前言
上一篇文章解析了stream基础数据结构特点,同时分析了xadd源码了解到了添加消息流程,这篇文章将会完整分析stream命令,通过这些命令源码展现出来stream全部特点。
正文
XGROUP命令
前面分析xadd向stream添加消息源码,但是并不立刻看xread源码,因为xread里面涉及到消费者组的东西,先看一下XGROUP命令可以看出来一些特点,命令结构如下:
- CREATE <id or $> MKSTREAM:创建命令,指定一个stream为其创建一个group,并且指定id,MKSTREAM表示如果不存在stream可以创建stream
- SETID <id or $>:设置当前group的id
- DESTROY :删除一个group
- DELCONSUMER :删除一个消费者
- HELP:打开帮助
++$表示stream当前记录的last_id++
int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
}
int streamGenericParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int strict) {
char buf[128];
if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
memcpy(buf,o->ptr,sdslen(o->ptr)+1);
// +是最大ID -是最小ID 严格模式下不能设置这种id
if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0')
goto invalid;
if (buf[0] == '-' && buf[1] == '\0') {
id->ms = 0;
id->seq = 0;
return C_OK;
} else if (buf[0] == '+' && buf[1] == '\0') {
id->ms = UINT64_MAX;
id->seq = UINT64_MAX;
return C_OK;
}
/* Parse <ms>-<seq> form. */
char *dot = strchr(buf,'-');
if (dot) *dot = '\0';
unsigned long long ms, seq;
if (string2ull(buf,&ms) == 0) goto invalid;
if (dot && string2ull(dot+1,&seq) == 0) goto invalid;
if (!dot) seq = missing_seq;
id->ms = ms;
id->seq = seq;
return C_OK;
invalid:
if (c) addReplyError(c,"Invalid stream ID specified as stream "
"command argument");
return C_ERR;
}
复制代码
这一段是解析id,因为如果命令指定一个id格式是:ms-seq,将字符串解析成对象
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {
if (s->cgroups == NULL) s->cgroups = raxNew();
if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)
return NULL;
streamCG *cg = zmalloc(sizeof(*cg));
cg->pel = raxNew();
cg->consumers = raxNew();
cg->last_id = *id;
raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
return cg;
}
复制代码
创建streamCG对象,初始化pel、consumers和last_id为指定id,然后插入到cgroups的rax树中
xgroup没有放出全部源码,因为就是对参数的解析,不涉及到复杂逻辑,主要就是增删group,都是操作rax的api
xread命令
xread支持两种模式,以一个非消费者和消费者模式去读取数据:
XREAD [BLOCK ] [COUNT ] STREAMS key_1 key_2 … key_N ID_1 ID_2 … ID_N
BLOCK表示阻塞时间,COUNT表示读取消息个数,这个命令支持从多个stream对于的某个id开始的数据(id递增的)
同时支持以一个消费者去读取,命令变为:XREADGROUP GROUP group-name consumer-name….
void xreadCommand(client *c) {
//BLOCK 阻塞时间
long long timeout = -1;
long long count = 0;
int streams_count = 0;
int streams_arg = 0;
//表示不需要ack
int noack = 0;
#define STREAMID_STATIC_VECTOR_LEN 8
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
streamCG **groups = NULL;
//是否为xreadgroup命令
int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */
robj *groupname = NULL;
robj *consumername = NULL;
for (int i = 1; i < c->argc; i++) {
int moreargs = c->argc-i-1;
char *o = c->argv[i]->ptr;
//使用block在lua中没有意义
if (!strcasecmp(o,"BLOCK") && moreargs) {
if (c->flags & CLIENT_LUA) {
/* There is no sense to use BLOCK option within LUA */
addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr);
return;
}
i++;
if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,
UNIT_MILLISECONDS) != C_OK) return;
} else if (!strcasecmp(o,"COUNT") && moreargs) {
i++;
if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)
return;
if (count < 0) count = 0;
} else if (!strcasecmp(o,"STREAMS") && moreargs) {
//获取流的数量 id和key一一对应的
streams_arg = i+1;
streams_count = (c->argc-streams_arg);
if ((streams_count % 2) != 0) {
addReplyError(c,"Unbalanced XREAD list of streams: "
"for each stream key an ID or '$' must be "
"specified.");
return;
}
streams_count /= 2;
break;
} else if (!strcasecmp(o,"GROUP") && moreargs >= 2) {
if (!xreadgroup) {
addReplyError(c,"The GROUP option is only supported by "
"XREADGROUP. You called XREAD instead.");
return;
}
groupname = c->argv[i+1];
consumername = c->argv[i+2];
i += 2;
} else if (!strcasecmp(o,"NOACK")) {
if (!xreadgroup) {
addReplyError(c,"The NOACK option is only supported by "
"XREADGROUP. You called XREAD instead.");
return;
}
noack = 1;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
复制代码
首先是对参数的解析,有以下几个特点:
- 不能在lua脚本中设置BLOCK参数
- 命令中stream的key数量必须和id数量一致的
- group和noack必须是xreadgroup命令才支持
//必填参数校验
if (streams_arg == 0) {
addReply(c,shared.syntaxerr);
return;
}
if (xreadgroup && groupname == NULL) {
addReplyError(c,"Missing GROUP option for XREADGROUP");
return;
}
if (streams_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*streams_count);
if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count);
for (int i = streams_arg + streams_count; i < c->argc; i++) {
//给每一个key找到对应的stream 并且找到对应的group(如果是xreadgroup命令) 放入数组
int id_idx = i - streams_arg - streams_count;
robj *key = c->argv[i-streams_count];
robj *o = lookupKeyRead(c->db,key);
if (o && checkType(c,o,OBJ_STREAM)) goto cleanup;
streamCG *group = NULL;
if (groupname) {
if (o == NULL ||
(group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
{
addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
"group '%s' in XREADGROUP with GROUP "
"option",
(char*)key->ptr,(char*)groupname->ptr);
goto cleanup;
}
groups[id_idx] = group;
}
if (strcmp(c->argv[i]->ptr,"$") == 0) {
if (xreadgroup) {
addReplyError(c,"The $ ID is meaningless in the context of "
"XREADGROUP: you want to read the history of "
"this consumer by specifying a proper ID, or "
"use the > ID to get new messages. The $ ID would "
"just return an empty result set.");
goto cleanup;
}
if (o) {
stream *s = o->ptr;
ids[id_idx] = s->last_id;
} else {
ids[id_idx].ms = 0;
ids[id_idx].seq = 0;
}
continue;
//解析 >
} else if (strcmp(c->argv[i]->ptr,">") == 0) {
if (!xreadgroup) {
addReplyError(c,"The > ID can be specified only when calling "
"XREADGROUP using the GROUP <group> "
"<consumer> option.");
goto cleanup;
}
/* We use just the maximum ID to signal this is a ">" ID, anyway
* the code handling the blocking clients will have to update the
* ID later in order to match the changing consumer group last ID. */
ids[id_idx].ms = UINT64_MAX;
ids[id_idx].seq = UINT64_MAX;
continue;
}
if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)
goto cleanup;
}
复制代码
这一段是将多个stream的key,以及指定的id放入数组中,几个关键特点如下:
- stream和group都是必须存在的
- 使用group模式下id不能指定为是当前stream最大id,如果这么指定那么group读取不到任何数据
- group模式下id可以使用”>”
size_t arraylen = 0;
void *arraylen_ptr = NULL;
for (int i = 0; i < streams_count; i++) {
robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);
if (o == NULL) continue;
stream *s = o->ptr;
streamID *gt = ids+i;
int serve_synchronously = 0;
//id没有指定为 > 为true
int serve_history = 0;
if (groups) {
//如果id没有指定为 ">" 设置serve_history需要从历史数据中读取
if (gt->ms != UINT64_MAX ||
gt->seq != UINT64_MAX)
{
serve_synchronously = 1;
serve_history = 1;
//处理 id = > 检查group的last_id和stream最新提交的大小
} else if (s->length) {
streamID maxid, *last = &groups[i]->last_id;
streamLastValidID(s, &maxid);
//如果maxid比last要大 说明有新内容
if (streamCompareID(&maxid, last) > 0) {
serve_synchronously = 1;
*gt = *last;
}
}
} else if (s->length) {
//没有指定group 则无需判断group的id 同时也是判断新内容
streamID maxid;
streamLastValidID(s, &maxid);
if (streamCompareID(&maxid, gt) > 0) {
serve_synchronously = 1;
}
}
复制代码
这里循环stream的key获取stream,主要特点:
- 通过serve_synchronously来确定是否可以读取一个值,在普通模式下读取一个id,只需要判断有没有比他更大的id,因为id递增,如果有则说明可能会读到(没有话一定不会读到)
- 如果xreadgroup模式下指定了一个准确的id值,表示要读取一个历史值,如果id指定为”>”,表示从group属性里面的last_id开始读。也就是说当加入一个消费组以后,一般都是通过“>”去读取新消息
if (serve_synchronously) {
arraylen++;
if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c);
streamID start = *gt;
streamIncrID(&start);
if (c->resp == 2) addReplyArrayLen(c,2);
addReplyBulk(c,c->argv[streams_arg+i]);
streamConsumer *consumer = NULL;
//查找这个group里面的消费者 如果没有创建
if (groups) consumer = streamLookupConsumer(groups[i],
consumername->ptr,
SLC_NONE);
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
int flags = 0;
if (noack) flags |= STREAM_RWR_NOACK;
if (serve_history) flags |= STREAM_RWR_HISTORY;
streamReplyWithRange(c,s,&start,NULL,count,0,
groups ? groups[i] : NULL,
consumer, flags, &spi);
if (groups) server.dirty++;
}
}
复制代码
serve_synchronously为真表示可以读,同时处理comsumer,没有就以当前client创建一个comsumer,调用streamReplyWithRange去读取消息
streamReplyWithRange指定了stream,group、startId,endId以及需要读取的条数cont,这里的flag主要是处理noack和history
这个noack和history是做什么用的看方法源码:
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
void *arraylen_ptr = NULL;
size_t arraylen = 0;
streamIterator si;
int64_t numfields;
streamID id;
int propagate_last_id = 0;
int noack = flags & STREAM_RWR_NOACK;
//group模式请求历史
if (group && (flags & STREAM_RWR_HISTORY)) {
return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
consumer);
}
if (!(flags & STREAM_RWR_RAWENTRIES))
arraylen_ptr = addReplyDeferredLen(c);
复制代码
STREAM_RWR_HISTORY会调用streamReplyWithRangeFromConsumerPEL方法,先不看这个,假设现在是普通模式读取 或者 xreadgroup的id指定为”>”
streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) {
//迭代id 然后修改group的lastId
if (group && streamCompareID(&id,&group->last_id) > 0) {
group->last_id = id;
//noack模式需要传播最后一条id
if (noack) propagate_last_id = 1;
}
addReplyArrayLen(c,2);
addReplyStreamID(c,&id);
addReplyArrayLen(c,numfields*2);
/* 循环当前消息listpack上面的numfields */
while(numfields--) {
unsigned char *key, *value;
int64_t key_len, value_len;
streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
addReplyBulkCBuffer(c,key,key_len);
addReplyBulkCBuffer(c,value,value_len);
}
复制代码
这里对stream进行迭代,之前讲过stream存储结构是rax,而rax存储了listpack,主要逻辑如下:
- group.last_id记录了当前group消费的id最大值,所以如果是xreadgroup要跟新这个值
- 获取listpack上面一条消息的numfields,循环获取键值对加入返回的buffer里面
if (group && !noack) {
unsigned char buf[sizeof(streamID)];
streamEncodeID(buf,&id);
//创建一个未提交消息 插入group和consumer的pel
streamNACK *nack = streamCreateNACK(consumer);
int group_inserted =
raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
int consumer_inserted =
raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
//调用raxTryInsert失败说明已有了一个相同的节点 remove掉然修改信息重新insert
if (group_inserted == 0) {
streamFreeNACK(nack);
nack = raxFind(group->pel,buf,sizeof(buf));
serverAssert(nack != raxNotFound);
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
//修改信息
nack->consumer = consumer;
nack->delivery_time = mstime();
nack->delivery_count = 1;
/* Add the entry in the new consumer local PEL. */
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
} else if (group_inserted == 1 && consumer_inserted == 0) {
serverPanic("NACK half-created. Should not be possible.");
if (spi) {
robj *idarg = createObjectFromStreamID(&id);
streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
decrRefCount(idarg);
}
arraylen++;
if (count && count == arraylen) break;
}
if (spi && propagate_last_id)
streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
streamIteratorStop(&si);
if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
return arraylen;
复制代码
通过这段代码可以看出来,如果没有noack,表示这些消息需要客户端消费以后再ack给服务器。上面主要是创建streamNACK一个未提交的消息,分别insert到group和comsumer的pel里面,表示这些消息消费者消费了,但是没有ack
每成功读取一条消息会增加arraylen,等于count后中断循环++但是这里的不一定会读到完全等于count++,即使指定了count但是也可能没有这么多数据可以读,类似于mysql的limit。同时最后还要传播last_id修改到主从。
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) {
raxIterator ri;
unsigned char startkey[sizeof(streamID)];
unsigned char endkey[sizeof(streamID)];
streamEncodeID(startkey,start);
if (end) streamEncodeID(endkey,end);
size_t arraylen = 0;
void *arraylen_ptr = addReplyDeferredLen(c);
raxStart(&ri,consumer->pel);
raxSeek(&ri,">=",startkey,sizeof(startkey));
//注意这里
while(raxNext(&ri) && (!count || arraylen < count)) {
if (end && memcmp(ri.key,end,ri.key_len) > 0) break;
streamID thisid;
streamDecodeID(ri.key,&thisid);
if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL) == 0)
{
addReplyArrayLen(c,2);
addReplyStreamID(c,&thisid);
addReplyNullArray(c);
} else {
streamNACK *nack = ri.data;
nack->delivery_time = mstime();
nack->delivery_count++;
}
arraylen++;
}
raxStop(&ri);
setDeferredArrayLen(c,arraylen_ptr,arraylen);
return arraylen;
}
复制代码
以上是xreadgroup指定了一个准确的id会触发history模式,它的含义是:history只能在当前comsumer未提交的列表里面读取这个id的数据,最终交给了streamReplyWithRange去stream里面,当能够读取到一条未提交的消息时,会将delivery_count增加
//如果读到了数据可以返回
if (arraylen) {
if (c->resp == 2)
setDeferredArrayLen(c,arraylen_ptr,arraylen);
else
setDeferredMapLen(c,arraylen_ptr,arraylen);
goto cleanup;
}
复制代码
一旦读到了任何数据就可以返回了,就不需要timeout
if (timeout != -1) {
//处于事务中
if (c->flags & CLIENT_MULTI) {
addReplyNullArray(c);
goto cleanup;
}
blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
timeout, NULL, ids);
c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;
if (groupname) {
incrRefCount(groupname);
incrRefCount(consumername);
c->bpop.xread_group = groupname;
c->bpop.xread_consumer = consumername;
c->bpop.xread_group_noack = noack;
} else {
c->bpop.xread_group = NULL;
c->bpop.xread_consumer = NULL;
}
goto cleanup;
}
addReplyNullArray(c);
cleanup:
preventCommandPropagation(c);
if (ids != static_ids) zfree(ids);
zfree(groups);
}
复制代码
如果指定了block但是没有读取到数据,就会调用blockForKeys将客户端block….注意cleanup里面有一个preventCommandPropagation,这是阻止后续的命令传播,因为通过call方法调用了命令,调用完成后会传播命令到aof和主从节点,但是这个过程中已经传播了,所以要阻止后面再次传播。
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
dictEntry *de;
list *l;
int j;
c->bpop.timeout = timeout;
c->bpop.target = target;
if (target != NULL) incrRefCount(target);
for (j = 0; j < numkeys; j++) {
bkinfo *bki = zmalloc(sizeof(*bki));
if (btype == BLOCKED_STREAM)
bki->stream_id = ids[j];
if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) {
zfree(bki);
continue;
}
incrRefCount(keys[j]);
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;
l = listCreate();
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
serverAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
l = dictGetVal(de);
}
//client添加进去
listAddNodeTail(l,c);
bki->listnode = listLast(l);
}
blockClient(c,btype);
}
复制代码
blockForKeys将当前信息创建对象,并且添加进db的blocking_keys里面,blocking_keys是一个dict,value是list,用于存储因为读取某key而block的client
++当某个查询不到stream的某个信息时候,client会被block,如果stream有了新的信息,则会进行通知并且发送信息给client,这个操作是在哪里触发的?++
xadd触发
if (server.blocked_clients_by_type[BLOCKED_STREAM])
signalKeyAsReady(c->db, c->argv[1]);
复制代码
xadd结束后会判断是否有block中的客户端, 然后调用signalKeyAsReady
void signalKeyAsReady(redisDb *db, robj *key) {
readyList *rl;
//没有block的客户端不用通知
if (dictFind(db->blocking_keys,key) == NULL) return;
//已经添加
if (dictFind(db->ready_keys,key) != NULL) return;
//添加
rl = zmalloc(sizeof(*rl));
rl->key = key;
rl->db = db;
incrRefCount(key);
listAddNodeTail(server.ready_keys,rl);
incrRefCount(key);
serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}
复制代码
向ready_keys添加当前key,表示key已经准备好了
//具体执行代码方法
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
复制代码
之前分析过的processCommand方法会调用call来执行具体方法,在调用结束后会判断是否有准备好的key,调用handleClientsBlockedOnKeys触发
void handleClientsBlockedOnKeys(void) {
while(listLength(server.ready_keys) != 0) {
list *l;
l = server.ready_keys;
server.ready_keys = listCreate();
while(listLength(l) != 0) {
listNode *ln = listFirst(l);
readyList *rl = ln->value;
dictDelete(rl->db->ready_keys,rl->key);
server.fixed_time_expire++;
updateCachedTime(0);
/* Serve clients blocked on list key. */
robj *o = lookupKeyWrite(rl->db,rl->key);
if (o != NULL) {
if (o->type == OBJ_LIST)
serveClientsBlockedOnListKey(o,rl);
else if (o->type == OBJ_ZSET)
serveClientsBlockedOnSortedSetKey(o,rl);
else if (o->type == OBJ_STREAM)
serveClientsBlockedOnStreamKey(o,rl);
serveClientsBlockedOnKeyByModule(rl);
}
server.fixed_time_expire--;
/* Free this item. */
decrRefCount(rl->key);
zfree(rl);
listDelNode(l,ln);
}
listRelease(l);
}
}
复制代码
这方法会清除ready_keys的内存,根据type的不同调用不同的方法,我们这里是触发stream,所以进入serveClientsBlockedOnStreamKey
void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
stream *s = o->ptr;
if (de) {
list *clients = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(clients,&li);
复制代码
首先从blocking_keys获取客户端list
while((ln = listNext(&li))) {
client *receiver = listNodeValue(ln);
if (receiver->btype != BLOCKED_STREAM) continue;
bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key);
streamID *gt = &bki->stream_id;
streamCG *group = NULL;
if (receiver->bpop.xread_group) {
group = streamLookupCG(s,
receiver->bpop.xread_group->ptr);
if (!group) {
addReplyError(receiver,
"-NOGROUP the consumer group this client "
"was blocked on no longer exists");
unblockClient(receiver);
continue;
} else {
*gt = group->last_id;
}
}
//注意这里
if (streamCompareID(&s->last_id, gt) > 0) {
streamID start = *gt;
streamIncrID(&start);
//获取对应的comsumer
streamConsumer *consumer = NULL;
int noack = 0;
if (group) {
consumer =
streamLookupConsumer(group,
receiver->bpop.xread_consumer->ptr,
SLC_NONE);
noack = receiver->bpop.xread_group_noack;
}
//一个key就用数组返回,多个key用map返回
if (receiver->resp == 2) {
addReplyArrayLen(receiver,1);
addReplyArrayLen(receiver,2);
} else {
addReplyMapLen(receiver,1);
}
addReplyBulk(receiver,rl->key);
streamPropInfo pi = {
rl->key,
receiver->bpop.xread_group
};
streamReplyWithRange(receiver,s,&start,NULL,
receiver->bpop.xread_count,
0, group, consumer, noack, &pi);
unblockClient(receiver);
}
}
复制代码
接下来就是遍历client的list获取每一个client以及block信息,还是调用streamReplyWithRange方法获取stream消息发送
stream的group有一个特性,就是同一个group里面的消费者是竞争关系,一条消息不能多个group内的消费者消费,从上面注释中的代码也能够体现出来:
这里会比较stream的last_id和group,如果stream的比较大表示有新消息提交,然后跟新group的last_id,意味着当一个client竞争到消息以后,跟新了group的last_id为stream.last_id,后续的client在这个whlie循环中的判断会不通过。
++blocking_keys的值是一个list结构,说明了一个group里面多个consumer-client同时block主,先到的那个client会获取消息(list获取是顺序的),如果说这个client又调用了xreadgroup去读消息,那么他也竞争不过其他block的client++
总结
总结一下读取stream消息的特点:
- 可以一次性从多个stream的不同id位置读取消息,count参数类似于mysql的limit,可以控制消息返回的数量
- 通过非消费者模式去读取stream的消息,只要这条消息id是存在于stream中的任意位置,都可以返回数据
- 通过xreadgroup去读消息,只能指定”>”去读取当前group.last_id以后的消息,或者指定一个id去读取当前comsumer已经读取了但是未提交的消息。也就是说如果使用noack或者调用ack命令去ack了消息,那么这一条消息对于comsumer来说就是不可见的
- 当xreadgroup读取不到消息时会block客户端,当其他客户端xadd了对应stream消息后服务器会通知block的客户端,但是只有一个客户端能够消费它