Redis6.0.6源码阅读——stream(中)

前言

上一篇文章解析了stream基础数据结构特点,同时分析了xadd源码了解到了添加消息流程,这篇文章将会完整分析stream命令,通过这些命令源码展现出来stream全部特点。

正文

XGROUP命令

前面分析xadd向stream添加消息源码,但是并不立刻看xread源码,因为xread里面涉及到消费者组的东西,先看一下XGROUP命令可以看出来一些特点,命令结构如下:

  • CREATE <id or $> MKSTREAM:创建命令,指定一个stream为其创建一个group,并且指定id,MKSTREAM表示如果不存在stream可以创建stream
  • SETID <id or $>:设置当前group的id
  • DESTROY :删除一个group
  • DELCONSUMER :删除一个消费者
  • HELP:打开帮助

++$表示stream当前记录的last_id++

int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
    return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
}

int streamGenericParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int strict) {
    char buf[128];
    if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
    memcpy(buf,o->ptr,sdslen(o->ptr)+1);

    // +是最大ID -是最小ID  严格模式下不能设置这种id
    if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0')
        goto invalid;

    if (buf[0] == '-' && buf[1] == '\0') {
        id->ms = 0;
        id->seq = 0;
        return C_OK;
    } else if (buf[0] == '+' && buf[1] == '\0') {
        id->ms = UINT64_MAX;
        id->seq = UINT64_MAX;
        return C_OK;
    }

    /* Parse <ms>-<seq> form. */
    char *dot = strchr(buf,'-');
    if (dot) *dot = '\0';
    unsigned long long ms, seq;
    if (string2ull(buf,&ms) == 0) goto invalid;
    if (dot && string2ull(dot+1,&seq) == 0) goto invalid;
    if (!dot) seq = missing_seq;
    id->ms = ms;
    id->seq = seq;
    return C_OK;

invalid:
    if (c) addReplyError(c,"Invalid stream ID specified as stream "
                           "command argument");
    return C_ERR;
}
复制代码

这一段是解析id,因为如果命令指定一个id格式是:ms-seq,将字符串解析成对象

streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {
    if (s->cgroups == NULL) s->cgroups = raxNew();
    if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)
        return NULL;

    streamCG *cg = zmalloc(sizeof(*cg));
    cg->pel = raxNew();
    cg->consumers = raxNew();
    cg->last_id = *id;
    raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
    return cg;
}
复制代码

创建streamCG对象,初始化pel、consumers和last_id为指定id,然后插入到cgroups的rax树中

xgroup没有放出全部源码,因为就是对参数的解析,不涉及到复杂逻辑,主要就是增删group,都是操作rax的api

xread命令

xread支持两种模式,以一个非消费者和消费者模式去读取数据:

XREAD [BLOCK ] [COUNT ] STREAMS key_1 key_2 … key_N ID_1 ID_2 … ID_N

BLOCK表示阻塞时间,COUNT表示读取消息个数,这个命令支持从多个stream对于的某个id开始的数据(id递增的)

同时支持以一个消费者去读取,命令变为:XREADGROUP GROUP group-name consumer-name….

