searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

redis hash实现浅析

2023-10-10 07:12:59
5
0

代码:dict.h/dict.c

对于 Redis 键值数据库来说,Hash 表既是键值对中的一种值类型,同时,Redis 也使用一个全局 Hash 表来保存所有的键值对,从而既满足应用存取 Hash 结构数据需求,又能提供快速查询功能

Redis 采用了链式哈希处理冲突

  • 对于 rehash 开销,Redis 实现了渐进式 rehash 设计
  • 好的设计

节省内存的小技巧:下面使用union关键字,如果数据为uint64_t/int64_t/double,直接保存,无需用val指向了,节省一个指针的开销,代码如下:

typedef struct dictEntry {

void *key;

union {

void *val;

uint64_t u64;

int64_t s64;

double d;

} v;

struct dictEntry *next;

} dictEntry;
 

Redis 如何实现 rehash

 
/* This is our hash table structure. Every dictionary has two of this as we
 * implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
 dictEntry **table;
 unsigned long size;
 unsigned long sizemask; //用于将hash值映射到table位置的索引,大小为(size-1)
 unsigned long used;
} dictht;

#dict字典的数据结构
typedef struct dict{
 dictType *type; //直线dictType结构,dictType结构中包含自定义的函数,这些函数使得key和value能够存储任何类型的数据
 void *privdata; //私有数据,保存着dictType结构中函数的 参数
 dictht ht[2]; //两张哈希表
 long rehashidx; //rehash的标记,rehashidx=-1表示没有进行rehash,rehash时每迁移一个桶就对rehashidx加一
 int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
}
 

什么时候触发 rehash?

判断条件

  • 哈希表size为0,进行扩容,即进行初始化
  • 使用的大小超过size并且(能进行resize或者负载因子超过dict_force_resize_ration)并且能进行该操作,就进行扩容,这里才是rehash的判断
 
/* Expand the hash table if needed */

static int _dictExpandIfNeeded(dict *d)

{

/* Incremental rehashing already in progress. Return. */

if (dictIsRehashing(d)) return DICT_OK;

/* If the hash table is empty expand it to the initial size. */

if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

/* If we reached the 1:1 ratio, and we are allowed to resize the hash

* table (global setting) or we should avoid it but the ratio between

* elements/buckets is over the "safe" threshold, we resize doubling

* the number of buckets. */

if (d->ht[0].used >= d->ht[0].size &&

(dict_can_resize ||

d->ht[0].used/d->ht[0].size > dict_force_resize_ratio) &&

dictTypeExpandAllowed(d))

{

return dictExpand(d, d->ht[0].used + 1);

}

return DICT_OK;

}
 

更新 dict_can_resize 变量(能否进行resize)

该变量会在没有子进程时设置为1,即当前没有 RDB 子进程,并且也没有 AOF 子进程时就可以进行resize

/* This function is called once a background process of some kind terminates, * as we want to avoid resizing the hash tables when there is a child in order to play well with copy-on-write (otherwise when a resize happens lots of memory pages are copied). The goal of this function is to update the ability for dict.c to resize the hash tables accordingly to the fact we have a active fork child running. */
void updateDictResizePolicy(void) {
 if (!hasActiveChildProcess())
 dictEnableResize();
 else
 dictDisableResize();
}

void dictEnableResize(void) {
 dict_can_resize = 1;
}

void dictDisableResize(void) {
 dict_can_resize = 0;
}
 

为什么存在子进程时,不允许resize

大多数操作系统都采用写时复制(copy-on-write)技术来优化子进程的使用效率,所以在子进程存在期间,服务器会提高执行扩展操作所需的负载因子,从而尽可能地避免在子进程存在期间进行哈希表扩展操作,这可以避免不必要的内存写入操作, 最大限度地节约内存,简单来说就是子进程存在时,使用了写时复制,父进程如果写了,需要拷贝一份,浪费了内存空间

三种情况调用判断是否需要进行rehash

需要进行rehash时会分配ht[1]空间,以及设置rehashidx=0,表示正处于rehash,下次操作时会进行渐进式rehash

rehash 扩容扩多大?

