RxJava介绍3:源码解析

源码解析

create Observable

Observable
    .fromArray(1, 2, 3, 4)
 	.subscribe (object:Observer<Int>{
         override fun onSubscribe(d: Disposable) { }
         override fun onNext(t: Int) { }
         override fun onError(e: Throwable) { }
         override fun onComplete() { }
    })
复制代码

首先看Observable.fromArray(1, 2, 3, 4),进入Observable类

class Observable{
    public static <T> Observable<T> fromArray(T... items) {
        //判空
        ObjectHelper.requireNonNull(items, "items is null");
        //显而易见的,做一些优化,0个元素和1个元素处理逻辑要简单很多。
        if (items.length == 0) {
            return empty();
        } else
        if (items.length == 1) {
            return just(items[0]);
        }
        //1.RxJavaPlugins.onAssembly() 
        //Calls the associated hook function,调用关联的Hook函数。
        //哦,可以在外部提前关联一些log啊什么的,监控什么的。那不影响主流程。
        //后面的分析就都省略这个RxJavaPlugins了。
        //2.创建了ObservableFromArray,所以这个是接下来的核心了。
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
    
    @Override
    public final void subscribe(Observer<? super T> observer) {
       subscribeActual(observer);
    }
}
复制代码

ObservableFromArray代码,朴实无华!
ObservableFromArray继承Observable。子类的subscribeActual()被Observable.subscribe()调用。实际大量的逻辑是发生在这里。分析视角也常在此处。

删减了fusionMode相关代码,此处我们逻辑走不到,后面章节分析。

class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) { this.array = array; }
   
    @Override
    public void subscribeActual(Observer<? super T> s) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
        s.onSubscribe(d);
        d.run();
    }

    static final class FromArrayDisposable<T> implements Disposable {
        final Observer<? super T> actual;
        final T[] array;
        volatile boolean disposed;

        FromArrayDisposable(Observer<? super T> actual, T[] array) {
            this.actual = actual;
            this.array = array;
        }
        void run() {
            T[] a = array;
            int n = a.length;
            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return;
                }
                actual.onNext(value);
            }
             if (!isDisposed()) actual.onComplete();
        }
        @Override
        public void dispose() {
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
    
}
复制代码

关于Disposable,RxJava为我们提供了一个开关,可以用来取消订阅。这像个追踪器一般在代码中传来传去,源码读着读着就认不清这个了。还是先看正向功能。

map

Observable.fromArray(1, 2, 3, 4)
    .map { it * 5 }
 	.subscribe { println(it) }
复制代码

从这里开始,代码似乎变得复杂。因为RxJava有多个操作符,为了复用逻辑。做了很多抽象和封装。
当ObservableMap.subscribeActual()时,source.subscribe(MapObserver(yourObserver)),MapObserver是上游的Observer。同时我们把mapper fuction,it * 5,传给了MapObserver。
调用顺序:
–>FromArrayDisposable.run()
–>MapObserver.onNext(value)
–>yourObserver.onNext(mapper.apply(value))

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> {
    protected final ObservableSource<T> source;
}

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            U v = mapper.apply(t);
            actual.onNext(v);
        }
    }
}
复制代码

BasicFuseableObserver去掉fusionmode的部分,还是简单的。MapObserver(即BasicFuseableObserver),包含上游Disposable和下游Observer。

/**
 * Base class for a fuseable intermediate observer.
 * @param <T> the upstream value type
 * @param <R> the downstream value type
 */
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {

    /** The downstream subscriber. */
    protected final Observer<? super R> actual;

    /** The upstream subscription. */
    protected Disposable s;

    public BasicFuseableObserver(Observer<? super R> actual) {
        this.actual = actual;
    }

    @Override
    public final void onSubscribe(Disposable s) {
            this.s = s;
            actual.onSubscribe(this);     
        }
    }
    @Override
    public void onError(Throwable t) { actual.onError(t); }

    @Override
    public void onComplete() { actual.onComplete();}

    @Override
    public void dispose() {s.dispose();}

    @Override
    public boolean isDisposed() {return s.isDisposed(); }
}
复制代码

