Redis6.0.6源码阅读——stream(上)

前言

stream是redis5引入的消息队列功能,根据redis作者坦言,stream狠狠的借鉴了kafka的实现,从而在redis当中也能使用到类似kafka的功能。stream设计究竟如何,走进源码一探究竟。

正文

简介

xadd mystream * testKey1 testVal1 testKey2 testVal2
复制代码

这是stream追加消息的命令,由以下组成:

  • mystream:为stream的名称
  • *:自增id,也可以自己指定
  • testKey-Val:消息对应的field以及值

可以看到一条消息可以有多个field键值对,并且可以不指定消费组

image.png

借用网上的图片,stream内部消息id是严格递增的,并且可以创建消费组,一个stream可以有多个消费组,一个消费组里面有多个消费者,特点如下:

  • 消费组用组名唯一标识来区分,每个消费都可以消费一个stream的全部消息。
  • 消费组内的消费者是竞争关系,同一条消息只能由一个消费者消费。
  • 消费者消费以后需要对消息进行确认。

底层结构

stream的底层结构主要由rax和listpack组成

rax

rax基数树(上)

rax基数树(下)

rax详细见之前的文章

listpack

listpack是rax树的value节点,存储字符串或整形,结构如下:

listpack.png

  • totalBytes是listpack表空间大小,占用4字节
  • num是存储entry的个数,和压缩列表一样,最大只支持65535,如果entry超过了65535,则需要遍历来统计个数
  • entry是实际数据节点:
    • encode编码方式
    • content实际数据
    • backlen表示长度,主要用于从后向前遍历
  • end结束标志,0xFF
unsigned char *lpNew(void) {
    unsigned char *lp = lp_malloc(LP_HDR_SIZE+1);
    if (lp == NULL) return NULL;
    lpSetTotalBytes(lp,LP_HDR_SIZE+1);
    lpSetNumElements(lp,0);
    lp[LP_HDR_SIZE] = LP_EOF;
    return lp;
}

#define LP_HDR_SIZE 6
复制代码

通过lpNew方法创建一个空的listpack,32bit的totalBytes加上16bit的num,一共需要6字节;然后设置末尾标志

//ele:待插入新元素   size:ele长度   p:插入的位置  where:三种模式(前插入、后插入、替换) newp(用于返回 新插入元素的下一个元素)
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp) {
    unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
    unsigned char backlen[LP_MAX_BACKLEN_SIZE];

    uint64_t enclen; //encode长度

    //如果待插入元素为null 表示替换(删除)
    if (ele == NULL) where = LP_REPLACE;

    //如果是后插入 改为跳到下一个节点的前插入
    //这样后续插入代码都是相同的
    if (where == LP_AFTER) {
        p = lpSkip(p);
        where = LP_BEFORE;
    }

    //存储插入前p的偏移量
    unsigned long poff = p-lp;

    //获取ele的enctype
    int enctype;
    if (ele) {
        enctype = lpEncodeGetType(ele,size,intenc,&enclen);
    } else {
        enctype = -1;
        enclen = 0;
    }

    //获取backLen来从后向前遍历
    unsigned long backlen_size = ele ? lpEncodeBacklen(backlen,enclen) : 0;
    uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
    uint32_t replaced_len  = 0;
    if (where == LP_REPLACE) {
        replaced_len = lpCurrentEncodedSize(p);
        replaced_len += lpEncodeBacklen(NULL,replaced_len);
    }

    uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
                                  - replaced_len;
    if (new_listpack_bytes > UINT32_MAX) return NULL;
    
    unsigned char *dst = lp + poff; /* May be updated after reallocation. */

    /* 需要更多的空间*/
    if (new_listpack_bytes > old_listpack_bytes) {
        if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
        dst = lp + poff;
    }

    //移动元素
    if (where == LP_BEFORE) {
        memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
    } else { /* LP_REPLACE. */
        long lendiff = (enclen+backlen_size)-replaced_len;
        memmove(dst+replaced_len+lendiff,
                dst+replaced_len,
                old_listpack_bytes-poff-replaced_len);
    }

    /* 释放空间 */
    if (new_listpack_bytes < old_listpack_bytes) {
        if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
        dst = lp + poff;
    }

    /* 保存下一个  */
    if (newp) {
        *newp = dst;
        /* In case of deletion, set 'newp' to NULL if the next element is
         * the EOF element. */
        if (!ele && dst[0] == LP_EOF) *newp = NULL;
    }
    if (ele) {
        if (enctype == LP_ENCODING_INT) {
            memcpy(dst,intenc,enclen);
        } else {
            lpEncodeString(dst,ele,size);
        }
        dst += enclen;
        memcpy(dst,backlen,backlen_size);
        dst += backlen_size;
    }

    /* 修改header信息*/
    if (where != LP_REPLACE || ele == NULL) {
        uint32_t num_elements = lpGetNumElements(lp);
        if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
            //增加或删除元素
            if (ele)
                lpSetNumElements(lp,num_elements+1);
            else
                lpSetNumElements(lp,num_elements-1);
        }
    }
    lpSetTotalBytes(lp,new_listpack_bytes);