对 Hash 表扩容的思路也很简单,就是如果当前表的已用空间大小为 size,那么就将表扩容到 "下一个2的倍数大小",该值大于等于(已用空间大小 + 1)

_dictNextPower(d->ht[0].used + 1) 返回扩容后的大小

static unsigned long _dictNextPower(unsigned long size)

{

//哈希表的初始大小

unsigned long i = DICT_HT_INITIAL_SIZE;

//如果要扩容的大小已经超过最大值,则返回最大值加1

if (size >= LONG_MAX) return LONG_MAX + 1LU;

//扩容大小没有超过最大值

while(1) {

//如果扩容大小大于等于最大值,就返回截至当前扩到的大小

if (i >= size)

return i;

//每一步扩容都在现有大小基础上乘以2

i *= 2;

}

}
 

渐进式 rehash 如何实现?

核心函数:int dictRehash(dict *d, int n)

渐进式 rehash 的意思就是 Redis 并不会一次性把当前 Hash 表中的所有键,都拷贝到新位置,而是会分批拷贝,每次的键拷贝只拷贝 Hash 表中一个 bucket 中的哈希项。这样一来,每次键拷贝的时长有限,对主线程的影响也就有限了

分批拷贝,每次拷贝多少

每次拷贝一个bucket,将ht[0]上某一个bucket(即一个桶上dictEntry链表)上的每一个链表移动到扩容后的ht[1]上(每次只移动一个链表,即渐进式rehash,使用 rehashidx 表示下一次要迁移链表所在桶的位置

/* This is our hash table structure. Every dictionary has two of this as we

* implement incremental rehashing, for the old to the new table. */

typedef struct dictht {

dictEntry **table;

unsigned long size;

unsigned long sizemask;

unsigned long used;

} dictht;

typedef struct dict {

dictType *type;

void *privdata;

dictht ht[2];

long rehashidx; /* rehashing not in progress if rehashidx == -1 */

int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */

} dict;
 

迁移期间增,删,改查怎么操作的

新增数据

在渐进式 rehash 执行期间,新添加到字典的键值对一律会被保存到ht[1]里面,而ht[0]则不再进行任何添加操作。这一措施保证了ht[0]包含的键值对数量会只减不增,并随着rehash操作的执行而最终变成空表

插入到新表,_dictKeyIndex代码中可以看出

/* Returns the index of a free slot that can be populated with
 * a hash entry for the given 'key'.
 * If the key already exists, -1 is returned
 * and the optional output parameter may be filled.
 *
 * Note that if we are in the process of rehashing the hash table, the
 * index is always returned in the context of the second (new) hash table. */
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;

    /* Expand the hash table if needed */
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    for (table = 0; table <= 1; table++) {
        idx = hash & d->ht[table].sizemask;
        /* Search if this slot does not already contain the given key */
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
                if (existing) *existing = he;
                return -1;
            }
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;  // 如果不处于rehash,则直接break,此时为ht[0],即旧表,否则为ht[1],即新表
    }
    return idx;
}
 

删除操作

删除时会访问两个表进行删除操作,见函数 dictGenericDelete

查询操作

程序会先在ht[0]里面进行查找,如果没找到的话,就会继续到ht[1]里面进行查找

dictEntry *dictFind(dict *d, const void *key)

{

dictEntry *he;

uint64_t h, idx, table;

if (dictSize(d) == 0) return NULL; /* dict is empty */

if (dictIsRehashing(d)) _dictRehashStep(d);

h = dictHashKey(d, key);

for (table = 0; table <= 1; table++) {

idx = h & d->ht[table].sizemask;

he = d->ht[table].table[idx];

while(he) {

if (key==he->key || dictCompareKeys(d, key, he->key))

return he;

he = he->next;

}

if (!dictIsRehashing(d)) return NULL;

}

return NULL;

}
 

更新操作

会访问两个表,如果处于rehash,就会返回新表的索引

/* Returns the index of a free slot that can be populated with

* a hash entry for the given 'key'.

* If the key already exists, -1 is returned

* and the optional output parameter may be filled.

*

* Note that if we are in the process of rehashing the hash table, the

* index is always returned in the context of the second (new) hash table. */

