营销网站制作信ls15227,易语言 网站开发,酒店网站建设范文,百度app安装文章目录 一、创建操作1、基本创建2、快速创建2.1 empty2.2 never2.3 error2.4 from2.5 just 3、定时与延时创建操作3.1 defer3.2 timer3.3 interval3.4 intervalRange3.5 range3.6 repeat 二、过滤操作1、skip/skipLast2、debounce3、distinct——去重4、elementAt——获取指定… 文章目录 一、创建操作1、基本创建2、快速创建2.1 empty2.2 never2.3 error2.4 from2.5 just 3、定时与延时创建操作3.1 defer3.2 timer3.3 interval3.4 intervalRange3.5 range3.6 repeat 二、过滤操作1、skip/skipLast2、debounce3、distinct——去重4、elementAt——获取指定位置元素5、filter——过滤6、first——取第一个数据7、last——取最后一个8、ignoreElements ignoreElement忽略元素9、ofType过滤类型10、sample11 、take takeLast 三、组合可观察对象操作符1、CombineLatest2、merge3、zip4、startWith5、join 四、变化操作符1、map2、flatMap / concatMap3、scan4、buffer5、window 关于RxJava/RxAndroid的全部文章 一、创建操作
1、基本创建 create创建一个基本的被观察者
在使用create()操作符时最好在被观察者的回调函数subscribe()中加上isDisposed()以便在观察者断开连接的时候不在执行subscribe()函数中的相关逻辑避免意想不到的错误出现。 Observable.create(new ObservableOnSubscribeObject() {Overridepublic void subscribe(NonNull ObservableEmitterObject emitter) throws Throwable {try {if(!emitter.isDisposed()){emitter.onNext(a);emitter.onNext(b);}} catch (Exception e) {emitter.onError(e);}}}).subscribe(value - Log.e(TAG, onNext: value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));2、快速创建
完整快速创建被观察者、数组、集合遍历
操作符作用empty创建一个只发送 onComplete 事件的 Observable。never创建一个不发送任何事件的 Observable。error创建一个只发送 onError 事件的 Observable。from操作符用于将其他对象或数据结构转换为 Observable,可发送不同类型的数据流just操作符将对象或一组对象转换为 Observable并立即发送这些对象没有延迟。
2.1 empty 创建一个不发射任何items但正常终止的 Observable——create an Observable that emits no items but terminates normally Observable.empty().subscribe(value - Log.e(TAG, onNext: value ),error - Log.e(TAG, Error: error),()-Log.e(TAG,onComplete));2.2 never 创建一个不发射任何items且不会终止的 Observable——create an Observable that emits no items and does not terminate Observable.never().subscribe(value - Log.e(TAG, onNext: value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));不发送任何事件
2.3 error 创建一个不发射任何items并以错误终止的 Observable——create an Observable that emits no items and terminates with an error Observable.error(new Exception(ERROR)).subscribe(value - Log.e(TAG, onNext: value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));2.4 from 以fromAray举例 Observable.fromArray(1,2,3,4,5).subscribe(value - Log.e(TAG, onNext: value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));2.5 just Observable.just(1,2,3,4,5).subscribe(value - Log.e(TAG, onNext: value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));通过just()创建传入Integer类型的参数构建Observable被观察者相当于执行了onNext(1)~onNext(5)通过链式编程订阅观察者。注意just的数据一般不能超过10个。
注意如果将 null 传递给 Just它将返回一个将 null 作为项目发出的 Observable。不要错误地认为这将返回一个空的 Observable根本不发出任何项目
3、定时与延时创建操作
定时操作、周期性操作
操作符作用defer直到有Observer观察者订阅时才会通过Observeable的工厂方法动态创建Observeable并且发送事件timer用于延时发送,在给定的延迟后发出单个项目interval它按照指定时间间隔发出整数序列通常用于定时操作。intervalRange类似于interval()快速创建一个被观察者对象指定时间间隔就发送事件可以执行发送事件的数量range它发出一个连续的整数序列,可以指定发送的次数repeat重复发送指定次数的某个事件流
3.1 defer 直到有Observer观察者订阅时才会通过Observeable的工厂方法动态创建Observeable并且发送事件
defer不会立即创建 Observable而是等到观察者订阅时才动态创建每个观察者都会得到一个新的 Observable 实例。
defer确保了Observable代码在被订阅后才执行而不是创建后立即执行 ObservableInteger integerObservable Observable.defer(new SupplierObservableSource? extends Integer() {Overridepublic ObservableSource? extends Integer get() throws Throwable {int randomNumber (int) (Math.random() * 100);return Observable.just(randomNumber);}});integerObservable.subscribe(new ConsumerInteger() {Overridepublic void accept(Integer integer) throws Throwable {Log.e(TAG, 第一次 integer.toString());}});integerObservable.subscribe(integer - Log.e(TAG, 第二次 integer.toString()));3.2 timer 构造方法如下 timer(long delay, TimeUnit unit)timer(long delay, TimeUnit unit, Scheduler scheduler)delay延时的时间类型为Longunit表示时间单位有TimeUnit.SECONDS等多种类型scheduler表示调度器用于指定线程。
用于延时发送 final SimpleDateFormat dateFormat new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Log.e(TAG, timer当前时间 dateFormat.format(System.currentTimeMillis()));Observable.timer(5, TimeUnit.SECONDS).subscribe(value - Log.e(TAG, timeronNext dateFormat.format(System.currentTimeMillis())),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));表示延迟5s后发送数据 3.3 interval 用于定时发送数据快速创建Observable被观察者对象每隔指定的时间就发送相应的事件事件序列从0开始无限递增1
//在指定延迟时间后每个多少时间发送一次事件
interval(long initialDelay, long period, TimeUnit unit)//在指定的延迟时间后每隔多少时间发送一次事件可以指定调度器
interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)//每间隔多少时间发送一次事件使用默认的线程
ObservableLong interval(long period, TimeUnit unit)//每间隔多少时间发送一次事件可以指定调度器
interval(long period, TimeUnit unit, Scheduler scheduler)initialDelay 表示延迟开始的时间类型为Longperiod距离下一次发送事件的时间间隔类型Longunit时间单位有TimeUnit.SECONDS等多种类型scheduler表示调度器用于指定线程。
它会从0开始然后每隔 1 秒发射一个递增的整数值 Observable.interval(1,3,TimeUnit.SECONDS).subscribe(value - Log.e(TAG, timeronNext dateFormat.format(System.currentTimeMillis())),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));定时发射指定的结果
// 创建一个每秒发射一个递增整数的 ObservableObservableLong intervalObservable Observable.interval(1, TimeUnit.SECONDS);// 使用 map 操作符将递增的整数值映射为您想要的数据类型ObservableString customObservable intervalObservable.map(index - Data_ index); // 映射为字符串 Data_ index// 订阅并输出结果customObservable.subscribe(data - System.out.println(Received: data),error - System.err.println(Error: error),() - System.out.println(Completed));3.4 intervalRange
类似于interval()快速创建一个被观察者对象指定时间间隔就发送事件可以执行发送事件的数量数据依次递增1。
intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)start表示事件开始的数值大小类型为Longcount表示事件执行的次数类型为long不能为负数;initialDelay表示延迟开始的时间类型为Longperiod距离下一次发送事件的时间间隔类型Longunit时间单位有TimeUnit.SECONDS等多种类型scheduler表示调度器用于指定线程。 Observable.intervalRange(10, 3, 2, 1,TimeUnit.SECONDS,Schedulers.io()).subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));3.5 range Range 运算符按顺序发出一系列连续整数可以在其中选择范围的起点及其长度。
它发出一个连续的整数序列通常不涉及延迟。类似于intervalRange。 public static ObservableInteger range(int start, int count)public static ObservableLong rangeLong(long start, long count)start事件开始的大小count发送的事件次数 Observable.range(10,5).subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));3.6 repeat repeat操作符可以重复发送指定次数的某个事件流repeat操作符默认在trampoline调度器上执行repeat默认重复次数为Long.MAX_VALUE可使用重载方法指定次数以及使用repeatUntil指定条件。 //一直重复Observable.fromArray(1, 2, 3, 4).repeat();//重复发送5次Observable.fromArray(1, 2, 3, 4).repeat(5);//重复发送直到符合条件时停止重复Observable.fromArray(1, 2, 3, 4).repeatUntil(new BooleanSupplier() {Overridepublic boolean getAsBoolean() throws Exception {//自定判断条件为true即可停止默认为falsereturn false;}});二、过滤操作
1、skip/skipLast 可以在Flowable,Observable中使用表示源发射数据前跳过多少个。 skip: skip 操作符用于跳过 Observable 开头的一定数量的事件然后开始发射后续的事件。它忽略序列的头部事件。 例如observable.skip(3) 会跳过前面的 3 个事件然后发射后续的事件。 skipLast: skipLast 操作符用于跳过 Observable 末尾的一定数量的事件然后发射前面的事件。它忽略序列的末尾事件。 例如observable.skipLast(3) 会发射从序列开头到倒数第 3 个事件之前的事件忽略了最后 3 个事件。 ObservableInteger integerObservable Observable.just(1, 2, 3, 4, 5, 6, 7, 8);integerObservable.skipLast(3).subscribe(new ConsumerInteger() {Overridepublic void accept(Integer integer) throws Throwable {Log.e(TAG, accept: integer);}});换成skip后结果如下 2、debounce
仅当特定时间跨度过去而没有发出另一个项目时才从 Observable 发出一个项目 Observable.create(emitter - {emitter.onNext(1);Thread.sleep(1_500);emitter.onNext(2);Thread.sleep(500);emitter.onNext(3);Thread.sleep(2000);emitter.onNext(4);emitter.onComplete();
}).subscribeOn(Schedulers.io()).debounce(1,TimeUnit.SECONDS).blockingSubscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete)
);debounce(1, TimeUnit.SECONDS) 表示将事件流中的事件按照时间窗口的方式进行过滤。具体含义是如果在连续的 1 秒内没有新的事件发射那么才会将最后一个事件传递给观察者否则会丢弃之前的事件。 结合图像理解红色线条为debounce监听的发射节点也就是每隔一秒发送一次数据。
在0s时发送了1。
在1s时由于没有数据就没有发送数据。
在1s—2s期间产生了两次数据分别是2和3。但是debounce只会将距离2s最近一次的数据发送。因此2被不会发送出来。 3、distinct——去重 可作用于Flowable,Observable去掉数据源重复的数据。 Observable.just(1,2,3,1,2,3,4).distinct().subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));distinctUntilChanged()去掉相邻重复数据。 Observable.just(1,3,3,2,2,3,4).distinctUntilChanged().subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));//还可以指定重复条件Observable.just(1,3,3,2,2,3,4).distinctUntilChanged(new FunctionInteger, Boolean() {Overridepublic Boolean apply(Integer integer) throws Throwable {return integer3;}}).subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));4、elementAt——获取指定位置元素 //获取索引为1的元素如果不存在返回Error
Observable.just(a,b,c,d,e).elementAt(1,Error).subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),
);5、filter——过滤
用于过滤指定的发射元素。 Observable.just(1, 2, 3, 4, 5, 6).filter(new PredicateInteger() {Overridepublic boolean test(Integer integer) throws Throwable {return (integer % 2) ! 0;}}).subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));6、first——取第一个数据 //不存在则返回100
Observable.just(1, 2, 3, 4, 5, 6).first(100).subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error));7、last——取最后一个 last、lastElement、lastOrError与fist、firstElement、firstOrError相对应。 Observable.just(1, 2, 3, 4, 5, 6).last(100).subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error));8、ignoreElements ignoreElement忽略元素 ignoreElements 作用于Flowable、Observable。ignoreElement作用于Maybe、Single。两者都是忽略掉数据不发射任何数据返回完成或者错误时间。
9、ofType过滤类型
作用于Flowable、Observable、Maybe过滤选择类型。 Observable.just(1, 2, 3, 4.4, 5.5, 6.6).ofType(Integer.class).subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error));10、sample
debounce它等待一段时间如果在这段时间内没有新事件到达它会发射最后一个事件。它用于处理高频率事件流例如用户输入以确保只处理用户停止输入后的事件。debounce 等待事件流静止然后发射最后一个事件。sample它按照固定的时间间隔从事件流中抽样一个事件并发射该事件。它用于定期采样事件流例如从传感器数据中每隔一段时间获取一次数据。sample 定期获取事件无论事件流是否活跃。 ObservableInteger observable Observable.create(emitter - {emitter.onNext(1);Thread.sleep(1_500);emitter.onNext(2);Thread.sleep(500);emitter.onNext(3);Thread.sleep(2000);emitter.onNext(4);emitter.onComplete();});observable.sample(1, TimeUnit.SECONDS).subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error));产生的数据在红线处发送
1在第1s时被发送2在第2s时被发送3在第3s时被发送由于4还未在第5s时就已经onComplete所以4无法被发送 11 、take takeLast 作用于Flowable、Observable。take发射前n个元素。takeLast发射后n个元素。 ObservableInteger source Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);source.take(4).subscribe(value- Log.e(TAG, timeronNext value));//打印:1 2 3 4source.takeLast(4).subscribe(value- Log.e(TAG, timeronNext value));//打印:7 8 9 10三、组合可观察对象操作符
操作符作用combineLatest用于将多个 Observable 中最新的事件进行组合并生成一个新的事件。merge用于将多个 Observable 合并成一个单一的 Observable按照它们发射事件的顺序合并。zip用于一一配对多个 Observable 发射的事件只有当所有 Observable 都有事件时才生成新事件。startWith用于在一个 Observable 发射的事件前插入一个或多个初始事件。join用于将两个 Observable 的事件按照时间窗口的方式进行组合。
1、CombineLatest 通过指定的函数将每个 Observable 发出的最新项目组合在一起并根据该函数的结果发出项目
combineLatest 用于将多个 Observable 中最新的事件进行组合并生成一个新的事件。当任何一个 Observable 发射新数据时都会生成新的组合事件。适用于需要及时反应多个数据源最新值变化的情况。
ObservableInteger source1 Observable.just(1, 2, 3);
ObservableString source2 Observable.just(A, B, C);
ObservableBoolean source3 Observable.just(true, false, true);ObservableString combined Observable.combineLatest(source1,source2,source3,(integer, string, aBoolean) - integer string aBoolean
);combined.subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete)
);2、merge merge 用于将多个 Observable 合并成一个单一的 Observable按照它们发射事件的顺序合并。merge 不会进行事件的组合只是合并多个 Observable 的事件。适用于需要将多个 Observable 的事件合并成一个流的情况。
注意merge只能合并相同类型的Observable ObservableInteger source1 Observable.just(1, 2, 3);ObservableInteger source2 Observable.just(4,5,6);ObservableInteger source3 Observable.just(7,8,9);ObservableInteger combined Observable.merge(source1,source2,source3);combined.subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));3、zip zip 用于一一配对多个 Observable 发射的事件只有当所有 Observable 都有事件时才生成新事件。zip 会等待所有 Observable 都有事件后才会执行组合函数生成新事件。适用于需要将多个数据源的事件一一配对的情况。 ObservableInteger source1 Observable.just(1, 2, 3);ObservableString source2 Observable.just(A, B, C);ObservableBoolean source3 Observable.just(true, false, true);ObservableString combined Observable.zip(source1,source2,source3,(integer, string, aBoolean) - integer string aBoolean);combined.subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));4、startWith startWith 用于在一个 Observable 发射的事件前插入一个或多个初始事件。这些初始事件会作为 Observable 的开头。适用于需要在 Observable 发射事件前添加一些初始数据的情况。 ObservableInteger source Observable.just(1, 2, 3);ObservableInteger withStart source.startWithArray(100,200);withStart.subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));5、join join 用于将两个 Observable 的事件按照时间窗口的方式进行组合。可以为每个 Observable 设置时间窗口然后在这些窗口内组合事件。适用于需要在时间窗口内组合两个 Observable 的事件的情况。
时间窗口 固定时间窗口定义一个固定的时间段将在该时间段内的事件分为一个时间窗口。延时时间窗口定义一个时间段但在事件发生后延迟一段时间后才分为时间窗口。动态时间窗口根据事件的特定条件动态地定义时间窗口。 ObservableInteger left Observable.just(1, 2, 3);ObservableInteger right Observable.just(10, 20, 30);left.join(right,leftDuration - Observable.timer(1, TimeUnit.SECONDS),rightDuration - Observable.timer(1, TimeUnit.SECONDS),(leftValue, rightValue) - Left: leftValue , Right: rightValue).subscribe(value - Log.e(TAG, timeronNext value));在这个示例中我们定义了以下时间窗口规则
左边的时间窗口规则leftDuration - Observable.timer(1, TimeUnit.SECONDS) 表示在左边的事件后等待 1 秒后生成一个时间窗口。右边的时间窗口规则rightDuration - Observable.timer(2, TimeUnit.SECONDS) 表示在右边的事件后等待 2 秒后生成一个时间窗口。
现在让我们看看时间窗口如何影响事件的组合
当左边的事件 1 发生时它会进入左边的时间窗口并等待 1 秒。在此期间右边的事件没有机会进入左边的时间窗口。当右边的事件 10 发生时它会进入右边的时间窗口并等待 2 秒。在此期间左边的事件也没有机会进入右边的时间窗口。
只有在左边和右边的事件都在各自的时间窗口内时它们才会被组合。在这个示例中左边的事件会在右边的时间窗口内被组合。所以在 1 秒后左边的事件 1 和右边的事件 10 被组合成 “Left: 1, Right: 10”。 四、变化操作符
| 操作符 | 说明 |
map()对数据流的类型进行转换flatMap()对数据流的类型进行包装成另一个数据流scan()scan操作符会对发射的数据和上一轮发射的数据进行函数处理并返回的数据供下一轮使用。buffer()缓存指定大小数据window()缓存指定大小数据返回新的integerObservable
对上一轮处理过后的数据流进行函数处理 对所有的数据流进行分组 缓存发射的数据流到一定数量随后发射出数据流集合 缓存发射的数据流到一定数量随后发射出新的事件流
1、map Observable.just(1,2,3).map(new FunctionInteger, Object() {Overridepublic Object apply(Integer integer) throws Throwable {return integer * 100;}}).subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));2、flatMap / concatMap Observable observable Observable.just(isLogin(12346)).flatMap(new FunctionBoolean, ObservableSource?() {Overridepublic ObservableSource? apply(Boolean aBoolean) throws Throwable {String Login 登陆失败帐号秘密错误;if (aBoolean) Login 登陆成功;return Observable.just(Login).delay(2, TimeUnit.SECONDS);}});observable.subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));private boolean isLogin(String passWord) {if (passWord.equals(123456)) {return true;}return false;}Observable.just(isLogin(12346)) 创建一个 Observable它会发射一个布尔值表示登录是否成功。.flatMap(new FunctionBoolean, ObservableSource?() { ... }使用 flatMap 操作符将上一步的布尔值结果转换成一个新的 Observable其中包含登录的结果消息。flatMap 中的 apply 方法根据登录结果 aBoolean 决定返回不同的消息。如果登录成功返回 “登陆成功” 消息否则返回 “登陆失败帐号秘密错误” 消息并使用 delay 延迟 2 秒发送消息。observable.subscribe(...)最后订阅 observable并设置了三个回调函数分别处理 onNext、onError、onComplete 事件。
concatMap与flatMap的区别: concatMap是有序的flatMap是无序的。 flatMap(): 不保证内部 Observable 的发射顺序它会尽可能并行地处理内部 Observable并将它们的发射结果合并到一个单一的 Observable 中。内部 Observable 可以乱序发射数据最终结果也可能是乱序的。 concatMap(): 保证内部 Observable 的发射顺序它会按照原始数据的顺序依次处理每个内部 Observable等待一个内部 Observable 完成后再处理下一个。内部 Observable 的发射顺序和最终结果的顺序都与原始数据的顺序一致。 ObservableInteger source Observable.just(1, 2, 3);// 使用 flatMapsource.flatMap(num - Observable.just(num * 2).delay(num, TimeUnit.MILLISECONDS)).subscribe(value - Log.e(TAG, timerflatMapOnNext value));// 使用 concatMapsource.concatMap(num - Observable.just(num * 2).delay(num, TimeUnit.MILLISECONDS)).subscribe(value - Log.e(TAG, timerconcatMapOnNext value));3、scan scan操作符会对发射的数据和上一轮发射的数据进行函数处理并返回的数据供下一轮使用。 ObservableInteger observable Observable.just(1,2,3,4).scan(new BiFunctionInteger, Integer, Integer() {Overridepublic Integer apply(Integer integer, Integer integer2) throws Throwable {Log.e(TAG, integer integer integer2 integer2);return integer2-integer;}});observable.subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete));初始情况下前一个累积结果为空因为没有前一个值所以第一个数据项 1 直接发射出来产生的结果是 1。接下来前一个累积结果是 1当前数据项是 2所以执行操作 2 - 1产生的结果是 1。再次执行前一个累积结果是 1当前数据项是 3所以执行操作 3 - 1产生的结果是 2。最后前一个累积结果是 2当前数据项是 4执行操作 4 - 2产生的结果是 2。 4、buffer buffer操作符可以将发射出来的数据流在给定的缓存池中进行缓存当缓存池中的数据项溢满时则将缓存池的数据项进行输出重复上述过程直到将发射出来的数据全部发射出去。 Observable.just(1,2,3,4,5,6,7,8).buffer(3).subscribe(value - Log.e(TAG, timeronNext value),error - Log.e(TAG, Error: error),() - Log.e(TAG, onComplete))5、window window操作符和buffer操作符在功能上实现的效果是一样的但window操作符最大区别在于同样是缓存一定数量的数据项window操作符最终发射出来的是新的事件流integerObservable而buffer操作符发射出来的是新的数据流。
也就是说window操作符发射出来新的事件流中的数据项还可以经过Rxjava其他操作符进行处理 window 操作符用于将一个 Observable 拆分为多个子 Observable每个子 Observable 包含一定数量的连续数据项。window 操作符的两个参数的含义如下 第一个参数count指定每个子 Observable 中包含的数据项的数量。第二个参数skip指定何时启动新的窗口。它定义了窗口之间的重叠或间隔。如果 skip 等于 count则窗口之间不重叠。如果 skip 小于 count则窗口之间有重叠数据。 举个例子来说明 假设有一个 Observable 发出的数据序列如下1, 2, 3, 4, 5, 6, 7, 8, 9。 如果你使用 window(3, 2)它的含义是每个窗口包含 3 个数据项且窗口之间间隔 2 个数据项。那么分割后的子 Observable 将是 窗口11, 2, 3窗口23, 4, 5窗口35, 6, 7窗口47, 8, 9 如果你使用 window(3, 3)窗口之间不重叠每个窗口包含 3 个数据项。那么分割后的子 Observable 将是 窗口11, 2, 3窗口24, 5, 6窗口37, 8, 9 这里只使用了一个参数用于指定窗口的大小。然后更具window发射新的事件流integerObservable的特性对这个事件流进行了去重操作。 Observable.just(1,1,3,4,6,6,7,8).window(3).subscribe(new ConsumerObservableInteger() {Overridepublic void accept(ObservableInteger integerObservable) throws Throwable {integerObservable.distinct().subscribe(value - Log.e(TAG, timeronNext value.toString()));}});关于RxJava/RxAndroid的全部文章 RxJava/RxAndroid的基本使用方法一 RxJava的操作符使用(二) 参考文档
官方文档reactivex
RxJava3 WikiHome · ReactiveX/RxJava Wiki · GitHub
RxJava3官方githubWhat’s different in 3.0 · ReactiveX/RxJava Wiki · GitHub
RxJava2 只看这一篇文章就够了–玉刚说
RxJava2最全面、最详细的讲解–苏火火丶
关于背压Backpressure的介绍关于RxJava最友好的文章——背压Backpressure
RXJava3OKHTTP3Retrofit2观察者设计模式讲解实战