void xreadCommand(client *c) {
    //BLOCK 阻塞时间
    long long timeout = -1;
    long long count = 0;
    int streams_count = 0;
    int streams_arg = 0;
    //表示不需要ack
    int noack = 0;
    #define STREAMID_STATIC_VECTOR_LEN 8
    streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
    streamID *ids = static_ids;
    streamCG **groups = NULL;
    //是否为xreadgroup命令
    int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */
    robj *groupname = NULL;
    robj *consumername = NULL;

    
    for (int i = 1; i < c->argc; i++) {
        int moreargs = c->argc-i-1;
        char *o = c->argv[i]->ptr;
        //使用block在lua中没有意义
        if (!strcasecmp(o,"BLOCK") && moreargs) {
            if (c->flags & CLIENT_LUA) {
                /* There is no sense to use BLOCK option within LUA */
                addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr);
                return;
            }
            i++;
            if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,
                UNIT_MILLISECONDS) != C_OK) return;
        } else if (!strcasecmp(o,"COUNT") && moreargs) {
            i++;
            if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)
                return;
            if (count < 0) count = 0;
        } else if (!strcasecmp(o,"STREAMS") && moreargs) {
            //获取流的数量 id和key一一对应的
            streams_arg = i+1;
            streams_count = (c->argc-streams_arg);
            if ((streams_count % 2) != 0) {
                addReplyError(c,"Unbalanced XREAD list of streams: "
                                "for each stream key an ID or '$' must be "
                                "specified.");
                return;
            }
            streams_count /= 2;
            break;
        } else if (!strcasecmp(o,"GROUP") && moreargs >= 2) {
            if (!xreadgroup) {
                addReplyError(c,"The GROUP option is only supported by "
                                "XREADGROUP. You called XREAD instead.");
                return;
            }
            groupname = c->argv[i+1];
            consumername = c->argv[i+2];
            i += 2;
        } else if (!strcasecmp(o,"NOACK")) {
            if (!xreadgroup) {
                addReplyError(c,"The NOACK option is only supported by "
                                "XREADGROUP. You called XREAD instead.");
                return;
            }
            noack = 1;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }
复制代码

首先是对参数的解析,有以下几个特点:

  • 不能在lua脚本中设置BLOCK参数
  • 命令中stream的key数量必须和id数量一致的
  • group和noack必须是xreadgroup命令才支持
//必填参数校验
    if (streams_arg == 0) {
        addReply(c,shared.syntaxerr);
        return;
    }

    if (xreadgroup && groupname == NULL) {
        addReplyError(c,"Missing GROUP option for XREADGROUP");
        return;
    }

    
    if (streams_count > STREAMID_STATIC_VECTOR_LEN)
        ids = zmalloc(sizeof(streamID)*streams_count);
    if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count);

    for (int i = streams_arg + streams_count; i < c->argc; i++) {
        //给每一个key找到对应的stream 并且找到对应的group(如果是xreadgroup命令) 放入数组
        int id_idx = i - streams_arg - streams_count;
        robj *key = c->argv[i-streams_count];
        robj *o = lookupKeyRead(c->db,key);
        if (o && checkType(c,o,OBJ_STREAM)) goto cleanup;
        streamCG *group = NULL;

        if (groupname) {
            if (o == NULL ||
                (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
            {
                addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
                                       "group '%s' in XREADGROUP with GROUP "
                                       "option",
                                    (char*)key->ptr,(char*)groupname->ptr);
                goto cleanup;
            }
            groups[id_idx] = group;
        }

        if (strcmp(c->argv[i]->ptr,"$") == 0) {
            if (xreadgroup) {
                addReplyError(c,"The $ ID is meaningless in the context of "
                                "XREADGROUP: you want to read the history of "
                                "this consumer by specifying a proper ID, or "
                                "use the > ID to get new messages. The $ ID would "
                                "just return an empty result set.");
                goto cleanup;
            }
            if (o) {
                stream *s = o->ptr;
                ids[id_idx] = s->last_id;
            } else {
                ids[id_idx].ms = 0;
                ids[id_idx].seq = 0;
            }
            continue;
            //解析 >
        } else if (strcmp(c->argv[i]->ptr,">") == 0) {
            if (!xreadgroup) {
                addReplyError(c,"The > ID can be specified only when calling "
                                "XREADGROUP using the GROUP <group> "
                                "<consumer> option.");
                goto cleanup;
            }
            /* We use just the maximum ID to signal this is a ">" ID, anyway
             * the code handling the blocking clients will have to update the
             * ID later in order to match the changing consumer group last ID. */
            ids[id_idx].ms = UINT64_MAX;
            ids[id_idx].seq = UINT64_MAX;
            continue;
        }
        if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)
            goto cleanup;
    }
复制代码

这一段是将多个stream的key,以及指定的id放入数组中,几个关键特点如下:

  • stream和group都是必须存在的
  • 使用group模式下id不能指定为,因为,因为是当前stream最大id,如果这么指定那么group读取不到任何数据
  • group模式下id可以使用”>”