#if 0
    unsigned char *oldlp = lp;
    lp = lp_malloc(new_listpack_bytes);
    memcpy(lp,oldlp,new_listpack_bytes);
    if (newp) {
        unsigned long offset = (*newp)-oldlp;
        *newp = lp + offset;
    }
    /* Make sure the old allocation contains garbage. */
    memset(oldlp,'A',new_listpack_bytes);
    lp_free(oldlp);
#endif

    return lp;
}
复制代码

listpack的insert代码,具体见注释,大体流程如下:

  1. 将前插入改为后插入,统一代码
  2. 计算插入元素或者替换元素所需要的空间,重新分配整个listpack内存
  3. 调整old元素的位置,腾出空间给新元素
  4. 释放旧listpack,在新的listpack中进行插入或者替换
  5. 修改header信息
unsigned char *lpFirst(unsigned char *lp) {
    lp += LP_HDR_SIZE; /* Skip the header. */
    if (lp[0] == LP_EOF) return NULL;
    return lp;
}
复制代码

获取首个元素的指针,就是header的下一个而已

unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {
    if (p-lp == LP_HDR_SIZE) return NULL;
    p--; 
    uint64_t prevlen = lpDecodeBacklen(p);
    //加上编码长度 用当前元素指针减去prevlen就可以获取
    prevlen += lpEncodeBacklen(NULL,prevlen);
    return p-prevlen+1;
}
复制代码

获取当一个元素指针,只需要获取Backlen属性就可以倒序遍历

listpack支持next或perv遍历,剩下的方法就不赘述了

stream解析

typedef struct stream {
    rax *rax;
    uint64_t length;
    streamID last_id;
    rax *cgroups;           /* 消费者组*/
} stream;
复制代码

stream由一个rax基数树当作数据存储,last_id表示当前最新id,用了另外一个rax树来存储当前stream的消费者组。

typedef struct streamID {
    uint64_t ms;        
    uint64_t seq;      
} streamID;
复制代码

streamID由毫秒和自增序号组成

void streamIncrID(streamID *id) {
    if (id->seq == UINT64_MAX) {
        if (id->ms == UINT64_MAX) {
            id->ms = id->seq = 0;
        } else {
            id->ms++;
            id->seq = 0;
        }
    } else {
        id->seq++;
    }
}
复制代码

可以看到seq和ms最大值都是UINT64的最大值,超出以后重新设置为0


消息格式

根据上文的命令,xadd了多个field,而这些field以及value都是存储在listpack中的,有如下几个特点:

  • listpack有一个masterEntry节点,用于存储创建listpack时的field信息(如首次xadd会将两个field存粗在masterEntry中),这是因为一个stream流里面的消息大多数都是相似的,这样的话后续插入消息可以不用存储field信息
  • 一个listpack是会存储多个消息的
  • 一个消息会占用多个listpack entry

下面介绍以下masterEntry

master.png

masterEntry是listpack的一个节点,相当于元数据节点,节点含义如下:

  • count:该listpack中未删除消息的个数
  • deleted:该listpack中删除消息的个数
  • num-field:field的数量
  • field1-fieldN:首次创建listpack时插入的field
  • 0:末尾标志

如同上一个命令一样,插入了testKey1和testKey2两个field以及value,masterEntry内部的field就是上述这两个

++如果后续又插入了信息消息,但是消息field和之前的一样,那么listpack entry如下图所示:++

same.png

entry就不存filed信息,达到了节约空间的目的

lp-count表示该节点占用listpack数量 3+N

