RxJava介绍4:背压

Back-pressure

名词来源

Back pressure is a resistance or force opposing the desired flow of fluid through pipes.
背压是抵挡流体在管道中流动的阻力
如下图所示,Pipe1和Pipe2在起始处有着相同的起始压力和距离。Pipe2呢,就遇到了三种backpressure。
image.png

软件概念

背压就是抵挡数据流动的阻力。
背压问题的出现,需要同时满足两个条件:

  • 上下游不在同一个线程
  • 下游处理速度慢于上游

想象一些很多年前,社交应用消息爆炸导致应用界面卡死的情景。
1_feQArAYERutZ451cMQSV6w.gif

背压策略

当我们聊背压时,不仅是背压问题,也包括其处理策略。
那通常我们有哪些背压策略?

  • control producer,控制生产者速度。生产者跟消费者又不是一家,这个挺为难的。
  • Buffer,缓冲数据。如果是有限容量缓存,那就总有溢出的那一天
  • Drop,丢弃数据。这个倒是好,就是数据可能会失准。


RxJava2的Backpressure

Flowable特性

RxJava1中Observable支持Backpressure,由于使用难度,使用者经常会遇到MissingBackpressureException。RxJava2进行了拆分。同时Flowable遵循”reactive-streams-jvm”接口规范,与Observable在名称上略有区别。

Observable Flowable
观察源 ObservableSource Publisher
观察者 Observer Subscriber
支持Backpressure 不支持 支持
观察者.onSubscribe(xx) xx:Disposable xx:Subscription

在Subscription接口中提到,只有当下游调用了Subscription.request(n)后,上游才会发送数据。下游将自身处理数据流的能力告诉了上游,同时需要在后续时常调用request(xx),更新自身的处理能力。

有些下游的onSubscribe(subscription)中会调用subscription.request(Long.MAX_VALUE),这就意味着,告诉上游:不用考虑我的处理能力,尽情发送数据。实际上,最终的YourSubscriber一般都这样干。

FlowableObserveOn

observeOn会切换下游线程。在Observable和Flowable中的区别如下

在Observable中 在Flowable中
观察者 ObserveOnObserver ObserveOnSubscriber
观察者中的SimpleQueue SpscLinkedArrayQueue
通过链表+数组实现
无限容量,
SpscArrayQueue
通过数组实现,
有限容量
根据buffersize计算数组长度
是否有潜在背压问题 无(假定内存无限) 有(队列溢出时)

数组长度为离buffersize最近且>=buffersize大的2指数幂。
在Flowable中,Queue的容量有限。当队列已满时,offer元素会返回false,这意味着溢出。
从BaseObserveOnSubscriber的源码可以看出,FlowableOnserveOn在遇到背压问题时,就只是抛出了异常。

static final class BaseObserveOnSubscriber<T> implements FlowableSubscriber<T>
		@Override
        public final void onNext(T t) {
            ...
            if (!queue.offer(t)) {
            	...
                //抛出异常
                error = new MissingBackpressureException("Queue is full?!");
            }
        }
}
复制代码

我们先来分析ObserveOnSubscriber中的subscription.request(n)
为了研究特性,我们拷贝了FlowableRange,加了log。只有当下游request(x),则会onNext x个数据。

public final class FlowableRange2 extends Flowable<Integer> {
    static class RangeSubscription extends AtomicLong implements Subscription {
       @Override
        public final void request(long n) {
        	System.out.print(String.format("\n[request:%d]", r));
        }
    }
}

FlowableRange2(1,20)//拷贝了FlowableRange,加了log
    .observeOn(Schedulers.newThread(), false, 5)
    .subscribe { Thread.sleep(2);print("$it,") }

//输出结果:
[request:5]1,2,3,4,
[request:4]5,6,7,8,
[request:4]9,10,11,12,
[request:4]13,14,15,16,
[request:4]17,18,19,20,
复制代码