可以看到map自身的MapObserver订阅上游Observable。
截屏2021-05-11 下午4.48.42.png

subscribeOn

回忆上面章节中提到的代码

println("in main:${Thread.currentThread()}")
Observable.create<Int> {
        println("in create:${Thread.currentThread()}");
        it.onNext(1) }
	.subscribeOn(Schedulers.newThread())
    .subscribe { println("in next  :${Thread.currentThread()} $it") }

//运行结果
in main:Thread[main,5,main]
in create:Thread[RxNewThreadScheduler-1,5,main]
in next  :Thread[RxNewThreadScheduler-1,5,main] 5
复制代码

Schedulers.newThread()创建NewThreadScheduler。scheduler内容不是此部分重点。NewThreadScheduler.scheduleDirect(Runnable)最终调用ExecutorService.submit(Runnable)。把runnable扔到线程池中执行。
这里的runnable是SubscribeTask。在新线程中执行source.subscribe(SubscribeOnObserver)。如果上游没有相关线程切换的操作。那么整个执行过程从main线程切换到新线程。
整个链式过程等价于ExecutorService.submit{source.subscribe(SubscribeOnObserver)}.
再看SubscribeOnObserver,包含了yourObserver。

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;
        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
        @Override
        public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); }
        @Override
        public void onNext(T t) {actual.onNext(t);}
        @Override
        public void onError(Throwable t) {actual.onError(t);}
        @Override
        public void onComplete() {actual.onComplete();}
    }
    
    final class SubscribeTask implements Runnable {
    	private final SubscribeOnObserver<T> parent;
    	@Override
    	public void run() {source.subscribe(parent);}
	}
}
复制代码

�subscibeOn可以这个理解
截屏2021-05-11 下午8.14.03.png

observeOn

println("in main:${Thread.currentThread()}")
Observable.create<Int> {
        println("in create:${Thread.currentThread()}");
        it.onNext(1) }
	.observeOn(Schedulers.newThread())
    .subscribe { println("in next  :${Thread.currentThread()} $it") }
//运行结果
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next  :Thread[RxNewThreadScheduler-1,5,main] 5
复制代码

我们来看ObservableObserveOn代码, 对照源码,这里面的代码我做了大量的删减。 但不考虑外部取消或者内部异常,确实是在这么执行的。 删掉了fusion的代码,ObservableCreate还用不到。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    //实际运行的线程相关
    final Scheduler scheduler;
    
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        Scheduler.Worker w = scheduler.createWorker();
        //subscribe()仍然在源线程执行哦,而subscribeOn把整个source.subscribe扔进新线程。
        //把Scheduler和yourObserver传给了新的ObserveOnObserver。
		source.subscribe(new ObserveOnObserver<T>(observer, w));
    }

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        SimpleQueue<T> queue;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker) {
            this.actual = actual;
            this.worker = worker;
        }
        @Override
        public void onSubscribe(Disposable s) {
            //SpscLinkedArrayQueue,我只知道是线程安全且无synchronized代码的队列
            queue = new SpscLinkedArrayQueue<T>(bufferSize);
			actual.onSubscribe(this);
        }

        //在Scheduler线程中执行run(),即drainNormal()。我们发现上下游不在同一个线程了。
        //生产者消费者问题来了,这里我们假定生产速度 > 消费速度。
        @Override
        public void onNext(T t) {
            //放入线程共享的队列,生产行为
            queue.offer(t);
            schedule();
        }
        void schedule() {
            //首次调用时,get值为0,同时值+1,值理解为需要处理的次数
            //如果get值如果为0,触发消费行为,否则不触发。
            if (getAndIncrement() == 0) worker.schedule(this);
        }
        @Override
        public void run() { drainNormal(); } 

       //drain,我们要把管道中的水给排干。
        void drainNormal() {
            //能够进入drain,至少有一个数据。
            int missed = 1;
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;
            //初次看到这for-for,也是懵的。实际这是一种处理生产者消费者问题的代码模型。
            for (;;) {
                for (;;) {
                    //1.先看内层循环,q即SpscLinkedArrayQueue,是不阻塞的。
                    //排空一个个数据,没有数据了就跳出。
                    T v = q.poll();
					if (v == null)  break;
                   	a.onNext(v);//yourObserver.onNext
                }
                //2.还记得上面的假定生产速度 > 消费速度。
                //在消费过程中,队列又加入了一些数据,而getAndIncrement()!=0,无法进入消费线程。
                //missed记录了当前内层循环需要处理的请求次数。
                //更新missed值,当前请求次数-上轮处理的请求次数。
                missed = addAndGet(-missed);
                if (missed == 0) break;
                //3.值得注意的是,这个地方请求次数与数据数量是相等的。但这个模型中,并不强制要求。
            }
        }
       ...
    }
}
复制代码

