Redis Cluster的reshard功能
Redis Cluster是一个分布式的redis集群,通过基于KEY进行HASH分桶,把KEY拆成16,384个不同的slot。 Redis cluter支持动态扩缩容,这时意味着需要把slot和里面的数据从一个节点迁移到另一个节点上,实现数据的重平衡(reshard)功能。
背景
Redis Cluster是一个分布式的redis集群。通过基于KEY进行HASH分桶,把KEY拆成16,384个不同的slot。 这16,384个slot会分布在所有的redis集群中。 redis cluster客户端访问时, 会通过调用cluster nodes或cluster slots的命令,查询所有slot的分布信息,再计算KEY值对应slot,往对应的节点去发送请求。
由于redis是一个纯内存的KV存储, 数据都是放在内存上。受限于内存资源的有限性,为了解决这个问题,需要把多个机器的内存组成集群来使用。 而这个集群必须具备根据实际的使用需要,能够增加或减少节点的能力。
而扩缩容节点,核心的逻辑只有2个: 增加变更节点的元数据; 从原来的节点上迁移数据到新节点上 , 即redis cluster的重平衡(reshard)功能。
使用例子
参考官方文档
redis cluster的rebalance功能,有一个官方的开源实现实现 ,
可以利用redis-cli命令行去执行:
redis-cli --cluster reshard 127.0.0.1:7000
官方源码
可以参考官方的代码
核心的源码如下:
static int clusterManagerCommandReshard(int argc, char **argv) {
int port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
clusterManagerCheckCluster(0);
if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) {
fflush(stdout);
fprintf(stderr,
"*** Please fix your cluster problems before resharding\n");
return 0;
}
int slots = config.cluster_manager_command.slots;
if (!slots) {
while (slots <= 0 || slots > CLUSTER_MANAGER_SLOTS) {
printf("How many slots do you want to move (from 1 to %d)? ",
CLUSTER_MANAGER_SLOTS);
fflush(stdout);
char buf[6];
int nread = read(fileno(stdin),buf,6);
if (nread <= 0) continue;
int last_idx = nread - 1;
if (buf[last_idx] != '\n') {
int ch;
while ((ch = getchar()) != '\n' && ch != EOF) {}
}
buf[last_idx] = '\0';
slots = atoi(buf);
}
}
char buf[255];
char *to = config.cluster_manager_command.to,
*from = config.cluster_manager_command.from;
while (to == NULL) {
printf("What is the receiving node ID? ");
fflush(stdout);
int nread = read(fileno(stdin),buf,255);
if (nread <= 0) continue;
int last_idx = nread - 1;
if (buf[last_idx] != '\n') {
int ch;
while ((ch = getchar()) != '\n' && ch != EOF) {}
}
buf[last_idx] = '\0';
if (strlen(buf) > 0) to = buf;
}
int raise_err = 0;
clusterManagerNode *target = clusterNodeForResharding(to, NULL, &raise_err);
if (target == NULL) return 0;
list *sources = listCreate();
list *table = NULL;
int all = 0, result = 1;
if (from == NULL) {
printf("Please enter all the source node IDs.\n");
printf(" Type 'all' to use all the nodes as source nodes for "
"the hash slots.\n");
printf(" Type 'done' once you entered all the source nodes IDs.\n");
while (1) {
printf("Source node #%lu: ", listLength(sources) + 1);
fflush(stdout);
int nread = read(fileno(stdin),buf,255);
if (nread <= 0) continue;
int last_idx = nread - 1;
if (buf[last_idx] != '\n') {
int ch;
while ((ch = getchar()) != '\n' && ch != EOF) {}
}
buf[last_idx] = '\0';
if (!strcmp(buf, "done")) break;
else if (!strcmp(buf, "all")) {
all = 1;
break;
} else {
clusterManagerNode *src =
clusterNodeForResharding(buf, target, &raise_err);
if (src != NULL) listAddNodeTail(sources, src);
else if (raise_err) {
result = 0;
goto cleanup;
}
}
}
} else {
char *p;
while((p = strchr(from, ',')) != NULL) {
*p = '\0';
if (!strcmp(from, "all")) {
all = 1;
break;
} else {
clusterManagerNode *src =
clusterNodeForResharding(from, target, &raise_err);
if (src != NULL) listAddNodeTail(sources, src);
else if (raise_err) {
result = 0;
goto cleanup;
}
}
from = p + 1;
}
/* Check if there's still another source to process. */
if (!all && strlen(from) > 0) {
if (!strcmp(from, "all")) all = 1;
if (!all) {
clusterManagerNode *src =
clusterNodeForResharding(from, target, &raise_err);
if (src != NULL) listAddNodeTail(sources, src);
else if (raise_err) {
result = 0;
goto cleanup;
}
}
}
}
listIter li;
listNode *ln;
if (all) {
listEmpty(sources);
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE || n->replicate)
continue;
if (!sdscmp(n->name, target->name)) continue;
listAddNodeTail(sources, n);
}
}
if (listLength(sources) == 0) {
fprintf(stderr, "*** No source nodes given, operation aborted.\n");
result = 0;
goto cleanup;
}
printf("\nReady to move %d slots.\n", slots);
printf(" Source nodes:\n");
listRewind(sources, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *src = ln->value;
sds info = clusterManagerNodeInfo(src, 4);
printf("%s\n", info);
sdsfree(info);
}
printf(" Destination node:\n");
sds info = clusterManagerNodeInfo(target, 4);
printf("%s\n", info);
sdsfree(info);
table = clusterManagerComputeReshardTable(sources, slots);
printf(" Resharding plan:\n");
clusterManagerShowReshardTable(table);
if (!(config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_YES))
{
printf("Do you want to proceed with the proposed "
"reshard plan (yes/no)? ");
fflush(stdout);
char buf[4];
int nread = read(fileno(stdin),buf,4);
buf[3] = '\0';
if (nread <= 0 || strcmp("yes", buf) != 0) {
result = 0;
goto cleanup;
}
}
int opts = CLUSTER_MANAGER_OPT_VERBOSE;
listRewind(table, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerReshardTableItem *item = ln->value;
char *err = NULL;
result = clusterManagerMoveSlot(item->source, target, item->slot,
opts, &err);
if (!result) {
if (err != NULL) {
clusterManagerLogErr("clusterManagerMoveSlot failed: %s\n", err);
zfree(err);
}
goto cleanup;
}
}
cleanup:
listRelease(sources);
clusterManagerReleaseReshardTable(table);
return result;
invalid_args:
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
return 0;
}
核心逻辑分析
下面是基于核心源码进行分析和说明。
1. 执行CLUSTER NODES命令,获取所有节点的信息,包括每个节点对应的redis节点的slot 信息。 下面是参考的结果:
d5060aff8ab03d3555b424a8d74d60bf7f46238c 127.0.0.1:8123@18123 myself,master - 0 1707036730000 3 connected 10923-16383
106e63c4f2cc529ee11b9d8b50abbc07b4f7b41d 127.0.0.1:6123@16123 master - 0 1707036732249 1 connected 0-5460
dc6cec6b9f566d7ca6ea9a8c704792ddb0c712e3 127.0.0.1:7123@17123 master - 0 1707036731231 2 connected 5461-10922
第一行最后的10923-16383, 说明的正是这个节点分配的slot范围。
2. 下面是核心的计算逻辑:
1. 计算reshard的迁移计划,每个节点要多少个slot迁移到新节点上
2. 循环执行每个SLOT的迁移: (对应函数clusterManagerMoveSlot)
2.1. 在目标节点执行 CLUSTER SETSLOT {slot} importing {源节点的名字} : 设置目标节点SLOT的状态importing
2.2. 在源节点执行 CLUSTER SETSLOT {slot} migrating {目标节点的名字} : 设置源节点SLOT的状态migrating
2.3. 迁移数据: 存在大量KEY的情况,会重复执行下面步骤:
2.3.1 在源节点执行CLUSTER GETKEYSINSLOT {slot} {pipeline} : 获取这个SLOT的最多{pineline}个 KEY
2.3.2 在源节点执行MIGRATE {host} {port} 0 {timeout} REPLACE AUTH {paasword} KEYS key [key ...]: 迁移数据
2.4. 在目标节点执行 CLUSTER SETSLOT {slot} node {目标节点的名字} : 设置目标节点已经拥有这个SLOT
2.5. 在源节点执行 CLUSTER SETSLOT {slot} node {源节点的名字} : 设置源节点不拥有这个SLOT
2.6. 对其他节点执行 CLUSTER SETSLOT {slot} node {目标节点的名字} 通知其他节点
3. 迁移中的slot状态说明。
slot有importing和migrating 两种特殊状态, 代表这个slot正在迁移中
A是源节点,B是迁移的目标节点,执行下面命令更新slot状态
We send B: CLUSTER SETSLOT 8 IMPORTING A
We send A: CLUSTER SETSLOT 8 MIGRATING B
这时slot 8的key依然发向节点A。如果A存在目标KEY,则直接返回。 如果KEY不存在,这时会返回转发到B上处理。
参考下面例子: 一个请求被转发了两次
127.0.0.1:6123> get 1
-> Redirected to slot [9842] located at 127.0.0.1:9123
-> Redirected to slot [9842] located at 127.0.0.1:7123
(nil)
但让服务端处理IMPORTING的slot的请求, 需要客户端执行asking命令,打开对应的开关才行。 但这个行为一般客户端(比如JEDIS)都会自动执行 ,用户不需要额外关心。
总结
Redis Cluster提供的reshard功能,结合实际的使用需求,可以在线的扩缩容redis cluster集群的容量,不需要担心期间服务的不可用。
但需要注意的是, 由于迁移期间,为了不太多影响业务的正常执行, 每次只会迁移一个slot,这必然会导致最终迁移时间边长。 再者,迁移执行的MIGRATE命令,本质是批量的dump和restore,以及可选的del命令组合。必然还是会对迁移的相关节点有一定的性能影响。 建议如果选择扩缩容,也考虑在业务压力较低的时段再进行。