整体流程:将  Sequence ([observable1, observable2]) 中的 Element 放入 FIFO 的 Queue 中。内部创建 Sink 对 Sequence.Element 逐个订阅并转发,订阅后一个的前提是,前一个发出 onComplete。依次完成对所有 Sequence.Element 订阅。需要注意一点:如果其中某个元素发出 error,会终止对整个序列的订阅。
Example:
        Observable.just(1).concat(Observable.just(2)).concat(Observable.just(3)).subscribe { value in
            print(value)
        }
        .disposed(by: disposeBag)
        
        打印:
        next(1)
        next(2)
        next(3)
        completed
复制代码对象方法定义:
extension ObservableType {
    public func concat<Source: ObservableConvertibleType>(_ second: Source) -> Observable<Element> where Source.Element == Element {
        Observable.concat([self.asObservable(), second.asObservable()])
    }
}
复制代码类方法定义:
extension ObservableType {
    public static func concat<Sequence: Swift.Sequence>(_ sequence: Sequence) -> Observable<Element>
        where Sequence.Element == Observable<Element> {
            return Concat(sources: sequence, count: nil)
    }
    public static func concat<Collection: Swift.Collection>(_ collection: Collection) -> Observable<Element>
        where Collection.Element == Observable<Element> {
            return Concat(sources: collection, count: Int64(collection.count))
    }
    public static func concat(_ sources: Observable<Element> ...) -> Observable<Element> {
        Concat(sources: sources, count: Int64(sources.count))
    }
}
复制代码这几个方法都是用不同的形式生成 Concat 对象
Concat:
// 继承自 Producer。提供初始化方法,和 重写 run 方法。
final private class Concat<Sequence: Swift.Sequence>: Producer<Sequence.Element.Element> where Sequence.Element: ObservableConvertibleType {
    typealias Element = Sequence.Element.Element
    
    fileprivate let sources: Sequence
    fileprivate let count: IntMax?
    init(sources: Sequence, count: IntMax?) {
        self.sources = sources
        self.count = count
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = ConcatSink<Sequence, Observer>(observer: observer, cancel: cancel)
        let subscription = sink.run((self.sources.makeIterator(), self.count))
        return (sink: sink, subscription: subscription)
    }
}
复制代码文章开头的例子:
Observable.just(1).concat(Observable.just(2)).concat(Observable.just(3))
复制代码返回的 Concat 实例,属性 sources 格式应该是这样的"()" 中的内容表示Concat sources 的值:
[Concat([just1, just2]), just3]
复制代码以此类推,如果有四个元素,格式将成这样:
[Concat([Concat([just1, just2]), just3]), just4]
复制代码怎么将该 Sequence 解开,依次按顺序拿到 just1 ,just2, just3 …
let subscription = sink.run((self.sources.makeIterator(), self.count))
复制代码此方法传入 Sequence 的迭代器,和元素个数,内部处理,将 Sequence 解开
来看下 Sink:
final private class ConcatSink<Sequence: Swift.Sequence, Observer: ObserverType>
    : TailRecursiveSink<Sequence, Observer>
    , ObserverType where Sequence.Element: ObservableConvertibleType, Sequence.Element.Element == Observer.Element {
    typealias Element = Observer.Element 
    
    override init(observer: Observer, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }
    
    func on(_ event: Event<Element>){
        switch event {
        case .next:
            self.forwardOn(event)
        case .error:
            self.forwardOn(event)
            self.dispose()
        case .completed:
            self.schedule(.moveNext)
        }
    }
    override func subscribeToNext(_ source: Observable<Element>) -> Disposable {
        source.subscribe(self)
    }
    
    override func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
        if let source = observable as? Concat<Sequence> {
            return (source.sources.makeIterator(), source.count)
        }
        else {
            return nil
        }
    }
}
复制代码此类主要完成以下操作:
    override func subscribeToNext(_ source: Observable<Element>) -> Disposable {
        source.subscribe(self)
    }
复制代码从 FIFO 的 Queue 队列中取出元素,订阅
func on(_ event: Event<Element>){
        switch event {
        case .next:
            self.forwardOn(event)
        case .error:
            self.forwardOn(event)
            self.dispose()
        case .completed:
            self.schedule(.moveNext)
        }
    }
复制代码转发订阅内容。如果收到 completed , 从队列 Queue 取下一个元素,订阅。
    override func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
        if let source = observable as? Concat<Sequence> {
            return (source.sources.makeIterator(), source.count)
        }
        else {
            return nil
        }
    }
复制代码返回 迭代器 和 序列元素个数。还是前面说的 Squence 嵌套,解开 Squence 时,会递归调用这里。这里的参数 observable 类似[Concat([Concat([just1, just2]), just3]), just4])  中的 Concat([just1, just2])
———- 未完———
父类:TailRecursiveSink 是一个公用的类,也比较复杂,下一节单独分析。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END
    






















![[桜井宁宁]COS和泉纱雾超可爱写真福利集-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/4d3cf227a85d7e79f5d6b4efb6bde3e8.jpg)

![[桜井宁宁] 爆乳奶牛少女cos写真-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/d40483e126fcf567894e89c65eaca655.jpg)
