总览
RxJava 里,我们使用 AndroidSchedulers,Schdulers API 来指定操作符所执行的线程。核心实现是采用线程池,管理线程池资源,快速响应各种 Observable 操作符。Schedulers 提供了一系列的静态工厂方法来获取 Scheduler。常用的如,
- AndroidSchedulers.mainThread()
- Schedulers.computation()
- Schedulers.io()
- Schedulers.newThread()
- Schedulers.single()
源码分析
单例管理
查看 Schedules 源码,我们会发现,他们各自对应不同的 Scheduler 实现。Schedules 类内部定义了多个静态的内部类,
内部类里分别持有一个静态的 Scheduler 实现。
static final Scheduler SINGLE;
static final Scheduler COMPUTATION;
static final Scheduler IO;
static final Scheduler TRAMPOLINE;
static final Scheduler NEW_THREAD;
/** 对应 Schedulers.single() */
static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}
/** 对应 Schedulers.computation() */
static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}
/** 对应 Schedulers.single() */
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
/** 对应 Schedulers.newThread() */
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
然后通过静态方法,将单例返回。这里 RxJavaPlugins.onSingleScheduler 等类似的方法,作用是给用户开放 API,自定义 hook 函数,可用于监控线程的使用
public static Scheduler single() {
return RxJavaPlugins.onSingleScheduler(SINGLE);
}
那么,我们来看看 RxJava 究竟是怎么管理这些线程的。
Schedules.newThread() & NewThreadScheduler
首先是 NewThreadScheduler,
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
...
static {
...
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
构造方法很简单,将静态单例 RxThreadFactory 赋值给 ThreadFactory 引用。 通过之前的学习,我们知道,在 Observable 真正执行的时候会调用到 createWorker() ,创建 Worker 实例。 而这些 Worker 正是管理线程池的细节。
来看看 NewThreadWorker 的构造方法,创建了一个 ScheduledExecutorService 对象。 我们知道,ScheduledExecutorService 是用来管理线程池的接口,因此 NewThreadWorker 组合了一个 ScheduledExecutorService 管理线程池,同时又做了一些扩展。
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
// 创建的细节在 SchedulerPoolFactory 内部
executor = SchedulerPoolFactory.create(threadFactory);
}
@Override
public void dispose() {
// dispose 提供关闭线程池 executor
if (!disposed) {
disposed = true;
executor.shutdownNow();
}
}
public void shutdown() {
// dispose 提供关闭线程池 executor
if (!disposed) {
disposed = true;
executor.shutdown();
}
}
那么,我们来看 SchedulerPoolFactory,创建了核心数为 1 的调度线程池服务。
public static ScheduledExecutorService create(ThreadFactory factory) {
// 创建了一个核心数为 1 的线程池服务 ScheduledExecutorService
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
// 记录该线程池,用于清理释放内存
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
这里 ThreadFactory 被当做参数传递了,ThreadFactory是一个超级简单的接口,
public interface ThreadFactory {
Thread newThread(Runnable r);
}
作用是当线程池服务创建线程的时候会调用该回调。 因此,这个 ThreadFactory 的实现就是创建线程的代码,到这里不难看出,在NewThreadScheduler 构造方法里,RxThreadFactory 就是 ThreadFactory 的实现。
RxThreadFactory 代码不是很长就直接贴出来了,分析就看注释了
/**
* RxThreadFactory 继承 AtomicLong(原子操作,用于计数),主要提供两个功能
* 1. 计算创建了多少多线线程
* 2. 给这些线程命名为:prefix-count
*/
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
private static final long serialVersionUID = -7789753024099756196L;
final String prefix;
final int priority;
final boolean nonBlocking;
public RxThreadFactory(String prefix) {
this(prefix, Thread.NORM_PRIORITY, false);
}
public RxThreadFactory(String prefix, int priority) {
this(prefix, priority, false);
}
public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
this.prefix = prefix;
this.priority = priority;
this.nonBlocking = nonBlocking;
}
@Override
public Thread newThread(Runnable r) {
// 线程命名,格式: prefix-count
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
// new Thread() 在这里
// 根据 nonBlocking 创建是否实现 NonBlockingThread 接口的线程
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
t.setPriority(priority);
// 该线程其实是守护线程
t.setDaemon(true);
return t;
}
@Override
public String toString() {
return "RxThreadFactory[" + prefix + "]";
}
static final class RxCustomThread extends Thread implements NonBlockingThread {
RxCustomThread(Runnable run, String name) {
super(run, name);
}
}
}
小结一下,
- NewThreadScheduler 持有创建线程的 ThreadFactory 实现
- createWorker 创建 NewThreadWorker,NewThreadWorker 管理着线程池服务的生命周期
- SchedulerPoolFactory.create 方法里调用 Executors.newScheduledThreadPool(1, factory) 创建核心数为 1 的可调度线程池服务
- 创建线程的实现在最初 NewThreadScheduler 提供的 RxThreadFactory
- RxThreadFactory 创建守护线程,通过实现 NonBlockingThread 接口标识是否用于阻塞
至此,指定的线程是 Schedules.newThread()时,每次 Observable 执行时,NewThreadScheduler 都会创建新的线程(守护线程),在核心数为 1 的可调度线程池中执行和管理
Schedules.single() & SingleScheduler
IoScheduler 和 NewThreadScheduler 实现上大部分都是类似的,不同的点在于线程池服务的创建位置。
public final class SingleScheduler extends Scheduler {
final ThreadFactory threadFactory;
final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();
public SingleScheduler() {
this(SINGLE_THREAD_FACTORY);
}
public SingleScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
executor.lazySet(createExecutor(threadFactory));
}
static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
return SchedulerPoolFactory.create(threadFactory);
}
...
}
可以发现,在 SingleScheduler 里完成了 executor 的创建,而 NewThreadScheduler 是在 NewThreadWorker 创建 executor 的。同时
创建的方式二者是一样的,都是通过 SchedulerPoolFactory.create(threadFactory) 完成。
我们知道 SingleScheduler 在 Schedulers 里是保证单例的,因此,SingleScheduler 里的 executor 相当于也是单例的,这样就不会多次创建线程池了,浪费资源了。
Schedules.io() & IoScheduler
那么,所谓的 IoScheduler 又是什么呢? 构造方法与 NewThreadScheduler 不同的是初始化了一个 AtomicReference
public final class IoScheduler extends Scheduler {
...
final ThreadFactory threadFactory;
final AtomicReference<CachedWorkerPool> pool;
static final CachedWorkerPool NONE;
private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
static final RxThreadFactory WORKER_THREAD_FACTORY;
private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor";
static final RxThreadFactory EVICTOR_THREAD_FACTORY;
static {
...
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
// NONE 使用的是 WORKER_THREAD_FACTORY
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
}
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
// 传入 NONE,pool 里的 ThreadFactory 也是WORKER_THREAD_FACTORY
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
...
}
所以 IoScheduler 创建的 Worker 是 EventLoopWorker,那么我们可以猜测IoScheduler 的线程池管理要会在 CachedWorkerPool 里面,要么就是在 EventLoopWorker 里面。 其实看命名就能知道大概,十有八九肯定在 CachedWorkerPool 这个内部类中。
那么我们就来看看 CachedWorkerPool,
// CachedWorkerPool 实现了 Runnable 接口
static final class CachedWorkerPool implements Runnable {
...
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
private final ThreadFactory threadFactory;
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
// 并发链表队列管理 Worker
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
// CompositeDisposable 管理所有的 Disposable 对象
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
// 创建核心数为 1 的可调度线程池服务
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
// 线程池服务马上执行 this,CachedWorkerPool 这个Runnable
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
// 更新超时时间,同时加入超时队列中等待复用
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
// 添加到队列
expiringWorkerQueue.offer(threadWorker);
}
...
}
expiringWorkerQueue 队列管理着 ThreadWorker,evictor 调度器线程池服务的核心数为 1,并且已经立即运行了 CachedWorkerPool,那么其他的线程要使用 evictor 是不可能了。因此,线程池的调度管理并不在 CachedWorkerPool 这里。
那么,应该就是在 EventLoopWorker 里头了。但是一看构造方法会发现,这里也没有初始化线程池,emmm…
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
public void dispose() {
// EventLoopWorker 执行完任务 disponse 掉后,
// pool 会更新复用 threadWorker 的超时时间
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
...
}
但是奇怪的地方在,EventLoopWorker 本身就是一个 Worker,内部又定义了一个ThreadWorker。从设计模式来看,这里很像静态代理模式或者装饰者模式,但不管哪个,真实的 Worker 是通过 pool.get() 所拿到的那个,即
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
// 从队列中轮训出来已有的 ThreadWorker 返回
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// 没有缓存的 worker时,创建新的 ThreadWorker
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
这里的 ThreadWorker,其实就是 NewThreadWorker。前两个小节可以知道 NewThreadWorker 里自行管理了一个核心数为1的线程池服务
static final class ThreadWorker extends NewThreadWorker {
// 超时时间
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
...
}
因此,尽管我们每次执行操作符时都会创建 EventLoopWorker,但实际会从缓存中复用已有的 ThreadWorker,即复用已有的线程池服务。功能上跟 Schedulers.io() 有点像,不同的是在 CachedWorkerPool 里会检测 ThreadWorker 超时时间
private static final long KEEP_ALIVE_TIME = 60;
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
// ThreadWorker 已超时,移除
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
// 队列是有序的,最先超时的肯定在第一个,当找到没有
// 超时的任务后,后面的也不会超时,break
break;
}
}
}
}
并且,默认的超时时间是 60s。
小结一下,
- IoScheduler 初始化 ThreadWorker 管理者 CachedWorkerPool,ThreadWorker 的超时时间为 60s
- IoScheduler 是单例,故 CachedWorkerPool 也是单例对象
- EventLoopWorker 是 ThreadWorker 的代理对象。在任务执行完时,把ThreadWorker 添加到超时队列中等待复用
- ThreadWorker 内部管理着核心数为 1 的调度线程池服务
因此,Schedulers.io() 比其他的调度器更适合时间短的密集型任务,如常见的IO操作
Schedules.computation() & ComputationScheduler
看了那么多 Scheduler,相信 ComputationScheduler 也不会太复杂
public final class ComputationScheduler extends Scheduler {
/** 没有任何线程时的 pool */
static final FixedSchedulerPool NONE;
static final RxThreadFactory THREAD_FACTORY;
/** 计算调度线程的最大数量. */
static final int MAX_THREADS;
static final PoolWorker SHUTDOWN_WORKER;
final ThreadFactory threadFactory;
final AtomicReference<FixedSchedulerPool> pool;
...
static {
// 根据 CPU 核数确定最大线程数
MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
SHUTDOWN_WORKER = new PoolWorker(new RxThreadFactory("RxComputationShutdown"));
SHUTDOWN_WORKER.dispose();
...
// 这里依然是 RxThreadFactory,PREFIX 名字不一样
// 以及 noBlocking 参数为 true,阻塞超出容量的线程
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority, true);
// FixedSchedulerPool 管理 worker 缓存
NONE = new FixedSchedulerPool(0, THREAD_FACTORY);
NONE.shutdown();
}
public ComputationScheduler() {
this(THREAD_FACTORY);
}
public ComputationScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
// 初始化 NULL 的 pool
this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
start();
}
public void start() {
FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS, threadFactory);
// 更新新的 pool
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
public Worker createWorker() {
// 这里创建的 Worker 依然是叫 EventLoopWorker,可以猜测跟 IoScheduler 里的 EventLoopWorker 作用类似
return new EventLoopWorker(pool.get().getEventLoop());
}
}
到这里跟 IoScheduler 非常相似。 基本可以知道是怎样的一个逻辑了,这里 EventLoopWorker 的参数是从 pool.get().getEventLoop() 方法拿到的,那么我们来看看这个 pool FixedSchedulerPool
static final class FixedSchedulerPool {
final int cores;
final PoolWorker[] eventLoops;
long n;
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
this.cores = maxThreads;
this.eventLoops = new PoolWorker[maxThreads];
for (int i = 0; i < maxThreads; i++) {
this.eventLoops[i] = new PoolWorker(threadFactory);
}
}
// 拿到 PoolWorker 的副本
public PoolWorker getEventLoop() {
int c = cores;
if (c == 0) {
return SHUTDOWN_WORKER;
}
// simple round robin, improvements to come
return eventLoops[(int)(n++ % c)];
}
可以看到这里的构造也是很简单,初始化了核心线程数量 cores,以及初始化数量为cores 的 PoolWorker 数组。
那么这个 PoolWorker 又是什么?大佬可能已经猜出来了,绝壁就是用来复用的 Worker 缓存。果然,大佬猜的没错
static final class PoolWorker extends NewThreadWorker {
PoolWorker(ThreadFactory threadFactory) {
super(threadFactory);
}
}
在工作机制上跟 IoScheduler 基本一致,只是副本的数量不一样。IoScheduler 始终只有一个线程池,而 ComputationScheduler 是根据 CPU 核心数量确定线程和线程池数量,相比更加适合做 CPU 密集的运算。
但是由于是静态单例的,而实际开发中相对比较少用或者基本不用,那么ComputationScheduler 初始化的资源就有点浪费了,没有关系,RxJava其实是有开发释放资源的 API 的。
ComputationScheduler里,调用Scheduler 的 shutdown() 即可
@Override
public void shutdown() {
for (;;) {
FixedSchedulerPool curr = pool.get();
if (curr == NONE) {
return;
}
if (pool.compareAndSet(curr, NONE)) {
curr.shutdown();
return;
}
}
}
FixedSchedulerPool 里
public void shutdown() {
for (PoolWorker w : eventLoops) {
w.dispose();
}
}
Schedulers.from()
RxJava 还很人性化的提供了 from API,让开发者可以自己定义 Scheduler 来满足自己的设计。 如何自定义,相信通过以上的学习,大家都能完成自己的实现了。
总结
简单做一下总结,
- NewThreadScheduler
- SingleScheduler
- IoScheduler
- ComputationScheduler
相同点:
- 这四个 Scheduler,线程池的管理都离不开 NewThreadWorker,都是维护这一个或多个 NewThreadWorker 副本,而 NewThreadWorker 里都是创建核心数为1 的 ScheduledExecutorService
不同点:
NewThreadScheduler 没有限制,每次都会创建新的 NewThreadWorker 来执行任务,因此性能相对会低一点,好处是用完就会移除,内存停留时间短
SingleScheduler 使用单例维护了唯一的 ScheduledExecutorService 对象
- IoScheduler 里的 CachedWorkerPool 使用队列 ConcurrentLinkedQueue 维护 ThreadWorker,超时时间为 60s
- ComputationScheduler 里 FixedSchedulerPool 维护了CPU核心数量的 ThreadWorker 副本