Back-pressure
名词来源
Back pressure is a resistance or force opposing the desired flow of fluid through pipes.
背压是抵挡流体在管道中流动的阻力。
如下图所示,Pipe1和Pipe2在起始处有着相同的起始压力和距离。Pipe2呢,就遇到了三种backpressure。
软件概念
背压就是抵挡数据流动的阻力。
背压问题的出现,需要同时满足两个条件:
- 上下游不在同一个线程
- 下游处理速度慢于上游
背压策略
当我们聊背压时,不仅是背压问题,也包括其处理策略。
那通常我们有哪些背压策略?
- control producer,控制生产者速度。生产者跟消费者又不是一家,这个挺为难的。
- Buffer,缓冲数据。如果是有限容量缓存,那就总有溢出的那一天
- Drop,丢弃数据。这个倒是好,就是数据可能会失准。
RxJava2的Backpressure
Flowable特性
RxJava1中Observable支持Backpressure,由于使用难度,使用者经常会遇到MissingBackpressureException。RxJava2进行了拆分。同时Flowable遵循”reactive-streams-jvm”接口规范,与Observable在名称上略有区别。
| Observable | Flowable |
---|---|---|
观察源 | ObservableSource | Publisher |
观察者 | Observer | Subscriber |
支持Backpressure | 不支持 | 支持 |
观察者.onSubscribe(xx) | xx:Disposable | xx:Subscription |
在Subscription接口中提到,只有当下游调用了Subscription.request(n)后,上游才会发送数据。下游将自身处理数据流的能力告诉了上游,同时需要在后续时常调用request(xx),更新自身的处理能力。
有些下游的onSubscribe(subscription)中会调用subscription.request(Long.MAX_VALUE),这就意味着,告诉上游:不用考虑我的处理能力,尽情发送数据。实际上,最终的YourSubscriber一般都这样干。
FlowableObserveOn
observeOn会切换下游线程。在Observable和Flowable中的区别如下
在Observable中 | 在Flowable中 | |
---|---|---|
观察者 | ObserveOnObserver | ObserveOnSubscriber |
观察者中的SimpleQueue | SpscLinkedArrayQueue 通过链表+数组实现 无限容量, |
SpscArrayQueue 通过数组实现, 有限容量 根据buffersize计算数组长度 |
是否有潜在背压问题 | 无(假定内存无限) | 有(队列溢出时) |
数组长度为离buffersize最近且>=buffersize大的2指数幂。
在Flowable中,Queue的容量有限。当队列已满时,offer元素会返回false,这意味着溢出。
从BaseObserveOnSubscriber的源码可以看出,FlowableOnserveOn在遇到背压问题时,就只是抛出了异常。
static final class BaseObserveOnSubscriber<T> implements FlowableSubscriber<T>
@Override
public final void onNext(T t) {
...
if (!queue.offer(t)) {
...
//抛出异常
error = new MissingBackpressureException("Queue is full?!");
}
}
}
复制代码
我们先来分析ObserveOnSubscriber中的subscription.request(n)
为了研究特性,我们拷贝了FlowableRange,加了log。只有当下游request(x),则会onNext x个数据。
public final class FlowableRange2 extends Flowable<Integer> {
static class RangeSubscription extends AtomicLong implements Subscription {
@Override
public final void request(long n) {
System.out.print(String.format("\n[request:%d]", r));
}
}
}
FlowableRange2(1,20)//拷贝了FlowableRange,加了log
.observeOn(Schedulers.newThread(), false, 5)
.subscribe { Thread.sleep(2);print("$it,") }
//输出结果:
[request:5]1,2,3,4,
[request:4]5,6,7,8,
[request:4]9,10,11,12,
[request:4]13,14,15,16,
[request:4]17,18,19,20,
复制代码
可以看到并没有每次都请求buffsize5。可以看看FlowableObserveOn代码流程,只适用当前例子。FlowableObserveOn特点:
- FlowableObserveOn会切换线程
- queue.offer(t),生产者在上游线程
- runAsync运行下游消费者线程,即scheduler的新线程(在新线程中串行)
- 有背压问题,当SpscArrayQueue已满时,插入数值会有MissingBackpressureException(“Queue is full?!”)
- 首次预取数据个数为prefetch即bufferSize
- 当处理的数据达到limit,则会向上游request limit个数值。limit为prefetch – (prefetch >> 2)
public final class FlowableObserveOn<T> extends Flowable<T> implements HasUpstreamPublisher<T> {
@Override
public void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new ObserveOnSubscriber<T>(s, scheduler.createWorker(), prefetch));
}
static class ObserveOnSubscriber<T> extends AtomicInteger
implements FlowableSubscriber<T>, Runnable, Subscription {
final Subscriber<? super T> actual; //actual: 下游的,yourSubscriber
Subscription s; //上游的,RangeSubscription(在我们这个例子中)
SimpleQueue<T> queue;
final Worker worker;
final int prefetch;//buffSize
//prefetch - (prefetch >> 2)
final int limit;
final AtomicLong requested;
long produced;
ObserveOnSubscriber(Subscriber<? super T> actual, Worker worker,int prefetch) {
this.actual = actual;
this.worker = worker;
this.prefetch = prefetch;//5
this.requested = new AtomicLong();
this.limit = prefetch - (prefetch >> 2);//4
}
@Override
public void onSubscribe(Subscription s) {
this.s = s;
//observeOn,无锁容量有限队列,
queue = new SpscArrayQueue<T>(prefetch);
actual.onSubscribe(this);
//首次request: 参数为bufferSize,5
s.request(prefetch);
}
}
@Override
//供下游yourSubscriber调用的,实际n为Long.MAX_VALUE
public final void request(long n) {
BackpressureHelper.add(requested, n);
trySchedule();
}
@Override
public final void onNext(T t) {
if (!queue.offer(t)) {//如果队列已满onNext会异常
error = new MissingBackpressureException("Queue is full?!");
}
trySchedule();
}
final void trySchedule() {
if (getAndIncrement() != 0) return;
worker.schedule(this);
}
@Override
public final void run() { runAsync();}
void runAsync() {
int missed = 1;
final Subscriber<? super T> a = actual;
final SimpleQueue<T> q = queue;
long e = produced;
for (; ; ) {
long r = requested.get();
//首次进入时e为0,r为Long.MAX_VALUE
while (e != r) {
//取出元素
T v = q.poll();
boolean empty = v == null;
if (empty) break;
a.onNext(v);
e++;
//当消费的数量达到limit时,
if (e == limit) {
//会再次请求上游。个数为limit。
s.request(e);
e = 0L;
}
}
//w为while执行过程中trySchedule请求的次数。
int w = get();
if (missed == w) {
produced = e;
missed = addAndGet(-missed);
if (missed == 0) break;
} else {
missed = w;
}
}
}
}
复制代码
BackpressureStrategy
了解Flowable这部分特性后,我们来看BackpressureStrategy 中提到的5种策略:
- MISSING 写入OnNext事件时不会进行任何缓冲或丢弃,下游要处理溢出。
- �ERROR 抛出 MissingBackpressureException
- BUFFER 缓冲所有onNext值,直到下游消费该值
- DROP 丢弃最新的onNext值
- LATEST 只保留最新的onNext值,一直覆盖前面的值
这里有段代码举例。生产速度1毫秒/次,消费速度4毫秒/次。生产速度是消费速度的4倍。
ObserveOnSubscriber首次request为4(prefetch),当消费3(limit)个数值(12毫秒)后,会request(3)。
这就意味着上游线程在前16毫秒内只能onNext 4个数值,当再次request后onNext 3个数值。
var flowableOnSubscribe = FlowableOnSubscribe<Int> { emitter ->
//1.FlowableEmitter每隔1毫秒发射一次值,从1到20。
for (i in 1..20) {
Thread.sleep(1)
emitter.onNext(i)
}
}
//2.背压策略为DROP
//3.observeOn, bufferSize为4。
//4.消费,sleep4毫秒,再打印数值
Flowable.create(flowableOnSubscribe, BackpressureStrategy.DROP)
.observeOn(Schedulers.newThread(), false, 4)
.subscribe { Thread.sleep(4);println(it) }
DROP--输出结果:
1,2,3,4,13,14,15,
LATEST--输出结果:
1,2,3,4,12,13,14,16,
复制代码
DropAsyncEmitter
abstract static class DropAsyncEmitter<T> extends BaseEmitter<T> {
@Override
public final void onNext(T t) {
if (get() != 0) { //request != 0
actual.onNext(t);
//每次onNext,request--;
BackpressureHelper.produced(this, 1);
} else {
onOverflow();//request == 0
}
}
void onOverflow() {
// nothing to do
}
}
复制代码
LatestAsyncEmitter
static final class LatestAsyncEmitter<T> extends BaseEmitter<T> {
//实际不是队列,只有一个值,每次onNext都会更新这个值
final AtomicReference<T> queue = new AtomicReference<T>();
final AtomicInteger wip;
@Override
public final void request(long n) {
BackpressureHelper.add(this, n); //request += n
}
@Override
public void onNext(T t) {
//每drain一次,request--;
//而request为0时,onNext会更新queue的值,drain不会调用下游的onNext(value)
queue.set(t);
drain();
}
void drain() {
if (wip.getAndIncrement() != 0) return;
int missed = 1;
final Subscriber<? super T> a = actual;
final AtomicReference<T> q = queue;
for (; ; ) {
//在本例子中r开始为4,即下游的request(4)
long r = get();
long e = 0L;
while (e != r) {
//while循环第一趟e为0,r为4,o有值
//while循环第一趟e为1,r为4,o没有值,跳出while
T o = q.getAndSet(null);
boolean empty = o == null;
if (empty) { break;}
a.onNext(o);
e++;
}
每次跳出while循环时,e均为1
if (e != 0) {
//r--;
BackpressureHelper.produced(this, e);
}
missed = wip.addAndGet(-missed);
//由于是单线程生成,每个drain()的for都只会执行一趟
if (missed == 0) {
break;
}
}
}
}
复制代码
BufferAsyncEmitter
static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {
//无锁,容量无限队列,帮下游缓存数值。
final SpscLinkedArrayQueue<T> queue;
}
复制代码
MissingEmitter
static final class MissingEmitter<T> extends BaseEmitter<T> {
@Override
public void onNext(T t) {
//不管下游死活,往下游传数据
actual.onNext(t);
}
}
复制代码
ErrorAsyncEmitter
static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
@Override
void onOverflow() {
//当溢出时,本层即报错
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
}
}
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
@Override
public final void onNext(T t) {
if (get() != 0) {
actual.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
abstract void onOverflow();
}
复制代码
总结
- 背压问题的前提:上下游在不同线程。
- 下游调用request(n),上游记录request值,即下游能够处理的个数。
- 上游onNext(数值),request–。当request为0,上游onNext则会有背压问题。
- 出现背压时,一般的处理策略:
- 上游不管,直接onNext,下游自己处理溢出
- 上游抛出异常
- 上游缓存溢出的数值,直到下游再次更新request值。
- 上游丢弃溢出的数值,区分是否保留最后的值。