Rxjava的简单使用
//被观察者
Observable.create(object : ObservableOnSubscribe<Int> {
override fun subscribe(emitter: ObservableEmitter<Int>) {
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onComplete()
}
}).subscribeOn(Schedulers.io()) //运行在Io线程
.observeOn(AndroidSchedulers.mainThread())// 切换观察者回调到主线程
.subscribe(object : Observer<Int> {//注册观察者
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: Int) {
}
override fun onError(e: Throwable) {
}
override fun onComplete() {
}
})
复制代码
我们就先从Rxjava的整体进行分析 。
源码分析
我们先看看 Observable.create() subscribeOn() observeOn() subscribe()
方法 。
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//这里ObservableCreate
return RxJavaPlugins.onAssembly(
new ObservableCreate<T>(source)); //对 source进行一个简单地封装 钩子函数 。
}
//subscribeOn()
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//这里ObservableSubscribeOn ,注意这里的this是上一个ObservableCreate
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
//observeOn()
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//这里 ObservableObserveOn ,注意这里的this是上一个ObservableSubscribeOn
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
//subscribe()
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//这里的this 是ObservableObserveOn
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//这里我们不妨先猜测是在注册Observer
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
复制代码
看完了一遍调用的流程, 现在有3个关键的类在我们的眼里 。ObservableCreate
ObservableSubscribeOn
ObservableObserveOn
,可以看到他们一层一层的调用下来 ,之前都是在配置参数 ,最后一个subscribe()
方法中是真的做事情的地方 。
我们从刚才的 subscribeActual(observer);
看起
/**
* Operator implementations (both source and intermediate) should implement this method that
* performs the necessary business logic and handles the incoming {@link Observer}s.
* <p>There is no need to call any of the plugin hooks on the current {@code Observable} instance or
* the {@code Observer}; all hooks and basic safeguards have been
* applied by {@link #subscribe(Observer)} before this method gets called.
* @param observer the incoming Observer, never null
*/
protected abstract void subscribeActual(Observer<? super T> observer);
复制代码
是个抽象方法(可以想到这里开始一层层的回去) 。
我们先看ObservableObserveOn
中的subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
//注意这里的source 就是上一个调用的对象 也就是ObservableSubscribeOn
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
复制代码
看看source.subscribe(observer);
干了什么
//是个接口
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(@NonNull Observer<? super T> observer);
}
//又回来了
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
复制代码
hhh, 又回来了 ,依然走subscribeActual(observer);
函数。
ObservableSubscribeOn
@Override
public void subscribeActual(final Observer<? super T> observer) {
//这里对Observer进行了一个包装
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
//回调onSubscribe方法
observer.onSubscribe(parent);
//这里注意一下 ,SubscribeTask
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
//是个runnable ? 哦? 子线程切换
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//这里执行了最后一个Observable的subscribeActual
source.subscribe(parent);
}
}
复制代码
我们可以看到,runnable
中执行了最后一个Observable
的subscribeActual
,可想而知 最后的三个结果的回调都在那里了。
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//发送了创建发射器 ,这就是真实的 onSubscribe(d: Disposable)回调了
observer.onSubscribe(parent);
try {
//这里执行了发射器
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
复制代码
可以看看发射器内部 CreateEmitter
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//这里
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
//这里
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//这里
observer.onComplete();
} finally {
dispose();
}
}
}
复制代码
回调什么的就实现了 。
个人总结
Rxjava总体上来说是一个非常好的框架 ,虽然学习成本较大 ,但是内部的设计模式和思想是值得借鉴的 。多看源码 ,多学习 。希望大家都可以成长为自己心目中的那个人 。
由于小弟是第一次写文章 ,写的不好的地方还请大家指正和包含 。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END