RxJava学习总结

RxJava+Retrofit+OkHttp是最佳的网络请求实践,之前的项目中一直是使用Callback的方式进行网络请求,最近得空,引入了RxJava和Retrofit,重新封装了下项目的网络请求模块,不得不说,使用RxJava的响应式编程思想,面向数据流的编程方式,写出来的代码十分优美。

编程范式概述

响应式编程: 响应式编程是一种面向数据流和变化传播的编程范式。实践:Rxjava

函数式编程: 或称函数程序设计,又称泛函编程,是一种编程典范,它将电脑运算视为数学上的函数计算,并且避免使用程序状态以及易变对象。函数编程语言最重要的基础是λ演算(lambda calculus)。而且λ演算的函数可以接受函数当作输入(引数)和输出(传出值)。比起命令式编程,函数式编程更加强调程序执行的结果而非执行的过程,倡导利用若干简单的执行单元让计算结果不断渐进,逐层推导复杂的运算,而不是设计一个复杂的执行过程。特征:函数作为基本单元,无状态,高阶函数等.

函数响应式编程:响应式编程思想为体,函数式编程思想为用。

命令式编程:是一种描述电脑所需作出的行为的编程典范。几乎所有电脑的硬件工作都是指令式的;几乎所有电脑的硬件都是设计来运行机器码,使用指令式的风格来写的。较高级的指令式编程语言使用变量和更复杂的语句,但仍依从相同的典范。菜谱和行动清单,虽非计算机程序,但与命令式编程有相似的风格:每步都是指令,有形的世界控制情况。因为命令式编程的基础观念,不但概念上比较熟悉,而且较容易具体表现于硬件,所以大部分的编程语言都是指令式的。实践:汇编和机器码.

面向对象编程:把构成问题的事物分解成一个个对象,建立对象的目的不是为了完成某一个步骤,而是为了描述某个事物在整个问题中的行为.

面向过程编程:分析出解决问题所需要的步骤,然后用函数把这些步骤一步步实现,使用的时候一个个依次调用就可以了.

以上编程范式并不互斥,可以相互结合使用,总而言之,编程范式本质就是为了更合理的解决问题而存在的.不要拘泥于概念而被已有的框架所束缚.

RxJava基本概念

Observable: 可被观察的对象
Observer: 观察者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {//注意: 只有当上游和下游建立连接之后, 上游才会开始发送事件. 也就是调用了subscribe() 方法之后才开始发送事件.
@Override
public void onSubscribe(Disposable d) {//注意Disposable在这里
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});

ObservableEmitter: 数据发射器 onNext(T value)、onComplete()和onError(Throwable error)。

  • 上游可以发送无限个onNext, 下游也可以接收无限个onNext.
  • 当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.
  • 当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
  • 上游可以不发送onComplete或onError.
  • 最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然

Disposable: 调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件.

subscribe()有多个重载的方法:

1
2
3
4
5
6
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}

不带任何参数的subscribe() 表示下游不关心任何事件,带有一个Consumer参数的方法表示下游只关心onNext事件.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
Log.d(TAG, "emit 4");
emitter.onNext(4);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});

线程调度

Rxjava默认上下游在同一个线程。

关键词

subscribeOn:控制在哪个线程发送事件,只有第一次指定有用,后续的都忽视
observeOn:控制在哪个线程接收事件,每次指定后接下来的操作都会在指定的线程,可多次指定

Rxjava内置线程选择:

  • Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
  • Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
  • Schedulers.newThread() 代表一个常规的新线程
  • AndroidSchedulers.mainThread() 代表Android的主线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "After observeOn(mainThread), current thread is: " + Thread.currentThread().getName());
}
})
.observeOn(Schedulers.io())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "After observeOn(io), current thread is : " + Thread.currentThread().getName());
}
})
.subscribe(consumer);

配合Retrofit

1
2
3
4
5
6
7
public interface Api {
@GET
Observable<LoginResponse> login(@Body LoginRequest request);
@GET
Observable<RegisterResponse> register(@Body RegisterRequest request);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Api api = retrofit.create(Api.class);
api.login(request)
.subscribeOn(Schedulers.io()) //在IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求结果
.subscribe(new Observer<LoginResponse>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(LoginResponse value) {}
@Override
public void onError(Throwable e) {
Toast.makeText(mContext, "登录失败", Toast.LENGTH_SHORT).show();
}
@Override
public void onComplete() {
Toast.makeText(mContext, "登录成功", Toast.LENGTH_SHORT).show();
}
});

