Redis6.0.6源码阅读——基础数据结构(dict、ziplist)

前言

上一篇文章介绍了基础sds、skipList等数据结构,这篇文章介绍一下dict和zipList。redis是一个K-V数据库,其本身就是一个map结构,dict是redis的基石,redis就是基于dict这个数据结构而成的,着重看一下实现原理以及特点。

正文

dict

dict实际上就是一个hashMap,运用的链式法解决hash冲突,首先看一下struct

typedef struct dictEntry {
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;
} dictEntry;
复制代码

hashMap节点,next链接冲突节点,union表示实际数据

typedef struct dictht {
    //table数组
    dictEntry **table;
    //容量
    unsigned long size;
    //size - 1
    unsigned long sizemask;
    //已用
    unsigned long used;
} dictht;

typedef struct dict {
    dictType *type;
    void *privdata;
    //两个dictht用于扩容
    dictht ht[2];
    long rehashidx;
    unsigned long iterators; 
} dict;
复制代码

可以看到dict内部保存了两个hashTable和rehashidx,是用于渐进式扩容,因为redis单线程的原因,如果用于扩容会阻塞主线程导致段时间无法提供服务,于是采用的渐进式扩容,后面可以通过代码看到渐进式扩容的特点。

typedef struct dictType {
    uint64_t (*hashFunction)(const void *key);
    void *(*keyDup)(void *privdata, const void *key);
    void *(*valDup)(void *privdata, const void *obj);
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    void (*keyDestructor)(void *privdata, void *key);
    void (*valDestructor)(void *privdata, void *obj);
} dictType;
复制代码

dictType用于里面存放了各种方法,hashFunction计算hash的函数,keyCompare是key比较函数,能够针对不同的类型来编写方法

static dict *dictCreate(dictType *type, void *privDataPtr) {
    dict *ht = malloc(sizeof(*ht));
    _dictInit(ht,type,privDataPtr);
    return ht;
}

static int _dictInit(dict *ht, dictType *type, void *privDataPtr) {
    _dictReset(ht);
    ht->type = type;
    ht->privdata = privDataPtr;
    return DICT_OK;
}
复制代码

创建dict代码很简单

int dictAdd(dict *d, void *key, void *val)
{
    dictEntry *entry = dictAddRaw(d,key,NULL);

    if (!entry) return DICT_ERR;
    dictSetVal(d, entry, val);
    return DICT_OK;
}
复制代码

dictAdd方法调用了dictAddRaw

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    long index;
    dictEntry *entry;
    dictht *ht;
    
    if (dictIsRehashing(d)) _dictRehashStep(d);
    
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;

    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;

    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}
复制代码

dictIsRehashing判断是否在rehash过程中,如果正在迁移就会协助进行rehash

接下来通过_dictKeyIndex获取插入的索引,如果此时正在rehash中,那么插入数据只会插入第二个表

static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;

    //判断是否需要扩容 如果是就进行扩容
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    //在两个表中查询数据
    for (table = 0; table <= 1; table++) {
        idx = hash & d->ht[table].sizemask;
       
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
                if (existing) *existing = he;
                return -1;
            }
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;
    }
    return idx;
}
复制代码

以上是获取索引的方法,会在两个表中计算索引,如果正在rehash,那么始终返回的是第二个表的索引。

从以上代码可以得出rehash的特点:rehash不会多线程运行,而是在每次插入或者其他操作的时候进行rehash。从_dictKeyIndex这个方法也可以看出来,当没有进行rehash的时候两个表的sizemask都是一样的,所以计算出来的index是相等的,在进行rehash的时候第二个表sizemask是要比第一个表更大的,计算出来的index对应的是第二个表

static void _dictRehashStep(dict *d) {
    if (d->iterators == 0) dictRehash(d,1);
}
复制代码

实际上调用了dictRehash,方法参数是1,表示这次rehash只移动一个槽位

int dictRehash(dict *d, int n) {
    //允许访问最大的空槽数
    int empty_visits = n*10;
    if (!dictIsRehashing(d)) return 0;

    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;

        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        //rehashidx自增 直到节点不为空
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht[0].table[d->rehashidx];
        while(de) {
            uint64_t h;

            nextde = de->next;

            //计算新表的索引位置
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            //将de节点连上新表上的节点
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        //删除原表的数据
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }

    //rehash完成后 交换两个表
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        d->rehashidx = -1;
        return 0;
    }

    /* More to rehash... */
    return 1;
}
复制代码

