RxSwift源码解析6: 操作符 Concat (一)

Concat官方介绍

整体流程:将 Sequence ([observable1, observable2]) 中的 Element 放入 FIFO 的 Queue 中。内部创建 Sink 对 Sequence.Element 逐个订阅并转发,订阅后一个的前提是,前一个发出 onComplete。依次完成对所有 Sequence.Element 订阅。需要注意一点:如果其中某个元素发出 error,会终止对整个序列的订阅。

Example:

        Observable.just(1).concat(Observable.just(2)).concat(Observable.just(3)).subscribe { value in
            print(value)
        }
        .disposed(by: disposeBag)
        
        打印:
        next(1)
        next(2)
        next(3)
        completed
复制代码

对象方法定义:

extension ObservableType {
    public func concat<Source: ObservableConvertibleType>(_ second: Source) -> Observable<Element> where Source.Element == Element {
        Observable.concat([self.asObservable(), second.asObservable()])
    }
}
复制代码

类方法定义:

extension ObservableType {
    public static func concat<Sequence: Swift.Sequence>(_ sequence: Sequence) -> Observable<Element>
        where Sequence.Element == Observable<Element> {
            return Concat(sources: sequence, count: nil)
    }

    public static func concat<Collection: Swift.Collection>(_ collection: Collection) -> Observable<Element>
        where Collection.Element == Observable<Element> {
            return Concat(sources: collection, count: Int64(collection.count))
    }

    public static func concat(_ sources: Observable<Element> ...) -> Observable<Element> {
        Concat(sources: sources, count: Int64(sources.count))
    }
}
复制代码

这几个方法都是用不同的形式生成 Concat 对象

Concat:

// 继承自 Producer。提供初始化方法,和 重写 run 方法。
final private class Concat<Sequence: Swift.Sequence>: Producer<Sequence.Element.Element> where Sequence.Element: ObservableConvertibleType {
    typealias Element = Sequence.Element.Element
    
    fileprivate let sources: Sequence
    fileprivate let count: IntMax?

    init(sources: Sequence, count: IntMax?) {
        self.sources = sources
        self.count = count
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ConcatSink<Sequence, Observer>(observer: observer, cancel: cancel)
        let subscription = sink.run((self.sources.makeIterator(), self.count))
        return (sink: sink, subscription: subscription)
    }
}
复制代码

文章开头的例子:

Observable.just(1).concat(Observable.just(2)).concat(Observable.just(3))
复制代码

返回的 Concat 实例,属性 sources 格式应该是这样的"()" 中的内容表示Concat sources 的值

[Concat([just1, just2]), just3]
复制代码

以此类推,如果有四个元素,格式将成这样:

[Concat([Concat([just1, just2]), just3]), just4]
复制代码

怎么将该 Sequence 解开,依次按顺序拿到 just1 ,just2, just3 …

let subscription = sink.run((self.sources.makeIterator(), self.count))
复制代码

此方法传入 Sequence 的迭代器,和元素个数,内部处理,将 Sequence 解开

来看下 Sink:

final private class ConcatSink<Sequence: Swift.Sequence, Observer: ObserverType>
    : TailRecursiveSink<Sequence, Observer>
    , ObserverType where Sequence.Element: ObservableConvertibleType, Sequence.Element.Element == Observer.Element {
    typealias Element = Observer.Element 
    
    override init(observer: Observer, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }
    
    func on(_ event: Event<Element>){
        switch event {
        case .next:
            self.forwardOn(event)
        case .error:
            self.forwardOn(event)
            self.dispose()
        case .completed:
            self.schedule(.moveNext)
        }
    }

    override func subscribeToNext(_ source: Observable<Element>) -> Disposable {
        source.subscribe(self)
    }
    
    override func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
        if let source = observable as? Concat<Sequence> {
            return (source.sources.makeIterator(), source.count)
        }
        else {
            return nil
        }
    }
}
复制代码

此类主要完成以下操作:

    override func subscribeToNext(_ source: Observable<Element>) -> Disposable {
        source.subscribe(self)
    }
复制代码

从 FIFO 的 Queue 队列中取出元素,订阅

func on(_ event: Event<Element>){
        switch event {
        case .next:
            self.forwardOn(event)
        case .error:
            self.forwardOn(event)
            self.dispose()
        case .completed:
            self.schedule(.moveNext)
        }
    }
复制代码

转发订阅内容。如果收到 completed , 从队列 Queue 取下一个元素,订阅。

    override func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
        if let source = observable as? Concat<Sequence> {
            return (source.sources.makeIterator(), source.count)
        }
        else {
            return nil
        }
    }
复制代码

返回 迭代器 和 序列元素个数。还是前面说的 Squence 嵌套,解开 Squence 时,会递归调用这里。这里的参数 observable 类似[Concat([Concat([just1, just2]), just3]), just4]) 中的 Concat([just1, just2])

———- 未完———

父类:TailRecursiveSink 是一个公用的类,也比较复杂,下一节单独分析。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享