size_t arraylen = 0;
    void *arraylen_ptr = NULL;
    for (int i = 0; i < streams_count; i++) {
        robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);
        if (o == NULL) continue;
        stream *s = o->ptr;
        streamID *gt = ids+i;
        int serve_synchronously = 0;
        //id没有指定为 > 为true
        int serve_history = 0;
        
        if (groups) {
            //如果id没有指定为 ">" 设置serve_history需要从历史数据中读取
            if (gt->ms != UINT64_MAX ||
                gt->seq != UINT64_MAX)
            {
                serve_synchronously = 1;
                serve_history = 1;
                //处理 id = > 检查group的last_id和stream最新提交的大小
            } else if (s->length) {
                streamID maxid, *last = &groups[i]->last_id;
                streamLastValidID(s, &maxid);
                //如果maxid比last要大 说明有新内容
                if (streamCompareID(&maxid, last) > 0) {
                    serve_synchronously = 1;
                    *gt = *last;
                }
            }
        } else if (s->length) {
            //没有指定group 则无需判断group的id 同时也是判断新内容
            streamID maxid;
            streamLastValidID(s, &maxid);
            if (streamCompareID(&maxid, gt) > 0) {
                serve_synchronously = 1;
            }
        }
复制代码

这里循环stream的key获取stream,主要特点:

  • 通过serve_synchronously来确定是否可以读取一个值,在普通模式下读取一个id,只需要判断有没有比他更大的id,因为id递增,如果有则说明可能会读到(没有话一定不会读到)
  • 如果xreadgroup模式下指定了一个准确的id值,表示要读取一个历史值,如果id指定为”>”,表示从group属性里面的last_id开始读。也就是说当加入一个消费组以后,一般都是通过“>”去读取新消息
if (serve_synchronously) {
            arraylen++;
            if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c);
            streamID start = *gt;
            streamIncrID(&start);
            
            if (c->resp == 2) addReplyArrayLen(c,2);
            addReplyBulk(c,c->argv[streams_arg+i]);
            streamConsumer *consumer = NULL;
            //查找这个group里面的消费者 如果没有创建
            if (groups) consumer = streamLookupConsumer(groups[i],
                                                        consumername->ptr,
                                                        SLC_NONE);
            streamPropInfo spi = {c->argv[i+streams_arg],groupname};
            int flags = 0;
            if (noack) flags |= STREAM_RWR_NOACK;
            if (serve_history) flags |= STREAM_RWR_HISTORY;
            streamReplyWithRange(c,s,&start,NULL,count,0,
                                 groups ? groups[i] : NULL,
                                 consumer, flags, &spi);
            if (groups) server.dirty++;
        }
    }
复制代码

serve_synchronously为真表示可以读,同时处理comsumer,没有就以当前client创建一个comsumer,调用streamReplyWithRange去读取消息

streamReplyWithRange指定了stream,group、startId,endId以及需要读取的条数cont,这里的flag主要是处理noack和history

这个noackhistory是做什么用的看方法源码:

size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
    void *arraylen_ptr = NULL;
    size_t arraylen = 0;
    streamIterator si;
    int64_t numfields;
    streamID id;
    int propagate_last_id = 0;
    int noack = flags & STREAM_RWR_NOACK;
    
    //group模式请求历史
    if (group && (flags & STREAM_RWR_HISTORY)) {
        return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
                                                   consumer);
    }
   if (!(flags & STREAM_RWR_RAWENTRIES))
        arraylen_ptr = addReplyDeferredLen(c);
复制代码

STREAM_RWR_HISTORY会调用streamReplyWithRangeFromConsumerPEL方法,先不看这个,假设现在是普通模式读取 或者 xreadgroup的id指定为”>”

streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) {
        //迭代id 然后修改group的lastId
        if (group && streamCompareID(&id,&group->last_id) > 0) {
            group->last_id = id;
            //noack模式需要传播最后一条id
            if (noack) propagate_last_id = 1;
        }

        addReplyArrayLen(c,2);
        addReplyStreamID(c,&id);

        addReplyArrayLen(c,numfields*2);

        /* 循环当前消息listpack上面的numfields */
        while(numfields--) {
            unsigned char *key, *value;
            int64_t key_len, value_len;
            streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
            addReplyBulkCBuffer(c,key,key_len);
            addReplyBulkCBuffer(c,value,value_len);
        }
复制代码

这里对stream进行迭代,之前讲过stream存储结构是rax,而rax存储了listpack,主要逻辑如下:

  1. group.last_id记录了当前group消费的id最大值,所以如果是xreadgroup要跟新这个值
  2. 获取listpack上面一条消息的numfields,循环获取键值对加入返回的buffer里面
if (group && !noack) {
            unsigned char buf[sizeof(streamID)];
            streamEncodeID(buf,&id);

            //创建一个未提交消息 插入group和consumer的pel
            streamNACK *nack = streamCreateNACK(consumer);
            int group_inserted =
                raxTryInsert(group->pel,buf,sizeof(buf),nack,NULL);
            int consumer_inserted =
                raxTryInsert(consumer->pel,buf,sizeof(buf),nack,NULL);

            //调用raxTryInsert失败说明已有了一个相同的节点 remove掉然修改信息重新insert
            if (group_inserted == 0) {
                streamFreeNACK(nack);
                nack = raxFind(group->pel,buf,sizeof(buf));
                serverAssert(nack != raxNotFound);
                raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
                //修改信息
                nack->consumer = consumer;
                nack->delivery_time = mstime();
                nack->delivery_count = 1;
                /* Add the entry in the new consumer local PEL. */
                raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
            } else if (group_inserted == 1 && consumer_inserted == 0) {
                serverPanic("NACK half-created. Should not be possible.");

 if (spi) {
                robj *idarg = createObjectFromStreamID(&id);
                streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
                decrRefCount(idarg);
            }
arraylen++;
        if (count && count == arraylen) break;
            }

if (spi && propagate_last_id)
        streamPropagateGroupID(c,spi->keyname,group,spi->groupname);

    streamIteratorStop(&si);
    if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
    return arraylen;
复制代码

通过这段代码可以看出来,如果没有noack,表示这些消息需要客户端消费以后再ack给服务器。上面主要是创建streamNACK一个未提交的消息,分别insert到group和comsumer的pel里面,表示这些消息消费者消费了,但是没有ack

每成功读取一条消息会增加arraylen,等于count后中断循环++但是这里的不一定会读到完全等于count++,即使指定了count但是也可能没有这么多数据可以读,类似于mysql的limit。同时最后还要传播last_id修改到主从。

size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer) {
    raxIterator ri;
    unsigned char startkey[sizeof(streamID)];
    unsigned char endkey[sizeof(streamID)];
    streamEncodeID(startkey,start);
    if (end) streamEncodeID(endkey,end);

    size_t arraylen = 0;
    void *arraylen_ptr = addReplyDeferredLen(c);
    raxStart(&ri,consumer->pel);
    raxSeek(&ri,">=",startkey,sizeof(startkey));
    //注意这里
    while(raxNext(&ri) && (!count || arraylen < count)) {
        if (end && memcmp(ri.key,end,ri.key_len) > 0) break;
        streamID thisid;
        streamDecodeID(ri.key,&thisid);
        if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL,
                                 STREAM_RWR_RAWENTRIES,NULL) == 0)
        {
            addReplyArrayLen(c,2);
            addReplyStreamID(c,&thisid);
            addReplyNullArray(c);
        } else {
            streamNACK *nack = ri.data;
            nack->delivery_time = mstime();
            nack->delivery_count++;
        }
        arraylen++;
    }
    raxStop(&ri);
    setDeferredArrayLen(c,arraylen_ptr,arraylen);
    return arraylen;
}
复制代码

以上是xreadgroup指定了一个准确的id会触发history模式,它的含义是:history只能在当前comsumer未提交的列表里面读取这个id的数据,最终交给了streamReplyWithRange去stream里面,当能够读取到一条未提交的消息时,会将delivery_count增加

//如果读到了数据可以返回
    if (arraylen) {
        if (c->resp == 2)
            setDeferredArrayLen(c,arraylen_ptr,arraylen);
        else
            setDeferredMapLen(c,arraylen_ptr,arraylen);
        goto cleanup;
    }
复制代码

