Android 架构师之路21 响应式编程RxJava 线程变换原理
12 RxJava2 observeOn原理分析
12.1 RxJava2(无背压) observeOn原理分析
**
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码
**
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
s.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}
@Override
public boolean isDisposed() {
return cancelled;
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
void drainFused() {
int missed = 1;
for (;;) {
if (cancelled) {
return;
}
boolean d = done;
Throwable ex = error;
if (!delayError && d && ex != null) {
actual.onError(error);
worker.dispose();
return;
}
actual.onNext(null);
if (d) {
ex = error;
if (ex != null) {
actual.onError(ex);
} else {
actual.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (cancelled) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError) {
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
@Nullable
@Override
public T poll() throws Exception {
return queue.poll();
}
@Override
public void clear() {
queue.clear();
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
}
}
复制代码
- 继承自AbstractObservableWithUpstream
- 利用subscribeActual方法
- 创建一个新的Observer包裹旧的
- 在调用到onNext等方法时丢到线程中去执行
12.2 RxJava2(有背压) observeOn原理分析
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
复制代码
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int prefetch;
public FlowableObserveOn(
Flowable<T> source,
Scheduler scheduler,
boolean delayError,
int prefetch) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.prefetch = prefetch;
}
@Override
public void subscribeActual(Subscriber<? super T> s) {
Worker worker = scheduler.createWorker();
if (s instanceof ConditionalSubscriber) {
source.subscribe(new ObserveOnConditionalSubscriber<T>(
(ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
} else {
source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
}
}
abstract static class BaseObserveOnSubscriber<T>
extends BasicIntQueueSubscription<T>
implements FlowableSubscriber<T>, Runnable {
private static final long serialVersionUID = -8241002408341274697L;
final Worker worker;
final boolean delayError;
final int prefetch;
final int limit;
final AtomicLong requested;
Subscription s;
SimpleQueue<T> queue;
volatile boolean cancelled;
volatile boolean done;
Throwable error;
int sourceMode;
long produced;
boolean outputFused;
BaseObserveOnSubscriber(
Worker worker,
boolean delayError,
int prefetch) {
this.worker = worker;
this.delayError = delayError;
this.prefetch = prefetch;
this.requested = new AtomicLong();
this.limit = prefetch - (prefetch >> 2);
}
@Override
public final void onNext(T t) {
if (done) {
return;
}
if (sourceMode == ASYNC) {
trySchedule();
return;
}
if (!queue.offer(t)) {
s.cancel();
error = new MissingBackpressureException("Queue is full?!");
done = true;
}
trySchedule();
}
@Override
public final void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
trySchedule();
}
@Override
public final void onComplete() {
if (!done) {
done = true;
trySchedule();
}
}
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
trySchedule();
}
}
@Override
public final void cancel() {
if (cancelled) {
return;
}
cancelled = true;
s.cancel();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
final void trySchedule() {
if (getAndIncrement() != 0) {
return;
}
worker.schedule(this);
}
@Override
public final void run() {
if (outputFused) {
runBackfused();
} else if (sourceMode == SYNC) {
runSync();
} else {
runAsync();
}
}
abstract void runBackfused();
abstract void runSync();
abstract void runAsync();
final boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) {
if (cancelled) {
clear();
return true;
}
if (d) {
if (delayError) {
if (empty) {
Throwable e = error;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
Throwable e = error;
if (e != null) {
clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
```
@Override
public final int requestFusion(int requestedMode) {
if ((requestedMode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
@Override
public final void clear() {
queue.clear();
}
@Override
public final boolean isEmpty() {
return queue.isEmpty();
}
}
static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
implements FlowableSubscriber<T> {
private static final long serialVersionUID = -4547113800637756442L;
final Subscriber<? super T> actual;
ObserveOnSubscriber(
Subscriber<? super T> actual,
Worker worker,
boolean delayError,
int prefetch) {
super(worker, delayError, prefetch);
this.actual = actual;
}
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked")
QueueSubscription<T> f = (QueueSubscription<T>) s;
int m = f.requestFusion(ANY | BOUNDARY);
if (m == SYNC) {
sourceMode = SYNC;
queue = f;
done = true;
actual.onSubscribe(this);
return;
} else
if (m == ASYNC) {
sourceMode = ASYNC;
queue = f;
actual.onSubscribe(this);
s.request(prefetch);
return;
}
}
queue = new SpscArrayQueue<T>(prefetch);
actual.onSubscribe(this);
s.request(prefetch);
}
}
@Override
void runSync() {
int missed = 1;
final Subscriber<? super T> a = actual;
final SimpleQueue<T> q = queue;
long e = produced;
for (;;) {
long r = requested.get();
while (e != r) {
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
a.onError(ex);
worker.dispose();
return;
}
if (cancelled) {
return;
}
if (v == null) {
a.onComplete();
worker.dispose();
return;
}
a.onNext(v);
e++;
}
if (cancelled) {
return;
}
if (q.isEmpty()) {
a.onComplete();
worker.dispose();
return;
}
int w = get();
if (missed == w) {
produced = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
@Override
void runAsync() {
int missed = 1;
final Subscriber<? super T> a = actual;
final SimpleQueue<T> q = queue;
long e = produced;
for (;;) {
long r = requested.get();
while (e != r) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
e++;
if (e == limit) {
if (r != Long.MAX_VALUE) {
r = requested.addAndGet(-e);
}
s.request(e);
e = 0L;
}
}
```
if (e == r && checkTerminated(done, q.isEmpty(), a)) {
return;
}
int w = get();
if (missed == w) {
produced = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
@Override
void runBackfused() {
int missed = 1;
for (;;) {
if (cancelled) {
return;
}
boolean d = done;
actual.onNext(null);
if (d) {
Throwable e = error;
if (e != null) {
actual.onError(e);
} else {
actual.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Nullable
@Override
public T poll() throws Exception {
T v = queue.poll();
if (v != null && sourceMode != SYNC) {
long p = produced + 1;
if (p == limit) {
produced = 0;
s.request(p);
} else {
produced = p;
}
}
return v;
}
}
static final class ObserveOnConditionalSubscriber<T>
extends BaseObserveOnSubscriber<T> {
private static final long serialVersionUID = 644624475404284533L;
final ConditionalSubscriber<? super T> actual;
long consumed;
ObserveOnConditionalSubscriber(
ConditionalSubscriber<? super T> actual,
Worker worker,
boolean delayError,
int prefetch) {
super(worker, delayError, prefetch);
this.actual = actual;
}
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked")
QueueSubscription<T> f = (QueueSubscription<T>) s;
int m = f.requestFusion(ANY | BOUNDARY);
if (m == SYNC) {
sourceMode = SYNC;
queue = f;
done = true;
actual.onSubscribe(this);
return;
} else
if (m == ASYNC) {
sourceMode = ASYNC;
queue = f;
actual.onSubscribe(this);
s.request(prefetch);
return;
}
}
queue = new SpscArrayQueue<T>(prefetch);
actual.onSubscribe(this);
s.request(prefetch);
}
}
@Override
void runSync() {
int missed = 1;
final ConditionalSubscriber<? super T> a = actual;
final SimpleQueue<T> q = queue;
long e = produced;
for (;;) {
long r = requested.get();
while (e != r) {
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
a.onError(ex);
worker.dispose();
return;
}
if (cancelled) {
return;
}
if (v == null) {
a.onComplete();
worker.dispose();
return;
}
if (a.tryOnNext(v)) {
e++;
}
}
if (cancelled) {
return;
}
if (q.isEmpty()) {
a.onComplete();
worker.dispose();
return;
}
int w = get();
if (missed == w) {
produced = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
```
@Override
void runAsync() {
int missed = 1;
final ConditionalSubscriber<? super T> a = actual;
final SimpleQueue<T> q = queue;
long emitted = produced;
long polled = consumed;
for (;;) {
long r = requested.get();
while (emitted != r) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
if (a.tryOnNext(v)) {
emitted++;
}
polled++;
if (polled == limit) {
s.request(polled);
polled = 0L;
}
}
if (emitted == r && checkTerminated(done, q.isEmpty(), a)) {
return;
}
int w = get();
if (missed == w) {
produced = emitted;
consumed = polled;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
@Override
void runBackfused() {
int missed = 1;
for (;;) {
if (cancelled) {
return;
}
boolean d = done;
actual.onNext(null);
if (d) {
Throwable e = error;
if (e != null) {
actual.onError(e);
} else {
actual.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Nullable
@Override
public T poll() throws Exception {
T v = queue.poll();
if (v != null && sourceMode != SYNC) {
long p = consumed + 1;
if (p == limit) {
consumed = 0;
s.request(p);
} else {
consumed = p;
}
}
return v;
}
}
}
复制代码
- 继承自AbstractFlowableWithUpstream
- 利用subscribeActual方法
- 创建一个新的Subscriber包裹旧的.
- 在调用到onNext等方法时丢到线程中去执行
13 RxJava1 observeOn仿写
**
/**
* Created by Xionghu on 2018/6/14.
* Desc: 用于CallbackOn
*/
public class OperatorCallbackOn<T> implements Caller.Operator<T, T> {
private final Switcher switcher;
public OperatorCallbackOn(Switcher switcher) {
this.switcher = switcher;
}
@Override
public Receiver<T> call(final Receiver<T> tReceiver) {
return new CallbackOnReceiver<>(tReceiver, switcher);
}
private static final class CallbackOnReceiver<T> extends Receiver<T> implements Action0 {
private final Receiver<T> tReceiver;
private final Switcher.Worker worker;
private final Queue<T> tQueue = new LinkedList<>();
public CallbackOnReceiver(Receiver<T> tReceiver, Switcher switcher) {
this.tReceiver = tReceiver;
this.worker = switcher.createWorker();
}
@Override
public void call() {
T t = tQueue.poll(); //移除元素,如果队列为空,则返回null
tReceiver.onReceive(t);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onReceive(T t) {
tQueue.offer(t);// offer:添加一个元素并返回true 如果队列已满,则返回false
switches();
}
private void switches() {
worker.switches(this);
}
}
}
复制代码
OperatorCallbackOn
- 持有Switcher
- call方法中返回用于callbackOn的Receiver
CallbackOnReceiver
- 持有原Caller和Switcher
- 在onReceive等方法中做调度
- 调度后用原Receiver再调用onReceive
实例
**
/**
* Created by Xionghu on 2018/6/11.
* Desc: .RxJava1 observeOn
*/
public class Lesson3_4Activity extends AppCompatActivity {
public static final String TAG = "kpioneer";
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_custom_test);
ButterKnife.bind(this);
}
@OnClick(R.id.testDo)
public void onViewClicked() {
Caller.
create(new Caller.OnCall<String>() {
@Override
public void call(Receiver<String> stringReceiver) {
stringReceiver.onReceive("test");
Log.d(TAG, "currentThread:" + Thread.currentThread());
stringReceiver.onCompleted();
}
}).
callOn(new NewThreadSwitcher()).
callbackOn(new LooperSwitcher(getMainLooper())).
call(new Receiver<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onReceive(String s) {
Log.d(TAG, "onReceive:" + s);
Log.d(TAG, "currentThread:" + Thread.currentThread());
}
});
}
}
复制代码
**
06-19 10:05:36.245 11194-11346/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[ NewThreadWorker,5,main]
06-19 10:05:36.265 11194-11194/com.haocai.rxjavademo D/kpioneer: onReceive:test
06-19 10:05:36.265 11194-11194/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[main,5,main]
复制代码
14 RxJava2 observeOn仿写
14.1 RxJava2(无背压) observeOn
public Caller<T> callbackOn(Switcher switcher) {
return new CallerCallbackOn<>(this, switcher);
}
复制代码
/**
* Created by Xionghu on 2018/6/19.
* Desc: 用于callbackon
*/
public class CallerCallbackOn<T> extends CallerWithUpstream<T, T> {
private final Switcher mSwitcher;
public CallerCallbackOn(Caller<T> source, Switcher mSwitcher) {
super(source);
this.mSwitcher = mSwitcher;
}
@Override
protected void callActual(Callee<T> callee) {
source.call(new CallbackOnCallee<T>(callee, mSwitcher));
}
private static final class CallbackOnCallee<T> implements Callee<T>, Runnable {
private final Callee<T> mCallee;
private final Switcher.Worker worker;
private final Queue<T> tQueue = new LinkedList<>();
public CallbackOnCallee(Callee<T> mCallee, Switcher switcher) {
this.mCallee = mCallee;
this.worker = switcher.createWorker();
}
@Override
public void onCall(Release release) {
}
@Override
public void onReceive(T t) {
tQueue.offer(t);
switches();
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable t) {
}
@Override
public void run() {
T t = tQueue.poll();
mCallee.onReceive(t);
}
private void switches() {
worker.switches(this);
}
}
}
复制代码
CallerCallbackOn
- 持有Switcher
- 实现callActual方法
- 用原有Caller去call一个用于线程切换的Callee
CallbackOnCallee
- 持有原Callee和Switcher
- 在onReceive等方法中做调度
- 调度后用原Callee在调用onReceive
14.2 RxJava2(有背压) observeOn
public Telephoner<T> callbackOn(Switcher switcher){
return new TelephonerCallbackOn<>(this,switcher);
}
复制代码
/**
* Created by Xionghu on 2018/6/19.
* Desc:用于callbackon
*/
public class TelephonerCallbackOn<T> extends TelephonerWithUpstream<T, T> {
private final Switcher mSwitcher;
public TelephonerCallbackOn(Telephoner<T> source, Switcher mSwitcher) {
super(source);
this.mSwitcher = mSwitcher;
}
@Override
protected void callActual(Receiver<T> receiver) {
source.call(new CallbackOnReceiver<>(receiver, mSwitcher));
}
private static final class CallbackOnReceiver<T> implements Receiver<T>, Runnable {
private final Receiver<T> tReceiver;
private final Switcher.Worker worker;
private final Queue<T> tQueue = new LinkedList<>();
public CallbackOnReceiver(Receiver<T> tReceiver, Switcher switcher) {
this.tReceiver = tReceiver;
this.worker = switcher.createWorker();
}
@Override
public void onCall(Drop d) {
tReceiver.onCall(d);
}
@Override
public void onReceive(T t) {
tQueue.offer(t);
switches();
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
@Override
public void run() {
T t = tQueue.poll();
tReceiver.onReceive(t);
}
private void switches() {
worker.switches(this);
}
}
}
复制代码
TelephonerCallbackOn
- 持有Switcher
- 实现callActual方法
- 用原Telephoner去call一个用于线程切换的Receiver
CallbackOnReceiver
- 持有原Telephoner和Switcher
- 在onReceive等方法中做调度
- 调度后用原Receiver再调用onReceive
实例
/**
* Created by Xionghu on 2018/6/11.
* Desc: .RxJava2 observeOn仿写
*/
public class Lesson3_5Activity extends AppCompatActivity {
public static final String TAG = "kpioneer";
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_custom_test);
ButterKnife.bind(this);
}
@OnClick(R.id.testDo)
public void onViewClicked() {
/*---------无背压---------*/
Caller.
create(new CallerOnCall<String>() {
@Override
public void call(CallerEmitter<String> callerEmitter) {
callerEmitter.onReceive("test");
Log.d(TAG, "无背压 currentThread:" + Thread.currentThread());
callerEmitter.onCompleted();
}
}).
callOn(new NewThreadSwitcher()).
callbackOn(new LooperSwitcher(getMainLooper())).
call(new Callee<String>() {
@Override
public void onCall(Release release) {
}
@Override
public void onReceive(String s) {
Log.d(TAG, "无背压 onReceive:" + s);
Log.d(TAG, "无背压 currentThread:" + Thread.currentThread());
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable t) {
}
});
/*---------有背压---------*/
Telephoner.
create(new TelephonerOnCall<String>() {
@Override
public void call(TelephonerEmitter<String> telephonerEmitter) {
telephonerEmitter.onReceive("test");
Log.d(TAG, "有背压 currentThread:" + Thread.currentThread());
}
}).
callOn(new NewThreadSwitcher()).
callbackOn(new LooperSwitcher(getMainLooper())).
call(new Receiver<String>() {
@Override
public void onCall(Drop d) {
d.request(Long.MAX_VALUE);
}
@Override
public void onReceive(String s) {
Log.d(TAG, "有背压 onReceive:" + s);
Log.d(TAG, "有背压 currentThread:" + Thread.currentThread());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
});
}
}
复制代码
06-19 13:19:29.185 25626-25771/com.haocai.rxjavademo D/kpioneer: 无背压 currentThread:Thread[NewThreadWorker,5,main]
06-19 13:19:29.195 25626-25626/com.haocai.rxjavademo D/kpioneer: 无背压 onReceive:test
06-19 13:19:29.195 25626-25626/com.haocai.rxjavademo D/kpioneer: 无背压 currentThread:Thread[main,5,main]
06-19 13:19:29.205 25626-25772/com.haocai.rxjavademo D/kpioneer: 有背压 currentThread:Thread[NewThreadWorker,5,main]
06-19 13:19:29.205 25626-25626/com.haocai.rxjavademo D/kpioneer: 有背压 onReceive:test
06-19 13:19:29.205 25626-25626/com.haocai.rxjavademo D/kpioneer: 有背压 currentThread:Thread[main,5,main]
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END