RxJava介绍5:操作符融合

Operator Fusion

翻译自 Operator fusion in RxJava 2

介绍

RxJava是一个非常强大的库,尽管它也存在一些问题。特别是性能和内存的问题。
为了最小化RxJava中的开销,有许多优化措施,称为“操作符融合”。
首先让我们回顾下RxJava如何工作以及他们存在哪些问题。

Observable

截屏2021-05-17 下午8.43.57.png

  • 构建Observable
  • Observer订阅Observable
  • Observable通过Observer.onSubscribe方法,将创建的Disposable传递给Observer。
  • 随后Observable可以调用Observer.onNext传递值

Observable不支持背压,因为Observer无法将自身处理能力通知给Observable。

Flowable

截屏2021-05-17 下午8.42.09.png

与Observable相似,但是没有Observer和Disposable,而是Subscriber和Subscription。
Subscription具有额外的request(n)方法,Subscriber可以使用该方法显式告知Flowable发出item的请求数量。如果不request值,Flowable将不会发出任何东西,这就是Flowable支持背压的原因。

Assembly and subscribe

使用RxJava时,有两个重要的阶段组装与订阅:

  • 组装阶段,建立Rx链
  • 订阅阶段,启动Rx链(触发各种操作符内部的“风暴”)

参考下面一段代码

 Observable.fromArray(1, 2, 3)
	.map { it + 1 }
	.filter { it < 3 }
	.subscribe { println(it) }
复制代码

过程如下:1.组装,2.订阅,3.运行。 仅仅三条Rx链就发生了这么多事情。如果换成Flowable,request(n)将会使得过程更加复杂。
截屏2021-05-17 下午9.19.33.png

Queues and synchronization

操作符的内部实现可能会有内部Queue,用来处理事件。Queue应当被串行访问(这就意味着要有适当的同步机制)。RxJava2具有基于Atomics的无阻塞同步(例如AtomicInteger)和带有compareAndSet方法的无限循环。
假设每个操作符都有自己的内部Queue,那操作符中的Queue和Atomic对象同样会带来额外的开销。
类似下面的代码,

public final class QueueDrainHelper {
    public static <T> boolean postCompleteRequest(...) {
        for (; ; ) {
            long r = state.get();
            long r0 = r & REQUESTED_MASK;
            long u = (r & COMPLETED_MASK) | BackpressureHelper.addCap(r0, n);
            if (state.compareAndSet(r, u)) {
                if (r == COMPLETED_MASK) {
                    postCompleteDrain(n | COMPLETED_MASK, actual, queue, state, isCancelled);
                    return true;
                }
                return false;
            }
        }
    }
}
复制代码
public final class ObservableObserveOn<T>{
     void drainNormal() {
            int missed = 1;
            SimpleQueue<T> q = this.queue;
            Observer a = this.actual;
            do {
                while(true) {
                    Object v;
                    try {
                        v = q.poll();
                    } catch (Throwable var7) {
                       ...
                    }

                    boolean empty = v == null;                  
                    if (empty) {
                        missed = this.addAndGet(-missed);
                        break;
                    }
                    a.onNext(v);
                }
            } while(missed != 0);

        }
}
复制代码

Issues

综上所述,RxJava存在的问题是:

  • 组装开销,创建Rx链,会创建很多对象,这会带来内存开销
  • 订阅开销,会发生大量通信,这会带来性能开销
  • 分配和串行化开销-为每个操作符创建内部结构(比如队列和原子对象),带来内存和性能开销

Operator fusion

为了解决某些性能和内存问题,这就是“操作符融合”
操作符融合用两种类型:

  • 宏融合(Macro fusion),合并操作符,最大程度的减少在组装和订阅阶段创建的对象数量。
  • 微融合(Micro fusion),移除不必要的同步和在操作符间共享内部结构(例如Queue)

Macro fusion on Assembly

组装时

组装时的宏融合专注于最大程度地减少组装期间创建的Observable和对象。
当我们谈“组装时”,我们指的是这个地方。

public abstract class Observable<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> fromCallable(Callable<? extends T> supplier) {
        ObjectHelper.requireNonNull(supplier, "supplier is null");
        return RxJavaPlugins.onAssembly(new ObservableFromCallable<T>(supplier));
    }
}
复制代码

组装融合基础

