Redis6.0.6源码阅读——stream(下)

前言

上一篇stream文章分析了xread含xreadgroup的源码,通过其中的实现方式了解到了消费者组的特点,这篇文章分析剩下的stream命令,了解一下stream还能干些什么事情。

正文

XCLAIM命令

由于消费者消费了消息后会进入pel里面,需要后续对该消息进行ack,如果该消费者下线了,就会造成消息堆积无法ack,这个时候可以调用XCLAIM将多条消息放入自己的pel中

XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
        [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
        [FORCE] [JUSTID]
复制代码

XCLAIM传入stream的key、group的key以及对应的comsumer的key,并且支持多个id,额外参数解析:

  • :消息的最小空闲时间
  • IDLE :会跟新消息的delivered_time,idle会根据当前时间计算出delivered_time,当前时间-idle
  • TIME :和IDLE的区别就是TIME直接替换delivered_time
  • RETRYCOUNT:修改消息的retrycount属性
  • FORCE:如果在comsumer的pel里面找不到消息,就从stream里面获取消息加入当前pel
  • JUSTID:只返回id不返回其他信息
  • LASTID :修改group的last_id属性
void xclaimCommand(client *c) {
    streamCG *group = NULL;
    //通过key获取stream
    robj *o = lookupKeyRead(c->db,c->argv[1]);
    long long minidle;
    long long retrycount = -1;
    mstime_t deliverytime = -1;
    int force = 0;
    int justid = 0;

    //解析stream和gruop
    if (o) {
        if (checkType(c,o,OBJ_STREAM)) return;
        group = streamLookupCG(o->ptr,c->argv[2]->ptr);
    }


    if (o == NULL || group == NULL) {
        addReplyErrorFormat(c,"-NOGROUP No such key '%s' or "
                              "consumer group '%s'", (char*)c->argv[1]->ptr,
                              (char*)c->argv[2]->ptr);
        return;
    }

    if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,
        "Invalid min-idle-time argument for XCLAIM")
        != C_OK) return;
    if (minidle < 0) minidle = 0;
复制代码

首先根据参数找到对应的stream以及group,解析并初始化minidle

    int j;
    for (j = 5; j < c->argc; j++) {
        streamID id;
        if (streamParseStrictIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break;
    }
int last_id_arg = j-1;
复制代码

然后验证id是否消息,但是这一步没有记录id信息,而是记录了参数id最后的索引

//如果last_id大于 当前的 需要修改并且传播
    if (streamCompareID(&last_id,&group->last_id) > 0) {
        group->last_id = last_id;
        propagate_last_id = 1;
    }

    //检查deliverytime的合法以及是否设置 默认值为当前时间
    if (deliverytime != -1) {

        if (deliverytime < 0 || deliverytime > now) deliverytime = now;
    } else {
        deliverytime = now;
    }
复制代码

以上是解析参数后需要检查的项目,解析参数的代码就不放上来了,最终会将上面所有参数都解析出来(如果有的话)

streamConsumer *consumer = NULL;
    void *arraylenptr = addReplyDeferredLen(c);
    size_t arraylen = 0;
    for (int j = 5; j <= last_id_arg; j++) {
        streamID id;
        unsigned char buf[sizeof(streamID)];
        if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
            serverPanic("StreamID invalid after check. Should not be possible.");
        streamEncodeID(buf,&id);

        //获取未提交的消息
        streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));

        //如果设置了force 但是在pel里面没找到 从stream里面获取创建并放入group的pel中
        if (force && nack == raxNotFound) {
            streamIterator myiterator;
            streamIteratorStart(&myiterator,o->ptr,&id,&id,0);
            int64_t numfields;
            int found = 0;
            streamID item_id;
            if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1;
            streamIteratorStop(&myiterator);


            if (!found) continue;
            //创建
            nack = streamCreateNACK(NULL);
            raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
        }

        if (nack != raxNotFound) {
            //检查最小生存时间
            if (nack->consumer && minidle) {
                mstime_t this_idle = now - nack->delivery_time;
                if (this_idle < minidle) continue;
            }
            //删除旧的消费者nack,并放入当前消费者
            if (nack->consumer)
                raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
            /* Update the consumer and idle time. */
            if (consumer == NULL)
                consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE);
            nack->consumer = consumer;
            nack->delivery_time = deliverytime;
            if (retrycount >= 0) {
                nack->delivery_count = retrycount;
                //如果没有justid命令 表示这条消息又被处理了一次 所以要自增
            } else if (!justid) {
                nack->delivery_count++;
            }
            //加入新消费者pel
            raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
            //justid表示只返回id不返回其他信息
            if (justid) {
                addReplyStreamID(c,&id);
            } else {
                size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,
                                    NULL,NULL,STREAM_RWR_RAWENTRIES,NULL);
                if (!emitted) addReplyNull(c);
            }
            arraylen++;

            /* Propagate this change. */
            streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
            propagate_last_id = 0;
            server.dirty++;
        }
    }
复制代码

