1 前言
Rxjava是什么?套用官网的解释:
RxJava是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
复制代码
这句好像不太明白;那么简单直白的说
- 用同步的逻辑,来进行异步编程;使逻辑变得简单明了
- 对数据流或者事件流进行各种操作;丰富的操作过程,易于实现更强大的逻辑
如果你只是解决异步编程回调过多或者切换线程麻烦,用Rxjava,我觉得是大材小用;jdk 8提供了CompletableFuture这个类解决异步编程问题,那么它们有什么区别呢
- 适用场景:CompletableFuture是阶段性、一次性流,而Rxjava是不间断的流,可以不是阶段的
- 执行的角度:CompletableFuture源头以及中间过程中流任务立刻会执行,Rxjava只有在具体有消费者/观察者/订阅者时才会进行处理(可以和CompleableFuture一样,但这个没有用过,叫做热启动?热流?)
- 功能完善性:Rxjava具有相对强大的功能,但CompletableFuture大部分可以通过自己定义的提供者、消费者、转换、组合等手段进行实现
- 使用难度:均易上手,但Rxjava使用难度较大
就是由于Rxjava功能封装的已经相当强大,所以让人爱;而其使用好难度高,而又让人弃之;作者在这篇文章内,会从以下几个方面着手
- Rxjava中的概念抽象
- Rxjava使用详情
- Rxjava的一些原理思想
讲解时Rxjava版本为
io.reactivex.rxjava3:rxjava:3.0.8
复制代码
2 Rxjava基本概念
2.1 观察者模式
普通的观察者模式:观察者(Observer)需要在被观察者(Observable)变化的一顺间做出反应。而两者通过注册(Register)或者订阅(Subscrible)的方式进行绑定。
而Rxjava,虽然也是观察者模式,但是实现方式不同,大家都称为扩展的观察者模式(定义嘛,思想就是这个意思了);不同点如下:
- Observer与Observable是通过subscrible()来达成订阅关系
- 不仅存在内容回调,也存在处理时逻辑结果回调;也就是存在完成态方法回调:onCompleted()、onError()
2.2 Rxjava中抽象概念
- Observable 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者
- Observer 观察者对象,监听Observable发射的数据并做出响应,Subscriber是它的一个特殊实现
- emitter 发射器,是bservable在数据产生时,最常用的一种桥接手段;桥接真实数据产生和向观察者发射数据的
- Disposable:流传递控制器,观察者通知可观察对象,是否停止数据监听
- Operators:数据操作,对数据进行创建、过滤、转换、组合、映射等处理
- Scheduler: 线程切换器;对数据当前节点前、节点后操作执行线程进行变换控制
3 使用
有以下四个步骤
- 创建源头可观察者
- 中间操作过程
- 线程切换
- 进行订阅
例子中均是以Observable-Observer来做说明的,其实还有Single、Maybe、Flowble、Completable分类,但他们操作大致相同,只是处理场景略有不同
3.1 创建源头可观察者
源头可观察者创建可以使用以下方法:
- just系列:参数中即为数据,最多传入10个数据;
- defer:参数为一个提供流源头的提供者,延迟提供源头;如果一个源头热启动,通过此方式,可以进行延迟
- empty:创建一个不发射任何数据,只有正常完成的状态回调
- never:创建一个不发射数据,无任何回调
- error:创建一个不发射数据,只有异常完成状态回调
- from系列:可以来源线程线程相关接口(Runnable、Future、Callable等)以及类、内容提供者,还可以是Rxjava中的一些源头
- Interval系列:周期性发射Long类型数据,从0开始
- timer系列:定时发送数据,仅仅发送一个数据0后即自动结束
- create系列:通过发射器来桥接实现;发射器的相应方法和源头对象一致
- range系列:发射一个整数范围的数据
- generate系列:提供数据源头,并进行初始变换
- concat/combine/amb/using/wrap/switchOnNext/sequenceEqual/merge:对其它可观察者的变化、组合等操作处理而形成的新的可观察者
例子
Observable.create<Int> {
try {
it.onNext(10)
it.onComplete()
} catch (e : Throwable) {
it.onError(e)
}
}
复制代码
使用发射器时,注意完成态的设置;不设置也没有关系,只要你的观察者不需要;但这不是一个正确规范的行为
3.2 中间操作
这个过程是体现Rxjava的强大功能之处,很多的操作,使得数据或者逻辑的状态精彩纷呈;调用者,需要首先了解Rxjava有哪些变化,结合具体场景进行不同的自我抽象,进而达到易于理解的使用逻辑
这些操作方法可以见官方文档
比创建的方法实在是多太多,这也能反馈其功能的多样以及强大之处;其还可以自定义操作
自定义操作符
涉及到compose、lift方法以及XxxOperator、XxxTransformer接口
ObservableOperator实现
class IntOperator : ObservableOperator<Int, Int> {
override fun apply(observer: Observer<in Int>?): Observer<in Int> = object : Observer<Int> {
override fun onComplete() {
observer?.onComplete()
}
override fun onSubscribe(d: Disposable?) {
observer?.onSubscribe(d)
}
override fun onNext(t: Int?) {
observer?.onNext((t ?: 0) * 10)
}
override fun onError(e: Throwable?) {
observer?.onError(e)
}
}
}
复制代码
相当于一个map,Rxjava中实现的操纵变化,和这个操作是一样的思维,观察者被装饰成一个新的观察者
ObservableTransformer实现
class IntTransformer : ObservableTransformer<Int, Int> {
override fun apply(upstream: Observable<Int>?): ObservableSource<Int> = Observable.just(20)
}
复制代码
这完全就是整了一个流出现;
自定义使用
Observable.just(10).lift(IntOperator()).compose(IntTransformer()).subscribe {
println("over $it")
}
复制代码
从这来看ObservableOperator针对的是重定义操作,而ObservableTransformer是可以重定义被观察者;层次不同
3.3 线程切换
总的来说,切换分为两种情况
- 传递的整个过程线程,作用于整个执行过程,直至其它切换出现
- 数据监听的源头线程,作用于之后的监听操作
对应方法为
- subscribeOn:截止到下次调用,这些过程均在这次指定线程中执行;如果之前执行未指定执行线程,则此次调用之前也在此次指定线程中执行;
- observeOn:之后观察者回调执行的线程
例子
Observable.create<Int> {
println("subscribeOn: name ${Thread.currentThread().name}")
try {
it.onNext(10)
it.onComplete()
} catch (e : Throwable) {
it.onError(e)
}
}.subscribeOn(Schedulers.io()).map {
println("after subscribeOn: name ${Thread.currentThread().name}")
it * 10
}.observeOn(Schedulers.newThread()).subscribe(object : Observer<Int> {
override fun onComplete() {
println("name ${Thread.currentThread().name}, complete")
}
override fun onSubscribe(d: Disposable?) {
}
override fun onNext(t: Int?) {
println("name ${Thread.currentThread().name}, $t")
}
override fun onError(e: Throwable?) {
println("name ${Thread.currentThread().name}, error")
}
})
复制代码
3.4 进行订阅
订阅时,对象有两种
- 订阅者,这时订阅者内自己处理所有回调
- Consumer、Action实例分别进行各个方法的回调处理
例如
Observable.just(10).subscribe(Consumer<Int> {
println("onNext $it")
}, Consumer<Throwable> {
println("ex ${it?.message}")
}, Action {
println("complted")
})
复制代码
或者
Observable.just(10).subscribe(object : Observer<Int> {
override fun onComplete() {
print("on Complete")
}
override fun onSubscribe(d: Disposable?) {
println("disposable")
}
override fun onNext(t: Int?) {
println("onNext $t")
}
override fun onError(e: Throwable?) {
println("ex ${e?.message}")
}
})
复制代码
4 源码解读
Rxjava整体流程如下图:这张图来自于官网
可以看到其分成两个流向
- Observable新对象生成时的流;每个操作都会生成一个流,持有了前一个对象的引用和相关操作参数
- 进行订阅的流;发生订阅后,每个订阅方法调用时,当前订阅对象被装饰成新的Observer进行前一个流的订阅方法调用
Rxjava每一步调用可以认为是一个操作,每个操作都有具体的实现原理,但是其也有公用的原理部分;这里并不对每种操作做解读,主要解读以下几个方面
- create生成可观察者源头
- 自定义操作原理
- 中间过程的处理思想逻辑
- 线程切换原理
下面的也是以Observable-Observer来讲解
4.1 create操作
这个操作产生了一个ObservablerCreate类型的Observable流;订阅原理如下图:
其实内部是ObservableOnSubscribe的订阅方法,其参数为ObservableEmitter,和Oberver具有相同的接口;也就是利用参数进行相应数据发射即可
4.2 自定义原理
自定义有两种方式
- lift方法、ObservableOperator接口,其产生了一个ObservableLift类型的Observable流;原理如下图
- compose方法、ObservableTransformer接口;ObservableTransformer生成一个ObervableSource对象,对象被包裹成当前产生流(ObservableFromUnsafeSource,一个代理类)
4.3 中间过程的处理逻辑
从4.1和4.2可以看出,很类似;整体思想就是如下
- 每次操作都会生成一个新的Observable流;这个流持有上个流以及操作参数
- 发生订阅时,Observer会和操作一起装饰成新的Observer对象
- 通过持有的Observable进行订阅装饰成新的Observer
更简单点来说,每次操作相当于生成个代理Observable,订阅发生时,每个操作会生成当前订阅对象的装饰类,然后进行订阅
4.4 线程切换
有两个方法进行线程切换
- observeOn方法:后续的监听操作进行线程切换,原理如下:
-
subscribeOn方法:对订阅动作进行前程切换;起作用范围如下:
- 若之前没有进行过切换,则在此次操作之前,均是在这次指定线程中执行
- 截止到下次线程切换,中间过程均在此次指定线程中执行
5 小结
本篇文章,并没有对很多的操作进行的讲解,也没有对代码的实现进行讲解,更没有更加细致的讲解一些原理;但是本文章,把一个Rxjava的主要功能的框框画出来了;也提供了更进一步解读以及总结的台阶。
技术变化都很快,但基础技术、理论知识永远都是那些;作者希望在余后的生活中,对常用技术点进行基础知识分享;如果你觉得文章写的不错,请给与关注和点赞;如果文章存在错误,也请多多指教!