前言
上一篇文章介绍了基础sds、skipList等数据结构,这篇文章介绍一下dict和zipList。redis是一个K-V数据库,其本身就是一个map结构,dict是redis的基石,redis就是基于dict这个数据结构而成的,着重看一下实现原理以及特点。
正文
dict
dict实际上就是一个hashMap,运用的链式法解决hash冲突,首先看一下struct
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;
复制代码
hashMap节点,next链接冲突节点,union表示实际数据
typedef struct dictht {
//table数组
dictEntry **table;
//容量
unsigned long size;
//size - 1
unsigned long sizemask;
//已用
unsigned long used;
} dictht;
typedef struct dict {
dictType *type;
void *privdata;
//两个dictht用于扩容
dictht ht[2];
long rehashidx;
unsigned long iterators;
} dict;
复制代码
可以看到dict内部保存了两个hashTable和rehashidx,是用于渐进式扩容,因为redis单线程的原因,如果用于扩容会阻塞主线程导致段时间无法提供服务,于是采用的渐进式扩容,后面可以通过代码看到渐进式扩容的特点。
typedef struct dictType {
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;
复制代码
dictType用于里面存放了各种方法,hashFunction计算hash的函数,keyCompare是key比较函数,能够针对不同的类型来编写方法
static dict *dictCreate(dictType *type, void *privDataPtr) {
dict *ht = malloc(sizeof(*ht));
_dictInit(ht,type,privDataPtr);
return ht;
}
static int _dictInit(dict *ht, dictType *type, void *privDataPtr) {
_dictReset(ht);
ht->type = type;
ht->privdata = privDataPtr;
return DICT_OK;
}
复制代码
创建dict代码很简单
int dictAdd(dict *d, void *key, void *val)
{
dictEntry *entry = dictAddRaw(d,key,NULL);
if (!entry) return DICT_ERR;
dictSetVal(d, entry, val);
return DICT_OK;
}
复制代码
dictAdd方法调用了dictAddRaw
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;
if (dictIsRehashing(d)) _dictRehashStep(d);
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
}
复制代码
dictIsRehashing判断是否在rehash过程中,如果正在迁移就会协助进行rehash
接下来通过_dictKeyIndex获取插入的索引,如果此时正在rehash中,那么插入数据只会插入第二个表
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
unsigned long idx, table;
dictEntry *he;
if (existing) *existing = NULL;
//判断是否需要扩容 如果是就进行扩容
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
//在两个表中查询数据
for (table = 0; table <= 1; table++) {
idx = hash & d->ht[table].sizemask;
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
if (!dictIsRehashing(d)) break;
}
return idx;
}
复制代码
以上是获取索引的方法,会在两个表中计算索引,如果正在rehash,那么始终返回的是第二个表的索引。
从以上代码可以得出rehash的特点:rehash不会多线程运行,而是在每次插入或者其他操作的时候进行rehash。从_dictKeyIndex这个方法也可以看出来,当没有进行rehash的时候两个表的sizemask都是一样的,所以计算出来的index是相等的,在进行rehash的时候第二个表sizemask是要比第一个表更大的,计算出来的index对应的是第二个表
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);
}
复制代码
实际上调用了dictRehash,方法参数是1,表示这次rehash只移动一个槽位
int dictRehash(dict *d, int n) {
//允许访问最大的空槽数
int empty_visits = n*10;
if (!dictIsRehashing(d)) return 0;
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
assert(d->ht[0].size > (unsigned long)d->rehashidx);
//rehashidx自增 直到节点不为空
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
while(de) {
uint64_t h;
nextde = de->next;
//计算新表的索引位置
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
//将de节点连上新表上的节点
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
//删除原表的数据
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
//rehash完成后 交换两个表
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
复制代码
empty_visits表示最大访问的空槽数量,while一共循环n次,每次都移动ht[0][rehashidx]到ht[1],ht[1]对应的位置如果有数据,会将新的节点放到链表的头部而不是尾部,因为头部的插入效率是O(1),这是一个单链表。
最后如果ht[0]完全迁移完毕,会交换两个表同时设置表的size为一致。
_dictKeyIndex计算索引的时候会执行_dictExpandIfNeeded来判断是否需要扩容,判断扩容就是在这里进行的
static int _dictExpandIfNeeded(dict *d)
{
//正在迁移过程中不需要扩容
if (dictIsRehashing(d)) return DICT_OK;
// 表size为0表示才初始化 扩容大小为4
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}
复制代码
可以看到扩容有一个必要条件就是存储的节点数量 数组容量,还有一个条件就是dict_can_resize为true,dict_can_resize如果为false,则存储节点数量 是数组容量的五倍才会触发扩容,这个时候等于说不得不扩容了。
扩容策略为当前已使用节点数量的两倍。
void updateDictResizePolicy(void) {
if (!hasActiveChildProcess())
dictEnableResize();
else
dictDisableResize();
}
int hasActiveChildProcess() {
return server.rdb_child_pid != -1 ||
server.aof_child_pid != -1 ||
server.module_child_pid != -1;
}
复制代码
dict_can_resize由updateDictResizePolicy函数控制,当rdb或者aof(bgsave)任务没有执行的时候才能够进行扩容,redis是使用Copy on Write策略来执行写盘的,为了避免在bgsave的时候多次COW而设置不能扩容,但是数量太多也不得不扩容。
int dictExpand(dict *d, unsigned long size)
{
//如果正在rehash 或 使用量大于size 是无效的
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;
dictht n;
//扩容二次幂
unsigned long realsize = _dictNextPower(size);
//相同大小没效果
if (realsize == d->ht[0].size) return DICT_ERR;
//重新分配内存 设置属性
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
//对于刚新创建的表来说 扩容操作等于创建ht[0]
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
//将新创建的表设置成ht[1] 后续插入
d->ht[1] = n;
d->rehashidx = 0;
return DICT_OK;
}
复制代码
dictExpand其实什么都没干,仅仅是将rehashidx设置成0,表示此时开始rehash了,接下来的增删改查就会根据上面代码触发一次rehash操作。
当插入操作很少的时候,不代表就不能触发rehash动作,redis还维护了定时任务会触发rehash。
void databasesCron(void) {
.......
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
break;
} else {
/* If this db didn't need rehash, we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
}
int dictRehashMilliseconds(dict *d, int ms) {
long long start = timeInMilliseconds();
int rehashes = 0;
while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}
复制代码
dictRehashMilliseconds一次执行100个槽的rehash操作,没有超过1ms就继续执行,直到超过为止。
总结一下,redis由于单线程的原因,其实现还是非常简单,同时扩容策略也很简单:每次扩容为原来的两倍,但还是会计算成数,扩容采用了渐进式扩容,每次进行增删改查的时候扩容数组的一个槽下所有节点到新表,当完全扩容后就可以进行表的替换了。相比于java的hashMap来说更简单(java还有0.75的阈值)
zipList
压缩链表虽然说是链表,但是并不像普通链表一样有结构体和指向下一个节点的指针,它就是一块内存地址,但是对这个地址进行了特殊的编码
属性 | 类型 | 作用 |
---|---|---|
zlbytes | unint32_t | 记录整个zipList占用内存的字节 |
zltial | unint32_t | 记录zipList的尾部到开始位置有多少字节,可以通过这个值计算出链表尾部 |
zlien | unint16_t | 表示链表节点的数量,小于65535直接可以计算,大于等于65535需要遍历,也就预示ziplist节点不宜太多 |
entry | 不定 | 有多个 实际节点 |
zlend | unint8_t | 特殊值OxFF=255),用于标记压缩列表的末端 |
typedef struct zlentry {
unsigned int prevrawlensize; /* encode前一个entry的字节长度*/
unsigned int prevrawlen; /* 前一个entry长度*/
unsigned int lensize; /* encode当前字节长度*/
unsigned int len; /* 实际长度 */
unsigned int headersize; /* prevrawlensize + lensize. */
unsigned char encoding; /* Set to ZIP_STR_* or ZIP_INT_* depending on
the entry encoding. However for 4 bits
immediate integers this can assume a range
of values and must be range-checked. */
unsigned char *p; /* Pointer to the very start of the entry, that
is, this points to prev-entry-len field. */
} zlentry;
复制代码
每一个entry都保存了前一个entry的长度,这也支持了尾部遍历的特性。
但是这也带来了一个问题,当插入一个新的节点时,有可能会影响后面节点的prevrawlensize,同时要修改后续节点属性
unsigned char *ziplistNew(void) {
// unint32_t + unint32_t + unint16_t + unint8_t
unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
unsigned char *zl = zmalloc(bytes);
ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
//设置entry长度
ZIPLIST_LENGTH(zl) = 0;
//设置末尾特殊值
zl[bytes-1] = ZIP_END;
return zl;
}
复制代码
创建一个空的ziplist,由于没有entry,固定大小与上表所示:unint32_t + unint32_t + unint16_t + unint8_t。
unsigned char *ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
return __ziplistInsert(zl,p,s,slen);
}
复制代码
插入代码,进入__ziplistInsert
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
//当前长度
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
//前一个节点长度和编码长度
unsigned int prevlensize, prevlen = 0;
size_t offset;
int nextdiff = 0;
unsigned char encoding = 0;
//默认一个初始值
long long value = 123456789;
zlentry tail;
/* Find out prevlen for the entry that is inserted. */
if (p[0] != ZIP_END) {
//获取前一个节点的prevlensize和prevlen
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
} else {
unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
//如果if成立 表示链表为空
if (ptail[0] != ZIP_END) {
prevlen = zipRawEntryLength(ptail);
}
}
//尝试将entry encoding,成功后跟新reqlen
if (zipTryEncoding(s,slen,&value,&encoding)) {
/* 'encoding' is set to the appropriate integer encoding */
reqlen = zipIntSize(encoding);
} else {
/* 'encoding' is untouched, however zipStoreEntryEncoding will use the
* string length to figure out how to encode it. */
reqlen = slen;
}
//将前一个节点的长度和当前节点编码长度相加
reqlen += zipStorePrevEntryLength(NULL,prevlen);
reqlen += zipStoreEntryEncoding(NULL,encoding,slen);
复制代码
__ziplistInsert代码有点长这是前半部分,因为entry要保存前一个节点的长度信息,所以要拿出前一个节点的长度信息,并且后面一个节点要跟新prevrawlensize,这段代码就是计算出新的entry需要存储的数据
//如果不是尾行插入 需要计算下一行的PrevLen字段位数是否够
int forcelarge = 0;
nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
if (nextdiff == -4 && reqlen < 4) {
nextdiff = 0;
forcelarge = 1;
}
//存储p和链表的偏移量 并且重新分配内存
offset = p-zl;
zl = ziplistResize(zl,curlen+reqlen+nextdiff);
p = zl+offset;
if (p[0] != ZIP_END) {
//由于新插入的节点 要将p移动到新的位置
memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);
if (forcelarge)
zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
else
zipStorePrevEntryLength(p+reqlen,reqlen);
//跟新tail字段 长度要加上reqlen
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);
zipEntry(p+reqlen, &tail);
//如果新节点后面还有多个 表尾还要加上nextdiff
if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
} else {
//如果是尾部直接计算
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
}
//一下函数可能要进行连锁跟新
if (nextdiff != 0) {
offset = p-zl;
zl = __ziplistCascadeUpdate(zl,p+reqlen);
p = zl+offset;
}
/* Write the entry */
p += zipStorePrevEntryLength(p,prevlen);
p += zipStoreEntryEncoding(p,encoding,slen);
if (ZIP_IS_STR(encoding)) {
memcpy(p,s,slen);
} else {
zipSaveInteger(p,value,encoding);
}
ZIPLIST_INCR_LENGTH(zl,1);
return zl;
}
复制代码
由于PrevLen是不定长的,有可能出现:本来只需要4位字节就能保存,但是新插入节点的长度需要8位才能保存。出现这种情况需要跟新整个链表的长度,加上这缺失的一部分。
zl = ziplistResize(zl,curlen+reqlen+nextdiff);
复制代码
这段代码就是计算新的长度,nextdiff表示Prev字段的扩容长度
接下来的一段代码看注释就能明白,涉及到用memmove内存的拷贝(就跟移动数组差不多含义),如果nextdiff不为0,接下来可能还会涉及到连锁跟新(最快情况:PrevLen的扩容导致接下来每一个entry都扩容),进入__ziplistCascadeUpdate
unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
size_t offset, noffset, extra;
unsigned char *np;
zlentry cur, next;
while (p[0] != ZIP_END) {
zipEntry(p, &cur);
//当前entry长度
rawlen = cur.headersize + cur.len;
rawlensize = zipStorePrevEntryLength(NULL,rawlen);
//如果是尾部就跳过
if (p[rawlen] == ZIP_END) break;
//获取下一个entry
zipEntry(p+rawlen, &next);
//如果长度相等跳过这个阶段
if (next.prevrawlen == rawlen) break;
//这种情况说明prevrawlensize的长度不够了 需要扩容
if (next.prevrawlensize < rawlensize) {
//保存p的偏移量 以便后续重新计算p
offset = p-zl;
//需要扩容的字节长度
extra = rawlensize-next.prevrawlensize;
//修改总字节长度
zl = ziplistResize(zl,curlen+extra);
p = zl+offset;
//当前指针和下一个元素的偏移量
np = p+rawlen;
noffset = np-zl;
//当np不是尾部的时候 跟新tail的偏移量 zl+extra
if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
}
//移动数据
memmove(np+rawlensize,
np+next.prevrawlensize,
curlen-noffset-next.prevrawlensize-1);
zipStorePrevEntryLength(np,rawlen);
//跟新指针 下一个entry
p += rawlen;
curlen += extra;
} else {
//空间多余 这将导致收缩,这是我们希望避免的,因此,在可用字节中设置rawlen
if (next.prevrawlensize > rawlensize) {
/* This would result in shrinking, which we want to avoid.
* So, set "rawlen" in the available bytes. */
zipStorePrevEntryLengthLarge(p+rawlen,rawlen);
} else {
zipStorePrevEntryLength(p+rawlen,rawlen);
}
break;
}
}
return zl;
}
复制代码
这一段代码核心就是:
memmove(np+rawlensize,
np+next.prevrawlensize,
curlen-noffset-next.prevrawlensize-1);
复制代码
扩容ziplist要将红线到绿线的区域移动蓝线,移动的长度为curlen-noffset-next.prevrawlensize-1,然后跟新rawlen的长度,随后还要跳转到下一个entry继续判断,直到符合条件为止。
//返回p节点后值是vstr 长度等于vlen的节点,skip为查找间隔
unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip) {
int skipcnt = 0;
unsigned char vencoding = 0;
long long vll = 0;
while (p[0] != ZIP_END) {
unsigned int prevlensize, encoding, lensize, len;
unsigned char *q;
ZIP_DECODE_PREVLENSIZE(p, prevlensize);
ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);
//当前节点的实际数据
q = p + prevlensize + lensize;
//跳过了skip 为0的时候判断
if (skipcnt == 0) {
/* Compare current entry with specified entry */
if (ZIP_IS_STR(encoding)) {
if (len == vlen && memcmp(q, vstr, vlen) == 0) {
return p;
}
} else {
//判断vstr能够用整数表示 只执行一次
if (vencoding == 0) {
if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
/* If the entry can't be encoded we set it to
* UCHAR_MAX so that we don't retry again the next
* time. */
vencoding = UCHAR_MAX;
}
/* Must be non-zero by now */
assert(vencoding);
}
//entry地址转换为整数
if (vencoding != UCHAR_MAX) {
long long ll = zipLoadInteger(q, encoding);
if (ll == vll) {
return p;
}
}
}
//重置skip
skipcnt = skip;
} else {
skipcnt--;
}
//下一个entry
p = q + len;
}
return NULL;
}
复制代码
ziplistFind指定了skip参数表示查找间隔,如果为0的话相当于遍历,通过entry的属性来实现节点的遍历,遇到相等的就返回。
总结:ziplist里面涉及到详细的字节计算,因此经常使用JAVA的我来说看起来一头雾水,但是只要画图根据定义来计算长度,还是能够了解其中的含义。zipList节约了传统链表next指针的开销,但是实际上插入的复杂度并不是真正的O(1),即使连锁跟新的概率比较小。zipList能够节约内存,但并不适合存储大量数据,满足数据字节长度尽量统一能够避免连锁跟新,实现既效率高又节约内存。
总结
没啥好总结的