使用通道
到目前为止,我们一直使用 take
和 put
效果与 Redux Store 进行通信。通道将这些效果泛化,以便与外部事件源或 Saga 本身之间进行通信。它们也可以用来从 Store 中排队特定动作。
在本节中,我们将看到
如何使用
yield actionChannel
Effect 来缓冲来自 Store 的特定操作。如何使用
eventChannel
工厂函数将take
Effect 连接到外部事件源。如何使用通用的
channel
工厂函数创建通道,并在take
/put
Effect 中使用它来在两个 Saga 之间进行通信。
使用 actionChannel
Effect
让我们回顾一下规范示例
import { take, fork, ... } from 'redux-saga/effects'
function* watchRequests() {
while (true) {
const {payload} = yield take('REQUEST')
yield fork(handleRequest, payload)
}
}
function* handleRequest(payload) { ... }
上面的示例说明了典型的观察和分叉模式。watchRequests
saga 使用 fork
来避免阻塞,从而不会错过来自 store 的任何操作。每个 REQUEST
操作都会创建一个 handleRequest
任务。因此,如果在快速速率下触发了许多操作,则可能会有许多 handleRequest
任务同时执行。
现在想象一下,我们的要求如下:我们希望按顺序处理 REQUEST
。如果我们有任何时候有四个操作,我们希望处理第一个 REQUEST
操作,然后只有在完成此操作后,我们才处理第二个操作,依此类推...
所以我们希望将所有未处理的操作排队,一旦我们完成当前请求的处理,我们就从队列中获取下一条消息。
Redux-Saga 提供了一个小助手 Effect actionChannel
,它可以为我们处理这个问题。让我们看看如何用它重写前面的示例
import { take, actionChannel, call, ... } from 'redux-saga/effects'
function* watchRequests() {
// 1- Create a channel for request actions
const requestChan = yield actionChannel('REQUEST')
while (true) {
// 2- take from the channel
const {payload} = yield take(requestChan)
// 3- Note that we're using a blocking call
yield call(handleRequest, payload)
}
}
function* handleRequest(payload) { ... }
第一步是创建操作通道。我们使用 yield actionChannel(pattern)
,其中 pattern 使用与之前使用 take(pattern)
提到的相同规则进行解释。两种形式之间的区别在于 actionChannel
可以缓冲传入的消息,如果 Saga 尚未准备好接收它们(例如,在 API 调用中被阻塞)。
接下来是 yield take(requestChan)
。除了与 pattern
一起使用以从 Redux Store 中获取特定操作之外,take
也可以与通道一起使用(上面我们从特定的 Redux 操作创建了一个通道对象)。take
将阻塞 Saga,直到通道上有消息可用。如果底层缓冲区中有消息存储,则 take 也可能立即恢复。
需要注意的重要一点是我们如何使用阻塞的 call
。Saga 将保持阻塞状态,直到 call(handleRequest)
返回。但与此同时,如果 Saga 仍然被阻塞,而其他 REQUEST
操作被分派,它们将在 requestChan
内部排队。当 Saga 从 call(handleRequest)
恢复并执行下一个 yield take(requestChan)
时,take 将使用排队的消息解析。
默认情况下,actionChannel
会无限制地缓冲所有传入的消息。如果您想要更精细地控制缓冲,可以在创建 Effect 时提供一个 Buffer 参数。Redux-Saga 提供了一些常见的缓冲区(none、dropping、sliding),但您也可以提供自己的缓冲区实现。有关更多详细信息,请参阅 API 文档。
例如,如果您只想处理最近的五个项目,可以使用
import { buffers } from 'redux-saga'
import { actionChannel } from 'redux-saga/effects'
function* watchRequests() {
const requestChan = yield actionChannel('REQUEST', buffers.sliding(5))
...
}
使用 eventChannel
工厂连接到外部事件
与 actionChannel
(Effect)类似,eventChannel
(一个工厂函数,而不是一个 Effect)会为事件创建一个 Channel,但来自 Redux Store 以外的事件源。
这个基本示例从一个间隔创建了一个 Channel
import { eventChannel, END } from 'redux-saga'
function countdown(secs) {
return eventChannel(emitter => {
const iv = setInterval(() => {
secs -= 1
if (secs > 0) {
emitter(secs)
} else {
// this causes the channel to close
emitter(END)
}
}, 1000);
// The subscriber must return an unsubscribe function
return () => {
clearInterval(iv)
}
}
)
}
eventChannel
中的第一个参数是一个订阅者函数。订阅者的作用是初始化外部事件源(上面使用 setInterval
),然后通过调用提供的 emitter
将来自源的所有传入事件路由到通道。在上面的示例中,我们每秒钟调用一次 emitter
。
注意:您需要对事件源进行清理,以避免通过事件通道传递 null 或 undefined。虽然传递数字是可以的,但我们建议您像 Redux 操作一样构建事件通道数据。
{ number }
而不是number
。
还要注意 emitter(END)
的调用。我们使用它来通知任何通道使用者通道已关闭,这意味着不会再有其他消息通过此通道。
让我们看看如何从我们的 Saga 中使用此通道。(这取自存储库中的 cancellable-counter 示例。)
import { take, put, call } from 'redux-saga/effects'
import { eventChannel, END } from 'redux-saga'
// creates an event Channel from an interval of seconds
function countdown(seconds) { ... }
export function* saga() {
const chan = yield call(countdown, value)
try {
while (true) {
// take(END) will cause the saga to terminate by jumping to the finally block
let seconds = yield take(chan)
console.log(`countdown: ${seconds}`)
}
} finally {
console.log('countdown terminated')
}
}
因此,Saga 正在生成一个 take(chan)
。这会导致 Saga 阻塞,直到通道上有消息。在我们上面的示例中,它对应于我们调用 emitter(secs)
的时候。还要注意,我们在 try/finally
块中执行整个 while (true) {...}
循环。当间隔终止时,倒计时函数通过调用 emitter(END)
关闭事件通道。关闭通道的效果是终止所有在该通道上的 take
上阻塞的 Saga。在我们的示例中,终止 Saga 将导致它跳到其 finally
块(如果提供,否则 Saga 终止)。
订阅者返回一个 unsubscribe
函数。通道使用它在事件源完成之前取消订阅。在从事件通道消费消息的 Saga 中,如果我们想要在事件源完成之前提前退出(例如 Saga 已被取消),您可以调用 chan.close()
来关闭通道并从源取消订阅。
例如,我们可以让我们的 Saga 支持取消
import { take, put, call, cancelled } from 'redux-saga/effects'
import { eventChannel, END } from 'redux-saga'
// creates an event Channel from an interval of seconds
function countdown(seconds) { ... }
export function* saga() {
const chan = yield call(countdown, value)
try {
while (true) {
let seconds = yield take(chan)
console.log(`countdown: ${seconds}`)
}
} finally {
if (yield cancelled()) {
chan.close()
console.log('countdown cancelled')
}
}
}
以下是如何使用事件通道将 WebSocket 事件传递到您的 saga 中的另一个示例(例如:使用 socket.io 库)。假设您正在等待服务器消息 ping
,然后在延迟一段时间后回复 pong
消息。
import { take, put, call, apply, delay } from 'redux-saga/effects'
import { eventChannel } from 'redux-saga'
import { createWebSocketConnection } from './socketConnection'
// this function creates an event channel from a given socket
// Setup subscription to incoming `ping` events
function createSocketChannel(socket) {
// `eventChannel` takes a subscriber function
// the subscriber function takes an `emit` argument to put messages onto the channel
return eventChannel(emit => {
const pingHandler = (event) => {
// puts event payload into the channel
// this allows a Saga to take this payload from the returned channel
emit(event.payload)
}
const errorHandler = (errorEvent) => {
// create an Error object and put it into the channel
emit(new Error(errorEvent.reason))
}
// setup the subscription
socket.on('ping', pingHandler)
socket.on('error', errorHandler)
// the subscriber must return an unsubscribe function
// this will be invoked when the saga calls `channel.close` method
const unsubscribe = () => {
socket.off('ping', pingHandler)
}
return unsubscribe
})
}
// reply with a `pong` message by invoking `socket.emit('pong')`
function* pong(socket) {
yield delay(5000)
yield apply(socket, socket.emit, ['pong']) // call `emit` as a method with `socket` as context
}
export function* watchOnPings() {
const socket = yield call(createWebSocketConnection)
const socketChannel = yield call(createSocketChannel, socket)
while (true) {
try {
// An error from socketChannel will cause the saga jump to the catch block
const payload = yield take(socketChannel)
yield put({ type: INCOMING_PONG_PAYLOAD, payload })
yield fork(pong, socket)
} catch(err) {
console.error('socket error:', err)
// socketChannel is still open in catch block
// if we want end the socketChannel, we need close it explicitly
// socketChannel.close()
}
}
}
注意:eventChannel 上的消息默认不会被缓冲。您必须在创建 eventChannel 时提供一个缓冲区,以指定该通道的缓冲策略(例如:
eventChannel(subscriber, buffer)
)。查看 API 文档了解更多信息。
在这个 WebSocket 示例中,socketChannel 在发生某些套接字错误时可能会发出错误,这将中止我们正在此 eventChannel 上等待的 yield take(socketChannel)
。请注意,默认情况下,发出错误不会中止通道,如果我们希望在错误后结束通道,我们需要显式地关闭通道。
使用通道在 Saga 之间进行通信
除了动作通道和事件通道,您还可以直接创建通道,这些通道默认情况下不连接到任何源。然后,您可以手动将消息 put
到通道中。当您想使用通道在 Saga 之间进行通信时,这非常方便。
为了说明,让我们回顾一下之前处理请求的示例。
import { take, fork, ... } from 'redux-saga/effects'
function* watchRequests() {
while (true) {
const {payload} = yield take('REQUEST')
yield fork(handleRequest, payload)
}
}
function* handleRequest(payload) { ... }
我们看到,观察和分叉模式允许同时处理多个请求,对并发执行的工作任务数量没有限制。然后,我们使用 actionChannel
效果将并发性限制为一次一个任务。
所以假设我们的要求是最多同时执行三个任务。当我们收到一个请求并且正在执行的任务少于三个时,我们会立即处理该请求,否则我们会将该任务排队并等待三个槽位中的一个变为空闲。
下面是一个使用通道的解决方案示例
import { channel } from 'redux-saga'
import { take, fork, ... } from 'redux-saga/effects'
function* watchRequests() {
// create a channel to queue incoming requests
const chan = yield call(channel)
// create 3 worker 'threads'
for (var i = 0; i < 3; i++) {
yield fork(handleRequest, chan)
}
while (true) {
const {payload} = yield take('REQUEST')
yield put(chan, payload)
}
}
function* handleRequest(chan) {
while (true) {
const payload = yield take(chan)
// process the request
}
}
在上面的示例中,我们使用 channel
工厂创建一个通道。我们得到一个通道,默认情况下它会缓冲我们放入其中的所有消息(除非有一个挂起的接收者,在这种情况下,接收者会立即恢复并接收消息)。
然后,watchRequests
Saga 分叉三个工作 Saga。请注意,创建的通道被提供给所有分叉的 Saga。watchRequests
将使用此通道将工作分派给三个工作 Saga。对于每个 REQUEST
动作,Saga 将把有效负载放入通道中。然后,有效负载将被任何空闲的工作者获取。否则,它将被通道排队,直到一个工作 Saga 准备好获取它。
所有三个工作者都运行一个典型的 while 循环。在每次迭代中,一个工作者将获取下一个请求,或者阻塞直到有消息可用。请注意,这种机制在 3 个工作者之间提供了自动负载平衡。快速工作者不会被慢速工作者拖慢。
使用 multicastChannel
与不同的 worker 通信
在上面的部分中,我们看到了如何使用 channels
在相同的 worker 之间进行负载均衡,这些 worker 被fork了多次。如果我们需要将一个 action put
到一个 channel 中,并让多个不同的 worker 消费它,该怎么办呢?
我们可能需要将传入的请求传递给不同的 worker 以执行不同的副作用。
以下是一个使用 channels
的示例,我们可以看到问题所在:当我们使用 yield put(chan, payload)
将 action put
到 channel
中时,我们只会运行一个 worker(logWorker
或 mainWorker
),而不是两个。
import { channel } from 'redux-saga'
import { take, fork, call, put } from 'redux-saga/effects'
function* watchRequests() {
// create a channel to queue incoming requests
const chan = yield call(channel)
// fork both workers
yield fork(logWorker, chan)
yield fork(mainWorker, chan)
while (true) {
const { payload } = yield take('REQUEST')
// put here will reach only one worker, not both!
yield put(chan, payload)
}
}
function* logWorker(channel) {
while (true) {
const payload = yield take(channel)
// Log the request somewhere..
console.log('logWorker:', payload)
}
}
function* mainWorker(channel) {
while (true) {
const payload = yield take(channel)
// Process the request
console.log('mainWorker', payload)
}
}
为了解决这个问题,我们需要使用 multicastChannel
,它会将 action 广播给所有 worker,同时进行。
请注意,使用
multicastChannel
的take
需要我们传递额外的参数pattern - 你可以使用它来过滤要take
的 action。
请看下面的示例
import { multicastChannel } from 'redux-saga'
import { take, fork, call, put } from 'redux-saga/effects'
function* watchRequests() {
// create a multicastChannel to queue incoming requests
const channel = yield call(multicastChannel)
// fork different workers
yield fork(logWorker, channel)
yield fork(mainWorker, channel)
while (true) {
const { payload } = yield take('REQUEST')
yield put(channel, payload)
}
}
function* logWorker(channel) {
while (true) {
// Pattern '*' for simplicity
const payload = yield take(channel, '*')
// Log the request somewhere..
console.log('logWorker:', payload)
}
}
function* mainWorker(channel) {
while (true) {
// Pattern '*' for simplicity
const payload = yield take(channel, '*')
// Process the request
console.log('mainWorker', payload)
}
}