源码解析
create Observable
Observable
.fromArray(1, 2, 3, 4)
.subscribe (object:Observer<Int>{
override fun onSubscribe(d: Disposable) { }
override fun onNext(t: Int) { }
override fun onError(e: Throwable) { }
override fun onComplete() { }
})
复制代码
首先看Observable.fromArray(1, 2, 3, 4)
,进入Observable类
class Observable{
public static <T> Observable<T> fromArray(T... items) {
//判空
ObjectHelper.requireNonNull(items, "items is null");
//显而易见的,做一些优化,0个元素和1个元素处理逻辑要简单很多。
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
//1.RxJavaPlugins.onAssembly()
//Calls the associated hook function,调用关联的Hook函数。
//哦,可以在外部提前关联一些log啊什么的,监控什么的。那不影响主流程。
//后面的分析就都省略这个RxJavaPlugins了。
//2.创建了ObservableFromArray,所以这个是接下来的核心了。
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
@Override
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
}
复制代码
看ObservableFromArray
代码,朴实无华!
ObservableFromArray继承Observable。子类的subscribeActual()被Observable.subscribe()调用。实际大量的逻辑是发生在这里。分析视角也常在此处。
删减了fusionMode相关代码,此处我们逻辑走不到,后面章节分析。
class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) { this.array = array; }
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
d.run();
}
static final class FromArrayDisposable<T> implements Disposable {
final Observer<? super T> actual;
final T[] array;
volatile boolean disposed;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.actual = actual;
this.array = array;
}
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) actual.onComplete();
}
@Override
public void dispose() {
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
复制代码
关于Disposable,RxJava为我们提供了一个开关,可以用来取消订阅。这像个追踪器一般在代码中传来传去,源码读着读着就认不清这个了。还是先看正向功能。
map
Observable.fromArray(1, 2, 3, 4)
.map { it * 5 }
.subscribe { println(it) }
复制代码
从这里开始,代码似乎变得复杂。因为RxJava有多个操作符,为了复用逻辑。做了很多抽象和封装。
当ObservableMap.subscribeActual()时,source.subscribe(MapObserver(yourObserver)),MapObserver是上游的Observer。同时我们把mapper fuction,it * 5
,传给了MapObserver。
调用顺序:
–>FromArrayDisposable.run()
–>MapObserver.onNext(value)
–>yourObserver.onNext(mapper.apply(value))
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> {
protected final ObservableSource<T> source;
}
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
U v = mapper.apply(t);
actual.onNext(v);
}
}
}
复制代码
BasicFuseableObserver去掉fusionmode的部分,还是简单的。MapObserver(即BasicFuseableObserver),包含上游Disposable和下游Observer。
/**
* Base class for a fuseable intermediate observer.
* @param <T> the upstream value type
* @param <R> the downstream value type
*/
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
/** The downstream subscriber. */
protected final Observer<? super R> actual;
/** The upstream subscription. */
protected Disposable s;
public BasicFuseableObserver(Observer<? super R> actual) {
this.actual = actual;
}
@Override
public final void onSubscribe(Disposable s) {
this.s = s;
actual.onSubscribe(this);
}
}
@Override
public void onError(Throwable t) { actual.onError(t); }
@Override
public void onComplete() { actual.onComplete();}
@Override
public void dispose() {s.dispose();}
@Override
public boolean isDisposed() {return s.isDisposed(); }
}
复制代码
可以看到map自身的MapObserver订阅上游Observable。
subscribeOn
回忆上面章节中提到的代码
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
.subscribeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it") }
//运行结果
in main:Thread[main,5,main]
in create:Thread[RxNewThreadScheduler-1,5,main]
in next :Thread[RxNewThreadScheduler-1,5,main] 5
复制代码
Schedulers.newThread()创建NewThreadScheduler。scheduler内容不是此部分重点。NewThreadScheduler.scheduleDirect(Runnable)最终调用ExecutorService.submit(Runnable)。把runnable扔到线程池中执行。
这里的runnable是SubscribeTask。在新线程中执行source.subscribe(SubscribeOnObserver)
。如果上游没有相关线程切换的操作。那么整个执行过程从main线程切换到新线程。
整个链式过程等价于ExecutorService.submit{source.subscribe(SubscribeOnObserver)}
.
再看SubscribeOnObserver,包含了yourObserver。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); }
@Override
public void onNext(T t) {actual.onNext(t);}
@Override
public void onError(Throwable t) {actual.onError(t);}
@Override
public void onComplete() {actual.onComplete();}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
@Override
public void run() {source.subscribe(parent);}
}
}
复制代码
observeOn
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
.observeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it") }
//运行结果
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next :Thread[RxNewThreadScheduler-1,5,main] 5
复制代码
我们来看ObservableObserveOn
代码, 对照源码,这里面的代码我做了大量的删减。 但不考虑外部取消或者内部异常,确实是在这么执行的。 删掉了fusion的代码,ObservableCreate还用不到。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
//实际运行的线程相关
final Scheduler scheduler;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
Scheduler.Worker w = scheduler.createWorker();
//subscribe()仍然在源线程执行哦,而subscribeOn把整个source.subscribe扔进新线程。
//把Scheduler和yourObserver传给了新的ObserveOnObserver。
source.subscribe(new ObserveOnObserver<T>(observer, w));
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
final Observer<? super T> actual;
final Scheduler.Worker worker;
SimpleQueue<T> queue;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker) {
this.actual = actual;
this.worker = worker;
}
@Override
public void onSubscribe(Disposable s) {
//SpscLinkedArrayQueue,我只知道是线程安全且无synchronized代码的队列
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
//在Scheduler线程中执行run(),即drainNormal()。我们发现上下游不在同一个线程了。
//生产者消费者问题来了,这里我们假定生产速度 > 消费速度。
@Override
public void onNext(T t) {
//放入线程共享的队列,生产行为
queue.offer(t);
schedule();
}
void schedule() {
//首次调用时,get值为0,同时值+1,值理解为需要处理的次数
//如果get值如果为0,触发消费行为,否则不触发。
if (getAndIncrement() == 0) worker.schedule(this);
}
@Override
public void run() { drainNormal(); }
//drain,我们要把管道中的水给排干。
void drainNormal() {
//能够进入drain,至少有一个数据。
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
//初次看到这for-for,也是懵的。实际这是一种处理生产者消费者问题的代码模型。
for (;;) {
for (;;) {
//1.先看内层循环,q即SpscLinkedArrayQueue,是不阻塞的。
//排空一个个数据,没有数据了就跳出。
T v = q.poll();
if (v == null) break;
a.onNext(v);//yourObserver.onNext
}
//2.还记得上面的假定生产速度 > 消费速度。
//在消费过程中,队列又加入了一些数据,而getAndIncrement()!=0,无法进入消费线程。
//missed记录了当前内层循环需要处理的请求次数。
//更新missed值,当前请求次数-上轮处理的请求次数。
missed = addAndGet(-missed);
if (missed == 0) break;
//3.值得注意的是,这个地方请求次数与数据数量是相等的。但这个模型中,并不强制要求。
}
}
...
}
}
复制代码
关于observeOn,大胆点讲最终就只有yourObserver运行在新线程中。
上述代码中第一次见识到了drain代码。显然, 可以加锁来解决。但这里介绍一种wip技巧(Working-In-Progress)使用CAS(Compare And Set)来解决。RxJava2中,通常使用这种方式做多线程处理。
Scheduler
从上面来看,我们大概知道Scheduler就是把代码场景扔进另外的线程运行。Scheduler通过java线程池管理线程。
核心代码
下面是一段伪代码
public abstract class Scheduler {
public abstract Worker createWorker();
public Disposable scheduleDirect(@NonNull Runnable run) {
createWorker().schedule(run);
}
public abstract static class Worker implements Disposable {
ExecutorService executor;
public Disposable schedule(@NonNull Runnable run) {
Task task = new Task(run);
executor.submit(task)
}
}
//在RxJava中,Task也许叫ScheduledRunnable,也许叫ScheduledDirectTask
public static class Task implements Callable<Void>,Disposable{
private Runnable run;
@Override
public Void call() {run.run();return null;}
@Override
public void dispose() {...}
@Override
public boolean isDisposed() {...}
}
}
复制代码
实际的调用是这样的.
scheduler.worker.schedule(Runnable{需要扔进新线程的代码})
executor.submit(Task(runnable))
复制代码
关于Task
在RxJava中,Task也许叫ScheduledRunnable或者ScheduledDirectTask等等。
Task包裹了我们的runnable,同时提供了对runnable的控制。
在ObserveOnObserver中,每次next执行都会产生一个task。而在ObservableSubcribeOn中 source.subcribe,就只有一个task,因为是一次性的啊。
每个Worker中的task是串行执行的。
关于Scheduler
Schedulers提供了适用不同场景的调度器:
- Schedulers.immediate()
默认,不指定线程
- Schedulers.newThread():
在一个新线程执行,实际真的只有一个线程。每个worker都有个独立的线程池。
- Schedulers.computation():
适用CPU计算操作线程,限定了最大线程数量,数量为JVM处理器个数。预先生成固定数量PoolWorker。会出现不同场景共用PoolWorker的情况。
- Schedulers.io():
适用io操作线程,线程数量并没有上限。有一个CachedWorkerPool,那实际的worker是可以复用的。
- Schedulers.trampoline():
当前线程执行,不会立即执行,等前一个任务完成。当前任务入队执行。
- Schedulers.single():
相较于Schedulers.newThread(),single创建的所有worker公用线程池。
- AndroidSchedulers.mainThread()
在Android主线程执行,内部通过Handler实现,也只能通过Handler。关于RxAndroid。