可以看到并没有每次都请求buffsize5。可以看看FlowableObserveOn代码流程,只适用当前例子。FlowableObserveOn特点:

  • FlowableObserveOn会切换线程
    • queue.offer(t),生产者在上游线程
    • runAsync运行下游消费者线程,即scheduler的新线程(在新线程中串行)
  • 有背压问题,当SpscArrayQueue已满时,插入数值会有MissingBackpressureException(“Queue is full?!”)
  • 首次预取数据个数为prefetch即bufferSize
  • 当处理的数据达到limit,则会向上游request limit个数值。limit为prefetch – (prefetch >> 2)
 public final class FlowableObserveOn<T> extends Flowable<T> implements HasUpstreamPublisher<T> {
    
    @Override
    public void subscribeActual(Subscriber<? super T> s) {
        source.subscribe(new ObserveOnSubscriber<T>(s, scheduler.createWorker(), prefetch));
    }

     static class ObserveOnSubscriber<T> extends AtomicInteger
            implements FlowableSubscriber<T>, Runnable, Subscription {	
        final Subscriber<? super T> actual; //actual: 下游的,yourSubscriber    
        Subscription s; //上游的,RangeSubscription(在我们这个例子中)
		SimpleQueue<T> queue;
        final Worker worker;
        final int prefetch;//buffSize
		//prefetch - (prefetch >> 2)
        final int limit;
        
         final AtomicLong requested;
        long produced;
       
		ObserveOnSubscriber(Subscriber<? super T> actual, Worker worker,int prefetch) {
            this.actual = actual;
            this.worker = worker;
            this.prefetch = prefetch;//5
            this.requested = new AtomicLong();
            this.limit = prefetch - (prefetch >> 2);//4
        }

        @Override
        public void onSubscribe(Subscription s) {
                this.s = s;
            	//observeOn,无锁容量有限队列,
                queue = new SpscArrayQueue<T>(prefetch);
                actual.onSubscribe(this);
				//首次request: 参数为bufferSize,5
                s.request(prefetch);
            }
        }
     
     	@Override
       //供下游yourSubscriber调用的,实际n为Long.MAX_VALUE
        public final void request(long n) {          
            BackpressureHelper.add(requested, n);
            trySchedule();
        }
     
        @Override
        public final void onNext(T t) {
            if (!queue.offer(t)) {//如果队列已满onNext会异常
                error = new MissingBackpressureException("Queue is full?!");
            }
            trySchedule();
        }
		final void trySchedule() {
            if (getAndIncrement() != 0)  return;
            worker.schedule(this);
        }
        @Override
        public final void run() { runAsync();}
        void runAsync() {
            int missed = 1;

            final Subscriber<? super T> a = actual;
            final SimpleQueue<T> q = queue;
            long e = produced;
            for (; ; ) {
                long r = requested.get();  
                //首次进入时e为0,r为Long.MAX_VALUE
                while (e != r) {
                    //取出元素
                    T v = q.poll();
                    boolean empty = v == null;
                    if (empty) break;
                    a.onNext(v);
                    e++;
                    //当消费的数量达到limit时,
                    if (e == limit) {
                        //会再次请求上游。个数为limit。
                        s.request(e);
                        e = 0L;
                    }
                }
               //w为while执行过程中trySchedule请求的次数。
                int w = get();
                if (missed == w) {
                    produced = e;
                    missed = addAndGet(-missed);
                    if (missed == 0) break;
                } else {
                    missed = w;
                }
            }
        }
    }
复制代码

BackpressureStrategy

了解Flowable这部分特性后,我们来看BackpressureStrategy 中提到的5种策略:

  • MISSING 写入OnNext事件时不会进行任何缓冲或丢弃,下游要处理溢出。
  • �ERROR 抛出 MissingBackpressureException
  • BUFFER 缓冲所有onNext值,直到下游消费该值
  • DROP 丢弃最新的onNext值
  • LATEST 只保留最新的onNext值,一直覆盖前面的值

这里有段代码举例。生产速度1毫秒/次,消费速度4毫秒/次。生产速度是消费速度的4倍。
ObserveOnSubscriber首次request为4(prefetch),当消费3(limit)个数值(12毫秒)后,会request(3)。
这就意味着上游线程在前16毫秒内只能onNext 4个数值,当再次request后onNext 3个数值。

