Rxjava源码分析

Rxjava的简单使用

//被观察者
Observable.create(object : ObservableOnSubscribe<Int> {
        override fun subscribe(emitter: ObservableEmitter<Int>) {
            emitter.onNext(1)
            emitter.onNext(2)
            emitter.onNext(3)
            emitter.onComplete()
        }
    }).subscribeOn(Schedulers.io()) //运行在Io线程
        .observeOn(AndroidSchedulers.mainThread())// 切换观察者回调到主线程
        .subscribe(object : Observer<Int> {//注册观察者
            override fun onSubscribe(d: Disposable) {

            }

            override fun onNext(t: Int) {

            }

            override fun onError(e: Throwable) {

            }

            override fun onComplete() {

            }

        })
复制代码

我们就先从Rxjava的整体进行分析 。

源码分析

我们先看看 Observable.create() subscribeOn() observeOn() subscribe()方法 。

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    //这里ObservableCreate
    return RxJavaPlugins.onAssembly( 
        new ObservableCreate<T>(source)); //对 source进行一个简单地封装  钩子函数 。
}

//subscribeOn()   
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    //这里ObservableSubscribeOn   ,注意这里的this是上一个ObservableCreate
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
    

//observeOn()  
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    //这里 ObservableObserveOn  ,注意这里的this是上一个ObservableSubscribeOn
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

//subscribe() 
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        //这里的this 是ObservableObserveOn
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
        //这里我们不妨先猜测是在注册Observer
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}
复制代码

看完了一遍调用的流程, 现在有3个关键的类在我们的眼里 。ObservableCreate ObservableSubscribeOn ObservableObserveOn,可以看到他们一层一层的调用下来 ,之前都是在配置参数 ,最后一个subscribe()方法中是真的做事情的地方 。

我们从刚才的 subscribeActual(observer);看起

/**
 * Operator implementations (both source and intermediate) should implement this method that
 * performs the necessary business logic and handles the incoming {@link Observer}s.
 * <p>There is no need to call any of the plugin hooks on the current {@code Observable} instance or
 * the {@code Observer}; all hooks and basic safeguards have been
 * applied by {@link #subscribe(Observer)} before this method gets called.
 * @param observer the incoming Observer, never null
 */
protected abstract void subscribeActual(Observer<? super T> observer);
复制代码

是个抽象方法(可以想到这里开始一层层的回去) 。

我们先看ObservableObserveOn中的subscribeActual

@Override
protected void subscribeActual(Observer<? super T> observer) {
    //注意这里的source  就是上一个调用的对象 也就是ObservableSubscribeOn
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
复制代码

看看source.subscribe(observer);干了什么


//是个接口
public interface ObservableSource<T> {

    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}



//又回来了
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
        
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

复制代码

hhh, 又回来了 ,依然走subscribeActual(observer);函数。

ObservableSubscribeOn

@Override
public void subscribeActual(final Observer<? super T> observer) {
    //这里对Observer进行了一个包装
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
    
    //回调onSubscribe方法
    observer.onSubscribe(parent);
    //这里注意一下 ,SubscribeTask
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

//是个runnable ? 哦? 子线程切换 
final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        //这里执行了最后一个Observable的subscribeActual
        source.subscribe(parent);
    }
}


复制代码

我们可以看到,runnable中执行了最后一个ObservablesubscribeActual ,可想而知 最后的三个结果的回调都在那里了。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //发送了创建发射器 ,这就是真实的 onSubscribe(d: Disposable)回调了
    observer.onSubscribe(parent);

    try {
        //这里执行了发射器
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
复制代码

可以看看发射器内部 CreateEmitter

static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {

    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            //这里
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public boolean tryOnError(Throwable t) {
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }
        if (!isDisposed()) {
            try {
                //这里
                observer.onError(t);
            } finally {
                dispose();
            }
            return true;
        }
        return false;
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                //这里
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }
复制代码

回调什么的就实现了 。

个人总结

Rxjava总体上来说是一个非常好的框架 ,虽然学习成本较大 ,但是内部的设计模式和思想是值得借鉴的 。多看源码 ,多学习 。希望大家都可以成长为自己心目中的那个人 。

由于小弟是第一次写文章 ,写的不好的地方还请大家指正和包含 。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享