Operator Fusion
翻译自 Operator fusion in RxJava 2
介绍
RxJava是一个非常强大的库,尽管它也存在一些问题。特别是性能和内存的问题。
为了最小化RxJava中的开销,有许多优化措施,称为“操作符融合”。
首先让我们回顾下RxJava如何工作以及他们存在哪些问题。
Observable
- 构建Observable
- Observer订阅Observable
- Observable通过Observer.onSubscribe方法,将创建的Disposable传递给Observer。
- 随后Observable可以调用Observer.onNext传递值
Observable不支持背压,因为Observer无法将自身处理能力通知给Observable。
Flowable
与Observable相似,但是没有Observer和Disposable,而是Subscriber和Subscription。
Subscription具有额外的request(n)方法,Subscriber可以使用该方法显式告知Flowable发出item的请求数量。如果不request值,Flowable将不会发出任何东西,这就是Flowable支持背压的原因。
Assembly and subscribe
使用RxJava时,有两个重要的阶段组装与订阅:
- 组装阶段,建立Rx链
- 订阅阶段,启动Rx链(触发各种操作符内部的“风暴”)
参考下面一段代码
Observable.fromArray(1, 2, 3)
.map { it + 1 }
.filter { it < 3 }
.subscribe { println(it) }
复制代码
过程如下:1.组装,2.订阅,3.运行。 仅仅三条Rx链就发生了这么多事情。如果换成Flowable,request(n)将会使得过程更加复杂。
Queues and synchronization
操作符的内部实现可能会有内部Queue,用来处理事件。Queue应当被串行访问(这就意味着要有适当的同步机制)。RxJava2具有基于Atomics的无阻塞同步(例如AtomicInteger)和带有compareAndSet方法的无限循环。
假设每个操作符都有自己的内部Queue,那操作符中的Queue和Atomic对象同样会带来额外的开销。
类似下面的代码,
public final class QueueDrainHelper {
public static <T> boolean postCompleteRequest(...) {
for (; ; ) {
long r = state.get();
long r0 = r & REQUESTED_MASK;
long u = (r & COMPLETED_MASK) | BackpressureHelper.addCap(r0, n);
if (state.compareAndSet(r, u)) {
if (r == COMPLETED_MASK) {
postCompleteDrain(n | COMPLETED_MASK, actual, queue, state, isCancelled);
return true;
}
return false;
}
}
}
}
复制代码
public final class ObservableObserveOn<T>{
void drainNormal() {
int missed = 1;
SimpleQueue<T> q = this.queue;
Observer a = this.actual;
do {
while(true) {
Object v;
try {
v = q.poll();
} catch (Throwable var7) {
...
}
boolean empty = v == null;
if (empty) {
missed = this.addAndGet(-missed);
break;
}
a.onNext(v);
}
} while(missed != 0);
}
}
复制代码
Issues
综上所述,RxJava存在的问题是:
- 组装开销,创建Rx链,会创建很多对象,这会带来内存开销
- 订阅开销,会发生大量通信,这会带来性能开销
- 分配和串行化开销-为每个操作符创建内部结构(比如队列和原子对象),带来内存和性能开销
Operator fusion
为了解决某些性能和内存问题,这就是“操作符融合”
操作符融合用两种类型:
- 宏融合(Macro fusion),合并操作符,最大程度的减少在组装和订阅阶段创建的对象数量。
- 微融合(Micro fusion),移除不必要的同步和在操作符间共享内部结构(例如Queue)
Macro fusion on Assembly
组装时
组装时的宏融合专注于最大程度地减少组装期间创建的Observable和对象。
当我们谈“组装时”,我们指的是这个地方。
public abstract class Observable<T> {
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> fromCallable(Callable<? extends T> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new ObservableFromCallable<T>(supplier));
}
}
复制代码
组装融合基础
优化某些Observable的最简单方法是添加对特殊情况的检查,以创建更简单的Observable。例如Observable.fromArray可以被降级为Observable.empty或Observable.just。
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
复制代码
ScalarCallable
fuseable包中第一个“进阶”优化是ScalarCallable接口
public interface ScalarCallable<T> extends Callable<T> {
@Override
T call();
}
复制代码
它继承通用的Java Callable,并且具有相同的接口,区别在于不能抛出异常。ScalarCallable是一个标记接口,某个类实现该接口,就意味着该类在组装期间可以安全得提取一个常量值(也可以是null值)。基于以上描述,只有empty和just相关的数据源操作符(Observable/Flowable/Maybe)可以被 scalarCallable标记。
例如在xMap操作符(flatMap,switchMap,concatMap)中,如果source被标记,则可以用简化版本的xMap代替繁琐的完整实现(比较ObservableFlatMap和ObservableScalarXMap)。
public abstract class Observable<T> {
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
if (this instanceof ScalarCallable) {
T v = ((ScalarCallable<T>)this).call();
if (v == null) {
return empty();
}
return ObservableScalarXMap.scalarXMap(v, mapper);
}
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
}
复制代码
FuseToXXX
在fuseable包中有这三个接口
public interface FuseToObservable<T> {
// Returns a (direct) Observable for the operator.
Observable<T> fuseToObservable();
}
public interface FuseToMaybe<T> {
Maybe<T> fuseToMaybe();
}
public interface FuseToFlowable<T> {
Flowable<T> fuseToFlowable();
}
复制代码
进一步看看FuseToObservable,其他两个接口类似。考虑下面的Rx链:
========================================
Observable.range(1, 10)
.count()
.toObservable()
.subscribe { println(it) }
========================================
class ObservableCountSingle<T> extends Single<Long>
implements FuseToObservable<Long> {
@Override
public Observable<Long> fuseToObservable() {
return RxJavaPlugins.onAssembly(new ObservableCount<T>(source));
}
}
abstract class Single {
public final Observable<T> toObservable() {
if (this instanceof FuseToObservable) {
return ((FuseToObservable<T>)this).fuseToObservable();
}
return RxJavaPlugins.onAssembly(new SingleToObservable<T>(this));
}
}
复制代码
有或者没有FuseToObservable,组装结构是不一样的。
Macro fusion on subscribe
订阅期间的宏融合与组装期间是一样的,只是发生在subscribeActual方法中。有时候在订阅前数据是未知的,订阅时优化会更方便。
public abstract class Observable<T> implements ObservableSource<T> {
@Override
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
protected abstract void subscribeActual(Observer<? super T> observer);
}
复制代码
Basic on subscribe fusion
类似于在组装期间,我们添加对特殊情况的检查,对subscribe降级。例如下面ObservableAmb代码
public final class ObservableAmb<T> extends Observable<T> {
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
public void subscribeActual(Observer<? super T> s) {
ObservableSource<? extends T>[] sources = this.sources;
...
if (count == 0) {
EmptyDisposable.complete(s);
return;
} else if (count == 1) {
sources[0].subscribe(s);
return;
}
AmbCoordinator<T> ac = new AmbCoordinator<T>(s, count);
ac.subscribe(sources);
}
复制代码
Callable
类似于组装期间的ScalarCallable。在订阅期间,会通过Callable进行类似优化。
注意:因为ScalarCallable继承Callable,所以在组装期间ScalarCallable上的优化,同样可以在订阅期间应用在Callable上。
比如说 XMap操作符,订阅继承了Callable接口的Observables,就有可能会被替换成简化实现。参考ObservableFlatMap.MergeObserver,这个类太复杂了,我都不想看了。
Observable.fromCallable { 3 }
.flatMap { Observable.fromArray(it + 1, it + 2) }
.subscribe { println(it) }
=============================
public final class ObservableFlatMap<T, U> {
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
}
public final class ObservableScalarXMap {
public static <T, R> boolean tryScalarXMapSubscribe(ObservableSource<T> source,
Observer<? super R> observer,
Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
if (source instanceof Callable) {
T t;
try {
t = ((Callable<T>)source).call();
} catch (Throwable ex) {
...
return true;
}
if (t == null) {
EmptyDisposable.complete(observer);
return true;
}
ObservableSource<? extends R> r;
try {
r = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable ex) {
...
return true;
}
...
r.subscribe(observer);
return true;
}
return false;
}
}
复制代码
Micro fusion
ConditionalSubscriber
考虑 FlowableFilter操作符
public final class FlowableFilter<T> extends AbstractFlowableWithUpstream<T, T> {
static final class FilterSubscriber<T> extends BasicFuseableSubscriber<T, T>
implements ConditionalSubscriber<T> {
final Predicate<? super T> filter;
@Override
public void onNext(T t) {
if (!tryOnNext(t)) {
s.request(1);
}
}
@Override
public boolean tryOnNext(T t) {
boolean b = filter.test(t);
if (b) {
actual.onNext(t);
}
return b;
}
}
}
复制代码
试想如果有两个filter,filter.test(value)执行两次。N,request(1)或者 Y,onNext(value) 加起来会被执行两次。
于是便有了FilterConditionalSubscriber.它聚合了连续的fliter.test(value)
static final class FilterConditionalSubscriber<T>
extends BasicFuseableConditionalSubscriber<T, T> {
protected final ConditionalSubscriber<? super R> actual;
final Predicate<? super T> filter;
@Override
public void onNext(T t) {
if (!tryOnNext(t)) {
s.request(1);
}
}
@Override
public boolean tryOnNext(T t) {
return filter.test(t) && actual.tryOnNext(t);
}
}
复制代码
类似的代码可以在FlowableRange,FlowableMap中看到。
Queue fuseable
最复杂的微融合是基于操作符间共享内部队列。整个优化基于QueueFuseable接口。
public interface QueueFuseable<T> extends SimpleQueue<T> {
int NONE = 0;
int SYNC = 1;
int ASYNC = 2;
int ANY = SYNC | ASYNC;
int BOUNDARY = 4;
//Request a fusion mode from the upstream
int requestFusion(int mode);
}
public interface QueueSubscription<T> extends QueueFuseable<T>, Subscription {
}
复制代码
我们以Flowable全家桶举例,同样适用于Observable。
QueueSubscription接口继承QueueFuseable和Subscription,允许Flowable子类型操作符之间协商融合模式。
协商(requestFusion),通常发生在订阅期间,即上游调用subscriber.onSubsribe(Subscription)时。下游要在Subscription.request(n)之前调用Subscription.requestFusion(int mode)。
与常规subscriber的onXXX回调方法相比,上游不仅提供subscription,还提供QueueSubscription,从而允许下游直接访问上游内部队列。在融合模式下,下游通过调用上游QueueSubscription.poll(),获取上游数值。
�
有三种融合模式:
- NONE — 不允许融合
- SYNC — 支持同步方式融合
- ASYNC — 支持异步方式融合
SYNC融合
上游值要么已经可用,要么可以在poll()中同步生成。如果上下游同意适用SYNC融合,则意味着:
- 下游在需要时直接调用poll()方法
- poll()会抛出异常,相当于onError
- poll()可以返回null,相当于onComplete
- poll可以返回非null值,相当于onNext
- 上游不会调用任何onXXX回调
参考Flowable.range与observeOn代码
Flowable.range(1, 9)
.observeOn(Schedulers.newThread())
.subscribe { println(it) }
复制代码
RangeSubscription只支持SYNC融合
public final class FlowableRange extends Flowable<Integer> {
abstract static class BaseRangeSubscription extends BasicQueueSubscription<Integer> {
@Override
public final int requestFusion(int mode) {
return mode & SYNC;
}
@Nullable
@Override
public final Integer poll() {
int i = index;
if (i == end) {
return null;
}
index = i + 1;
return i;
}
}
}
复制代码
我们再看FlowableObserveOn.FlowableSubscriber.onSubscribe()方法中,
- s为上游RangeSubscription。
- actual为下游yourSubscriber
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
implements FlowableSubscriber<T> {
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
//s为上游RangeSubscription
this.s = s;
if (s instanceof QueueSubscription) {
QueueSubscription<T> f = (QueueSubscription<T>) s;
int m = f.requestFusion(ANY | BOUNDARY);
if (m == SYNC) {
sourceMode = SYNC;
queue = f;
done = true;
//actual为下游yourSubscriber,
//此时下游会调用本层.request(Long.MAX_VALUE)
actual.onSubscribe(this);
return;
}
}
}
}
@Override
public final void run() {
if (sourceMode == SYNC) {
runSync();
}
}
@Override
void runSync() {
...
final Subscriber<? super T> a = actual;
//上游RangeSubScription
final SimpleQueue<T> q = queue;
for (;;) {
while (e != r) {
T v;
try {
v = q.poll();
} catch (Throwable ex) {
//需要处理异常
a.onError(ex);
return;
}
//需要判断null值
if (v == null) {
a.onComplete();
return;
}
a.onNext(v);
...
}
...
}
}
}
}
复制代码
fuse SYNC相比于no fuse:
- observerOn少了request(x)
- observerOn不需要维护内部队列SpscArrayQueue,使用上游QueueSubScription
- observerOn与range少了onNext,这就意味着少了worker.schedule线程调度。
ObserveOn内部会去判断上游subscription是否为QueueSubscription。FlowableRange内部为RangeSubscription属于QueueSubscription,且仅支持SYNC融合模式
ASYNC融合
相比于SYNC融合,poll()无法立即同步获取上游值。
如果上下游同意使用ASYNC融合,则意味着:
- 上游仍然会调用onError,onNext,onComplete
- 当onNext实际上为null值时,下游可以调用poll()获取实际数值
- 下游仍然要调用request(x)
ObserveOnSubscriber继承QueueSubscription,只支持异步融合。
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
implements FlowableSubscriber<T> {
@Override
public final void run() {
if (outputFused) {
runBackfused();
}
}
@Override
void runBackfused() {
for (;;) {
boolean d = done;
actual.onNext(null);
...
}
}
@Override
public final int requestFusion(int requestedMode) {
if ((requestedMode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
@Nullable
@Override
public T poll() throws Exception {
T v = queue.poll();
if (v != null && sourceMode != SYNC) {
long p = produced + 1;
if (p == limit) {
produced = 0;
s.request(p);
} else {
produced = p;
}
}
return v;
}
}
}
复制代码
我们自定义一个FlowableSubscriber
var qs: QueueSubscription<*>? = null
Flowable.rangeLong(1, 5)
.observeOn(Schedulers.newThread())
.subscribe(object : FlowableSubscriber<Long> {
override fun onSubscribe(s: Subscription) {
if (s is QueueSubscription<*>) {
s.requestFusion(QueueFuseable.ASYNC)
qs = s
}
s.request(Long.MAX_VALUE)
}
//onNext值为null,需要自己去poll()
override fun onNext(t: Long?) {
println("onNext:$t ")
while (true) {
var value = qs?.poll()
if (value == null) {break }
println("onNext poll:$value")
}
}
override fun onError(t: Throwable?) { println("onError:$t") }
override fun onComplete() {println("onComplete") }
})
//输出结果为:
onNext:null
onNext poll:1
onNext poll:2
onNext poll:3
onNext poll:4
onNext poll:5
onComplete
//把”Flowable.rangeLong(1, 5)“
//换成”Flowable.intervalRange(1,5,0,100,TimeUnit.MILLISECONDS)“
//输出结果为:
onNext:null
onNext:null
onNext poll:1
onNext:null
onNext poll:2
onNext:null
onNext poll:3
onNext:null
onNext poll:4
onNext:null
onNext poll:5
onComplete
复制代码
前后两次onNext次数不同,区别在于FlowableRangeLong支持同步融合,而FlowableIntervalRange
�不支持融合。
融合的线程问题
poll()可能会使得上游数值计算被切换到新线程。
requestFusion参数包含BOUNDARY时,则告诉上游,poll()会被切换线程。如果上游不希望计算被切换线程,则不通过下游的融合请求。
static final class MapSubscriber<T, U> extends BasicFuseableSubscriber<T, U> {
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
protected final int transitiveBoundaryFusion(int mode) {
QueueSubscription<T> qs = this.qs;
if (qs != null) {
if ((mode & BOUNDARY) == 0) {
int m = qs.requestFusion(mode);
if (m != NONE) {
sourceMode = m;
}
return m;
}
}
return NONE;
}
}
复制代码
var qs: QueueSubscription<*>? = null
var executor = Executors.newSingleThreadExecutor()
Flowable.rangeLong(1, 5)
.map { println("int map: value $it,${Thread.currentThread()}");it }
.subscribe(object : FlowableSubscriber<Long> {
override fun onSubscribe(s: Subscription) {
if (s is QueueSubscription<*>) {
var mode = s.requestFusion(QueueFuseable.SYNC)
println("onSubscribe fusion mode result: $mode")
qs = s
}
s.request(Long.MAX_VALUE)
}
override fun onNext(t: Long?) {
executor.submit { qs?.poll() }
}
override fun onError(t: Throwable?) { }
override fun onComplete() {}
})
//输出结果:
//可以看到map计算是在新线程而非原始线程
onSubscribe fusion mode result: 1
int map: value 1,Thread[pool-1-thread-1,5,main]
int map: value 2,Thread[pool-1-thread-1,5,main]
int map: value 3,Thread[pool-1-thread-1,5,main]
int map: value 4,Thread[pool-1-thread-1,5,main]
int map: value 5,Thread[pool-1-thread-1,5,main]
复制代码
注释掉executor.submit { qs?.poll() } ,requestFusion参数新增QueueFuseable.BOUNDARY
var qs: QueueSubscription<*>? = null
var executor = Executors.newSingleThreadExecutor()
Flowable.rangeLong(1, 5)
.map { println("int map: value $it,${Thread.currentThread()}");it }
.subscribe(object : FlowableSubscriber<Long> {
override fun onSubscribe(s: Subscription) {
if (s is QueueSubscription<*>) {
var mode = s.requestFusion(QueueFuseable.SYNC or QueueFuseable.BOUNDARY)
println("onSubscribe fusion mode result: $mode")
qs = s
}
s.request(Long.MAX_VALUE)
}
override fun onNext(t: Long?) {}
override fun onError(t: Throwable?) { }
override fun onComplete() {}
})
//输出结果:
//可以看到map计算是在新线程而非原始线程
onSubscribe fusion mode result: 0
int map: value 1,Thread[main,5,main]
int map: value 2,Thread[main,5,main]
int map: value 3,Thread[main,5,main]
int map: value 4,Thread[main,5,main]
int map: value 5,Thread[main,5,main]
复制代码
结论
运算符融合很酷炫,但明没有在所有操作符中得到应用。有些运算符的优化,看着简单,做起来难。
你可以写很长的Rx链,但千万不要以为RxJava能高效地完成所有事情。你自己能优化的,还是要优化。
参考
Operator fusion in RxJava 2
RxJava2作者亲笔:Operator-fusion (Part 1)
RxJava2作者亲笔:Operator fusion (part 2 – final)