摘要:
在paxos和raft一致性协议中, 选举是分多轮次进行。但是在具体项目实践中,需要谨慎处理各种细节。
本文记录redis的设计和实现
核心代码:
/* This function is called if we are a slave node and our master serving
* a non-zero amount of hash slots is in FAIL state.
*
* The gaol of this function is:
* 1) To check if we are able to perform a failover, is our data updated?
* 2) Try to get elected by masters.
* 3) Perform the failover informing all the other nodes.
*/
void clusterHandleSlaveFailover(void) {
mstime_t data_age;
mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
int needed_quorum = (server.cluster->size / 2) + 1;
int manual_failover = server.cluster->mf_end != 0 &&
server.cluster->mf_can_start;
mstime_t auth_timeout, auth_retry_time;
server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
/* Compute the failover timeout (the max time we have to send votes
* and wait for replies), and the failover retry time (the time to wait
* before trying to get voted again).
*
* Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
* Retry is two times the Timeout.
*/
auth_timeout = server.cluster_node_timeout*2;
if (auth_timeout < 2000) auth_timeout = 2000;
auth_retry_time = auth_timeout*2;
/* Pre conditions to run the function, that must be met both in case
* of an automatic or manual failover:
* 1) We are a slave.
* 2) Our master is flagged as FAIL, or this is a manual failover.
* 3) We don't have the no failover configuration set, and this is
* not a manual failover.
* 4) It is serving slots. */
if (nodeIsMaster(myself) ||
myself->slaveof == NULL ||
(!nodeFailed(myself->slaveof) && !manual_failover) ||
(server.cluster_slave_no_failover && !manual_failover) ||
myself->slaveof->numslots == 0)
{
/* There are no reasons to failover, so we set the reason why we
* are returning without failing over to NONE. */
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
return;
}
/* Set data_age to the number of seconds we are disconnected from
* the master. */
if (server.repl_state == REPL_STATE_CONNECTED) {
data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
* 1000;
} else {
data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
}
/* Remove the node timeout from the data age as it is fine that we are
* disconnected from our master at least for the time it was down to be
* flagged as FAIL, that's the baseline. */
if (data_age > server.cluster_node_timeout)
data_age -= server.cluster_node_timeout;
/* Check if our data is recent enough according to the slave validity
* factor configured by the user.
*
* Check bypassed for manual failovers. */
if (server.cluster_slave_validity_factor &&
data_age >
(((mstime_t)server.repl_ping_slave_period * 1000) +
(server.cluster_node_timeout * server.cluster_slave_validity_factor)))
{
if (!manual_failover) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
return;
}
}
/* If the previous failover attempt timedout and the retry time has
* elapsed, we can setup a new one. */
if (auth_age > auth_retry_time) {
server.cluster->failover_auth_time = mstime() +
500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
random() % 500; /* Random delay between 0 and 500 milliseconds. */
server.cluster->failover_auth_count = 0;
server.cluster->failover_auth_sent = 0;
server.cluster->failover_auth_rank = clusterGetSlaveRank();
/* We add another delay that is proportional to the slave rank.
* Specifically 1 second * rank. This way slaves that have a probably
* less updated replication offset, are penalized. */
server.cluster->failover_auth_time +=
server.cluster->failover_auth_rank * 1000;
/* However if this is a manual failover, no delay is needed. */
if (server.cluster->mf_end) {
server.cluster->failover_auth_time = mstime();
server.cluster->failover_auth_rank = 0;
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
serverLog(LL_WARNING,
"Start of election delayed for %lld milliseconds "
"(rank #%d, offset %lld).",
server.cluster->failover_auth_time - mstime(),
server.cluster->failover_auth_rank,
replicationGetSlaveOffset());
/* Now that we have a scheduled election, broadcast our offset
* to all the other slaves so that they'll updated their offsets
* if our offset is better. */
clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
return;
}
/* It is possible that we received more updated offsets from other
* slaves for the same master since we computed our election delay.
* Update the delay if our rank changed.
*
* Not performed if this is a manual failover. */
if (server.cluster->failover_auth_sent == 0 &&
server.cluster->mf_end == 0)
{
int newrank = clusterGetSlaveRank();
if (newrank > server.cluster->failover_auth_rank) {
long long added_delay =
(newrank - server.cluster->failover_auth_rank) * 1000;
server.cluster->failover_auth_time += added_delay;
server.cluster->failover_auth_rank = newrank;
serverLog(LL_WARNING,
"Replica rank updated to #%d, added %lld milliseconds of delay.",
newrank, added_delay);
}
}
/* Return ASAP if we can't still start the election. */
if (mstime() < server.cluster->failover_auth_time) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
return;
}
/* Return ASAP if the election is too old to be valid. */
if (auth_age > auth_timeout) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
return;
}
/* Ask for votes if needed. */
if (server.cluster->failover_auth_sent == 0) {
server.cluster->currentEpoch++;
server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
(unsigned long long) server.cluster->currentEpoch);
clusterRequestFailoverAuth();
server.cluster->failover_auth_sent = 1;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
return; /* Wait for replies. */
}
/* Check if we reached the quorum. */
if (server.cluster->failover_auth_count >= needed_quorum) {
/* We have the quorum, we can finally failover the master. */
serverLog(LL_WARNING,
"Failover election won: I'm the new master.");
/* Update my configEpoch to the epoch of the election. */
if (myself->configEpoch < server.cluster->failover_auth_epoch) {
myself->configEpoch = server.cluster->failover_auth_epoch;
serverLog(LL_WARNING,
"configEpoch set to %llu after successful failover",
(unsigned long long) myself->configEpoch);
}
/* Take responsibility for the cluster slots. */
clusterFailoverReplaceYourMaster();
} else {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
}
}
/* This function returns the "rank" of this instance, a slave, in the context
* of its master-slaves ring. The rank of the slave is given by the number of
* other slaves for the same master that have a better replication offset
* compared to the local one (better means, greater, so they claim more data).
*
* A slave with rank 0 is the one with the greatest (most up to date)
* replication offset, and so forth. Note that because how the rank is computed
* multiple slaves may have the same rank, in case they have the same offset.
*
* The slave rank is used to add a delay to start an election in order to
* get voted and replace a failing master. Slaves with better replication
* offsets are more likely to win. */
int clusterGetSlaveRank(void) {
long long myoffset;
int j, rank = 0;
clusterNode *master;
serverAssert(nodeIsSlave(myself));
master = myself->slaveof;
if (master == NULL) return 0; /* Never called by slaves without master. */
myoffset = replicationGetSlaveOffset();
for (j = 0; j < master->numslaves; j++)
if (master->slaves[j] != myself &&
!nodeCantFailover(master->slaves[j]) &&
master->slaves[j]->repl_offset > myoffset) rank++;
return rank;
}
/* When a slave is turned into a master, the current replication ID
* (that was inherited from the master at synchronization time) is
* used as secondary ID up to the current offset, and a new replication
* ID is created to continue with a new replication history. */
shiftReplicationId();
/* Use the current replication ID / offset as secondary replication
* ID, and change the current one in order to start a new history.
* This should be used when an instance is switched from slave to master
* so that it can serve PSYNC requests performed using the master
* replication ID. */
void shiftReplicationId(void) {
memcpy(server.replid2,server.replid,sizeof(server.replid));
/* We set the second replid offset to the master offset + 1, since
* the slave will ask for the first byte it has not yet received, so
* we need to add one to the offset: for example if, as a slave, we are
* sure we have the same history as the master for 50 bytes, after we
* are turned into a master, we can accept a PSYNC request with offset
* 51, since the slave asking has the same history up to the 50th
* byte, and is asking for the new bytes starting at offset 51. */
server.second_replid_offset = server.master_repl_offset+1;
changeReplicationId();
serverLog(LL_WARNING,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, server.replid);
}
分析:
关键代码在这几行
server.cluster->failover_auth_rank = clusterGetSlaveRank();
/* We add another delay that is proportional to the slave rank.
* Specifically 1 second * rank. This way slaves that have a probably
* less updated replication offset, are penalized. */
server.cluster->failover_auth_time +=
server.cluster->failover_auth_rank * 1000;
int newrank = clusterGetSlaveRank();
if (newrank > server.cluster->failover_auth_rank) {
long long added_delay =
(newrank - server.cluster->failover_auth_rank) * 1000;
server.cluster->failover_auth_time += added_delay;
server.cluster->failover_auth_rank = newrank;
/* Return ASAP if we can't still start the election. */
if (mstime() < server.cluster->failover_auth_time) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
return;
}
rank是根据本slave对应的master的所有slave,数据的offset来进行的.rank越小数据越新。
那么不难发现,redis对于rank越大的slave, 越推迟发起投票。
同时, 从节点在升级主节点时, 将旧主节点的给从节点的复制偏移赋值给自己。这样当从节点升主后, 可以继续沿用主节点的主从复制偏移,继续增量复制。