empty_visits表示最大访问的空槽数量,while一共循环n次,每次都移动ht[0][rehashidx]到ht[1],ht[1]对应的位置如果有数据,会将新的节点放到链表的头部而不是尾部,因为头部的插入效率是O(1),这是一个单链表。

最后如果ht[0]完全迁移完毕,会交换两个表同时设置表的size为一致。

_dictKeyIndex计算索引的时候会执行_dictExpandIfNeeded来判断是否需要扩容,判断扩容就是在这里进行的

static int _dictExpandIfNeeded(dict *d)
{
    //正在迁移过程中不需要扩容
    if (dictIsRehashing(d)) return DICT_OK;

    // 表size为0表示才初始化 扩容大小为4
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
    
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}
复制代码

可以看到扩容有一个必要条件就是存储的节点数量 \geq 数组容量,还有一个条件就是dict_can_resize为true,dict_can_resize如果为false,则存储节点数量 是数组容量的五倍才会触发扩容,这个时候等于说不得不扩容了。

扩容策略为当前已使用节点数量的两倍。

void updateDictResizePolicy(void) {
    if (!hasActiveChildProcess())
        dictEnableResize();
    else
        dictDisableResize();
}

int hasActiveChildProcess() {
    return server.rdb_child_pid != -1 ||
           server.aof_child_pid != -1 ||
           server.module_child_pid != -1;
}
复制代码

dict_can_resize由updateDictResizePolicy函数控制,当rdb或者aof(bgsave)任务没有执行的时候才能够进行扩容,redis是使用Copy on Write策略来执行写盘的,为了避免在bgsave的时候多次COW而设置不能扩容,但是数量太多也不得不扩容。

int dictExpand(dict *d, unsigned long size)
{
    //如果正在rehash 或 使用量大于size 是无效的
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;

    dictht n;
    //扩容二次幂
    unsigned long realsize = _dictNextPower(size);

    //相同大小没效果
    if (realsize == d->ht[0].size) return DICT_ERR;

    //重新分配内存 设置属性
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;
    
    //对于刚新创建的表来说 扩容操作等于创建ht[0]
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }

    //将新创建的表设置成ht[1] 后续插入
    d->ht[1] = n;
    d->rehashidx = 0;
    return DICT_OK;
}
复制代码

dictExpand其实什么都没干,仅仅是将rehashidx设置成0,表示此时开始rehash了,接下来的增删改查就会根据上面代码触发一次rehash操作。

当插入操作很少的时候,不代表就不能触发rehash动作,redis还维护了定时任务会触发rehash。

void databasesCron(void) {
.......

if (server.activerehashing) {
            for (j = 0; j < dbs_per_call; j++) {
                int work_done = incrementallyRehash(rehash_db);
                if (work_done) {
                    /* If the function did some work, stop here, we'll do
                     * more at the next cron loop. */
                    break;
                } else {
                    /* If this db didn't need rehash, we'll try the next one. */
                    rehash_db++;
                    rehash_db %= server.dbnum;
                }
            }
        }

if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; /* already used our millisecond for this loop... */
    }

int dictRehashMilliseconds(dict *d, int ms) {
    long long start = timeInMilliseconds();
    int rehashes = 0;

    while(dictRehash(d,100)) {
        rehashes += 100;
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}
复制代码

dictRehashMilliseconds一次执行100个槽的rehash操作,没有超过1ms就继续执行,直到超过为止。

总结一下,redis由于单线程的原因,其实现还是非常简单,同时扩容策略也很简单:每次扩容为原来的两倍,但还是会计算成2n2^n数,扩容采用了渐进式扩容,每次进行增删改查的时候扩容数组的一个槽下所有节点到新表,当完全扩容后就可以进行表的替换了。相比于java的hashMap来说更简单(java还有0.75的阈值)

zipList

压缩链表虽然说是链表,但是并不像普通链表一样有结构体和指向下一个节点的指针,它就是一块内存地址,但是对这个地址进行了特殊的编码

yasuo.png

属性 类型 作用
zlbytes unint32_t 记录整个zipList占用内存的字节
zltial unint32_t 记录zipList的尾部到开始位置有多少字节,可以通过这个值计算出链表尾部
zlien unint16_t 表示链表节点的数量,小于65535直接可以计算,大于等于65535需要遍历,也就预示ziplist节点不宜太多
entry 不定 有多个 实际节点
zlend unint8_t 特殊值OxFF=255),用于标记压缩列表的末端
typedef struct zlentry {
    unsigned int prevrawlensize; /* encode前一个entry的字节长度*/
    unsigned int prevrawlen;     /* 前一个entry长度*/
    unsigned int lensize;        /* encode当前字节长度*/
    unsigned int len;            /* 实际长度 */
    unsigned int headersize;     /* prevrawlensize + lensize. */
    unsigned char encoding;      /* Set to ZIP_STR_* or ZIP_INT_* depending on
                                    the entry encoding. However for 4 bits
                                    immediate integers this can assume a range
                                    of values and must be range-checked. */
    unsigned char *p;            /* Pointer to the very start of the entry, that
                                    is, this points to prev-entry-len field. */
} zlentry;
复制代码