如果后续插入消息filed不同,那么存储方式就会变化

diff.png

这个时候新的field信息就会紧靠着value

消费者

typedef struct streamCG {
    streamID last_id;      
    rax *pel;               
    rax *consumers;
} streamCG;
复制代码

消费者组用两个rax树来存放未确认的消息和消费者,last_id表示已确认的最新id

typedef struct streamConsumer {
    mstime_t seen_time;
    sds name;            
    rax *pel;                   
} streamConsumer;
复制代码

seen_time是最后活跃时间,内部同样用rax来表示当前消费者未确认的消息

typedef struct streamNACK {
    mstime_t delivery_time;     /* 最后发送的时间 */
    uint64_t delivery_count;    /* 该消息发送的次数*/
    streamConsumer *consumer;   /* 归属消费者*/
} streamNACK;
复制代码

未确认消息结构体,消费者组内的pel和消费者内部pel是共享的,当某个消费者通过消费者组消费了消息后,会创建streamNACK放入到两个pel中

源码解析

按照开头所示XADD来解析一下源码过程

XADD key [MAXLEN [~|=] ] <ID or *> [field value] [field value] …

XADD支持MAXLEN来指定stream的长度,并且可以使用~来优化长度

void xaddCommand(client *c) {
    streamID id;
    //如果手动设置了id id_given=1
    int id_given = 0;
    long long maxlen = -1;  /* 是否设置maxLen */
    int approx_maxlen = 0;  /* MAXLEN参数~ */
    int maxlen_arg_idx = 0; /* maxlen参数的idx*/

    int i = 2; 
    for (; i < c->argc; i++) {
        //后面有多少个key-val参数
        int moreargs = (c->argc-1) - i;
        char *opt = c->argv[i]->ptr;
        //* 自增id
        if (opt[0] == '*' && opt[1] == '\0') {
            /* This is just a fast path for the common case of auto-ID
             * creation. */
            break;
        } else if (!strcasecmp(opt,"maxlen") && moreargs) {
            approx_maxlen = 0;
            char *next = c->argv[i+1]->ptr;
            //解析manlen后面参数 ~
            if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
                approx_maxlen = 1;
                i++;
            } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
                i++;
            }
            if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
                != C_OK) return;

            if (maxlen < 0) {
                addReplyError(c,"The MAXLEN argument must be >= 0.");
                return;
            }
            i++;
            maxlen_arg_idx = i;
        } else {
            //手动设置id
            if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return;
            id_given = 1;
            break;
        }
    }
    //key-val的index
    int field_pos = i+1;

    //key val必须成对出现
    if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {
        addReplyError(c,"wrong number of arguments for XADD");
        return;
    }

    //无法设置最小id
    if (id_given && id.ms == 0 && id.seq == 0) {
        addReplyError(c,"The ID specified in XADD must be greater than 0-0");
        return;
    }
复制代码

这一段代码是对参数的解析以及合法性验证,具体看注释

robj *o;
    stream *s;
    //在db中查找或者创建一个streamName-stream对象
    if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
    s = o->ptr;

    //超出容量了
    if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {
        addReplyError(c,"The stream has exhausted the last possible ID, "
                        "unable to add more items");
        return;
    }
    
    if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
        &id, id_given ? &id : NULL)
        == C_ERR)
    {
        addReplyError(c,"The ID specified in XADD is equal or smaller than the "
                        "target stream top item");
        return;
    }
    //向客户端发送id信息
    addReplyStreamID(c,&id);
复制代码

首先调用streamTypeLookupWriteOrCreate查找或者创建一个stream,对stream的last_id进行验证是否超出最大值,随后调用streamAppendItem添加

robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
    robj *o = lookupKeyWrite(c->db,key);
    if (o == NULL) {
        o = createStreamObject();
        dbAdd(c->db,key,o);
    } else {
        if (o->type != OBJ_STREAM) {
            addReply(c,shared.wrongtypeerr);
            return NULL;
        }
    }
    return o;
}
复制代码

stream和普通的key-val一样都存储在db上面,调用createStreamObject创建空stream。而streamAppendItem插入流程则会像上面分析的listpack一样的插入方式