一旦读到了任何数据就可以返回了,就不需要timeout

    if (timeout != -1) {
        //处于事务中
        if (c->flags & CLIENT_MULTI) {
            addReplyNullArray(c);
            goto cleanup;
        }
        blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
                     timeout, NULL, ids);
        c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;

        if (groupname) {
            incrRefCount(groupname);
            incrRefCount(consumername);
            c->bpop.xread_group = groupname;
            c->bpop.xread_consumer = consumername;
            c->bpop.xread_group_noack = noack;
        } else {
            c->bpop.xread_group = NULL;
            c->bpop.xread_consumer = NULL;
        }
        goto cleanup;
    }
    
    addReplyNullArray(c);
cleanup: 
    
    preventCommandPropagation(c);
    if (ids != static_ids) zfree(ids);
    zfree(groups);
}
复制代码

如果指定了block但是没有读取到数据,就会调用blockForKeys将客户端block….注意cleanup里面有一个preventCommandPropagation,这是阻止后续的命令传播,因为通过call方法调用了命令,调用完成后会传播命令到aof和主从节点,但是这个过程中已经传播了,所以要阻止后面再次传播。

void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, streamID *ids) {
    dictEntry *de;
    list *l;
    int j;

    c->bpop.timeout = timeout;
    c->bpop.target = target;

    if (target != NULL) incrRefCount(target);

    for (j = 0; j < numkeys; j++) {
        
        bkinfo *bki = zmalloc(sizeof(*bki));
        if (btype == BLOCKED_STREAM)
            bki->stream_id = ids[j];


        if (dictAdd(c->bpop.keys,keys[j],bki) != DICT_OK) {
            zfree(bki);
            continue;
        }
        incrRefCount(keys[j]);
        
        de = dictFind(c->db->blocking_keys,keys[j]);
        if (de == NULL) {
            int retval;

            l = listCreate();
            retval = dictAdd(c->db->blocking_keys,keys[j],l);
            incrRefCount(keys[j]);
            serverAssertWithInfo(c,keys[j],retval == DICT_OK);
        } else {
            l = dictGetVal(de);
        }
        //client添加进去
        listAddNodeTail(l,c);
        bki->listnode = listLast(l);
    }
    blockClient(c,btype);
}
复制代码

blockForKeys将当前信息创建对象,并且添加进db的blocking_keys里面,blocking_keys是一个dict,value是list,用于存储因为读取某key而block的client

++当某个查询不到stream的某个信息时候,client会被block,如果stream有了新的信息,则会进行通知并且发送信息给client,这个操作是在哪里触发的?++

xadd触发

if (server.blocked_clients_by_type[BLOCKED_STREAM])
        signalKeyAsReady(c->db, c->argv[1]);
复制代码

xadd结束后会判断是否有block中的客户端, 然后调用signalKeyAsReady

void signalKeyAsReady(redisDb *db, robj *key) {
    readyList *rl;

    //没有block的客户端不用通知
    if (dictFind(db->blocking_keys,key) == NULL) return;

    //已经添加
    if (dictFind(db->ready_keys,key) != NULL) return;

    //添加
    rl = zmalloc(sizeof(*rl));
    rl->key = key;
    rl->db = db;
    incrRefCount(key);
    listAddNodeTail(server.ready_keys,rl);

    incrRefCount(key);
    serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}
复制代码

向ready_keys添加当前key,表示key已经准备好了

