RxJava-线程调度

RxJava 线程模型分析

RxJava的被观察者在使用操作符时可以利用线程调度器–Scheduler来切换线程,例如

Observable.just("hello","Rxjava")
        .observeOn(Schedulers.newThread())
        .map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                return s.toUpperCase();
            }
        })
        .subscribeOn(Schedulers.single())
        .observeOn(Schedulers.io())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println(s);
            }
        });

被观察者(Observable、Flowable…)发射数据流之后,其操作符可以在不同的线程中加工数据流,最后被观察者在前台线程中接受并响应数据。
下图不同的箭头颜色表示不同的线程。
schedulers.png

线程调度

在默认情况下不做任何线程处理,观察者和被观察者是处于同一线程中的。RxJava提供了可以切换线程的便利 API ====>> subscribeOn(), observeOn()。

我们知道,在最后一步 subscribe(…) 未调用时,之前的链式是不会执行的,那我们先来看看 subscribe 的细节

 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) {
    ...
    // 构建一个 LambdaObserver,装饰者模式,对方法进行了增强
    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    // 执行订阅
    subscribe(ls);

    return ls;
}

public final void subscribe(Observer<? super T> observer) {
    ...
    ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    // 订阅
    subscribeActual(observer);
    ...
}

可以看到,最后会执行 subscribeActual 方法,但是该方法实际上是一个抽象方法。也就是说,会执行 subscribe 上一步链式所创建的 Observable 实现,那我们往链式的上游看,即示例代码的 observeOn(Schedulers.io())

线程调度 observeOn

首先,先说结论:

  • observeOn(scheduler) 接收一个 Scheduler 参数,用来指定后续操作运行在特定的线程调度器 Scheduler 上。
  • 若多次执行 observeOn,则每次均起作用,线程会一直切换。

进入 observeOn() 的源码可以看到,每次调用 observeOn() 都会创建一个 ObservableObserveOn 对象。

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ...
    // 注意: 这里的 this 表示调用 observeOn 的 Observable 对象,也就是上一步的链式,这里有可能就是操作符,或者 subscribeOn 的 Observable实现,我们统称为 上游事件
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

这时综上所述,你会发现示例里 subscribe 执行的其实就是 ObservableObserveOn 的 subscribeActual 实现。
这个设计就很巧妙了,采用 装饰者模式 + 责任链模式 的方式设计出简洁便利的API。

继续看 subscribeActual 代码,