int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
    
    //自增或者自定id
    streamID id;
    if (use_id)
        id = *use_id;
    else
        streamNextID(&s->last_id,&id);

    //自增id是循环的 检查并返回错误
    if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR;

    raxIterator ri;
    raxStart(&ri,s->rax);
    raxSeek(&ri,"$",NULL,0);

    size_t lp_bytes = 0;
    unsigned char *lp = NULL;

    if (raxNext(&ri)) {
        lp = ri.data;
        lp_bytes = lpBytes(lp);
    }
    raxStop(&ri);

    uint64_t rax_key[2];
    streamID master_id;
    //检查listpack节点数量和长度
    if (lp != NULL) {
        if (server.stream_node_max_bytes &&
            lp_bytes >= server.stream_node_max_bytes)
        {
            lp = NULL;
        } else if (server.stream_node_max_entries) {
            int64_t count = lpGetInteger(lpFirst(lp));
            if (count >= server.stream_node_max_entries) lp = NULL;
        }
    }

复制代码

首先判断是否需要自增id,然后获取listpack,因为id总是递增的,所以raxSeek方法会传入”$”来表示获取最后一个节点

f (lp == NULL || lp_bytes >= server.stream_node_max_bytes) {
        master_id = id;
        //将消息id encode成rax_key
        streamEncodeID(rax_key,&id);
        lp = lpNew();
        //count 为删除消息个数1
        lp = lpAppendInteger(lp,1);
        //deleted 已删除消息个数0
        lp = lpAppendInteger(lp,0);
        //field数量
        lp = lpAppendInteger(lp,numfields);
        for (int64_t i = 0; i < numfields; i++) {
            sds field = argv[i*2]->ptr;
            lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
        }
        //末尾标志位0
        lp = lpAppendInteger(lp,0);
        raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
        flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
复制代码

这是新插入listpack的情况,可以看到三个连续的lpAppendInteger,对应了上文提到的masterEntry格式,然后插入了fields,并设置为STREAM_ITEM_FLAG_SAMEFIELDS,表示本次插入的fields和masterEntry是一致的,可以用到上文的优化

else {
        serverAssert(ri.key_len == sizeof(rax_key));
        memcpy(rax_key,ri.key,sizeof(rax_key));

        //将rax_key解析成master_id
        streamDecodeID(rax_key,&master_id);
        unsigned char *lp_ele = lpFirst(lp);

        //首先获取count 然后新增count
        int64_t count = lpGetInteger(lp_ele);
        lp = lpReplaceInteger(lp,&lp_ele,count+1);
        //跳过deleted段
        lp_ele = lpNext(lp,lp_ele);
        lp_ele = lpNext(lp,lp_ele);

        //检查插入刚才插入的字段
        int64_t master_fields_count = lpGetInteger(lp_ele);
        lp_ele = lpNext(lp,lp_ele);
        if (numfields == master_fields_count) {
            int64_t i;
            //将原filed和带插入相比
            for (i = 0; i < master_fields_count; i++) {
                sds field = argv[i*2]->ptr;
                int64_t e_len;
                unsigned char buf[LP_INTBUF_SIZE];
                unsigned char *e = lpGet(lp_ele,&e_len,buf);
                //复制数据
                if (sdslen(field) != (size_t)e_len ||
                    memcmp(e,field,e_len) != 0) break;
                lp_ele = lpNext(lp,lp_ele);
            }
            //所有字段都是一样的 可以压缩字段名
            if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
        }
    }
复制代码

如果已经存在了listpack,则会检查masterEntry上的field信息与当先插入的差异,如果完全相同,设置STREAM_ITEM_FLAG_SAMEFIELDS

//三个头部信息
    lp = lpAppendInteger(lp,flags);
    lp = lpAppendInteger(lp,id.ms - master_id.ms);
    lp = lpAppendInteger(lp,id.seq - master_id.seq);
    //如果不相同修改numfields
    if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
        lp = lpAppendInteger(lp,numfields);
    for (int64_t i = 0; i < numfields; i++) {
        sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
        //如果不相同 还需要插入field信息
        if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
            lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
        lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
    }
    int64_t lp_count = numfields;
    lp_count += 3; /* 添加3个固定字段 flags + ms + seq. */
    //如果不相同 还需要添加numfields字段
    if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
        lp_count += numfields+1;
    }
    lp = lpAppendInteger(lp,lp_count);

    //插入跟新rax树
    if (ri.data != lp)
        raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
    s->length++;
    s->last_id = id;
    if (added_id) *added_id = id;
    return C_OK;
