RxJS中的实现自定义pipe

这是我参与8月更文挑战的第3天,活动详情查看:8月更文挑战

这些天在写一些功能的时候,希望可以把一些方法改写为函数式调用的管道写法,但是这些操作很多是基于Promise的链式异步调用,后来发现可以基于rxjs的自定义pipe很好的解决这个问题 ,既可以保障函数调用足够简单,不产生回调地狱类似的问题,由可以很好的生成异步任务队列。

有这样一组任务:

任务A 
任务B
任务C
复制代码

任务A,B,C 之间是完全异步的,但是需要按顺序执行,这是当然可以用await来完成,但还有时需要数据在A,B,C之间进行传递,可能写成这样:

await 任务A(result)
await 任务B(result)
await 任务C(result)
复制代码

这是可以通过一个result对象来存储返回的结果,但是如果在某个任务失败的情况下来终止任务就需要添加throw操作,当然我希望可以写成这样:

x
.任务A()
.任务B()
.任务C()

复制代码

但是在保障任务顺序执行和终止上需要还多额外的工作,后来想到了rxjs的pipe:

obserable.pipe(
    任务A(),
    任务B(),
    任务C()
)
复制代码

这样的调用方式刚好满足我的需要,下载只需要编写对应的自定义pipe即可。

自定应pipe需要返回一个函数<T>(source: Observable<V>) => Observable<V>

这个函数中传入的Observable用来传入pipe目标observable中的数据,返回的Observable用来控制pipe的状态,来控制pipe是否继续向下执行,如任务B发生异常是执行error方法,就会终止后续任务的执行,这样满足了我们在pipe进行异步操作的需求。

public call(handler: (arg) => Promise<any>) {
    return <T>(source: Observable<any>) =>
      new Observable<any>(observer => {
        return source.subscribe({
          next: async (data) => {
            handler(data)
              .then(() => {
                observer.next(tx)
              })
              .catch(err => {
                observer.error(err)
              })
              .finally(() => loading.dismiss())
          },
          error(err) {
            observer.error(err)
          },
          complete() {
            observer.complete()
          }
        })
      })
  }
复制代码

传入的handler是一个promise操作,这样就可以把一个Promise方法封装到这个pipe中。

为了可以进行调用,我们需要在管道外加一层

  public transaction(result): Observable<Transaction> {
    return Observable.create(observer => {
      observer.next(result)
    })
  }
复制代码

result可以作为数据结果共享的载体,这样多个自定义pipe之间就通过result共享数据。

调用时就可以像这样方便的调用了:

transaction(result).pipe(
      this.call(...promise操作A),
      this.call(...promise操作B)
    )
复制代码

如果您觉得这篇文章有帮助到您的的话不妨?关注+点赞+收藏+评论+转发?支持一下哟~~?

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享