redux-saga 简介
redux-saga,一个响亮的名字,虽然上一篇已经介绍过了,但读了它的源码后,我忍不住再郑重的再介绍一遍.这是一个管理程序”副作用”的框架,虽然说大多数情况下都是作为 redux 中间件使用,但根本使用上,它不依赖任何的其他库,可以单独使用.使用它来管理程序副作用有以下的优点:
- 更好的测试
- 更清晰的代码逻辑
- 轻松管理副作用的启动和取消
PS:这库的代码写的真的太好了,充满了计算机专业名词,让我感到亲近熟悉,fork、channel、task、io 概念、semaphore、buffer,就单凭概念这层,saga 赢太多
一些概念
Effect
官网强调了无数遍了,Effect 就是 saga 中间件的执行单元,通过内部的 API 生成,是一些简单对象,包含一些信息,比如 type 属性,saga 中间件可以根据这些信息选择向下执行、堵塞、dispatch action 等等操作(take、fork、put)
saga
对于什么是 saga,个人理解就是一些 Effect 的集合,可以分成 work saga 和 root saga,root saga 负责分发 action,work saga 负责对指定 action 进行反应,两者之间的组合嵌套可以根据 Generator 函数语法进行操作
function * worksaga(getstate){
try{
yield call(some async op);
yield put({type:'SOME ACTION WHEN SUCCESS'});
}catch(err){
yield put({type:'SOME ACTION WHEN FAIL'})
}
}
复制代码
function* rootsaga() {
yield* takeEvery("SOME ACTION", worksaga);
}
复制代码
task
每个saga对应一个task,用于管理迭代器运行,task 有很多类型,Main Task 是主要跟踪整个 Main Flow,Fork Task 就是 fork 创建出来的 task,Parent Task 是管理 Main Task 和若干 Fork Tasks 的
proc
主要执行逻辑集中在 proc,它定义了 saga 中间件执行 Effect 的逻辑,通过拿到 Generator 函数的迭代器(iterator),从而获得函数控制权,通过辅助函数之间的相互迭代来不断的调用 iterator.next
channel
保存 task 回调和触发 task 回调的地方,channel.take 进行回调注册,channel.put 匹配监听当前 action 的回调,进行触发
(源码有删减)
put(input) {
const takers = (currentTakers = nextTakers)
for (let i = 0, len = takers.length; i < len; i++) {
const taker = takers[i]
if (taker[MATCH](input)) {
taker.cancel()
taker(input)
}
}
},
复制代码
take(cb, matcher = matchers.wildcard) {
cb[MATCH] = matcher
nextTakers.push(cb)
cb.cancel = once(() => {
remove(nextTakers, cb)
})
},
复制代码
上述的列出一些概念是为了更好的理解接下来的源码解读,源码比较复杂,加之本人水平有限,不可能讲太细,也不需要讲太细.
源码
一般按照惯例,说源码之前都是要复习下使用的,但上篇已经说过了,不熟悉可以去看下再看下去,这里就不说了,直接从入口开始分析
入口
function sagaMiddleware({ getState, dispatch }) {
boundRunSaga = runSaga.bind(null, {
...options,
context,
channel,
dispatch,
getState,
sagaMonitor,
})
return next => action => {
if (sagaMonitor && sagaMonitor.actionDispatched) {
sagaMonitor.actionDispatched(action)
}
const result = next(action) // hit reducers
channel.put(action)
return result
}
复制代码
saga 的入口还是接收 redux 传给中间件的两个 API,其实外面还有一层 Factory 函数的,用于与环境解耦,这里就略了,入口绑定了一个boundRunSaga
函数也就是sagaMiddleware.run
调用的函数,然后返回的高阶函数主要逻辑就是当 action 被 dispatch,照常调用别的中间件封装过的 dispatch 函数,也就是 next 函数,但返回结果之前,使用channel.put(action)
唤醒 saga 中间件本身的逻辑,也就相当于独立于 redux 流,自己开一条处理副作用的流.
前面说过,channel.put(action)
是触发监听了 action 的回调,那这些回调是什么时候注册的呢?
runSaga 与 proc
runSaga 即 sagaMiddleware.run 调用的函数
在 runSaga 中,首先取得传入 root saga 的迭代器
const iterator = saga(...args);
复制代码
然后生成环境,可以看成是 task 进程运行的系统环境
const env = {
channel,
dispatch: wrapSagaDispatch(dispatch),
getState,
sagaMonitor,
onError,
finalizeRunEffect,
};
复制代码
然后马上执行一个函数,该函数用于创建管理该 root saga 的 parent task,监视总的 flow,其上有一些控制函数,比如 cancel,然后执行这个 saga 函数的迭代器,根据迭代器返回的 Effect 类型再进一步执行
immediately(() => {
const task = proc(
env,
iterator,
context,
effectId,
getMetaInfo(saga),
/* isRoot */ true,
undefined
);
if (sagaMonitor) {
sagaMonitor.effectResolved(effectId, task);
}
return task;
});
复制代码
(源码有删减)
export default function proc(
env,
iterator,
parentContext,
parentEffectId,
meta,
isRoot,
cont
) {
next.cancel = noop;
/** Creates a main task to track the main flow */
const mainTask = { meta, cancel: cancelMain, status: RUNNING };
/**
Creates a new task descriptor for this generator.
A task is the aggregation of it's mainTask and all it's forked tasks.
**/
const task = newTask(
env,
mainTask,
parentContext,
parentEffectId,
meta,
isRoot,
cont
);
const executingContext = {
task,
digestEffect,
};
/**
cancellation of the main task. We'll simply resume the Generator with a TASK_CANCEL
**/
function cancelMain() {
if (mainTask.status === RUNNING) {
mainTask.status = CANCELLED;
next(TASK_CANCEL);
}
}
/**
attaches cancellation logic to this task's continuation
this will permit cancellation to propagate down the call chain
**/
if (cont) {
cont.cancel = task.cancel;
}
// kicks up the generator
next();
// then return the task descriptor to the caller
return task;
}
复制代码
next 和 EffectRunner
next 就是 saga middleware 中执行 Effect 指令的地方,他会根据 Effect.type 的类型找到对应的 EffectRunner,执行这个 runner 函数
function runEffect(effect, effectId, currCb) {
if (is.promise(effect)) {
resolvePromise(effect, currCb);
} else if (is.iterator(effect)) {
// resolve iterator
proc(env, effect, task.context, effectId, meta, /* isRoot */ false, currCb);
} else if (effect && effect[IO]) {
const effectRunner = effectRunnerMap[effect.type];
effectRunner(env, effect.payload, currCb, executingContext);
} else {
// anything else returned as is
currCb(effect);
}
}
复制代码
其中effectRunnerMap[effect.type]
就是找到该 runner,然后执行,看下effectRunnerMap
const effectRunnerMap = {
[effectTypes.TAKE]: runTakeEffect,
[effectTypes.PUT]: runPutEffect,
[effectTypes.ALL]: runAllEffect,
[effectTypes.RACE]: runRaceEffect,
[effectTypes.CALL]: runCallEffect,
[effectTypes.CPS]: runCPSEffect,
[effectTypes.FORK]: runForkEffect,
[effectTypes.JOIN]: runJoinEffect,
[effectTypes.CANCEL]: runCancelEffect,
[effectTypes.SELECT]: runSelectEffect,
[effectTypes.ACTION_CHANNEL]: runChannelEffect,
[effectTypes.CANCELLED]: runCancelledEffect,
[effectTypes.FLUSH]: runFlushEffect,
[effectTypes.GET_CONTEXT]: runGetContextEffect,
[effectTypes.SET_CONTEXT]: runSetContextEffect,
};
复制代码
就是一些我们熟悉的 effectType 对应的 runner,看几个常见的 runner
- put
function runPutEffect(env, { channel, action, resolve }, cb) {
/**
Schedule the put in case another saga is holding a lock.
The put will be executed atomically. ie nested puts will execute after
this put has terminated.
**/
asap(() => {
let result;
try {
result = (channel ? channel.put : env.dispatch)(action);
} catch (error) {
cb(error, true);
return;
}
if (resolve && is.promise(result)) {
resolvePromise(result, cb);
} else {
cb(result);
}
});
// Put effects are non cancellables
}
复制代码
显然是直接 dispatch(action),跟普通直接 dispatch 的区别就是 put(action)返回的是 Effect,方便测试
- call
function runCallEffect(env, { context, fn, args }, cb, { task }) {
// catch synchronous failures; see #152
try {
const result = fn.apply(context, args);
if (is.promise(result)) {
resolvePromise(result, cb);
return;
}
if (is.iterator(result)) {
// resolve iterator
proc(
env,
result,
task.context,
currentEffectId,
getMetaInfo(fn),
/* isRoot */ false,
cb
);
return;
}
cb(result);
} catch (error) {
cb(error, true);
}
}
复制代码
call 的逻辑可以猜到,就是把迭代器的控制权包装成 cb,在 promise resolve 的时候再执行,就等于阻塞了.
- take
function runTakeEffect(env, { channel = env.channel, pattern, maybe }, cb) {
const takeCb = (input) => {
if (input instanceof Error) {
cb(input, true);
return;
}
if (isEnd(input) && !maybe) {
cb(TERMINATE);
return;
}
cb(input);
};
try {
channel.take(takeCb, is.notUndef(pattern) ? matcher(pattern) : null);
} catch (err) {
cb(err, true);
return;
}
cb.cancel = takeCb.cancel;
}
复制代码
take 是等待 action 的到来,说是控制反转,其实就是把 take 的回调注册到 channel 中,等待 action 到来时 channel 的唤醒
-
fork
fork 是先返回结果,而不是阻塞迭代器的执行,显然需要一个新的 task 去负责 fork 的进程,但并不等该 task 完成,而是直接返回
function runForkEffect(
env,
{ context, fn, args, detached },
cb,
{ task: parent }
) {
const taskIterator = createTaskIterator({ context, fn, args });
const meta = getIteratorMetaInfo(taskIterator, fn);
immediately(() => {
const child = proc(
env,
taskIterator,
parent.context,
currentEffectId,
meta,
detached,
undefined
);
if (detached) {
cb(child);
} else {
if (child.isRunning()) {
parent.queue.addTask(child);
cb(child);
} else if (child.isAborted()) {
parent.queue.abort(child.error());
} else {
cb(child);
}
}
});
// Fork effects are non cancellables
}
复制代码
- takeEvery
takeEvery 是高级 API,底层实现是 take+fork,源码逻辑不太好看出来,官网的例子比较清晰
export function takeEvery(pattern, saga) {
function* takeEveryHelper() {
while (true) {
yield take(pattern);
yield fork(saga);
}
}
return fork(takeEveryHelper);
}
复制代码
等待 aciton 被发起,fork 一个 task 去执行,不等返回,继续监听 action 的发起
总结
- redux-saga 通过自己建立的一套处理副作用系统独立的进行工作
- 每个 saga 函数表示一些个需要进行的副作用,saga 函数在内部表示为 task,task 负责监听函数运行,其上挂载了随时执行取消的方法,task 的执行逻辑由 proc 负责
- proc 是处理每个 task 的地方,首先取得 task 的迭代器,在内置的 next 函数中,根据每条 yield 语句返回的 Effect 选择对应的 EffectRunner 执行,完成阻塞逻辑
- Effect 是运行最小单元,可以理解为传给 saga middleware 的指令
- take Effect 会注册到 channel,等待 action 的到来再执行迭代器
- fork Effect 会生成新的 task 管理,并马上返回,不阻塞迭代器
- takeEvery 是构建在 fork 和 take 之上的,它等待 action 的到来,并 fork 一个 task 执行,因为 fork 不会阻塞迭代器,所以可以响应每次 action
另外,迭代器的取消是 saga 比较复杂的部分,此处并没有涉及,日后看是否有机会再单独出一期.