优化某些Observable的最简单方法是添加对特殊情况的检查,以创建更简单的Observable。例如Observable.fromArray可以被降级为Observable.empty或Observable.just。

public static <T> Observable<T> fromArray(T... items) {
        ObjectHelper.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        } else if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
复制代码

ScalarCallable

fuseable包中第一个“进阶”优化是ScalarCallable接口

public interface ScalarCallable<T> extends Callable<T> {
    @Override
    T call();
}
复制代码

它继承通用的Java Callable,并且具有相同的接口,区别在于不能抛出异常。ScalarCallable是一个标记接口,某个类实现该接口,就意味着该类在组装期间可以安全得提取一个常量值(也可以是null值)。基于以上描述,只有empty和just相关的数据源操作符(Observable/Flowable/Maybe)可以被 scalarCallable标记。
例如在xMap操作符(flatMap,switchMap,concatMap)中,如果source被标记,则可以用简化版本的xMap代替繁琐的完整实现(比较ObservableFlatMap和ObservableScalarXMap)。

public abstract class Observable<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        if (this instanceof ScalarCallable) {
            T v = ((ScalarCallable<T>)this).call();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }
}
复制代码

FuseToXXX

在fuseable包中有这三个接口

public interface FuseToObservable<T> {
    // Returns a (direct) Observable for the operator.
    Observable<T> fuseToObservable();
}

public interface FuseToMaybe<T> {
    Maybe<T> fuseToMaybe();
}

public interface FuseToFlowable<T> {
    Flowable<T> fuseToFlowable();
}
复制代码

进一步看看FuseToObservable,其他两个接口类似。考虑下面的Rx链:

========================================
Observable.range(1, 10)
	.count()
	.toObservable()
	.subscribe { println(it) }
========================================

class ObservableCountSingle<T> extends Single<Long> 
							implements FuseToObservable<Long> {
    @Override
    public Observable<Long> fuseToObservable() {
        return RxJavaPlugins.onAssembly(new ObservableCount<T>(source));
    }
}

abstract class Single {
 	public final Observable<T> toObservable() {
        if (this instanceof FuseToObservable) {
            return ((FuseToObservable<T>)this).fuseToObservable();
        }
        return RxJavaPlugins.onAssembly(new SingleToObservable<T>(this));
    }
}
复制代码

有或者没有FuseToObservable,组装结构是不一样的。
截屏2021-05-18 下午1.19.11.png

Macro fusion on subscribe

订阅期间的宏融合与组装期间是一样的,只是发生在subscribeActual方法中。有时候在订阅前数据是未知的,订阅时优化会更方便。

public abstract class Observable<T> implements ObservableSource<T> {
    
    @Override
    public final void subscribe(Observer<? super T> observer) {
         subscribeActual(observer);
    }
    
    protected abstract void subscribeActual(Observer<? super T> observer);
}

复制代码

Basic on subscribe fusion

类似于在组装期间,我们添加对特殊情况的检查,对subscribe降级。例如下面ObservableAmb代码