var flowableOnSubscribe = FlowableOnSubscribe<Int> { emitter ->
    //1.FlowableEmitter每隔1毫秒发射一次值,从1到20。
	for (i in 1..20) {
    	Thread.sleep(1)
		emitter.onNext(i)
	}
}
//2.背压策略为DROP
//3.observeOn, bufferSize为4。
//4.消费,sleep4毫秒,再打印数值
Flowable.create(flowableOnSubscribe, BackpressureStrategy.DROP)
	.observeOn(Schedulers.newThread(), false, 4)
	.subscribe { Thread.sleep(4);println(it)  }
    
DROP--输出结果:
1,2,3,4,13,14,15,

LATEST--输出结果:
1,2,3,4,12,13,14,16,
复制代码

DropAsyncEmitter

abstract static class DropAsyncEmitter<T> extends BaseEmitter<T> {

        @Override
        public final void onNext(T t) {
            if (get() != 0) {  //request != 0
                actual.onNext(t);
                //每次onNext,request--;
                BackpressureHelper.produced(this, 1);
            } else {
                onOverflow();//request == 0
            }
        }
     	void onOverflow() {
            // nothing to do
        }
    }
复制代码

截屏2021-05-17 下午1.48.06.png

LatestAsyncEmitter

static final class LatestAsyncEmitter<T> extends BaseEmitter<T> {

    	//实际不是队列,只有一个值,每次onNext都会更新这个值
        final AtomicReference<T> queue = new AtomicReference<T>();
		final AtomicInteger wip;
    
    	@Override
        public final void request(long n) {
           	BackpressureHelper.add(this, n); //request += n
        }

        @Override
        public void onNext(T t) {
        //每drain一次,request--;
    	//而request为0时,onNext会更新queue的值,drain不会调用下游的onNext(value)
			queue.set(t);
            drain();
        }
    
    
        void drain() {
            if (wip.getAndIncrement() != 0) return;
            int missed = 1;
            final Subscriber<? super T> a = actual;
            final AtomicReference<T> q = queue;

            for (; ; ) {
                //在本例子中r开始为4,即下游的request(4)
                long r = get();
                long e = 0L;
              
                while (e != r) {
                    //while循环第一趟e为0,r为4,o有值
                	//while循环第一趟e为1,r为4,o没有值,跳出while
                    T o = q.getAndSet(null);
                    boolean empty = o == null;
                    if (empty) { break;}
                    a.onNext(o);
                    e++;
                }
                每次跳出while循环时,e均为1
                if (e != 0) {
                    //r--;
                    BackpressureHelper.produced(this, e);
                }
               
                missed = wip.addAndGet(-missed);
                 //由于是单线程生成,每个drain()的for都只会执行一趟
                if (missed == 0) {
                    break;
                }
            }
        }
    }
复制代码

截屏2021-05-17 下午1.48.29.png

BufferAsyncEmitter

static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {

    	//无锁,容量无限队列,帮下游缓存数值。
        final SpscLinkedArrayQueue<T> queue;
}
复制代码

MissingEmitter

static final class MissingEmitter<T> extends BaseEmitter<T> {

        @Override
        public void onNext(T t) {
            //不管下游死活,往下游传数据
            actual.onNext(t);  
        }
}
复制代码

ErrorAsyncEmitter

 static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
        @Override
        void onOverflow() {
            //当溢出时,本层即报错
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
        }

    }
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
        @Override
        public final void onNext(T t) {
            if (get() != 0) {
                actual.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                onOverflow();
            }
        }

        abstract void onOverflow();
    }
复制代码

总结

  • 背压问题的前提:上下游在不同线程。
  • 下游调用request(n),上游记录request值,即下游能够处理的个数。
  • 上游onNext(数值),request–。当request为0,上游onNext则会有背压问题。
  • 出现背压时,一般的处理策略:
    • 上游不管,直接onNext,下游自己处理溢出
    • 上游抛出异常
    • 上游缓存溢出的数值,直到下游再次更新request值。
    • 上游丢弃溢出的数值,区分是否保留最后的值。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享