Rxjava2

Rxjava使用的是观察者模式,Observable和Observer是最重要的两个元素。

Observer是观察者,Observable是发布者,也可以说是被观察者。

这里先从最简单的流程开始分析。

1
2
3
Observable
-> create
-> subscribe

先来分析Observable.create()

1
2
3
4
5
6
7
8
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//主要是分析ObservableCreate(),将外部的内部类放到这个ObservableCreate中,而
//RxJavaPlugins.onAssembly()返回的就是ObservableCreate()
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

ObservableCreate是一个Observable,至此一个发布者(被观察者)诞生了。

ok,这里是对create的分析,接下来进入subscribe阶段。

注意,在create阶段结束之后返回的是就是ObservableCreate类。

所以subscribe方法是ObservableCreate.subscribe。

1
2
3
4
5
6
7
//Observable.java
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {
return subscribe(onNext, onError,onComplete,Functions.emptyConsumer());
}

下面看方法里的subscribe方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
//将onXXX方法放入Observer中。
LambdaObserver<T> ls = new LambdaObserver<T>(onNext,onError,onComplete, onSubscribe);
//下面会进入这个方法
subscribe(ls);
return ls;
}

这里将onXXX()放到一个LambdaObserver中。我们来看看LambdaObserver是什么

1
2
3
4
5
6
7
8
9
10
11
12
13
public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
public LambdaObserver(
Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
}

很明显LambdaObserver是一个Observer,所以Observer观察者找到了。

然后再回看👆的代码,我们再进入subscribe(ls) 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//Observable.java
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//这是Observable抽象类的抽象方法,具体实现要分析具体的Observable子类。
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}

看到上面的注释,subscribeActual调用的就是ObservableCreate.subscribeActual。所以我们接下来看ObservableCreate.subscribeActual

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//CreateEmitter是一个Disposable的实现类,代码在👇
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//通过前面的分析我们知道这个observer是LambdaObserver类,所以接下来看该类的实现方法
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//ObservableCreate.java
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
//这个类下面还有实现了onXXX方法,具体调用的是observer中的onXXX方法
@Override
public void onNext(T t) {
//省略部分代码
if (!isDisposed()) {
observer.onNext(t);
}
}
...
}

能够感受到这个里面包含一种设计模式,但是却不知道是哪一种,感觉设计模式之间参透不了其中的精髓,还是接着先看observer.onSubscribe(parent)吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//LambdaObserver.java
@Override
public void onSubscribe(Disposable s) {
public LambdaObserver(Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
//这个判断我目前搞不清楚是什么意思...
if (DisposableHelper.setOnce(this, s)) {
try {
//这个onSubscribe是构造函数里传进来的,而这个方法就是我们自己在 subscribe实现的匿名内部类中的onSubscribe方法
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
onError(ex);
}
}
}

我们再回到subscribeActual方法,来看看他下面的source.subscribe(parent);这个方法。

source就是构造方法传进来的ObservableOnSubscribe,而这个东西就是我们在外部create中创建的匿名内部类。

所以source.subscribe(parent);是绑定观察者和发布者的方法,而这样也确定了Rxjava2最基本的流程。

1
2
3
Observable
-> create
-> subscribe

第二部我们增加一个流程,加入一个map流程

1
2
3
4
Observable
-> create
-> map
-> subscribe

照着前面的第一步,我们可以知道create之后返回的是ObservableCreate,所以在我们心中是ObservableCreate.map方法,我们直接从这里开始分析。

1
2
3
4
5
6
7
8
9
10
11
//Observable.java
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
//传入的是map的方法,这里和create一样,所以我们直接看ObservableMap
//在这里构造方法传入了两个参数,第二个就不解释了,这第一个传入的是this,从前面我们心中已经标记了他是
//ObservableCreate.map,所以这个this就是ObservableCreate
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

接下来看看ObservableMap类

1
2
3
4
5
6
7
8
9
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//这里的super构造方法是把ObservableCreate放到了全局变量中
super(source);
this.function = function;
}
}

接下来就关键了,我们要开始分析下一步也就是map之后subscribe了。

这里中间直接用文字略述一下不需要细究的过程,ObservableMap的父类也是一个Observable类,该类也是实现了subscribeActual方法,而前面我们知道subscribe方法就是调用了该Observable类的subscribeActual方法,所以呢我们就可以直接进入ObservableMap.subscribeActual方法了。

1
2
3
4
5
6
7
8
//ObservableMap.java
@Override
public void subscribeActual(Observer<? super U> t) {
//结合上一段代码的source,这里的source是ObservableCreate类,
//推导一下那就是ObservableMap.subscribeActual调用了ObservableCreate.subscribeActual
//感觉这个链,一下子就连起来了
source.subscribe(new MapObserver<T, U>(t, function));
}

来看看这个MapObserver类

1
2
3
4
5
6
7
8
9
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
//这里父类的构造函数把传进来的actual给了一个全局变量
super(actual);
this.mapper = mapper;
}
}

接着我们需要回去再看看ObservableCreate的subscribeActual方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//ObservableCreate.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
//这里的observer是MapObserver了
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//我们要进入MapObserver来看看onSubscribe是如何实现的
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

MapObserver的onSubscribe实现方法在他的父类BasicFuseableObserver中,我们来看看具体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//BasicFuseableObserver.java
@SuppressWarnings("unchecked")
@Override
public final void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
this.qs = (QueueDisposable<T>)s;
}
if (beforeDownstream()) {
//这里的actual正好是前面说略述被略过的,根据第一部里的分析,这个actual就是LambdaObserver
//终于在这里,完成了create->map->subscribe的连线
actual.onSubscribe(this);
afterDownstream();
}
}
}

至此,完成了第二部分的分析,Rxjava的源码并不算难懂,命名非常有自己的特点,可以很快地通过方法名来知道这个方法的大概用途,而且她也使用了很多设计模式,让原本的复杂的、感觉上耦合很高的代码变得清晰,在这里要重新学习一下设计模式,才能感受到Rxjava代码的魅力