RxJava 线程模型分析
RxJava的被观察者在使用操作符时可以利用线程调度器–Scheduler来切换线程,例如
Observable.just("hello","Rxjava")
.observeOn(Schedulers.newThread())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s.toUpperCase();
}
})
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
});
被观察者(Observable、Flowable…)发射数据流之后,其操作符可以在不同的线程中加工数据流,最后被观察者在前台线程中接受并响应数据。
下图不同的箭头颜色表示不同的线程。
线程调度
在默认情况下不做任何线程处理,观察者和被观察者是处于同一线程中的。RxJava提供了可以切换线程的便利 API ====>> subscribeOn(), observeOn()。
我们知道,在最后一步 subscribe(…) 未调用时,之前的链式是不会执行的,那我们先来看看 subscribe 的细节
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
...
// 构建一个 LambdaObserver,装饰者模式,对方法进行了增强
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
// 执行订阅
subscribe(ls);
return ls;
}
public final void subscribe(Observer<? super T> observer) {
...
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// 订阅
subscribeActual(observer);
...
}
可以看到,最后会执行 subscribeActual 方法,但是该方法实际上是一个抽象方法。也就是说,会执行 subscribe 上一步链式所创建的 Observable 实现,那我们往链式的上游看,即示例代码的 observeOn(Schedulers.io())
线程调度 observeOn
首先,先说结论:
- observeOn(scheduler) 接收一个 Scheduler 参数,用来指定后续操作运行在特定的线程调度器 Scheduler 上。
- 若多次执行 observeOn,则每次均起作用,线程会一直切换。
进入 observeOn() 的源码可以看到,每次调用 observeOn() 都会创建一个 ObservableObserveOn 对象。
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
...
// 注意: 这里的 this 表示调用 observeOn 的 Observable 对象,也就是上一步的链式,这里有可能就是操作符,或者 subscribeOn 的 Observable实现,我们统称为 上游事件
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
这时综上所述,你会发现示例里 subscribe 执行的其实就是 ObservableObserveOn 的 subscribeActual 实现。
这个设计就很巧妙了,采用 装饰者模式 + 责任链模式 的方式设计出简洁便利的API。
继续看 subscribeActual 代码,
public ObservableObserveOn(
ObservableSource<T> source,
Scheduler scheduler,
boolean delayError,
int bufferSize) {
// source 即上游事件对象
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
// ObservableObserveOn 订阅 subscribe 里的 LambdaObserver,也就是所谓的 上游事件和下游事件立即产生订阅
source.subscribe(observer);
} else {
// 线程调度器创建任务封装 Scheduler.Worker
Scheduler.Worker w = scheduler.createWorker();
// 上游事件对象订阅下游事件 observer(被封装为ObserveOnObserver对象)
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
ObserveOnObserver 是 ObservableObserveOn 的内部类,实现了 Observer、Runnable 接口。
此时,如果你稍微看一下其他操作符的实现时,你会发现执行流程基本是一致的,
因此,再一次综上所述,我们逆向层层递推,当推到链式反应的第一步时,此时的上游事件订阅下游事件,变成了当前事件(没有上游事件了)订阅下游事件,如示例中的 Observable.create(),
public final class ObservableCreate<T> extends Observable<T> {
// source 即 Observable.create 时的 ObservableOnSubscribe 实现
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 方法增强 emitter 发射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 下游事件调用 onSubscribe
observer.onSubscribe(parent);
try {
// 执行 source 实现, 通常我们会调用 emitter.onNext() 等操作,开始链式反应
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
当最上层的 emitter 发射数据的时候,优先调用了当前 Observer 的 onNext(),等到执行完后,再开始调用下一个 Observer 的 onNext(),这样达到了层层传递,层层处理的目的。
最终会到达 ObserveOnObserver 的 onNext() 方法。schedule() 执行了具体调度的方法。
@Override
public void onNext(T t) {
...
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
其中,worker 是当前 scheduler 创建的 Worker,this 指的是当前的 ObserveOnObserver 对象, 实现了 Runnable 接口。
以 IoScheduler 为例,找到 io.reactivex.internal.schedulers.IoScheduler 的 worker 实现
static final class EventLoopWorker extends Scheduler.Worker {
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
// 缓存 worker 池
this.threadWorker = pool.get();
}
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
...
// ThreadWorker 继承 NewThreadWorker
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
最终会调用 scheduleActual 方法
public NewThreadWorker(ThreadFactory threadFactory) {
// executor 是 核心线程数为 1 的线程池 ScheduledExecutorService
executor = SchedulerPoolFactory.create(threadFactory);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
...
Future<?> f;
try {
if (delayTime <= 0) {
// 执行线程
f = executor.submit((Callable<Object>)sr);
} else {
// 执行线程
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
...
}
return sr;
}
最终通过线程池执行 ObserveOnObserver 这个 Runnable 实现,即下游事件,来实现线程的切换
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
因此,下游多次调用 observeOn() 的话,线程会一直切换。每一次切换线程,都会把对应的 Observer 对象的各个方法的处理交给指定线程池去调度
线程调度 subscribeOn
subscribeOn的结论:
- subscribeOn 通过接收一个 Scheduler 参数,来指定对数据的处理运行在特定的线程调度器 Scheduler 上。
- 若多次执行 subscribeOn,则只有一次起作用。
进入 subscribeOn() 的源码可以看到,每次调用 subscribeOn() 都会创建一个 ObservableSubscribeOn 对象。
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
ObservableSubscribeOn 真正发生订阅的方法是 subscribeActual(Observer<? super T> observer)。
@Override
public void subscribeActual(final Observer<? super T> s) {
// SubscribeOnObserver 是下游的 Observer 通过装饰器模式生成的。它实现了 Observer、Disposable 接口
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// 在上游的线程中执行下游 Observer 的 onSubscribe(Disposable disposabel)方法。
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
然后,将子线程的操作加入 Disposable 管理中,加入 Disposable 后可以方便上下游的统一管理。
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
在这里,已经调用对应 scheduler 的 scheduleDirect() 方法,最终会调用 对应 Worker 的 schedule 方法,通过线程池完成调度。
scheduleDirect() 传入的是一个Runnable,也就是下面的 SubscribeTask。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
// 对上游的 Observable 进行订阅。此时,已经在对应的 Scheduler 线程中运行了。
source.subscribe(parent);
}
}
在RxJava的链式操作中,数据的处理是自下而上,这点跟数据发射正好相反。如果多次调用 subscribeOn,最上面的线程切换最晚执行,所以变成了只有第一次切换线程才有效。
为什么 subscribeOn() 只有第一次切换有效
写到这里我们这个问题也就能回答了
因为 RxJava 最终能影响 ObservableOnSubscribe 这个匿名实现接口的运行环境的只能是最后一次运行的 subscribeOn() ,又因为 RxJava 订阅的时候是从下往上订阅,所以从上往下第一个 subscribeOn() 就是最后运行的,这就造成了写多个 subscribeOn() 并没有什么用的现象。
示例
RxJava 线程切换非常方便,直接看代码
线程切换示例代码:
Observable.create(ObservableOnSubscribe<String> {
Log.e("RxThreadTest", "create " + Thread.currentThread().name)
it.onNext("hello")
it.onComplete()
})
.observeOn(AndroidSchedulers.mainThread()) //指定下面的 call 在主线程中执行
.flatMap {
Log.e("RxThreadTest", "flatMap1 " + Thread.currentThread().name)
Observable.just(it)
}
.observeOn(Schedulers.io()) //指定下面的 call 在IO 线程中执行
.flatMap {
Log.e("RxThreadTest", "flatMap2 " + Thread.currentThread().name)
Observable.just(it)
}
.subscribeOn(Schedulers.io()) // 指定上面未指定的 Observable 在 IO 线程中执行,即最开始的ObservableOnSubscribe
.observeOn(AndroidSchedulers.mainThread()) // 指定下面的 subscribe 在主线程执行
.subscribe({
Log.e("RxThreadTest", "$it - ${Thread.currentThread().name}")
}, {
Log.e("RxThreadTest", it.message)
}, {
Log.e("RxThreadTest", "OnComplete")
})
输出结果:
15:20:21.073 create RxCachedThreadScheduler-1
15:20:21.149 flatMap1 main
15:20:21.157 flatMap2 RxCachedThreadScheduler-2
15:20:21.287 hello - main
15:20:21.287 OnComplete
示例代码2
val o1 = Observable.create(ObservableOnSubscribe<String> {
Log.e("RxThreadTest", "o1 ${Thread.currentThread().name}")
it.onNext("o1")
}).subscribeOn(Schedulers.io())
val o2 = Observable.create(ObservableOnSubscribe<String> {
Log.e("RxThreadTest", "o2 ${Thread.currentThread().name}")
it.onNext("o2")
}).subscribeOn(Schedulers.io())
val o3 = Observable.create(ObservableOnSubscribe<String> {
Log.e("RxThreadTest", "o3 ${Thread.currentThread().name}")
it.onNext("o3")
}).subscribeOn(Schedulers.io())
Observable.create(ObservableOnSubscribe<String> {
Log.e("RxThreadTest", "create ${Thread.currentThread().name}")
it.onNext("create")
it.onComplete()
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap {
Log.e("RxThreadTest", "flatMap1 ${Thread.currentThread().name}")
o1
}
.observeOn(AndroidSchedulers.mainThread())
.flatMap {
Log.e("RxThreadTest", "flatMap2 ${Thread.currentThread().name}")
o2
}
.flatMap {
Log.e("RxThreadTest", "flatMap3 ${Thread.currentThread().name}")
o3
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e("RxThreadTest", "onNext ${Thread.currentThread().name}")
}
输出结果
15:50:46.763 E/RxThreadTest: create RxCachedThreadScheduler-1
15:50:46.848 E/RxThreadTest: flatMap1 main
15:50:46.848 E/RxThreadTest: o1 RxCachedThreadScheduler-1
15:50:46.968 E/RxThreadTest: flatMap2 main
15:50:46.969 E/RxThreadTest: o2 RxCachedThreadScheduler-1
15:50:46.969 E/RxThreadTest: flatMap3 RxCachedThreadScheduler-1
15:50:46.971 E/RxThreadTest: o3 RxCachedThreadScheduler-2
15:50:46.986 E/RxThreadTest: onNext main
flatMap3 没有指定线程,会继续使用之前 Observable 操作符使用的线程,事例中即 o2 执行的线程 RxCachedThreadScheduler-1。 需要注意的是,如果 flatMap2 修改为 map 操作符时,因为 map 没有返回操作符,此时 flatMap3 则会用 map 的线程即 main 线程