以上就是循环解析id,从注释中可以看出来这条命令是原子的,之前就会对id进行解析来验证,在这个阶段如果获取id错误,那么会导致redis中断

 if (propagate_last_id) {
        streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
        server.dirty++;
    }
    setDeferredArrayLen(c,arraylenptr,arraylen);
    //调用call方法执行后会传播命令 使用该方法让call方法后续不传播命令  因为上面已经传播过了
    preventCommandPropagation(c);
复制代码

最后进行命令传播

XCLAIM在移动消息的同时可以修改消息的属性,提供了更大的灵活性

XACK命令

xack是消费者对已消费的消息进行提交,其逻辑也很简单

void xackCommand(client *c) {
    streamCG *group = NULL;
    //获取stream和group
    robj *o = lookupKeyRead(c->db,c->argv[1]);
    if (o) {
        if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
        group = streamLookupCG(o->ptr,c->argv[2]->ptr);
    }

    if (o == NULL || group == NULL) {
        addReply(c,shared.czero);
        return;
    }

    //检查这些id是有效的 其中一个无效就执行不成功
    for (int j = 3; j < c->argc; j++) {
        streamID id;
        if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
    }

    int acknowledged = 0;
    for (int j = 3; j < c->argc; j++) {
        streamID id;
        unsigned char buf[sizeof(streamID)];
        if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
            serverPanic("StreamID invalid after check. Should not be possible.");
        streamEncodeID(buf,&id);

        //将nack删除 group的pel和consumer的pel中
        streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
        if (nack != raxNotFound) {
            raxRemove(group->pel,buf,sizeof(buf),NULL);
            raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
            streamFreeNACK(nack);
            acknowledged++;
            server.dirty++;
        }
    }
    addReplyLongLong(c,acknowledged);
}
复制代码

只是删除group和comsumer的pel对应的数据

xtrim

xtrim支持对stream进行裁剪,指定最大容量,并且可以使用”~”来表示不精确删除,直接看streamTrimByLength源码

int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
 if (s->length <= maxlen) return 0;

    raxIterator ri;
    raxStart(&ri,s->rax);
    raxSeek(&ri,"^",NULL,0);

    int64_t deleted = 0;
复制代码

从rax树的头节点开始迭代

while(s->length > maxlen && raxNext(&ri)) {
        unsigned char *lp = ri.data, *p = lpFirst(lp);
        int64_t entries = lpGetInteger(p);

        //删除整个节点是否还是大于maxLen
        if (s->length - entries >= maxlen) {
            lpFree(lp);
            raxRemove(s->rax,ri.key,ri.key_len,NULL);
            raxSeek(&ri,">=",ri.key,ri.key_len);
            s->length -= entries;
            deleted += entries;
            continue;
        }

        //如果无法删除整个节点 就中断了
        if (approx) break;
复制代码

while条件是stream的长度必须小于指定的长度,从这里可以看出来,当使用了”~”后,表示每次删除只删除整个节点,删除整个节点后必须大于等于maxLen,意味着删除后strema的长度是大于等于maxLen的

//计算需要删除多少
        int64_t to_delete = s->length - maxlen;
        serverAssert(to_delete < entries);
        //修改 未删除节点信息 和 已删除节点信息
        lp = lpReplaceInteger(lp,&p,entries-to_delete);
        p = lpNext(lp,p);
        int64_t marked_deleted = lpGetInteger(p);
        lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);
        p = lpNext(lp,p);

        //跳过masterEntry
        int64_t master_fields_count = lpGetInteger(p);
        p = lpNext(lp,p);
        for (int64_t j = 0; j < master_fields_count; j++)
            p = lpNext(lp,p);
        p = lpNext(lp,p);
复制代码

计算当前节点上可以删除的数量,跳过listpack上的masterEntry

while(p) {
            int flags = lpGetInteger(p);
            int to_skip;

            //标记 未删除节点为已删除
            if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
                flags |= STREAM_ITEM_FLAG_DELETED;
                lp = lpReplaceInteger(lp,&p,flags);
                deleted++;
                s->length--;
                if (s->length <= maxlen) break; /* Enough entries deleted. */
            }

            //跳过ms seq 和 num-field节点
            p = lpNext(lp,p);
            p = lpNext(lp,p);
            p = lpNext(lp,p);
            if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
                to_skip = master_fields_count;
            } else {
                to_skip = lpGetInteger(p);
                to_skip = 1+(to_skip*2);
            }

            while(to_skip--) p = lpNext(lp,p);
            p = lpNext(lp,p);
        }

        //太多垃圾了 需要清理以下 但是这里并没有实现
        entries -= to_delete;
        marked_deleted += to_delete;
        if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
            /* TODO: perform a garbage collection. */
        }

        raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);

        break;
复制代码

然后循环对listpack进行删除,这里的删除是标记删除,将节点的flag标记为已删除。++当标记删除的节点数量太多了,就需要进行回收,但是这里并没有实现++

最后将listpack重新插入到stream中

总结

这篇文章分析了XCLAIM、XACK、XTRIM命令的源码,可以看出来在实现了rax和listpack数据结构的基础上,这些功能的实现都非常的简单,这些简单的实现共同组成了类似kafka的消息队列

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享