public final class ObservableAmb<T> extends Observable<T> {
    final ObservableSource<? extends T>[] sources;
    final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;

  
    public void subscribeActual(Observer<? super T> s) {
        ObservableSource<? extends T>[] sources = this.sources;
        ...
        if (count == 0) {
            EmptyDisposable.complete(s);
            return;
        } else if (count == 1) {
            sources[0].subscribe(s);
            return;
        }
        AmbCoordinator<T> ac = new AmbCoordinator<T>(s, count);
        ac.subscribe(sources);
    }
复制代码

Callable

类似于组装期间的ScalarCallable。在订阅期间,会通过Callable进行类似优化。

注意:因为ScalarCallable继承Callable,所以在组装期间ScalarCallable上的优化,同样可以在订阅期间应用在Callable上。

比如说 XMap操作符,订阅继承了Callable接口的Observables,就有可能会被替换成简化实现。参考ObservableFlatMap.MergeObserver,这个类太复杂了,我都不想看了。

Observable.fromCallable { 3 }
	.flatMap { Observable.fromArray(it + 1, it + 2) }
    .subscribe { println(it) }
=============================
public final class ObservableFlatMap<T, U>  {
    @Override
    public void subscribeActual(Observer<? super U> t) {
        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
}
public final class ObservableScalarXMap {
     public static <T, R> boolean tryScalarXMapSubscribe(ObservableSource<T> source,
            Observer<? super R> observer,
            Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        if (source instanceof Callable) {
            T t;
            try {
                t = ((Callable<T>)source).call();
            } catch (Throwable ex) {
                ...
                return true;
            }
            if (t == null) {
                EmptyDisposable.complete(observer);
                return true;
            }
            ObservableSource<? extends R> r;
            try {
                r = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable ex) {
                ...
                return true;
            }
            ...
            r.subscribe(observer);
            return true;
        }
        return false;
    }
}
复制代码

Micro fusion

微融合旨在通过减少同步或者共享内部结构来减少开销。

ConditionalSubscriber

考虑 FlowableFilter操作符

public final class FlowableFilter<T> extends AbstractFlowableWithUpstream<T, T> {
    static final class FilterSubscriber<T> extends BasicFuseableSubscriber<T, T>
    implements ConditionalSubscriber<T> {
        final Predicate<? super T> filter;
        
        @Override
        public void onNext(T t) {
            if (!tryOnNext(t)) {
                s.request(1);
            }
        }

        @Override
        public boolean tryOnNext(T t) {
            boolean  b = filter.test(t);
            if (b) { 
                actual.onNext(t); 
            }
            return b;
        }
    }
}
复制代码

截屏2021-05-18 下午4.16.06.png
试想如果有两个filter,filter.test(value)执行两次。N,request(1)或者 Y,onNext(value) 加起来会被执行两次。
于是便有了FilterConditionalSubscriber.它聚合了连续的fliter.test(value)

static final class FilterConditionalSubscriber<T> 
					extends BasicFuseableConditionalSubscriber<T, T> {
   
    protected final ConditionalSubscriber<? super R> actual;   
    
    final Predicate<? super T> filter;
        @Override
        public void onNext(T t) {
            if (!tryOnNext(t)) {
                s.request(1);
            }
        }
        @Override
        public boolean tryOnNext(T t) {
            return filter.test(t) && actual.tryOnNext(t);
        }
}
复制代码

截屏2021-05-18 下午4.39.40.png
类似的代码可以在FlowableRange,FlowableMap中看到。

Queue fuseable

最复杂的微融合是基于操作符间共享内部队列。整个优化基于QueueFuseable接口。

public interface QueueFuseable<T> extends SimpleQueue<T> {
    int NONE = 0;
	int SYNC = 1;
	int ASYNC = 2;
	int ANY = SYNC | ASYNC;
	int BOUNDARY = 4;
    //Request a fusion mode from the upstream
	int requestFusion(int mode);
}
public interface QueueSubscription<T> extends QueueFuseable<T>, Subscription {
}
复制代码

我们以Flowable全家桶举例,同样适用于Observable。
QueueSubscription接口继承QueueFuseable和Subscription,允许Flowable子类型操作符之间协商融合模式。
协商(requestFusion),通常发生在订阅期间,即上游调用subscriber.onSubsribe(Subscription)时。下游要在Subscription.request(n)之前调用Subscription.requestFusion(int mode)。
与常规subscriber的onXXX回调方法相比,上游不仅提供subscription,还提供QueueSubscription,从而允许下游直接访问上游内部队列。在融合模式下,下游通过调用上游QueueSubscription.poll(),获取上游数值。

有三种融合模式:

  • NONE — 不允许融合
  • SYNC — 支持同步方式融合
  • ASYNC — 支持异步方式融合


SYNC融合

上游值要么已经可用,要么可以在poll()中同步生成。如果上下游同意适用SYNC融合,则意味着:

  • 下游在需要时直接调用poll()方法
  • poll()会抛出异常,相当于onError
  • poll()可以返回null,相当于onComplete
  • poll可以返回非null值,相当于onNext
  • 上游不会调用任何onXXX回调

参考Flowable.range与observeOn代码

Flowable.range(1, 9)
	.observeOn(Schedulers.newThread())
	.subscribe { println(it) }
复制代码

RangeSubscription只支持SYNC融合

public final class FlowableRange extends Flowable<Integer> {
    abstract static class BaseRangeSubscription extends BasicQueueSubscription<Integer> {
        
        @Override
        public final int requestFusion(int mode) {
            return mode & SYNC;
        }