每一个entry都保存了前一个entry的长度,这也支持了尾部遍历的特性。

但是这也带来了一个问题,当插入一个新的节点时,有可能会影响后面节点的prevrawlensize,同时要修改后续节点属性

unsigned char *ziplistNew(void) {
    // unint32_t + unint32_t + unint16_t + unint8_t
    unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE;
    unsigned char *zl = zmalloc(bytes);
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
    //设置entry长度
    ZIPLIST_LENGTH(zl) = 0;
    //设置末尾特殊值
    zl[bytes-1] = ZIP_END;
    return zl;
}
复制代码

创建一个空的ziplist,由于没有entry,固定大小与上表所示:unint32_t + unint32_t + unint16_t + unint8_t。

unsigned char *ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    return __ziplistInsert(zl,p,s,slen);
}
复制代码

插入代码,进入__ziplistInsert

unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    //当前长度
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
    //前一个节点长度和编码长度
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    //默认一个初始值
    long long value = 123456789;
    zlentry tail;

    /* Find out prevlen for the entry that is inserted. */
    if (p[0] != ZIP_END) {
        //获取前一个节点的prevlensize和prevlen
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else {
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        //如果if成立 表示链表为空
        if (ptail[0] != ZIP_END) {
            prevlen = zipRawEntryLength(ptail);
        }
    }

    //尝试将entry encoding,成功后跟新reqlen
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        /* 'encoding' is set to the appropriate integer encoding */
        reqlen = zipIntSize(encoding);
    } else {
        /* 'encoding' is untouched, however zipStoreEntryEncoding will use the
         * string length to figure out how to encode it. */
        reqlen = slen;
    }
    //将前一个节点的长度和当前节点编码长度相加
    reqlen += zipStorePrevEntryLength(NULL,prevlen);
    reqlen += zipStoreEntryEncoding(NULL,encoding,slen);
复制代码

__ziplistInsert代码有点长这是前半部分,因为entry要保存前一个节点的长度信息,所以要拿出前一个节点的长度信息,并且后面一个节点要跟新prevrawlensize,这段代码就是计算出新的entry需要存储的数据

//如果不是尾行插入 需要计算下一行的PrevLen字段位数是否够
    int forcelarge = 0;
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
    if (nextdiff == -4 && reqlen < 4) {
        nextdiff = 0;
        forcelarge = 1;
    }

    //存储p和链表的偏移量 并且重新分配内存
    offset = p-zl;
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    p = zl+offset;

    if (p[0] != ZIP_END) {
        //由于新插入的节点 要将p移动到新的位置
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);


        if (forcelarge)
            zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
        else
            zipStorePrevEntryLength(p+reqlen,reqlen);

        //跟新tail字段 长度要加上reqlen
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

        zipEntry(p+reqlen, &tail);
        //如果新节点后面还有多个 表尾还要加上nextdiff
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
        //如果是尾部直接计算
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

    //一下函数可能要进行连锁跟新
    if (nextdiff != 0) {
        offset = p-zl;
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

    /* Write the entry */
    p += zipStorePrevEntryLength(p,prevlen);
    p += zipStoreEntryEncoding(p,encoding,slen);
    if (ZIP_IS_STR(encoding)) {
        memcpy(p,s,slen);
    } else {
        zipSaveInteger(p,value,encoding);
    }
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}
复制代码

由于PrevLen是不定长的,有可能出现:本来只需要4位字节就能保存,但是新插入节点的长度需要8位才能保存。出现这种情况需要跟新整个链表的长度,加上这缺失的一部分。

zl = ziplistResize(zl,curlen+reqlen+nextdiff);
复制代码

这段代码就是计算新的长度,nextdiff表示Prev字段的扩容长度

接下来的一段代码看注释就能明白,涉及到用memmove内存的拷贝(就跟移动数组差不多含义),如果nextdiff不为0,接下来可能还会涉及到连锁跟新(最快情况:PrevLen的扩容导致接下来每一个entry都扩容),进入__ziplistCascadeUpdate

unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize;
    size_t offset, noffset, extra;
    unsigned char *np;
    zlentry cur, next;

    while (p[0] != ZIP_END) {
        zipEntry(p, &cur);
        //当前entry长度
        rawlen = cur.headersize + cur.len;
        rawlensize = zipStorePrevEntryLength(NULL,rawlen);

        //如果是尾部就跳过
        if (p[rawlen] == ZIP_END) break;
        //获取下一个entry
        zipEntry(p+rawlen, &next);

        //如果长度相等跳过这个阶段
        if (next.prevrawlen == rawlen) break;
        //这种情况说明prevrawlensize的长度不够了 需要扩容
        if (next.prevrawlensize < rawlensize) {
            //保存p的偏移量 以便后续重新计算p
            offset = p-zl;
            //需要扩容的字节长度
            extra = rawlensize-next.prevrawlensize;
            //修改总字节长度
            zl = ziplistResize(zl,curlen+extra);
            p = zl+offset;

            //当前指针和下一个元素的偏移量
            np = p+rawlen;
            noffset = np-zl;

            //当np不是尾部的时候 跟新tail的偏移量 zl+extra
            if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
                ZIPLIST_TAIL_OFFSET(zl) =
                    intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
            }

            //移动数据
            memmove(np+rawlensize,
                np+next.prevrawlensize,
                curlen-noffset-next.prevrawlensize-1);
            zipStorePrevEntryLength(np,rawlen);

            //跟新指针 下一个entry
            p += rawlen;
            curlen += extra;
        } else {
            //空间多余 这将导致收缩,这是我们希望避免的,因此,在可用字节中设置rawlen
            if (next.prevrawlensize > rawlensize) {
                /* This would result in shrinking, which we want to avoid.
                 * So, set "rawlen" in the available bytes. */
                zipStorePrevEntryLengthLarge(p+rawlen,rawlen);
            } else {
                zipStorePrevEntryLength(p+rawlen,rawlen);
            }
            break;
        }
    }
    return zl;
}
复制代码

这一段代码核心就是:

memmove(np+rawlensize,
                np+next.prevrawlensize,
                curlen-noffset-next.prevrawlensize-1);
复制代码

IMG_E72E998C4D511.jpeg

扩容ziplist要将红线到绿线的区域移动蓝线,移动的长度为curlen-noffset-next.prevrawlensize-1,然后跟新rawlen的长度,随后还要跳转到下一个entry继续判断,直到符合条件为止。

//返回p节点后值是vstr 长度等于vlen的节点,skip为查找间隔
unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int vlen, unsigned int skip) {
    int skipcnt = 0;
    unsigned char vencoding = 0;
    long long vll = 0;

    while (p[0] != ZIP_END) {
        unsigned int prevlensize, encoding, lensize, len;
        unsigned char *q;

        ZIP_DECODE_PREVLENSIZE(p, prevlensize);
        ZIP_DECODE_LENGTH(p + prevlensize, encoding, lensize, len);

        //当前节点的实际数据
        q = p + prevlensize + lensize;

        //跳过了skip 为0的时候判断
        if (skipcnt == 0) {
            /* Compare current entry with specified entry */
            if (ZIP_IS_STR(encoding)) {
                if (len == vlen && memcmp(q, vstr, vlen) == 0) {
                    return p;
                }
            } else {
                //判断vstr能够用整数表示 只执行一次
                if (vencoding == 0) {
                    if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
                        /* If the entry can't be encoded we set it to
                         * UCHAR_MAX so that we don't retry again the next
                         * time. */
                        vencoding = UCHAR_MAX;
                    }
                    /* Must be non-zero by now */
                    assert(vencoding);
                }

                //entry地址转换为整数
                if (vencoding != UCHAR_MAX) {
                    long long ll = zipLoadInteger(q, encoding);
                    if (ll == vll) {
                        return p;
                    }
                }
            }
            //重置skip
            skipcnt = skip;
        } else {
            skipcnt--;
        }

        //下一个entry
        p = q + len;
    }

    return NULL;
}
复制代码

ziplistFind指定了skip参数表示查找间隔,如果为0的话相当于遍历,通过entry的属性来实现节点的遍历,遇到相等的就返回。

总结:ziplist里面涉及到详细的字节计算,因此经常使用JAVA的我来说看起来一头雾水,但是只要画图根据定义来计算长度,还是能够了解其中的含义。zipList节约了传统链表next指针的开销,但是实际上插入的复杂度并不是真正的O(1),即使连锁跟新的概率比较小。zipList能够节约内存,但并不适合存储大量数据,满足数据字节长度尽量统一能够避免连锁跟新,实现既效率高又节约内存。

总结

没啥好总结的

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享