这个比较简单,一贯的套路,生成中间的 TakeCountSink 作为源序列的实际订阅者,收到 next 事件就将计数 -1 ,计数为0时,调用 dispose 释放资源。(sink 将信号处理后转发给订阅者)
extension ObservableType {
/**
Returns a specified number of contiguous elements from the start of an observable sequence.
- seealso: [take operator on reactivex.io](http://reactivex.io/documentation/operators/take.html)
- parameter count: The number of elements to return.
- returns: An observable sequence that contains the specified number of elements from the start of the input sequence.
*/
public func take(_ count: Int)
-> Observable<Element> {
if count == 0 {
return Observable.empty()
}
else {
return TakeCount(source: self.asObservable(), count: count)
}
}
}
final private class TakeCountSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Element = Observer.Element
typealias Parent = TakeCount<Element>
private let parent: Parent
private var remaining: Int
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self.parent = parent
self.remaining = parent.count
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>) {
switch event {
case .next(let value):
if self.remaining > 0 {
self.remaining -= 1
self.forwardOn(.next(value))
if self.remaining == 0 {
self.forwardOn(.completed)
self.dispose()
}
}
case .error:
self.forwardOn(event)
self.dispose()
case .completed:
self.forwardOn(event)
self.dispose()
}
}
}
final private class TakeCount<Element>: Producer<Element> {
private let source: Observable<Element>
fileprivate let count: Int
init(source: Observable<Element>, count: Int) {
if count < 0 {
rxFatalError("count can't be negative")
}
self.source = source
self.count = count
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = TakeCountSink(parent: self, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END