CompositeDisposable: Disposable的容器,用于Activity或者Fragment退出时销毁所有Disposable

变换操作符

map: 对上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});

flatMap: 将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里,flatMap并不保证事件的顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, s);
}
});

concatMap: 和flatMap一样,但是保证事件的先后顺序

多个网络请求嵌套:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
api.register(new RegisterRequest()) //发起注册请求
.subscribeOn(Schedulers.io()) //在IO线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求注册结果
.doOnNext(new Consumer<RegisterResponse>() {
@Override
public void accept(RegisterResponse registerResponse) throws Exception {
//先根据注册的响应结果去做一些操作
}
})
.observeOn(Schedulers.io()) //回到IO线程去发起登录请求
.flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {//这里用map也可以实现
@Override
public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
return api.login(new LoginRequest());
}
})
.observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求登录的结果
.subscribe(new Consumer<LoginResponse>() {
@Override
public void accept(LoginResponse loginResponse) throws Exception {
Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Toast.makeText(MainActivity.this, "登录失败", Toast.LENGTH_SHORT).show();
}
});

map和flatMap的区别在于,map是1对1关系,而flatmap是1对多关系

zip

zip操作符用于将多个Observable发送的事件进行合并,它按照严格的顺序应用这个函数,它只发射与发射数据项最少的那个Observable一样多的数据.

异步进行合并:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Thread.sleep(1000);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Thread.sleep(1000);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Thread.sleep(1000);
Log.d(TAG, "emit 4");
emitter.onNext(4);
Thread.sleep(1000);
Log.d(TAG, "emit complete1");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(TAG, "emit A");
emitter.onNext("A");
Thread.sleep(1000);
Log.d(TAG, "emit B");
emitter.onNext("B");
Thread.sleep(1000);
Log.d(TAG, "emit C");
emitter.onNext("C");
Thread.sleep(1000);
Log.d(TAG, "emit complete2");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

zip发送的事件数量跟上游中发送事件最少的那一根水管的事件数量是有关的, 在这个例子里我们第二根水管只发送了三个事件然后就发送了Complete, 这个时候尽管第一根水管还有事件4 和事件Complete 没有发送, 但是它们发不发送还有什么意义呢? 所以本着节约是美德的思想, 就不让它发了.

如果不发送Complete呢? 答案是显然的, 上游会继续发送事件, 但是下游仍然收不到那些多余的事件.

如果在同一线程中,则不受这个条件约束,会发完一个Obervable的数据再去发另一个.

Backpressure

Backpressure是1.0的概念,用于发送数据和接收数据速率不统一的问题,2.0之后是没有这个概念的,大数据流用Flowable+Subscriber,小数据流用Observable+Observer.

当上下游处于不同线程,且上游速度远大于下游,可能会OOM,可以采用filter,sample,或者降低发送速度来避免这个问题.当然这是在使用Observable+Observer时的情况.

Flowable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE); //注意这句代码,同一个线程中,不request会报MissingBackpressureException异常
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

使用Flowable时,下游的onSubscribe方法中传给我们的不再是Disposable了, 而是Subscription, 它俩有什么区别呢, 首先它们都是上下游中间的一个开关, 之前我们说调用Disposable.dispose()方法可以切断水管, 同样的调用Subscription.cancel()也可以切断水管, 不同的地方在于Subscription增加了一个void request(long n)方法,不管在不在同一个线程,只有调用request才能收到数据.

使用Flowable时,如果上下游在同一个线程中,那么必须要调用request,否则会报MissingBackpressureException异常

如果不在同一个线程,那么发送数据会有一个缓冲区,大小为128,若存满后继续发送数据

这个时候要看设置Backpressure的strategy:

  • BackpressureStrategy.ERROR 直接报错MissingBackpressureException
  • BackpressureStrategy.BUFFER 和Observable表现一致,可能会OOM
  • BackpressureStrategy.DROP 存不下的数据都丢了
  • BackpressureStrategy.LATEST 存满之后,一直会刷新最新的一个的数据

