Redis源码剖析之快速列表
- 数据结构
- quicklist
- quicklistNode
- quicklist的操作
- 创建
- 头插和尾插
- 特定位置插入
- 数据删除
- 其他API
- 参考资料
何为quicklist,上次说到ziplist每次变更的时间复杂度都非常高,因为必须要重新生成一个新的ziplist来作为更新后的list,如果一个list非常大且更新频繁,那就会给redis带来非常大的负担。如何既保留ziplist的空间高效性,又能不让其更新复杂度过高? redis的作者给出的答案就是quicklist。
其实说白了就是把ziplist和普通的双向链表结合起来。每个双链表节点中保存一个ziplist,然后每个ziplist中存一批list中的数据(具体ziplist大小可配置),这样既可以避免大量链表指针带来的内存消耗,也可以避免ziplist更新导致的大量性能损耗,将大的ziplist化整为零。
数据结构
quicklist
一图胜千言,我们来看下一个实际的quicklist在内存中长啥样。
大致介绍下上图中不同的节点,所有redis中list其实都是quicklist,所以像pop push等命令中的list参数都是quicklist。quicklist各字段及其含义如下。
typedef struct quicklist {
quicklistNode *head; /* 头结点 */
quicklistNode *tail; /* 尾结点 */
unsigned long count; /* 在所有的ziplist中的entry总数 */
unsigned long len; /* quicklist节点总数 */
int fill : QL_FILL_BITS; /* 16位,每个节点的最大容量 */
unsigned int compress : QL_COMP_BITS; /* 16位,quicklist的压缩深度,0表示所有节点都不压缩,否则就表示从两端开始有多少个节点不压缩 */
unsigned int bookmark_count: QL_BM_BITS; /*4位,bookmarks数组的大小,bookmarks是一个可选字段,用来quicklist重新分配内存空间时使用,不使用时不占用空间*/
quicklistBookmark bookmarks[];
} quicklist;
可以看出quicklist其实就是简单的双链表,但这里多出来几个字段,先重点介绍下compress。在上图中我用了两种不同颜色的节点,其中绿色是普通的ziplist节点,而红色是被压缩后的ziplist节点(LZF节点),LZF是种无损压缩算法。redis为了节省内存空间,会将quicklist的节点用LZF压缩后存储,但这里不是全部压缩,可以配置compress的值,compress为0表示所有节点都不压缩,否则就表示从两端开始有多少个节点不压缩,像我上图图示中,compress就是1,表示从两端开始,有1个节点不做LZF压缩。**compress默认是0(不压缩),具体可以根据你们业务实际使用场景去配置。 **
为什么不全部节点都压缩,而是流出compress这个可配置的口子呢?其实从统计而已,list两端的数据变更最为频繁,像lpush,rpush,lpop,rpop等命令都是在两端操作,如果频繁压缩或解压缩会代码不必要的性能损耗。从这里可以看出 redis其实并不是一味追求性能,它也在努力减少存储占用、在存储和性能之间做trade-off。
这里还有个fill字段,它的含义是每个quicknode的节点最大容量,不同的数值有不同的含义,默认是-2,当然也可以配置为其他数值,具体数值含义如下:
- -1: 每个quicklistNode节点的ziplist所占字节数不能超过4kb。(建议配置)
- -2: 每个quicklistNode节点的ziplist所占字节数不能超过8kb。(默认配置&建议配置)
- -3: 每个quicklistNode节点的ziplist所占字节数不能超过16kb。
- -4: 每个quicklistNode节点的ziplist所占字节数不能超过32kb。
- -5: 每个quicklistNode节点的ziplist所占字节数不能超过64kb。
- 任意正数: 表示:ziplist结构所最多包含的entry个数,最大为215215。
quicklistNode
quicklistNode就是双链表的节点封装了,除了前后节点的指针外,这里还包含一些本节点的其他信息。比如是否是LZF压缩的节点、ziplist相关信息…… 具体如下:
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl; /* quicklist节点对应的ziplist */
unsigned int sz; /* ziplist的字节数 */
unsigned int count : 16; /* ziplist的item数*/
unsigned int encoding : 2; /* 数据类型,RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* 这个节点以前压缩过吗? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* 未使用到的10位 */
} quicklistNode;
从上文中我们已经了解了一个quicklist某个时刻在内存中的样子,接下我们来看下它是如何在数据插入删除时变化的。
quicklist的操作
创建
/* 创建一个新的quicklist.
* 使用quicklistRelease()释放quicklist. */
quicklist *quicklistCreate(void) {
struct quicklist *quicklist;
quicklist = zmalloc(sizeof(*quicklist));
quicklist->head = quicklist->tail = NULL;
quicklist->len = 0;
quicklist->count = 0;
quicklist->compress = 0;
quicklist->fill = -2;
quicklist->bookmark_count = 0;
return quicklist;
}
create就没啥好说的了,但这里需要提醒下,fill值默认是-2,也就是说每个quicklistNode中的ziplist最长是8k字节,可以更具自己业务需求调整具体配置。
头插和尾插
对于list而已,头部或者尾部插入是最常见的操作了,但其实头插和尾插还算是比较简单。
/* 在quicklist的头部插入一条数据
* 如果在已存在节点插入,返回0
* 如果是在新的头结点插入,返回1 */
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD); // 在头结点对应的ziplist中插入
quicklistNodeUpdateSz(quicklist->head);
} else { // 否则新建一个头结点,然后插入数据
quicklistNode *node = quicklistCreateNode();
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(node);
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}
/* 在quicklist的尾部插入一条数据
* 如果在已存在节点插入,返回0
* 如果是在新的头结点插入,返回1 */
int quicklistPushTail(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_tail = quicklist->tail;
if (likely(
_quicklistNodeAllowInsert(quicklist->tail, quicklist->fill, sz))) {
quicklist->tail->zl =
ziplistPush(quicklist->tail->zl, value, sz, ZIPLIST_TAIL);
quicklistNodeUpdateSz(quicklist->tail);
} else {
quicklistNode *node = quicklistCreateNode();
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_TAIL);
quicklistNodeUpdateSz(node);
_quicklistInsertNodeAfter(quicklist, quicklist->tail, node);
}
quicklist->count++;
quicklist->tail->count++;
return (orig_tail != quicklist->tail);
}
头插和尾插都调用了_quicklistNodeAllowInsert
先判断了是否能直接在当前头|尾节点能插入,如果能就直接插入到对应的ziplist里,否则就需要新建一个新节点再操作了。 还记得上文中我们说的fill
字段吗,_quicklistNodeAllowInsert
其实就是根据fill的具体值来判断是否已经超过最大容量。
特定位置插入
头插尾插比较简单,但quicklist在非头尾插入就比较繁琐了,因为需要考虑到插入位置、前节点、后节点的存储情况。
/* 在一个已经存在的entry前面或者后面插入一个新的entry
* 如果after==1表示插入到后面,否则是插入到前面 */
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz, int after) {
int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0;
int fill = quicklist->fill;
quicklistNode *node = entry->node;
quicklistNode *new_node = NULL;
if (!node) {
/* 如果entry中未填node,则重新创建一个node并插入到quicklist中 */
D("No node given!");
new_node = quicklistCreateNode();
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
__quicklistInsertNode(quicklist, NULL, new_node, after);
new_node->count++;
quicklist->count++;
return;
}
/* 检查要插入的节点是否是满的 */
if (!_quicklistNodeAllowInsert(node, fill, sz)) {
D("Current node is full with count %d with requested fill %lu",
node->count, fill);
full = 1;
}
if (after && (entry->offset == node->count)) {
D("At Tail of current ziplist");
at_tail = 1;
if (!_quicklistNodeAllowInsert(node->next, fill, sz)) {
D("Next node is full too.");
full_next = 1;
}
}
if (!after && (entry->offset == 0)) {
D("At Head");
at_head = 1;
if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) {
D("Prev node is full too.");
full_prev = 1;
}
}
/* 不确定把新元素插到哪 */
if (!full && after) { // 如果当前节点不满,就直接插入
D("Not full, inserting after current position.");
quicklistDecompressNodeForUse(node);
unsigned char *next = ziplistNext(node->zl, entry->zi);
if (next == NULL) {
node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);
} else {
node->zl = ziplistInsert(node->zl, next, value, sz);
}
node->count++;
quicklistNodeUpdateSz(node);
quicklistRecompressOnly(quicklist, node);
} else if (!full && !after) {
D("Not full, inserting before current position.");
quicklistDecompressNodeForUse(node);
node->zl = ziplistInsert(node->zl, entry->zi, value, sz);
node->count++;
quicklistNodeUpdateSz(node);
quicklistRecompressOnly(quicklist, node);
} else if (full && at_tail && node->next && !full_next && after) {
/* 如果当前节点是满的,要插入的位置是当前节点的尾部,且后一个节点有空间,那就插到后一个节点的头部。*/
D("Full and tail, but next isn't full; inserting next node head");
new_node = node->next;
quicklistDecompressNodeForUse(new_node);
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
} else if (full && at_head && node->prev && !full_prev && !after) {
/* 如果当前节点是满的,要插入的位置是当前节点的头部,且前一个节点有空间,那就插到前一个节点的尾部。 */
D("Full and head, but prev isn't full, inserting prev node tail");
new_node = node->prev;
quicklistDecompressNodeForUse(new_node);
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
} else if (full && ((at_tail && node->next && full_next && after) ||
(at_head && node->prev && full_prev && !after))) {
/* 如果当前节点是满的,前后节点也都是满的,那就创建一个新的节点插进去 */
D("\tprovisioning new node...");
new_node = quicklistCreateNode();
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
} else if (full) {
/* 否则,当前节点是满的,我们需要把它分裂成两个新节点,一般用于插入到当前节点ziplist中间某个位置时 */
D("\tsplitting node...");
quicklistDecompressNodeForUse(node);
new_node = _quicklistSplitNode(node, entry->offset, after);
new_node->zl = ziplistPush(new_node->zl, value, sz,
after ? ZIPLIST_HEAD : ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
_quicklistMergeNodes(quicklist, node);
}
quicklist->count++;
}
代码比较长,总结如下:
- 如果当前被插入节点不满,直接插入。
- 如果当前被插入节点是满的,要插入的位置是当前节点的尾部,且后一个节点有空间,那就插到后一个节点的头部。
- 如果当前被插入节点是满的,要插入的位置是当前节点的头部,且前一个节点有空间,那就插到前一个节点的尾部。
- 如果当前被插入节点是满的,前后节点也都是满的,要插入的位置是当前节点的头部或者尾部,那就创建一个新的节点插进去。
- 否则,当前节点是满的,且要插入的位置在当前节点的中间位置,我们需要把当前节点分裂成两个新节点,然后再插入。
数据删除
数据删除相对于插入而言应该是反着来的,看完下面的代码后你就会发现不完全是:
void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) {
quicklistNode *prev = entry->node->prev;
quicklistNode *next = entry->node->next;
int deleted_node = quicklistDelIndex((quicklist *)entry->quicklist,
entry->node, &entry->zi);
/* after delete, the zi is now invalid for any future usage. */
iter->zi = NULL;
/* If current node is deleted, we must update iterator node and offset. */
if (deleted_node) {
if (iter->direction == AL_START_HEAD) {
iter->current = next;
iter->offset = 0;
} else if (iter->direction == AL_START_TAIL) {
iter->current = prev;
iter->offset = -1;
}
}
}
REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node,
unsigned char **p) {
int gone = 0;
node->zl = ziplistDelete(node->zl, p);
node->count--;
if (node->count == 0) {
gone = 1;
__quicklistDelNode(quicklist, node);
} else {
quicklistNodeUpdateSz(node);
}
quicklist->count--;
/* If we deleted the node, the original node is no longer valid */
return gone ? 1 : 0;
}
删除相对于插入而言简单多了,我先看的插入逻辑,插入中有节点的分裂,但删除里却没有节点的合并,quicklist有节点最大容量,但没有最小容量限制。
其他API
理解了quicklist数据结构的设计,也基本就能猜测到每个api的具体实现了,这里我就不再罗列代码了,有兴趣可以自行查阅。
quicklist *quicklistCreate(void); // 创建quicklist
quicklist *quicklistNew(int fill, int compress); // 用一些指定参数创建一个新的quicklist
void quicklistSetCompressDepth(quicklist *quicklist, int depth); // 设置压缩深度
void quicklistSetFill(quicklist *quicklist, int fill); // 设置容量上限
void quicklistSetOptions(quicklist *quicklist, int fill, int depth);
void quicklistRelease(quicklist *quicklist); // 释放quicklist
int quicklistPushHead(quicklist *quicklist, void *value, const size_t sz); // 头部插入
int quicklistPushTail(quicklist *quicklist, void *value, const size_t sz); // 尾部插入
void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
int where); // 指定头部或者尾部插入
void quicklistAppendZiplist(quicklist *quicklist, unsigned char *zl); // 把一个ziplist放到quicklist中
quicklist *quicklistAppendValuesFromZiplist(quicklist *quicklist,
unsigned char *zl); // 把ziplist中的所有数据放到quicklist中
quicklist *quicklistCreateFromZiplist(int fill, int compress,
unsigned char *zl); // 从ziplist生成一个quicklist
void quicklistInsertAfter(quicklist *quicklist, quicklistEntry *node,
void *value, const size_t sz);
void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *node,
void *value, const size_t sz);
void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry); // 数据删除
int quicklistReplaceAtIndex(quicklist *quicklist, long index, void *data,
int sz); // 数据替换
int quicklistDelRange(quicklist *quicklist, const long start, const long stop); // 范围删除
quicklistIter *quicklistGetIterator(const quicklist *quicklist, int direction); // 迭代器
quicklistIter *quicklistGetIteratorAtIdx(const quicklist *quicklist,
int direction, const long long idx); // 从指定位置开始的迭代器
int quicklistNext(quicklistIter *iter, quicklistEntry *node); // 迭代器下一个位置
void quicklistReleaseIterator(quicklistIter *iter); // 释放迭代器
quicklist *quicklistDup(quicklist *orig); // 去重
int quicklistIndex(const quicklist *quicklist, const long long index,
quicklistEntry *entry); // 找到entry的下标索引
void quicklistRewind(quicklist *quicklist, quicklistIter *li);
void quicklistRewindTail(quicklist *quicklist, quicklistIter *li);
void quicklistRotate(quicklist *quicklist); // 选择quicklist
int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,
unsigned int *sz, long long *sval,
void *(*saver)(unsigned char *data, unsigned int sz));
int quicklistPop(quicklist *quicklist, int where, unsigned char **data,
unsigned int *sz, long long *slong); // 数据pop
unsigned long quicklistCount(const quicklist *ql);
int quicklistCompare(unsigned char *p1, unsigned char *p2, int p2_len); // 比较大小
size_t quicklistGetLzf(const quicklistNode *node, void **data); // LZF节点