前言
stream是redis5引入的消息队列功能,根据redis作者坦言,stream狠狠的借鉴了kafka的实现,从而在redis当中也能使用到类似kafka的功能。stream设计究竟如何,走进源码一探究竟。
正文
简介
xadd mystream * testKey1 testVal1 testKey2 testVal2
复制代码
这是stream追加消息的命令,由以下组成:
- mystream:为stream的名称
- *:自增id,也可以自己指定
- testKey-Val:消息对应的field以及值
可以看到一条消息可以有多个field键值对,并且可以不指定消费组
借用网上的图片,stream内部消息id是严格递增的,并且可以创建消费组,一个stream可以有多个消费组,一个消费组里面有多个消费者,特点如下:
- 消费组用组名唯一标识来区分,每个消费都可以消费一个stream的全部消息。
- 消费组内的消费者是竞争关系,同一条消息只能由一个消费者消费。
- 消费者消费以后需要对消息进行确认。
底层结构
stream的底层结构主要由rax和listpack组成
rax
rax详细见之前的文章
listpack
listpack是rax树的value节点,存储字符串或整形,结构如下:
- totalBytes是listpack表空间大小,占用4字节
- num是存储entry的个数,和压缩列表一样,最大只支持65535,如果entry超过了65535,则需要遍历来统计个数
- entry是实际数据节点:
- encode编码方式
- content实际数据
- backlen表示长度,主要用于从后向前遍历
- end结束标志,0xFF
unsigned char *lpNew(void) {
unsigned char *lp = lp_malloc(LP_HDR_SIZE+1);
if (lp == NULL) return NULL;
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
lpSetNumElements(lp,0);
lp[LP_HDR_SIZE] = LP_EOF;
return lp;
}
#define LP_HDR_SIZE 6
复制代码
通过lpNew方法创建一个空的listpack,32bit的totalBytes加上16bit的num,一共需要6字节;然后设置末尾标志
//ele:待插入新元素 size:ele长度 p:插入的位置 where:三种模式(前插入、后插入、替换) newp(用于返回 新插入元素的下一个元素)
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp) {
unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
unsigned char backlen[LP_MAX_BACKLEN_SIZE];
uint64_t enclen; //encode长度
//如果待插入元素为null 表示替换(删除)
if (ele == NULL) where = LP_REPLACE;
//如果是后插入 改为跳到下一个节点的前插入
//这样后续插入代码都是相同的
if (where == LP_AFTER) {
p = lpSkip(p);
where = LP_BEFORE;
}
//存储插入前p的偏移量
unsigned long poff = p-lp;
//获取ele的enctype
int enctype;
if (ele) {
enctype = lpEncodeGetType(ele,size,intenc,&enclen);
} else {
enctype = -1;
enclen = 0;
}
//获取backLen来从后向前遍历
unsigned long backlen_size = ele ? lpEncodeBacklen(backlen,enclen) : 0;
uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
uint32_t replaced_len = 0;
if (where == LP_REPLACE) {
replaced_len = lpCurrentEncodedSize(p);
replaced_len += lpEncodeBacklen(NULL,replaced_len);
}
uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
- replaced_len;
if (new_listpack_bytes > UINT32_MAX) return NULL;
unsigned char *dst = lp + poff; /* May be updated after reallocation. */
/* 需要更多的空间*/
if (new_listpack_bytes > old_listpack_bytes) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}
//移动元素
if (where == LP_BEFORE) {
memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
} else { /* LP_REPLACE. */
long lendiff = (enclen+backlen_size)-replaced_len;
memmove(dst+replaced_len+lendiff,
dst+replaced_len,
old_listpack_bytes-poff-replaced_len);
}
/* 释放空间 */
if (new_listpack_bytes < old_listpack_bytes) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}
/* 保存下一个 */
if (newp) {
*newp = dst;
/* In case of deletion, set 'newp' to NULL if the next element is
* the EOF element. */
if (!ele && dst[0] == LP_EOF) *newp = NULL;
}
if (ele) {
if (enctype == LP_ENCODING_INT) {
memcpy(dst,intenc,enclen);
} else {
lpEncodeString(dst,ele,size);
}
dst += enclen;
memcpy(dst,backlen,backlen_size);
dst += backlen_size;
}
/* 修改header信息*/
if (where != LP_REPLACE || ele == NULL) {
uint32_t num_elements = lpGetNumElements(lp);
if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
//增加或删除元素
if (ele)
lpSetNumElements(lp,num_elements+1);
else
lpSetNumElements(lp,num_elements-1);
}
}
lpSetTotalBytes(lp,new_listpack_bytes);
#if 0
unsigned char *oldlp = lp;
lp = lp_malloc(new_listpack_bytes);
memcpy(lp,oldlp,new_listpack_bytes);
if (newp) {
unsigned long offset = (*newp)-oldlp;
*newp = lp + offset;
}
/* Make sure the old allocation contains garbage. */
memset(oldlp,'A',new_listpack_bytes);
lp_free(oldlp);
#endif
return lp;
}
复制代码
listpack的insert代码,具体见注释,大体流程如下:
- 将前插入改为后插入,统一代码
- 计算插入元素或者替换元素所需要的空间,重新分配整个listpack内存
- 调整old元素的位置,腾出空间给新元素
- 释放旧listpack,在新的listpack中进行插入或者替换
- 修改header信息
unsigned char *lpFirst(unsigned char *lp) {
lp += LP_HDR_SIZE; /* Skip the header. */
if (lp[0] == LP_EOF) return NULL;
return lp;
}
复制代码
获取首个元素的指针,就是header的下一个而已
unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {
if (p-lp == LP_HDR_SIZE) return NULL;
p--;
uint64_t prevlen = lpDecodeBacklen(p);
//加上编码长度 用当前元素指针减去prevlen就可以获取
prevlen += lpEncodeBacklen(NULL,prevlen);
return p-prevlen+1;
}
复制代码
获取当一个元素指针,只需要获取Backlen属性就可以倒序遍历
listpack支持next或perv遍历,剩下的方法就不赘述了
stream解析
typedef struct stream {
rax *rax;
uint64_t length;
streamID last_id;
rax *cgroups; /* 消费者组*/
} stream;
复制代码
stream由一个rax基数树当作数据存储,last_id表示当前最新id,用了另外一个rax树来存储当前stream的消费者组。
typedef struct streamID {
uint64_t ms;
uint64_t seq;
} streamID;
复制代码
streamID由毫秒和自增序号组成
void streamIncrID(streamID *id) {
if (id->seq == UINT64_MAX) {
if (id->ms == UINT64_MAX) {
id->ms = id->seq = 0;
} else {
id->ms++;
id->seq = 0;
}
} else {
id->seq++;
}
}
复制代码
可以看到seq和ms最大值都是UINT64的最大值,超出以后重新设置为0
消息格式
根据上文的命令,xadd了多个field,而这些field以及value都是存储在listpack中的,有如下几个特点:
- listpack有一个masterEntry节点,用于存储创建listpack时的field信息(如首次xadd会将两个field存粗在masterEntry中),这是因为一个stream流里面的消息大多数都是相似的,这样的话后续插入消息可以不用存储field信息
- 一个listpack是会存储多个消息的
- 一个消息会占用多个listpack entry
下面介绍以下masterEntry
masterEntry是listpack的一个节点,相当于元数据节点,节点含义如下:
- count:该listpack中未删除消息的个数
- deleted:该listpack中删除消息的个数
- num-field:field的数量
- field1-fieldN:首次创建listpack时插入的field
- 0:末尾标志
如同上一个命令一样,插入了testKey1和testKey2两个field以及value,masterEntry内部的field就是上述这两个
++如果后续又插入了信息消息,但是消息field和之前的一样,那么listpack entry如下图所示:++
entry就不存filed信息,达到了节约空间的目的
lp-count表示该节点占用listpack数量 3+N
如果后续插入消息filed不同,那么存储方式就会变化
这个时候新的field信息就会紧靠着value
消费者
typedef struct streamCG {
streamID last_id;
rax *pel;
rax *consumers;
} streamCG;
复制代码
消费者组用两个rax树来存放未确认的消息和消费者,last_id表示已确认的最新id
typedef struct streamConsumer {
mstime_t seen_time;
sds name;
rax *pel;
} streamConsumer;
复制代码
seen_time是最后活跃时间,内部同样用rax来表示当前消费者未确认的消息
typedef struct streamNACK {
mstime_t delivery_time; /* 最后发送的时间 */
uint64_t delivery_count; /* 该消息发送的次数*/
streamConsumer *consumer; /* 归属消费者*/
} streamNACK;
复制代码
未确认消息结构体,消费者组内的pel和消费者内部pel是共享的,当某个消费者通过消费者组消费了消息后,会创建streamNACK放入到两个pel中
源码解析
按照开头所示XADD来解析一下源码过程
XADD key [MAXLEN [~|=] ] <ID or *> [field value] [field value] …
XADD支持MAXLEN来指定stream的长度,并且可以使用~来优化长度
void xaddCommand(client *c) {
streamID id;
//如果手动设置了id id_given=1
int id_given = 0;
long long maxlen = -1; /* 是否设置maxLen */
int approx_maxlen = 0; /* MAXLEN参数~ */
int maxlen_arg_idx = 0; /* maxlen参数的idx*/
int i = 2;
for (; i < c->argc; i++) {
//后面有多少个key-val参数
int moreargs = (c->argc-1) - i;
char *opt = c->argv[i]->ptr;
//* 自增id
if (opt[0] == '*' && opt[1] == '\0') {
/* This is just a fast path for the common case of auto-ID
* creation. */
break;
} else if (!strcasecmp(opt,"maxlen") && moreargs) {
approx_maxlen = 0;
char *next = c->argv[i+1]->ptr;
//解析manlen后面参数 ~
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
approx_maxlen = 1;
i++;
} else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
i++;
}
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
!= C_OK) return;
if (maxlen < 0) {
addReplyError(c,"The MAXLEN argument must be >= 0.");
return;
}
i++;
maxlen_arg_idx = i;
} else {
//手动设置id
if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return;
id_given = 1;
break;
}
}
//key-val的index
int field_pos = i+1;
//key val必须成对出现
if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {
addReplyError(c,"wrong number of arguments for XADD");
return;
}
//无法设置最小id
if (id_given && id.ms == 0 && id.seq == 0) {
addReplyError(c,"The ID specified in XADD must be greater than 0-0");
return;
}
复制代码
这一段代码是对参数的解析以及合法性验证,具体看注释
robj *o;
stream *s;
//在db中查找或者创建一个streamName-stream对象
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
s = o->ptr;
//超出容量了
if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {
addReplyError(c,"The stream has exhausted the last possible ID, "
"unable to add more items");
return;
}
if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
&id, id_given ? &id : NULL)
== C_ERR)
{
addReplyError(c,"The ID specified in XADD is equal or smaller than the "
"target stream top item");
return;
}
//向客户端发送id信息
addReplyStreamID(c,&id);
复制代码
首先调用streamTypeLookupWriteOrCreate查找或者创建一个stream,对stream的last_id进行验证是否超出最大值,随后调用streamAppendItem添加
robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
robj *o = lookupKeyWrite(c->db,key);
if (o == NULL) {
o = createStreamObject();
dbAdd(c->db,key,o);
} else {
if (o->type != OBJ_STREAM) {
addReply(c,shared.wrongtypeerr);
return NULL;
}
}
return o;
}
复制代码
stream和普通的key-val一样都存储在db上面,调用createStreamObject创建空stream。而streamAppendItem插入流程则会像上面分析的listpack一样的插入方式
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
//自增或者自定id
streamID id;
if (use_id)
id = *use_id;
else
streamNextID(&s->last_id,&id);
//自增id是循环的 检查并返回错误
if (streamCompareID(&id,&s->last_id) <= 0) return C_ERR;
raxIterator ri;
raxStart(&ri,s->rax);
raxSeek(&ri,"$",NULL,0);
size_t lp_bytes = 0;
unsigned char *lp = NULL;
if (raxNext(&ri)) {
lp = ri.data;
lp_bytes = lpBytes(lp);
}
raxStop(&ri);
uint64_t rax_key[2];
streamID master_id;
//检查listpack节点数量和长度
if (lp != NULL) {
if (server.stream_node_max_bytes &&
lp_bytes >= server.stream_node_max_bytes)
{
lp = NULL;
} else if (server.stream_node_max_entries) {
int64_t count = lpGetInteger(lpFirst(lp));
if (count >= server.stream_node_max_entries) lp = NULL;
}
}
复制代码
首先判断是否需要自增id,然后获取listpack,因为id总是递增的,所以raxSeek方法会传入”$”来表示获取最后一个节点
f (lp == NULL || lp_bytes >= server.stream_node_max_bytes) {
master_id = id;
//将消息id encode成rax_key
streamEncodeID(rax_key,&id);
lp = lpNew();
//count 为删除消息个数1
lp = lpAppendInteger(lp,1);
//deleted 已删除消息个数0
lp = lpAppendInteger(lp,0);
//field数量
lp = lpAppendInteger(lp,numfields);
for (int64_t i = 0; i < numfields; i++) {
sds field = argv[i*2]->ptr;
lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
}
//末尾标志位0
lp = lpAppendInteger(lp,0);
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
复制代码
这是新插入listpack的情况,可以看到三个连续的lpAppendInteger,对应了上文提到的masterEntry格式,然后插入了fields,并设置为STREAM_ITEM_FLAG_SAMEFIELDS,表示本次插入的fields和masterEntry是一致的,可以用到上文的优化
else {
serverAssert(ri.key_len == sizeof(rax_key));
memcpy(rax_key,ri.key,sizeof(rax_key));
//将rax_key解析成master_id
streamDecodeID(rax_key,&master_id);
unsigned char *lp_ele = lpFirst(lp);
//首先获取count 然后新增count
int64_t count = lpGetInteger(lp_ele);
lp = lpReplaceInteger(lp,&lp_ele,count+1);
//跳过deleted段
lp_ele = lpNext(lp,lp_ele);
lp_ele = lpNext(lp,lp_ele);
//检查插入刚才插入的字段
int64_t master_fields_count = lpGetInteger(lp_ele);
lp_ele = lpNext(lp,lp_ele);
if (numfields == master_fields_count) {
int64_t i;
//将原filed和带插入相比
for (i = 0; i < master_fields_count; i++) {
sds field = argv[i*2]->ptr;
int64_t e_len;
unsigned char buf[LP_INTBUF_SIZE];
unsigned char *e = lpGet(lp_ele,&e_len,buf);
//复制数据
if (sdslen(field) != (size_t)e_len ||
memcmp(e,field,e_len) != 0) break;
lp_ele = lpNext(lp,lp_ele);
}
//所有字段都是一样的 可以压缩字段名
if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
}
}
复制代码
如果已经存在了listpack,则会检查masterEntry上的field信息与当先插入的差异,如果完全相同,设置STREAM_ITEM_FLAG_SAMEFIELDS
//三个头部信息
lp = lpAppendInteger(lp,flags);
lp = lpAppendInteger(lp,id.ms - master_id.ms);
lp = lpAppendInteger(lp,id.seq - master_id.seq);
//如果不相同修改numfields
if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
lp = lpAppendInteger(lp,numfields);
for (int64_t i = 0; i < numfields; i++) {
sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
//如果不相同 还需要插入field信息
if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
}
int64_t lp_count = numfields;
lp_count += 3; /* 添加3个固定字段 flags + ms + seq. */
//如果不相同 还需要添加numfields字段
if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
lp_count += numfields+1;
}
lp = lpAppendInteger(lp,lp_count);
//插入跟新rax树
if (ri.data != lp)
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
s->length++;
s->last_id = id;
if (added_id) *added_id = id;
return C_OK;
复制代码
然后是具体值的插入,具有以下特点:
- listpack entry保存的id是当前id减去masterEntry-id的值
- 如果field不相同,插入的时候还需要在value前面插入field
- 最后跟新lp_count字段,然后重新插入rax中
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
server.dirty++;
if (maxlen >= 0) {
/* Notify xtrim event if needed. */
if (streamTrimByLength(s,maxlen,approx_maxlen)) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
}
if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
}
//重写id aof等传播使用
robj *idarg = createObjectFromStreamID(&id);
rewriteClientCommandArgument(c,i,idarg);
decrRefCount(idarg);
/* We need to signal to blocked clients that there is new data on this
* stream. */
if (server.blocked_clients_by_type[BLOCKED_STREAM])
signalKeyAsReady(c->db, c->argv[1]);
复制代码
随后会触发通知,maxlen参数的调用以及对自增id的重写,因为自增id生成依赖当前时间,需要将已生成的id保存下来修改命令,才能让主从以及aof一致性
++注意signalKeyAsReady这个方法,后面还会提到。++
int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
if (s->length <= maxlen) return 0;
raxIterator ri;
raxStart(&ri,s->rax);
raxSeek(&ri,"^",NULL,0);
int64_t deleted = 0;
while(s->length > maxlen && raxNext(&ri)) {
unsigned char *lp = ri.data, *p = lpFirst(lp);
int64_t entries = lpGetInteger(p);
//删除整个节点是否还是大于maxLen
if (s->length - entries >= maxlen) {
lpFree(lp);
raxRemove(s->rax,ri.key,ri.key_len,NULL);
raxSeek(&ri,">=",ri.key,ri.key_len);
s->length -= entries;
deleted += entries;
continue;
}
//如果无法删除整个节点 约为真 就中断了
if (approx) break;
//计算需要删除多少
int64_t to_delete = s->length - maxlen;
serverAssert(to_delete < entries);
//修改 未删除节点信息 和 已删除节点信息
lp = lpReplaceInteger(lp,&p,entries-to_delete);
p = lpNext(lp,p);
int64_t marked_deleted = lpGetInteger(p);
lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);
p = lpNext(lp,p);
//跳过masterEntry
int64_t master_fields_count = lpGetInteger(p);
p = lpNext(lp,p);
for (int64_t j = 0; j < master_fields_count; j++)
p = lpNext(lp,p);
p = lpNext(lp,p);
while(p) {
int flags = lpGetInteger(p);
int to_skip;
//标记 未删除节点为已删除
if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
flags |= STREAM_ITEM_FLAG_DELETED;
lp = lpReplaceInteger(lp,&p,flags);
deleted++;
s->length--;
if (s->length <= maxlen) break; /* Enough entries deleted. */
}
//跳过ms seq 和 num-field节点
p = lpNext(lp,p);
p = lpNext(lp,p);
p = lpNext(lp,p);
if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
to_skip = master_fields_count;
} else {
to_skip = lpGetInteger(p);
to_skip = 1+(to_skip*2);
}
while(to_skip--) p = lpNext(lp,p);
p = lpNext(lp,p);
}
//太多垃圾了 需要清理以下 但是这里并没有实现
entries -= to_delete;
marked_deleted += to_delete;
if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
/* TODO: perform a garbage collection. */
}
raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
break;
}
raxStop(&ri);
return deleted;
}
复制代码
这是对stream的长度进行枝减代码,主要流程如下:
- 从rax的头节点开始,approx的含义是只删除整个listpack,如果删除不了整个节点就停止删除
- 删除单个entry为标记删除,仅只把flag标记为deleted,并且修改计数
- 如果标记删除节点太多,需要清理listpack,但是这里代码并没有实现
总结
以上是stream结构以及内部listpack结构的源码,根据图片可以看出来listpack记录的信息,在xadd源码也可以严格的看出来每一步操作都是按照listpack结构去进行的。