        @Nullable
        @Override
        public final Integer poll() {
            int i = index;
            if (i == end) {
                return null;
            }
            index = i + 1;
            return i;
        }
    }
}
复制代码

我们再看FlowableObserveOn.FlowableSubscriber.onSubscribe()方法中,

  • s为上游RangeSubscription。
  • actual为下游yourSubscriber
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
    static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
    implements FlowableSubscriber<T> {

        @Override
        public void onSubscribe(Subscription s) {
            if (SubscriptionHelper.validate(this.s, s)) {
                //s为上游RangeSubscription
                this.s = s;

                if (s instanceof QueueSubscription) {
                    QueueSubscription<T> f = (QueueSubscription<T>) s;
                    int m = f.requestFusion(ANY | BOUNDARY);
                    if (m == SYNC) {
                        sourceMode = SYNC;
                        queue = f;
                        done = true;
                        //actual为下游yourSubscriber,
                        //此时下游会调用本层.request(Long.MAX_VALUE)
                        actual.onSubscribe(this);
                        return;
                    } 
                }
            }
        }
        @Override
        public final void run() {
           	if (sourceMode == SYNC) {
                runSync();
            }
        }
         @Override
        void runSync() {
           	...
            final Subscriber<? super T> a = actual;
            //上游RangeSubScription
            final SimpleQueue<T> q = queue;
            for (;;) {
                while (e != r) {
                    T v;
                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        //需要处理异常
                        a.onError(ex);
                        return;
                    }
                    //需要判断null值
                    if (v == null) {
                        a.onComplete();
                        return;
                    }
                    a.onNext(v);
					...
                }
				...
            }
        } 
    }
}
复制代码

fuse SYNC相比于no fuse:

  • observerOn少了request(x)
  • observerOn不需要维护内部队列SpscArrayQueue,使用上游QueueSubScription
  • observerOn与range少了onNext,这就意味着少了worker.schedule线程调度。

ObserveOn内部会去判断上游subscription是否为QueueSubscription。FlowableRange内部为RangeSubscription属于QueueSubscription,且仅支持SYNC融合模式
截屏2021-05-19 下午3.46.13.png

ASYNC融合

相比于SYNC融合,poll()无法立即同步获取上游值。
如果上下游同意使用ASYNC融合,则意味着:

  • 上游仍然会调用onError,onNext,onComplete
  • 当onNext实际上为null值时,下游可以调用poll()获取实际数值
  • 下游仍然要调用request(x)

ObserveOnSubscriber继承QueueSubscription,只支持异步融合。

public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
    static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
    implements FlowableSubscriber<T> {

        @Override
        public final void run() {
           	if (outputFused) {
                runBackfused();
            }
        }
         @Override
        void runBackfused() {
            for (;;) {
                boolean d = done;
                actual.onNext(null);
                ...
            }
        }
        
        @Override
        public final int requestFusion(int requestedMode) {
            if ((requestedMode & ASYNC) != 0) {
                outputFused = true;
                return ASYNC;
            }
            return NONE;
        }
        
        @Nullable
        @Override
        public T poll() throws Exception {
            T v = queue.poll();
            if (v != null && sourceMode != SYNC) {
                long p = produced + 1;
                if (p == limit) {
                    produced = 0;
                    s.request(p);
                } else {
                    produced = p;
                }
            }
            return v;
        }
    }
}
复制代码

我们自定义一个FlowableSubscriber

var qs: QueueSubscription<*>? = null
Flowable.rangeLong(1, 5)
	.observeOn(Schedulers.newThread())
	.subscribe(object : FlowableSubscriber<Long> {
		override fun onSubscribe(s: Subscription) {
			if (s is QueueSubscription<*>) {
				s.requestFusion(QueueFuseable.ASYNC)
                qs = s
             }
             s.request(Long.MAX_VALUE)
        }

        //onNext值为null,需要自己去poll()
        override fun onNext(t: Long?) {
        	println("onNext:$t ")
            while (true) {
            	 var value = qs?.poll()
                 if (value == null) {break }
                 println("onNext poll:$value")
            }
        }
        override fun onError(t: Throwable?) { println("onError:$t") }
        override fun onComplete() {println("onComplete") }
	})
