整体流程:将 Sequence ([observable1, observable2]
) 中的 Element 放入 FIFO 的 Queue 中。内部创建 Sink 对 Sequence.Element 逐个订阅并转发,订阅后一个的前提是,前一个发出 onComplete。依次完成对所有 Sequence.Element 订阅。需要注意一点:如果其中某个元素发出 error
,会终止对整个序列的订阅。
Example:
Observable.just(1).concat(Observable.just(2)).concat(Observable.just(3)).subscribe { value in
print(value)
}
.disposed(by: disposeBag)
打印:
next(1)
next(2)
next(3)
completed
复制代码
对象方法定义:
extension ObservableType {
public func concat<Source: ObservableConvertibleType>(_ second: Source) -> Observable<Element> where Source.Element == Element {
Observable.concat([self.asObservable(), second.asObservable()])
}
}
复制代码
类方法定义:
extension ObservableType {
public static func concat<Sequence: Swift.Sequence>(_ sequence: Sequence) -> Observable<Element>
where Sequence.Element == Observable<Element> {
return Concat(sources: sequence, count: nil)
}
public static func concat<Collection: Swift.Collection>(_ collection: Collection) -> Observable<Element>
where Collection.Element == Observable<Element> {
return Concat(sources: collection, count: Int64(collection.count))
}
public static func concat(_ sources: Observable<Element> ...) -> Observable<Element> {
Concat(sources: sources, count: Int64(sources.count))
}
}
复制代码
这几个方法都是用不同的形式生成 Concat
对象
Concat:
// 继承自 Producer。提供初始化方法,和 重写 run 方法。
final private class Concat<Sequence: Swift.Sequence>: Producer<Sequence.Element.Element> where Sequence.Element: ObservableConvertibleType {
typealias Element = Sequence.Element.Element
fileprivate let sources: Sequence
fileprivate let count: IntMax?
init(sources: Sequence, count: IntMax?) {
self.sources = sources
self.count = count
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ConcatSink<Sequence, Observer>(observer: observer, cancel: cancel)
let subscription = sink.run((self.sources.makeIterator(), self.count))
return (sink: sink, subscription: subscription)
}
}
复制代码
文章开头的例子:
Observable.just(1).concat(Observable.just(2)).concat(Observable.just(3))
复制代码
返回的 Concat 实例,属性 sources 格式应该是这样的"()" 中的内容表示Concat sources 的值
:
[Concat([just1, just2]), just3]
复制代码
以此类推,如果有四个元素,格式将成这样:
[Concat([Concat([just1, just2]), just3]), just4]
复制代码
怎么将该 Sequence 解开,依次按顺序拿到 just1 ,just2, just3 …
let subscription = sink.run((self.sources.makeIterator(), self.count))
复制代码
此方法传入 Sequence 的迭代器,和元素个数,内部处理,将 Sequence 解开
来看下 Sink:
final private class ConcatSink<Sequence: Swift.Sequence, Observer: ObserverType>
: TailRecursiveSink<Sequence, Observer>
, ObserverType where Sequence.Element: ObservableConvertibleType, Sequence.Element.Element == Observer.Element {
typealias Element = Observer.Element
override init(observer: Observer, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>){
switch event {
case .next:
self.forwardOn(event)
case .error:
self.forwardOn(event)
self.dispose()
case .completed:
self.schedule(.moveNext)
}
}
override func subscribeToNext(_ source: Observable<Element>) -> Disposable {
source.subscribe(self)
}
override func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
if let source = observable as? Concat<Sequence> {
return (source.sources.makeIterator(), source.count)
}
else {
return nil
}
}
}
复制代码
此类主要完成以下操作:
override func subscribeToNext(_ source: Observable<Element>) -> Disposable {
source.subscribe(self)
}
复制代码
从 FIFO 的 Queue 队列中取出元素,订阅
func on(_ event: Event<Element>){
switch event {
case .next:
self.forwardOn(event)
case .error:
self.forwardOn(event)
self.dispose()
case .completed:
self.schedule(.moveNext)
}
}
复制代码
转发订阅内容。如果收到 completed
, 从队列 Queue 取下一个元素,订阅。
override func extract(_ observable: Observable<Element>) -> SequenceGenerator? {
if let source = observable as? Concat<Sequence> {
return (source.sources.makeIterator(), source.count)
}
else {
return nil
}
}
复制代码
返回 迭代器 和 序列元素个数。还是前面说的 Squence 嵌套,解开 Squence 时,会递归调用这里。这里的参数 observable
类似[Concat([Concat([just1, just2])
, just3]), just4]) 中的 Concat([just1, just2])
———- 未完———
父类:TailRecursiveSink 是一个公用的类,也比较复杂,下一节单独分析。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END