static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)

{

unsigned long idx, table;

dictEntry *he;

if (existing) *existing = NULL;

/* Expand the hash table if needed */

if (_dictExpandIfNeeded(d) == DICT_ERR)

return -1;

for (table = 0; table <= 1; table++) {

idx = hash & d->ht[table].sizemask;

/* Search if this slot does not already contain the given key */

he = d->ht[table].table[idx];

while(he) {

if (key==he->key || dictCompareKeys(d, key, he->key)) {

if (existing) *existing = he;

return -1;

}

he = he->next;

}
 

操作时rehash

在rehash进行期间,每次对字典执行添加、删除、查找或者更新操作时,程序除了执行指定的操作以外,还会顺带将ht[0]哈希表在 rehashidx索引(table[rehashidx]桶上的链表)上的所有键值对rehash到ht[1]上,当rehash工作完成之后,将rehashidx属性的值增一,表示下一次要迁移链表所在桶的位置

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)

{

long index;

dictEntry *entry;

dictht *ht;

if (dictIsRehashing(d)) _dictRehashStep(d);

/* Get the index of the new element, or -1 if

* the element already exists. */

if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)

return NULL;

/* Allocate the memory and store the new entry.

* Insert the element in top, with the assumption that in a database

* system it is more likely that recently added entries are accessed

* more frequently. */

ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];

entry = zmalloc(sizeof(*entry));

entry->next = ht->table[index];

ht->table[index] = entry;

ht->used++;

/* Set the hash entry fields. */

dictSetKey(d, entry, key);

return entry;

}
 

迁移时,如果遇到empty bucket怎么办

渐进式 rehash 在执行时设置了一个变量 empty_visits,用来表示已经检查过的空 bucket,当检查了一定数量的空 bucket 后,这一轮的 rehash 就停止执行,转而继续处理外来请求,避免了对 Redis 性能的影响

0条评论
0 / 1000
9****m
15文章数
1粉丝数
9****m
15 文章 | 1 粉丝
原创

redis hash实现浅析

2023-10-10 07:12:59
5
0

代码:dict.h/dict.c

对于 Redis 键值数据库来说,Hash 表既是键值对中的一种值类型,同时,Redis 也使用一个全局 Hash 表来保存所有的键值对,从而既满足应用存取 Hash 结构数据需求,又能提供快速查询功能

Redis 采用了链式哈希处理冲突

  • 对于 rehash 开销,Redis 实现了渐进式 rehash 设计
  • 好的设计

节省内存的小技巧:下面使用union关键字,如果数据为uint64_t/int64_t/double,直接保存,无需用val指向了,节省一个指针的开销,代码如下:

typedef struct dictEntry {

void *key;

union {

void *val;

uint64_t u64;

int64_t s64;

double d;

} v;

struct dictEntry *next;

} dictEntry;
 

Redis 如何实现 rehash

 
/* This is our hash table structure. Every dictionary has two of this as we
 * implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
 dictEntry **table;
 unsigned long size;
 unsigned long sizemask; //用于将hash值映射到table位置的索引,大小为(size-1)
 unsigned long used;
} dictht;

#dict字典的数据结构
typedef struct dict{
 dictType *type; //直线dictType结构,dictType结构中包含自定义的函数,这些函数使得key和value能够存储任何类型的数据
 void *privdata; //私有数据,保存着dictType结构中函数的 参数
 dictht ht[2]; //两张哈希表
 long rehashidx; //rehash的标记,rehashidx=-1表示没有进行rehash,rehash时每迁移一个桶就对rehashidx加一
 int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
}
 

什么时候触发 rehash?

判断条件

  • 哈希表size为0,进行扩容,即进行初始化
  • 使用的大小超过size并且(能进行resize或者负载因子超过dict_force_resize_ration)并且能进行该操作,就进行扩容,这里才是rehash的判断
 
/* Expand the hash table if needed */

static int _dictExpandIfNeeded(dict *d)