//具体执行代码方法
        call(c,CMD_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
复制代码

之前分析过的processCommand方法会调用call来执行具体方法,在调用结束后会判断是否有准备好的key,调用handleClientsBlockedOnKeys触发

void handleClientsBlockedOnKeys(void) {
    while(listLength(server.ready_keys) != 0) {
        list *l;
        
        l = server.ready_keys;
        server.ready_keys = listCreate();

        while(listLength(l) != 0) {
            listNode *ln = listFirst(l);
            readyList *rl = ln->value;
            
            dictDelete(rl->db->ready_keys,rl->key);
            
            server.fixed_time_expire++;
            updateCachedTime(0);

            /* Serve clients blocked on list key. */
            robj *o = lookupKeyWrite(rl->db,rl->key);

            if (o != NULL) {
                if (o->type == OBJ_LIST)
                    serveClientsBlockedOnListKey(o,rl);
                else if (o->type == OBJ_ZSET)
                    serveClientsBlockedOnSortedSetKey(o,rl);
                else if (o->type == OBJ_STREAM)
                    serveClientsBlockedOnStreamKey(o,rl);
                serveClientsBlockedOnKeyByModule(rl);
            }
            server.fixed_time_expire--;

            /* Free this item. */
            decrRefCount(rl->key);
            zfree(rl);
            listDelNode(l,ln);
        }
        listRelease(l); 
    }
}
复制代码

这方法会清除ready_keys的内存,根据type的不同调用不同的方法,我们这里是触发stream,所以进入serveClientsBlockedOnStreamKey

void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
    dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
    stream *s = o->ptr;
    
    if (de) {
        list *clients = dictGetVal(de);
        listNode *ln;
        listIter li;
        listRewind(clients,&li);
复制代码

首先从blocking_keys获取客户端list

while((ln = listNext(&li))) {
            client *receiver = listNodeValue(ln);
            if (receiver->btype != BLOCKED_STREAM) continue;
            bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key);
            streamID *gt = &bki->stream_id;

            streamCG *group = NULL;
            if (receiver->bpop.xread_group) {
                group = streamLookupCG(s,
                        receiver->bpop.xread_group->ptr);
                if (!group) {
                    addReplyError(receiver,
                        "-NOGROUP the consumer group this client "
                        "was blocked on no longer exists");
                    unblockClient(receiver);
                    continue;
                } else {
                    *gt = group->last_id;
                }
            }

	    //注意这里
            if (streamCompareID(&s->last_id, gt) > 0) {
                streamID start = *gt;
                streamIncrID(&start);

               //获取对应的comsumer
                streamConsumer *consumer = NULL;
                int noack = 0;

                if (group) {
                    consumer =
                        streamLookupConsumer(group,
                                             receiver->bpop.xread_consumer->ptr,
                                             SLC_NONE);
                    noack = receiver->bpop.xread_group_noack;
                }

                //一个key就用数组返回,多个key用map返回
                if (receiver->resp == 2) {
                    addReplyArrayLen(receiver,1);
                    addReplyArrayLen(receiver,2);
                } else {
                    addReplyMapLen(receiver,1);
                }
                addReplyBulk(receiver,rl->key);

                streamPropInfo pi = {
                    rl->key,
                    receiver->bpop.xread_group
                };
                streamReplyWithRange(receiver,s,&start,NULL,
                                     receiver->bpop.xread_count,
                                     0, group, consumer, noack, &pi);

                unblockClient(receiver);
            }
        }
复制代码

接下来就是遍历client的list获取每一个client以及block信息,还是调用streamReplyWithRange方法获取stream消息发送

stream的group有一个特性,就是同一个group里面的消费者是竞争关系,一条消息不能多个group内的消费者消费,从上面注释中的代码也能够体现出来:

这里会比较stream的last_id和group,如果stream的比较大表示有新消息提交,然后跟新group的last_id,意味着当一个client竞争到消息以后,跟新了group的last_id为stream.last_id,后续的client在这个whlie循环中的判断会不通过。

++blocking_keys的值是一个list结构,说明了一个group里面多个consumer-client同时block主,先到的那个client会获取消息(list获取是顺序的),如果说这个client又调用了xreadgroup去读消息,那么他也竞争不过其他block的client++

总结

未命名文件.png

总结一下读取stream消息的特点:

  • 可以一次性从多个stream的不同id位置读取消息,count参数类似于mysql的limit,可以控制消息返回的数量
  • 通过非消费者模式去读取stream的消息,只要这条消息id是存在于stream中的任意位置,都可以返回数据
  • 通过xreadgroup去读消息,只能指定”>”去读取当前group.last_id以后的消息,或者指定一个id去读取当前comsumer已经读取了但是未提交的消息。也就是说如果使用noack或者调用ack命令去ack了消息,那么这一条消息对于comsumer来说就是不可见的
  • 当xreadgroup读取不到消息时会block客户端,当其他客户端xadd了对应stream消息后服务器会通知block的客户端,但是只有一个客户端能够消费它
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享