前言
上一篇stream文章分析了xread含xreadgroup的源码,通过其中的实现方式了解到了消费者组的特点,这篇文章分析剩下的stream命令,了解一下stream还能干些什么事情。
正文
XCLAIM命令
由于消费者消费了消息后会进入pel里面,需要后续对该消息进行ack,如果该消费者下线了,就会造成消息堆积无法ack,这个时候可以调用XCLAIM将多条消息放入自己的pel中
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
[IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
[FORCE] [JUSTID]
复制代码
XCLAIM传入stream的key、group的key以及对应的comsumer的key,并且支持多个id,额外参数解析:
- :消息的最小空闲时间
- IDLE :会跟新消息的delivered_time,idle会根据当前时间计算出delivered_time,当前时间-idle
- TIME :和IDLE的区别就是TIME直接替换delivered_time
- RETRYCOUNT:修改消息的retrycount属性
- FORCE:如果在comsumer的pel里面找不到消息,就从stream里面获取消息加入当前pel
- JUSTID:只返回id不返回其他信息
- LASTID :修改group的last_id属性
void xclaimCommand(client *c) {
streamCG *group = NULL;
//通过key获取stream
robj *o = lookupKeyRead(c->db,c->argv[1]);
long long minidle;
long long retrycount = -1;
mstime_t deliverytime = -1;
int force = 0;
int justid = 0;
//解析stream和gruop
if (o) {
if (checkType(c,o,OBJ_STREAM)) return;
group = streamLookupCG(o->ptr,c->argv[2]->ptr);
}
if (o == NULL || group == NULL) {
addReplyErrorFormat(c,"-NOGROUP No such key '%s' or "
"consumer group '%s'", (char*)c->argv[1]->ptr,
(char*)c->argv[2]->ptr);
return;
}
if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,
"Invalid min-idle-time argument for XCLAIM")
!= C_OK) return;
if (minidle < 0) minidle = 0;
复制代码
首先根据参数找到对应的stream以及group,解析并初始化minidle
int j;
for (j = 5; j < c->argc; j++) {
streamID id;
if (streamParseStrictIDOrReply(NULL,c->argv[j],&id,0) != C_OK) break;
}
int last_id_arg = j-1;
复制代码
然后验证id是否消息,但是这一步没有记录id信息,而是记录了参数id最后的索引
//如果last_id大于 当前的 需要修改并且传播
if (streamCompareID(&last_id,&group->last_id) > 0) {
group->last_id = last_id;
propagate_last_id = 1;
}
//检查deliverytime的合法以及是否设置 默认值为当前时间
if (deliverytime != -1) {
if (deliverytime < 0 || deliverytime > now) deliverytime = now;
} else {
deliverytime = now;
}
复制代码
以上是解析参数后需要检查的项目,解析参数的代码就不放上来了,最终会将上面所有参数都解析出来(如果有的话)
streamConsumer *consumer = NULL;
void *arraylenptr = addReplyDeferredLen(c);
size_t arraylen = 0;
for (int j = 5; j <= last_id_arg; j++) {
streamID id;
unsigned char buf[sizeof(streamID)];
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
serverPanic("StreamID invalid after check. Should not be possible.");
streamEncodeID(buf,&id);
//获取未提交的消息
streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
//如果设置了force 但是在pel里面没找到 从stream里面获取创建并放入group的pel中
if (force && nack == raxNotFound) {
streamIterator myiterator;
streamIteratorStart(&myiterator,o->ptr,&id,&id,0);
int64_t numfields;
int found = 0;
streamID item_id;
if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1;
streamIteratorStop(&myiterator);
if (!found) continue;
//创建
nack = streamCreateNACK(NULL);
raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
}
if (nack != raxNotFound) {
//检查最小生存时间
if (nack->consumer && minidle) {
mstime_t this_idle = now - nack->delivery_time;
if (this_idle < minidle) continue;
}
//删除旧的消费者nack,并放入当前消费者
if (nack->consumer)
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Update the consumer and idle time. */
if (consumer == NULL)
consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE);
nack->consumer = consumer;
nack->delivery_time = deliverytime;
if (retrycount >= 0) {
nack->delivery_count = retrycount;
//如果没有justid命令 表示这条消息又被处理了一次 所以要自增
} else if (!justid) {
nack->delivery_count++;
}
//加入新消费者pel
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
//justid表示只返回id不返回其他信息
if (justid) {
addReplyStreamID(c,&id);
} else {
size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,
NULL,NULL,STREAM_RWR_RAWENTRIES,NULL);
if (!emitted) addReplyNull(c);
}
arraylen++;
/* Propagate this change. */
streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
propagate_last_id = 0;
server.dirty++;
}
}
复制代码
以上就是循环解析id,从注释中可以看出来这条命令是原子的,之前就会对id进行解析来验证,在这个阶段如果获取id错误,那么会导致redis中断
if (propagate_last_id) {
streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
server.dirty++;
}
setDeferredArrayLen(c,arraylenptr,arraylen);
//调用call方法执行后会传播命令 使用该方法让call方法后续不传播命令 因为上面已经传播过了
preventCommandPropagation(c);
复制代码
最后进行命令传播
XCLAIM在移动消息的同时可以修改消息的属性,提供了更大的灵活性
XACK命令
xack是消费者对已消费的消息进行提交,其逻辑也很简单
void xackCommand(client *c) {
streamCG *group = NULL;
//获取stream和group
robj *o = lookupKeyRead(c->db,c->argv[1]);
if (o) {
if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
group = streamLookupCG(o->ptr,c->argv[2]->ptr);
}
if (o == NULL || group == NULL) {
addReply(c,shared.czero);
return;
}
//检查这些id是有效的 其中一个无效就执行不成功
for (int j = 3; j < c->argc; j++) {
streamID id;
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
}
int acknowledged = 0;
for (int j = 3; j < c->argc; j++) {
streamID id;
unsigned char buf[sizeof(streamID)];
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
serverPanic("StreamID invalid after check. Should not be possible.");
streamEncodeID(buf,&id);
//将nack删除 group的pel和consumer的pel中
streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
if (nack != raxNotFound) {
raxRemove(group->pel,buf,sizeof(buf),NULL);
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
streamFreeNACK(nack);
acknowledged++;
server.dirty++;
}
}
addReplyLongLong(c,acknowledged);
}
复制代码
只是删除group和comsumer的pel对应的数据
xtrim
xtrim支持对stream进行裁剪,指定最大容量,并且可以使用”~”来表示不精确删除,直接看streamTrimByLength源码
int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
if (s->length <= maxlen) return 0;
raxIterator ri;
raxStart(&ri,s->rax);
raxSeek(&ri,"^",NULL,0);
int64_t deleted = 0;
复制代码
从rax树的头节点开始迭代
while(s->length > maxlen && raxNext(&ri)) {
unsigned char *lp = ri.data, *p = lpFirst(lp);
int64_t entries = lpGetInteger(p);
//删除整个节点是否还是大于maxLen
if (s->length - entries >= maxlen) {
lpFree(lp);
raxRemove(s->rax,ri.key,ri.key_len,NULL);
raxSeek(&ri,">=",ri.key,ri.key_len);
s->length -= entries;
deleted += entries;
continue;
}
//如果无法删除整个节点 就中断了
if (approx) break;
复制代码
while条件是stream的长度必须小于指定的长度,从这里可以看出来,当使用了”~”后,表示每次删除只删除整个节点,删除整个节点后必须大于等于maxLen,意味着删除后strema的长度是大于等于maxLen的
//计算需要删除多少
int64_t to_delete = s->length - maxlen;
serverAssert(to_delete < entries);
//修改 未删除节点信息 和 已删除节点信息
lp = lpReplaceInteger(lp,&p,entries-to_delete);
p = lpNext(lp,p);
int64_t marked_deleted = lpGetInteger(p);
lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);
p = lpNext(lp,p);
//跳过masterEntry
int64_t master_fields_count = lpGetInteger(p);
p = lpNext(lp,p);
for (int64_t j = 0; j < master_fields_count; j++)
p = lpNext(lp,p);
p = lpNext(lp,p);
复制代码
计算当前节点上可以删除的数量,跳过listpack上的masterEntry
while(p) {
int flags = lpGetInteger(p);
int to_skip;
//标记 未删除节点为已删除
if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
flags |= STREAM_ITEM_FLAG_DELETED;
lp = lpReplaceInteger(lp,&p,flags);
deleted++;
s->length--;
if (s->length <= maxlen) break; /* Enough entries deleted. */
}
//跳过ms seq 和 num-field节点
p = lpNext(lp,p);
p = lpNext(lp,p);
p = lpNext(lp,p);
if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
to_skip = master_fields_count;
} else {
to_skip = lpGetInteger(p);
to_skip = 1+(to_skip*2);
}
while(to_skip--) p = lpNext(lp,p);
p = lpNext(lp,p);
}
//太多垃圾了 需要清理以下 但是这里并没有实现
entries -= to_delete;
marked_deleted += to_delete;
if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
/* TODO: perform a garbage collection. */
}
raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
break;
复制代码
然后循环对listpack进行删除,这里的删除是标记删除,将节点的flag标记为已删除。++当标记删除的节点数量太多了,就需要进行回收,但是这里并没有实现++
最后将listpack重新插入到stream中
总结
这篇文章分析了XCLAIM、XACK、XTRIM命令的源码,可以看出来在实现了rax和listpack数据结构的基础上,这些功能的实现都非常的简单,这些简单的实现共同组成了类似kafka的消息队列