如果不是自己创建的Flowable,可以通过以下方法设置策略:
onBackpressureBuffer()
onBackpressureDrop()
onBackpressureLatest()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Flowable.interval(1, TimeUnit.MICROSECONDS)
.onBackpressureDrop() //加上背压策略
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});

Flowable本质是为了响应式拉取,同一个线程中上游可以拿到下游request的数量,request的值可以累加,每次发送事件之后request数量自动减少,当为0时,说明下游没有消耗事件的能力了,这个时候就不要发了,
再发就要报MissingBackpressureException异常了.

而在不同线程中,不管下游request多少个,上游拿到的都是128,我们在上游做判断,如果拿到的request值等于0时,证明缓冲区满了不能发了,当下游消耗掉第96个数据后,上游又可以拿到request值了,等于96,然后继续发.
至于为什么是96,源码就是这样写的.

用于验证的demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public static void request() {
mSubscription.request(96); //请求96个事件
}
public static void demo4() {
Flowable
.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "First requested = " + emitter.requested());
boolean flag;
for (int i = 0; ; i++) {
flag = false;
while (emitter.requested() == 0) {
if (!flag) {
Log.d(TAG, "Oh no! I can't emit value!");
flag = true;
}
}
emitter.onNext(i);
Log.d(TAG, "emit " + i + " , requested = " + emitter.requested());
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
}

RxJava实践搭配

Rxlifecycle 可以避免不当的使用导致的内存泄漏,基础Activity和Fragment可以继承于Rxlifecycle组件

经过观察发现,Rxlifecycle只是在Activity等退出后,发消息阻断数据向下的传递,并不能解决内存泄漏问题,要解决这个,还是要从源头上解决,在退出后要停止发送数据,这样才能释放内存.

Rxlifecycle在View退出后,会回调onComplete.

内存泄漏本质上是因为长周期的组件引用了短周期的组件,在短周期的组件结束后还没有释放,就产生了内存泄漏,关键点在于引用,跟Rxjava关系不大,因为在创建Observer或者其它类时,经常会采用匿名内部类的方式创建,而匿名内部类默认持有对外部类的引用(所以我们才能在匿名内部类中访问外部类的成员变量(应该是在编译期自动生成了getter和setter方法)),所以会导致内存泄漏.而在MVP模式中,一般在Presenter中进行Rxjava的使用,但是MVP也会持有Activity的引用,所以在Activity退出时一定要与Presenter解绑,这个时候引用就被切断了,但是Presenter本身也会内存泄漏啊,不要以为内存泄漏只存在于Activity等组件中,所以这个时候还是要切断发送数据源,这样才可以,而Rxlifecycle的作用仅仅只是在Activity退出后让发送出的数据不往下传递了而已,功能仅限于此(这样我们就不用手动View判空了,因为View被回收后这边消息已经传递不下来了,哈哈).

总结,什么是内存泄漏?一切对象该被回收的时候,由于各种意外状况导致该对象仍然被引用,就是内存泄漏.多么精辟的一句话啊!

并不是每一个订阅都要手动解除,当onComplete或onError时会自动解除订阅关系,说是解除订阅关系,其实只是消息阻断而已,中间加了判断,确定要不要继续将消息传递下去

Rxbinding帮助我们将View的行为转化为Observable数据源

ReplaySubject (释放接收到的所有数据)
BehaviorSubject (释放订阅前最后一个数据和订阅后接收到的所有数据)
PublishSubject (释放订阅后接收到的数据)
AsyncSubject (仅释放接收到的最后一个数据)
SerializedSubject(串行Subject)
UnicastSubject (仅支持订阅一次的Subject)
TestSubject(已废弃,在2.x中被TestScheduler和TestObserver替代)

takeUntil操作符:当第二个Observable发射了一项数据或者终止时,丢弃原Observable发射的任何数据。

takeUntil本质上第二个Observable发送了数据之后,会调用第一个Observable中的dispose和onComplete方法.

文章目录
  1. 1. 编程范式概述
  2. 2. RxJava基本概念
  3. 3. 线程调度
    1. 3.1. 关键词
    2. 3.2. 配合Retrofit
  4. 4. 变换操作符
  5. 5. zip
  6. 6. Backpressure
    1. 6.1. Flowable
  7. 7. RxJava实践搭配
|