//输出结果为:
onNext:null 
onNext poll:1
onNext poll:2
onNext poll:3
onNext poll:4
onNext poll:5
onComplete

//把”Flowable.rangeLong(1, 5)“
//换成”Flowable.intervalRange(1,5,0,100,TimeUnit.MILLISECONDS)“
//输出结果为:
onNext:null 
onNext:null 
onNext poll:1
onNext:null 
onNext poll:2
onNext:null 
onNext poll:3
onNext:null 
onNext poll:4
onNext:null 
onNext poll:5
onComplete
复制代码

前后两次onNext次数不同,区别在于FlowableRangeLong支持同步融合,而FlowableIntervalRange
�不支持融合。

融合的线程问题

poll()可能会使得上游数值计算被切换到新线程。
requestFusion参数包含BOUNDARY时,则告诉上游,poll()会被切换线程。如果上游不希望计算被切换线程,则不通过下游的融合请求。

static final class MapSubscriber<T, U> extends BasicFuseableSubscriber<T, U> {
	@Override
	public int requestFusion(int mode) {
		return transitiveBoundaryFusion(mode);
	}
    
    protected final int transitiveBoundaryFusion(int mode) {
        QueueSubscription<T> qs = this.qs;
        if (qs != null) {
            if ((mode & BOUNDARY) == 0) {
                int m = qs.requestFusion(mode);
                if (m != NONE) {
                    sourceMode = m;
                }
                return m;
            }
        }
        return NONE;
    }
}  
复制代码
var qs: QueueSubscription<*>? = null
var executor = Executors.newSingleThreadExecutor()
Flowable.rangeLong(1, 5)
	.map { println("int map: value $it,${Thread.currentThread()}");it }
	.subscribe(object : FlowableSubscriber<Long> {
		override fun onSubscribe(s: Subscription) {
				if (s is QueueSubscription<*>) {
					var mode = s.requestFusion(QueueFuseable.SYNC)
                    println("onSubscribe fusion mode result: $mode")
                    qs = s
                 }
                 s.request(Long.MAX_VALUE)
        }

        override fun onNext(t: Long?) {
            executor.submit { qs?.poll() } 
        }
		override fun onError(t: Throwable?) { }
		override fun onComplete() {}
     })
//输出结果: 
//可以看到map计算是在新线程而非原始线程
onSubscribe fusion mode result: 1
int map: value 1,Thread[pool-1-thread-1,5,main]
int map: value 2,Thread[pool-1-thread-1,5,main]
int map: value 3,Thread[pool-1-thread-1,5,main]
int map: value 4,Thread[pool-1-thread-1,5,main]
int map: value 5,Thread[pool-1-thread-1,5,main]
复制代码

注释掉executor.submit { qs?.poll() } ,requestFusion参数新增QueueFuseable.BOUNDARY

var qs: QueueSubscription<*>? = null
var executor = Executors.newSingleThreadExecutor()
Flowable.rangeLong(1, 5)
	.map { println("int map: value $it,${Thread.currentThread()}");it }
	.subscribe(object : FlowableSubscriber<Long> {
		override fun onSubscribe(s: Subscription) {
			if (s is QueueSubscription<*>) {
				var mode = s.requestFusion(QueueFuseable.SYNC or QueueFuseable.BOUNDARY)
                println("onSubscribe fusion mode result: $mode")
                qs = s
            }
            s.request(Long.MAX_VALUE)
        }

        override fun onNext(t: Long?) {}
		override fun onError(t: Throwable?) { }
		override fun onComplete() {}
     })
//输出结果: 
//可以看到map计算是在新线程而非原始线程
onSubscribe fusion mode result: 0
int map: value 1,Thread[main,5,main]
int map: value 2,Thread[main,5,main]
int map: value 3,Thread[main,5,main]
int map: value 4,Thread[main,5,main]
int map: value 5,Thread[main,5,main]
复制代码

结论

运算符融合很酷炫,但明没有在所有操作符中得到应用。有些运算符的优化,看着简单,做起来难。
你可以写很长的Rx链,但千万不要以为RxJava能高效地完成所有事情。你自己能优化的,还是要优化。

参考

Operator fusion in RxJava 2
RxJava2作者亲笔:Operator-fusion (Part 1)
RxJava2作者亲笔:Operator fusion (part 2 – final)

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享