public ObservableObserveOn(
    ObservableSource<T> source, 
    Scheduler scheduler, 
    boolean delayError, 
    int bufferSize) {
        // source 即上游事件对象
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        // ObservableObserveOn 订阅 subscribe 里的 LambdaObserver,也就是所谓的 上游事件和下游事件立即产生订阅
        source.subscribe(observer);
    } else {
        // 线程调度器创建任务封装 Scheduler.Worker
        Scheduler.Worker w = scheduler.createWorker();
        // 上游事件对象订阅下游事件 observer(被封装为ObserveOnObserver对象)  
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

ObserveOnObserver 是 ObservableObserveOn 的内部类,实现了 Observer、Runnable 接口。
此时,如果你稍微看一下其他操作符的实现时,你会发现执行流程基本是一致的,
因此,再一次综上所述,我们逆向层层递推,当推到链式反应的第一步时,此时的上游事件订阅下游事件,变成了当前事件(没有上游事件了)订阅下游事件,如示例中的 Observable.create(),

public final class ObservableCreate<T> extends Observable<T> {
// source 即 Observable.create 时的 ObservableOnSubscribe 实现
final ObservableOnSubscribe<T> source;

public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
    // 方法增强 emitter 发射器
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    // 下游事件调用 onSubscribe
    observer.onSubscribe(parent);

    try {
        // 执行 source 实现, 通常我们会调用 emitter.onNext() 等操作,开始链式反应
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

当最上层的 emitter 发射数据的时候,优先调用了当前 Observer 的 onNext(),等到执行完后,再开始调用下一个 Observer 的 onNext(),这样达到了层层传递,层层处理的目的。

最终会到达 ObserveOnObserver 的 onNext() 方法。schedule() 执行了具体调度的方法。

@Override
public void onNext(T t) {
    ...
    schedule();
}

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

其中,worker 是当前 scheduler 创建的 Worker,this 指的是当前的 ObserveOnObserver 对象, 实现了 Runnable 接口。
以 IoScheduler 为例,找到 io.reactivex.internal.schedulers.IoScheduler 的 worker 实现

 static final class EventLoopWorker extends Scheduler.Worker {
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;

    final AtomicBoolean once = new AtomicBoolean();

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.tasks = new CompositeDisposable();
        // 缓存 worker 池
        this.threadWorker = pool.get();
    }

    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        ...
        // ThreadWorker 继承 NewThreadWorker
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
}

最终会调用 scheduleActual 方法

public NewThreadWorker(ThreadFactory threadFactory) {
    // executor 是 核心线程数为 1 的线程池 ScheduledExecutorService
    executor = SchedulerPoolFactory.create(threadFactory);
}

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    ...
    Future<?> f;
    try {
        if (delayTime <= 0) {
            // 执行线程 
            f = executor.submit((Callable<Object>)sr);
        } else {
            // 执行线程 
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        ...
    }

    return sr;
}

最终通过线程池执行 ObserveOnObserver 这个 Runnable 实现,即下游事件,来实现线程的切换

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

因此,下游多次调用 observeOn() 的话,线程会一直切换。每一次切换线程,都会把对应的 Observer 对象的各个方法的处理交给指定线程池去调度

线程调度 subscribeOn

subscribeOn的结论:

  • subscribeOn 通过接收一个 Scheduler 参数,来指定对数据的处理运行在特定的线程调度器 Scheduler 上。
  • 若多次执行 subscribeOn,则只有一次起作用。

进入 subscribeOn() 的源码可以看到,每次调用 subscribeOn() 都会创建一个 ObservableSubscribeOn 对象。

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

ObservableSubscribeOn 真正发生订阅的方法是 subscribeActual(Observer<? super T> observer)。

@Override
public void subscribeActual(final Observer<? super T> s) {
    // SubscribeOnObserver 是下游的 Observer 通过装饰器模式生成的。它实现了 Observer、Disposable 接口
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    // 在上游的线程中执行下游 Observer 的 onSubscribe(Disposable disposabel)方法。
    s.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

然后,将子线程的操作加入 Disposable 管理中,加入 Disposable 后可以方便上下游的统一管理。

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

在这里,已经调用对应 scheduler 的 scheduleDirect() 方法,最终会调用 对应 Worker 的 schedule 方法,通过线程池完成调度。
scheduleDirect() 传入的是一个Runnable,也就是下面的 SubscribeTask。

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        // 对上游的 Observable 进行订阅。此时,已经在对应的 Scheduler 线程中运行了。
        source.subscribe(parent);
    }
}

在RxJava的链式操作中,数据的处理是自下而上,这点跟数据发射正好相反。如果多次调用 subscribeOn,最上面的线程切换最晚执行,所以变成了只有第一次切换线程才有效。

为什么 subscribeOn() 只有第一次切换有效

写到这里我们这个问题也就能回答了
因为 RxJava 最终能影响 ObservableOnSubscribe 这个匿名实现接口的运行环境的只能是最后一次运行的 subscribeOn() ,又因为 RxJava 订阅的时候是从下往上订阅,所以从上往下第一个 subscribeOn() 就是最后运行的,这就造成了写多个 subscribeOn() 并没有什么用的现象。

示例

RxJava 线程切换非常方便,直接看代码
线程切换示例代码:

Observable.create(ObservableOnSubscribe<String> {
        Log.e("RxThreadTest", "create " + Thread.currentThread().name)
        it.onNext("hello")
        it.onComplete()
    })
            .observeOn(AndroidSchedulers.mainThread())   //指定下面的 call 在主线程中执行
            .flatMap {
                Log.e("RxThreadTest", "flatMap1 " + Thread.currentThread().name)
                Observable.just(it)
            }
            .observeOn(Schedulers.io()) //指定下面的 call  在IO 线程中执行
            .flatMap {
                Log.e("RxThreadTest", "flatMap2 " + Thread.currentThread().name)
                Observable.just(it)
            }
            .subscribeOn(Schedulers.io())   //  指定上面未指定的 Observable 在 IO 线程中执行,即最开始的ObservableOnSubscribe
            .observeOn(AndroidSchedulers.mainThread())  // 指定下面的 subscribe 在主线程执行
            .subscribe({
                Log.e("RxThreadTest", "$it - ${Thread.currentThread().name}")
            }, {
                Log.e("RxThreadTest", it.message)
            }, {
                Log.e("RxThreadTest", "OnComplete")
            })

输出结果:

15:20:21.073  create RxCachedThreadScheduler-1
15:20:21.149  flatMap1 main
15:20:21.157  flatMap2 RxCachedThreadScheduler-2
15:20:21.287  hello - main
15:20:21.287  OnComplete

示例代码2

val o1 = Observable.create(ObservableOnSubscribe<String> {
           Log.e("RxThreadTest", "o1 ${Thread.currentThread().name}")
           it.onNext("o1")
       }).subscribeOn(Schedulers.io())

       val o2 = Observable.create(ObservableOnSubscribe<String> {
           Log.e("RxThreadTest", "o2 ${Thread.currentThread().name}")
           it.onNext("o2")
       }).subscribeOn(Schedulers.io())

       val o3 = Observable.create(ObservableOnSubscribe<String> {
           Log.e("RxThreadTest", "o3 ${Thread.currentThread().name}")
           it.onNext("o3")
       }).subscribeOn(Schedulers.io())

       Observable.create(ObservableOnSubscribe<String> {
           Log.e("RxThreadTest", "create ${Thread.currentThread().name}")
           it.onNext("create")
           it.onComplete()
       })
               .subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread())
               .flatMap {
                   Log.e("RxThreadTest", "flatMap1 ${Thread.currentThread().name}")
                   o1
               }
               .observeOn(AndroidSchedulers.mainThread())
               .flatMap {
                   Log.e("RxThreadTest", "flatMap2 ${Thread.currentThread().name}")
                   o2
               }
               .flatMap {
                   Log.e("RxThreadTest", "flatMap3 ${Thread.currentThread().name}")
                   o3
               }
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe {
                   Log.e("RxThreadTest", "onNext ${Thread.currentThread().name}")
               }

输出结果

15:50:46.763 E/RxThreadTest: create RxCachedThreadScheduler-1
15:50:46.848 E/RxThreadTest: flatMap1 main
15:50:46.848 E/RxThreadTest: o1 RxCachedThreadScheduler-1
15:50:46.968 E/RxThreadTest: flatMap2 main
15:50:46.969 E/RxThreadTest: o2 RxCachedThreadScheduler-1
15:50:46.969 E/RxThreadTest: flatMap3 RxCachedThreadScheduler-1
15:50:46.971 E/RxThreadTest: o3 RxCachedThreadScheduler-2
15:50:46.986 E/RxThreadTest: onNext main

flatMap3 没有指定线程,会继续使用之前 Observable 操作符使用的线程,事例中即 o2 执行的线程 RxCachedThreadScheduler-1。 需要注意的是,如果 flatMap2 修改为 map 操作符时,因为 map 没有返回操作符,此时 flatMap3 则会用 map 的线程即 main 线程

参考 详解 RxJava2的线程切换原理