关于observeOn,大胆点讲最终就只有yourObserver运行在新线程中。
截屏2021-05-11 下午8.56.43.png
上述代码中第一次见识到了drain代码。显然, 可以加锁来解决。但这里介绍一种wip技巧(Working-In-Progress)使用CAS(Compare And Set)来解决。RxJava2中,通常使用这种方式做多线程处理。

Scheduler

从上面来看,我们大概知道Scheduler就是把代码场景扔进另外的线程运行。Scheduler通过java线程池管理线程。

核心代码

下面是一段伪代码

public abstract class Scheduler {
    
    public abstract Worker createWorker();

    public Disposable scheduleDirect(@NonNull Runnable run) {
        createWorker().schedule(run);
    }
    public abstract static class Worker implements Disposable {
        ExecutorService  executor;
        public Disposable schedule(@NonNull Runnable run) {
            Task task = new Task(run);
            executor.submit(task)
        }
    }
    //在RxJava中,Task也许叫ScheduledRunnable,也许叫ScheduledDirectTask
    public static class Task implements Callable<Void>,Disposable{
        private Runnable run;
    	@Override
    	public Void call() {run.run();return null;}
        @Override
        public void dispose() {...}
        @Override
        public boolean isDisposed() {...}
    }
}
复制代码

实际的调用是这样的.

scheduler.worker.schedule(Runnable{需要扔进新线程的代码})
executor.submit(Task(runnable))
复制代码

关于Task

在RxJava中,Task也许叫ScheduledRunnable或者ScheduledDirectTask等等。
Task包裹了我们的runnable,同时提供了对runnable的控制。
在ObserveOnObserver中,每次next执行都会产生一个task。而在ObservableSubcribeOn中 source.subcribe,就只有一个task,因为是一次性的啊。
每个Worker中的task是串行执行的。

关于Scheduler

Schedulers提供了适用不同场景的调度器:

  • Schedulers.immediate()

默认,不指定线程

  • Schedulers.newThread():

在一个新线程执行,实际真的只有一个线程。每个worker都有个独立的线程池。

  • Schedulers.computation():

适用CPU计算操作线程,限定了最大线程数量,数量为JVM处理器个数。预先生成固定数量PoolWorker。会出现不同场景共用PoolWorker的情况。

  • Schedulers.io():

适用io操作线程,线程数量并没有上限。有一个CachedWorkerPool,那实际的worker是可以复用的。

  • Schedulers.trampoline():

当前线程执行,不会立即执行,等前一个任务完成。当前任务入队执行。

  • Schedulers.single():

相较于Schedulers.newThread(),single创建的所有worker公用线程池。

  • AndroidSchedulers.mainThread()

在Android主线程执行,内部通过Handler实现,也只能通过Handler。关于RxAndroid

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享