{

/* Incremental rehashing already in progress. Return. */

if (dictIsRehashing(d)) return DICT_OK;

/* If the hash table is empty expand it to the initial size. */

if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

/* If we reached the 1:1 ratio, and we are allowed to resize the hash

* table (global setting) or we should avoid it but the ratio between

* elements/buckets is over the "safe" threshold, we resize doubling

* the number of buckets. */

if (d->ht[0].used >= d->ht[0].size &&

(dict_can_resize ||

d->ht[0].used/d->ht[0].size > dict_force_resize_ratio) &&

dictTypeExpandAllowed(d))

{

return dictExpand(d, d->ht[0].used + 1);

}

return DICT_OK;

}
 

更新 dict_can_resize 变量(能否进行resize)

该变量会在没有子进程时设置为1,即当前没有 RDB 子进程,并且也没有 AOF 子进程时就可以进行resize

/* This function is called once a background process of some kind terminates, * as we want to avoid resizing the hash tables when there is a child in order to play well with copy-on-write (otherwise when a resize happens lots of memory pages are copied). The goal of this function is to update the ability for dict.c to resize the hash tables accordingly to the fact we have a active fork child running. */
void updateDictResizePolicy(void) {
 if (!hasActiveChildProcess())
 dictEnableResize();
 else
 dictDisableResize();
}

void dictEnableResize(void) {
 dict_can_resize = 1;
}

void dictDisableResize(void) {
 dict_can_resize = 0;
}
 

为什么存在子进程时,不允许resize

大多数操作系统都采用写时复制(copy-on-write)技术来优化子进程的使用效率,所以在子进程存在期间,服务器会提高执行扩展操作所需的负载因子,从而尽可能地避免在子进程存在期间进行哈希表扩展操作,这可以避免不必要的内存写入操作, 最大限度地节约内存,简单来说就是子进程存在时,使用了写时复制,父进程如果写了,需要拷贝一份,浪费了内存空间

三种情况调用判断是否需要进行rehash

需要进行rehash时会分配ht[1]空间,以及设置rehashidx=0,表示正处于rehash,下次操作时会进行渐进式rehash

rehash 扩容扩多大?

对 Hash 表扩容的思路也很简单,就是如果当前表的已用空间大小为 size,那么就将表扩容到 "下一个2的倍数大小",该值大于等于(已用空间大小 + 1)

_dictNextPower(d->ht[0].used + 1) 返回扩容后的大小

static unsigned long _dictNextPower(unsigned long size)

{

//哈希表的初始大小

unsigned long i = DICT_HT_INITIAL_SIZE;

//如果要扩容的大小已经超过最大值,则返回最大值加1

if (size >= LONG_MAX) return LONG_MAX + 1LU;

//扩容大小没有超过最大值

while(1) {

//如果扩容大小大于等于最大值,就返回截至当前扩到的大小

if (i >= size)

return i;

//每一步扩容都在现有大小基础上乘以2

i *= 2;

}

}
 

渐进式 rehash 如何实现?

核心函数:int dictRehash(dict *d, int n)

渐进式 rehash 的意思就是 Redis 并不会一次性把当前 Hash 表中的所有键,都拷贝到新位置,而是会分批拷贝,每次的键拷贝只拷贝 Hash 表中一个 bucket 中的哈希项。这样一来,每次键拷贝的时长有限,对主线程的影响也就有限了

分批拷贝,每次拷贝多少