复制代码

然后是具体值的插入,具有以下特点:

  • listpack entry保存的id是当前id减去masterEntry-id的值
  • 如果field不相同,插入的时候还需要在value前面插入field
  • 最后跟新lp_count字段,然后重新插入rax中
    signalModifiedKey(c,c->db,c->argv[1]);
    notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
    server.dirty++;

    if (maxlen >= 0) {
        /* Notify xtrim event if needed. */
        if (streamTrimByLength(s,maxlen,approx_maxlen)) {
            notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
        }
        if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
    }

    //重写id aof等传播使用
    robj *idarg = createObjectFromStreamID(&id);
    rewriteClientCommandArgument(c,i,idarg);
    decrRefCount(idarg);

    /* We need to signal to blocked clients that there is new data on this
     * stream. */
    if (server.blocked_clients_by_type[BLOCKED_STREAM])
        signalKeyAsReady(c->db, c->argv[1]);
复制代码

随后会触发通知,maxlen参数的调用以及对自增id的重写,因为自增id生成依赖当前时间,需要将已生成的id保存下来修改命令,才能让主从以及aof一致性

++注意signalKeyAsReady这个方法,后面还会提到。++

int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
    if (s->length <= maxlen) return 0;

    raxIterator ri;
    raxStart(&ri,s->rax);
    raxSeek(&ri,"^",NULL,0);

    int64_t deleted = 0;
    while(s->length > maxlen && raxNext(&ri)) {
        unsigned char *lp = ri.data, *p = lpFirst(lp);
        int64_t entries = lpGetInteger(p);

        //删除整个节点是否还是大于maxLen
        if (s->length - entries >= maxlen) {
            lpFree(lp);
            raxRemove(s->rax,ri.key,ri.key_len,NULL);
            raxSeek(&ri,">=",ri.key,ri.key_len);
            s->length -= entries;
            deleted += entries;
            continue;
        }

        //如果无法删除整个节点 约为真 就中断了
        if (approx) break;

        //计算需要删除多少
        int64_t to_delete = s->length - maxlen;
        serverAssert(to_delete < entries);
        //修改 未删除节点信息 和 已删除节点信息
        lp = lpReplaceInteger(lp,&p,entries-to_delete);
        p = lpNext(lp,p);
        int64_t marked_deleted = lpGetInteger(p);
        lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);
        p = lpNext(lp,p);

        //跳过masterEntry
        int64_t master_fields_count = lpGetInteger(p);
        p = lpNext(lp,p);
        for (int64_t j = 0; j < master_fields_count; j++)
            p = lpNext(lp,p);
        p = lpNext(lp,p);

        while(p) {
            int flags = lpGetInteger(p);
            int to_skip;

            //标记 未删除节点为已删除
            if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
                flags |= STREAM_ITEM_FLAG_DELETED;
                lp = lpReplaceInteger(lp,&p,flags);
                deleted++;
                s->length--;
                if (s->length <= maxlen) break; /* Enough entries deleted. */
            }

            //跳过ms seq 和 num-field节点
            p = lpNext(lp,p);
            p = lpNext(lp,p);
            p = lpNext(lp,p);
            if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
                to_skip = master_fields_count;
            } else {
                to_skip = lpGetInteger(p);
                to_skip = 1+(to_skip*2);
            }

            while(to_skip--) p = lpNext(lp,p);
            p = lpNext(lp,p);
        }

        //太多垃圾了 需要清理以下 但是这里并没有实现
        entries -= to_delete;
        marked_deleted += to_delete;
        if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
            /* TODO: perform a garbage collection. */
        }

        raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);

        break;
    }

    raxStop(&ri);
    return deleted;
}
复制代码

这是对stream的长度进行枝减代码,主要流程如下:

  • 从rax的头节点开始,approx的含义是只删除整个listpack,如果删除不了整个节点就停止删除
  • 删除单个entry为标记删除,仅只把flag标记为deleted,并且修改计数
  • 如果标记删除节点太多,需要清理listpack,但是这里代码并没有实现

总结

以上是stream结构以及内部listpack结构的源码,根据图片可以看出来listpack记录的信息,在xadd源码也可以严格的看出来每一步操作都是按照listpack结构去进行的。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享