介绍 RxJava,响应式编程,告别AsyncTask,与Retrofit组成了日常开发经典组合。之前,好读书不求甚解得态度,让我对其印象并不深刻,知识层面比较浅薄。为了加深印象,重新学习。
添加依赖 1 2 implementation 'io.reactivex.rxjava2:rxandroid:2.0.1' implementation 'io.reactivex.rxjava2:rxjava:2.1.7'
举例说明 上游和下游(被观察者和观察者) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe (ObservableEmitter<Integer> e) throws Exception { e.onNext(111 ); e.onNext(222 ); e.onComplete(); } }); Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe (Disposable d) { Logger("onSubscribe" ); } @Override public void onNext (Integer integer) { Logger("onNext integer = " + integer); } @Override public void onError (Throwable e) { Logger("onError e = " + e.getMessage()); } @Override public void onComplete () { Logger("onComplete" ); } }; observable.subscribe(observer);
运行结果
流程
创建Observable
创建Observer
Observerable订阅(subscribe)Observer
Observable Observable是一个抽象类,实现ObservableSource接口
1 public abstract class Observable <T > implements ObservableSource <T >
而ObservableSource接口中只有一个订阅方法
1 2 3 4 5 6 7 8 9 public interface ObservableSource <T > { void subscribe (@NonNull Observer<? super T> observer) ; }
通过create方法创建Observable,稍微追下源码,帮助理解
1 2 3 4 public static <T> Observable<T> create (ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null" ); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
参数类型为ObservableOnSubscribe接口
1 2 3 4 5 6 7 8 9 public interface ObservableOnSubscribe <T > { void subscribe (@NonNull ObservableEmitter<T> e) throws Exception ; }
ObservableEmitter可以理解成发射器,看下它的源码。它继承Emitter。而Emitter中才是我们最基础最常用的功能。
1 2 3 4 5 6 7 8 public interface ObservableEmitter <T > extends Emitter <T > { void setDisposable (@Nullable Disposable d) ; void setCancellable (@Nullable Cancellable c) ; boolean isDisposed () ; ObservableEmitter<T> serialize () ; @Experimental boolean tryOnError (@NonNull Throwable t) ; }
上面这部分代码,删除了部分注释,可在源码中查阅。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public interface Emitter <T > { void onNext (@NonNull T value) ; void onError (@NonNull Throwable error) ; void onComplete () ; }
从上面Emitter的代码可以看出,我们可以发射三种消息: onNext, onComplete, onError。从语义上就很容易看出他们的作用。 针对三个事件,做一些说明:
发射器可以发送无限个onNext, Observer也可以接收无限个onNext;
发射器发送一个onComplete后, onComplete之后的事件将会继续发送, 而Observer收到第一个onComplete事件之后将不再继续接收事件;
发射器发送了一个onError后, onError之后的事件将继续发送, 而Observer收到onError事件之后将不再继续接收事件;
发射器可以不发送onComplete或onError;
发射器不能同时发出onComplete和onError;
不允许发出多个onError或者onComplete。
Observer 相对于Observable而言,Observer就简单很多,我们只需要针对我们感兴趣的事件作出相应的处理即可。
1 2 3 4 5 6 public interface Observer <T > { void onSubscribe (@NonNull Disposable d) ; void onNext (@NonNull T t) ; void onError (@NonNull Throwable e) ; void onComplete () ; }
这里有一个新内容Disposable,先看代码,“象征一个一次性资源”,从注释上看是这个意思。但是如何理解?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public interface Disposable { void dispose () ; boolean isDisposed () ; }
举例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe (ObservableEmitter<Integer> e) throws Exception { e.onNext(111 ); e.onNext(222 ); e.onNext(333 ); e.onComplete(); e.onNext(444 ); } }); Observer<Integer> observer = new Observer<Integer>() { Disposable mDisposable; @Override public void onSubscribe (Disposable d) { Logger("onSubscribe" ); mDisposable = d; } @Override public void onNext (Integer integer) { Logger("onNext integer = " + integer); if (integer== 222 && !mDisposable.isDisposed()) { mDisposable.dispose(); } } @Override public void onError (Throwable e) { Logger("onError e = " + e.getMessage()); } @Override public void onComplete () { Logger("onComplete" ); } }; observable.subscribe(observer);
运行结果:
可以看到发射器发送了222之后,剩下的事件Observer并没有处理,那么剩下的事件发射器有没有发送呢?在发射器发射事件之前添加log。
可以看到发射器的事件并没有受到Disposable影响,只是Observer没有接收到222之后的事件而已。
Disposable的作用应该是,截断发射器与Observer之间的事件通道。