每次拷贝一个bucket,将ht[0]上某一个bucket(即一个桶上dictEntry链表)上的每一个链表移动到扩容后的ht[1]上(每次只移动一个链表,即渐进式rehash,使用 rehashidx 表示下一次要迁移链表所在桶的位置

/* This is our hash table structure. Every dictionary has two of this as we

* implement incremental rehashing, for the old to the new table. */

typedef struct dictht {

dictEntry **table;

unsigned long size;

unsigned long sizemask;

unsigned long used;

} dictht;

typedef struct dict {

dictType *type;

void *privdata;

dictht ht[2];

long rehashidx; /* rehashing not in progress if rehashidx == -1 */

int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */

} dict;
 

迁移期间增,删,改查怎么操作的

新增数据

在渐进式 rehash 执行期间,新添加到字典的键值对一律会被保存到ht[1]里面,而ht[0]则不再进行任何添加操作。这一措施保证了ht[0]包含的键值对数量会只减不增,并随着rehash操作的执行而最终变成空表

插入到新表,_dictKeyIndex代码中可以看出

/* Returns the index of a free slot that can be populated with
 * a hash entry for the given 'key'.
 * If the key already exists, -1 is returned
 * and the optional output parameter may be filled.
 *
 * Note that if we are in the process of rehashing the hash table, the
 * index is always returned in the context of the second (new) hash table. */
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;

    /* Expand the hash table if needed */
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    for (table = 0; table <= 1; table++) {
        idx = hash & d->ht[table].sizemask;
        /* Search if this slot does not already contain the given key */
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
                if (existing) *existing = he;
                return -1;
            }
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;  // 如果不处于rehash,则直接break,此时为ht[0],即旧表,否则为ht[1],即新表
    }
    return idx;
}
 

删除操作

删除时会访问两个表进行删除操作,见函数 dictGenericDelete

查询操作

程序会先在ht[0]里面进行查找,如果没找到的话,就会继续到ht[1]里面进行查找

dictEntry *dictFind(dict *d, const void *key)

{

dictEntry *he;

uint64_t h, idx, table;

if (dictSize(d) == 0) return NULL; /* dict is empty */

if (dictIsRehashing(d)) _dictRehashStep(d);

h = dictHashKey(d, key);

for (table = 0; table <= 1; table++) {

idx = h & d->ht[table].sizemask;

he = d->ht[table].table[idx];

while(he) {

if (key==he->key || dictCompareKeys(d, key, he->key))

return he;

he = he->next;

}

if (!dictIsRehashing(d)) return NULL;

}

return NULL;

}
 

更新操作

会访问两个表,如果处于rehash,就会返回新表的索引

/* Returns the index of a free slot that can be populated with

* a hash entry for the given 'key'.

* If the key already exists, -1 is returned

* and the optional output parameter may be filled.

*

* Note that if we are in the process of rehashing the hash table, the

* index is always returned in the context of the second (new) hash table. */

static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)

{

unsigned long idx, table;

dictEntry *he;

if (existing) *existing = NULL;

/* Expand the hash table if needed */

if (_dictExpandIfNeeded(d) == DICT_ERR)

return -1;

for (table = 0; table <= 1; table++) {

idx = hash & d->ht[table].sizemask;

/* Search if this slot does not already contain the given key */

he = d->ht[table].table[idx];

while(he) {

if (key==he->key || dictCompareKeys(d, key, he->key)) {

if (existing) *existing = he;

return -1;

}

he = he->next;

}
 

操作时rehash

在rehash进行期间,每次对字典执行添加、删除、查找或者更新操作时,程序除了执行指定的操作以外,还会顺带将ht[0]哈希表在 rehashidx索引(table[rehashidx]桶上的链表)上的所有键值对rehash到ht[1]上,当rehash工作完成之后,将rehashidx属性的值增一,表示下一次要迁移链表所在桶的位置

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)

{

long index;

dictEntry *entry;

dictht *ht;

if (dictIsRehashing(d)) _dictRehashStep(d);

/* Get the index of the new element, or -1 if

* the element already exists. */

if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)

return NULL;

/* Allocate the memory and store the new entry.

* Insert the element in top, with the assumption that in a database

* system it is more likely that recently added entries are accessed

* more frequently. */

ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];

entry = zmalloc(sizeof(*entry));

entry->next = ht->table[index];

ht->table[index] = entry;

ht->used++;

/* Set the hash entry fields. */

dictSetKey(d, entry, key);

return entry;

}
 

迁移时,如果遇到empty bucket怎么办

渐进式 rehash 在执行时设置了一个变量 empty_visits,用来表示已经检查过的空 bucket,当检查了一定数量的空 bucket 后,这一轮的 rehash 就停止执行,转而继续处理外来请求,避免了对 Redis 性能的影响

文章来自个人专栏
redis代码剖析
9 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0