背景
基于redis sentinel和cluster的高可用能力,和redis lettuce 驱动的高可用连接,实现java 服务对redis 异常的自恢复能力并测试
对lettuce的初步了解可以看文章《初探 Redis 客户端 Lettuce》
本次是基于lettuce 5.3.7.RELEASE 版本进行实现,其他版本可能存在实现差异
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.3.7.RELEASE</version>
</dependency>
目的
使用的lettuce Redis client实现高可用链接,保证lettuce Redis client配合Redis Server(sentinel/cluster)高可用能力,具备故障恢复能力,并得出故障恢复时长。
测试场景
准备redis cluster 和sentinel 环境,可以基于redis 4.0 进行部署,cluster最小部署需要三主三从,sentinel可以三个哨兵节点+1主1从节点
高可用测试场景:
1:Redis Cluster的主动切换主,加片,减片,主动kill 主,的故障恢复时间
2:Redis Sentinel的主动切换主,主动kill 主的故障恢复时间
高可用连接实现:
sentinel
核心部分在
SENTINEL分支,需要构建 StatefulRedisMasterReplicaConnection 链接
/**
* 客户端
*/
protected RedisClient singleClient;
/**
* 同步操作
*/
protected RedisCommands sync;
/**
* 异步,不自动提交连接
*/
protected RedisAsyncCommands<String, String> pipleLine;
private String type = SINGLE;
/**
* 生成哨兵的URI
*
* @param nodes
* @param password
* @param timeOut
* @param masterId
* @return
*/
protected RedisURI getSentinelURI(String nodes, String password, Integer timeOut, String masterId) {
if (StringUtils.isEmpty(nodes)) {
logger.error("lettuce nodes config is empty.");
return null;
}
if (timeOut == null || timeOut < 500) {
timeOut = 2000;
}
Duration timeout = Duration.ofMillis(timeOut.longValue());
String[] ipPortArr = nodes.split(",", -1);
RedisURI.Builder builder = null;
for (String host : ipPortArr) {
if (StringUtils.isEmpty(host)) {
logger.error("lettuce config is wrong:" + host);
return null;
}
final String[] ipPort = host.trim().split(":");
//2个以上
if (ipPort.length >= 2) {
if (builder == null) {
builder = RedisURI.Builder.sentinel(ipPort[0], Integer.parseInt(ipPort[1]), masterId, password);
} else {
builder.withSentinel(ipPort[0], Integer.parseInt(ipPort[1]), password);
}
//sentinel的情况,不需要build 出uri
continue;
}
}
builder.withTimeout(timeout);
return builder.build();
}
protected void initSingle(String nodes, String password, Integer redisTimeout, String type, String masterId) {
if (StringUtils.isBlank(nodes)) {
logger.error("configMap is blank!");
return;
}
if (redisTimeout == null || redisTimeout < 500) {
redisTimeout = 2000;
}
if (init) {
logger.info("redis config changged by " + nodes);
}
//lettuce 链接可以复用,如果不是做事务操作,可以都用一个连接客户端
//以下取了2个连接
//同步,异步,不自动提交,pipleLine
//https://github.com/lettuce-io/lettuce-core/wiki/Master-Replica
RedisURI redisURI;
if (SENTINEL.equals(type)) {
redisURI = getSentinelURI(nodes, password, redisTimeout, masterId);
//同步链接
StatefulRedisMasterReplicaConnection<String, String> masterReplicaConnect = MasterReplica.connect(RedisClient.create(), StringCodec.UTF8, redisURI);
masterReplicaConnect.setReadFrom(ReadFrom.MASTER_PREFERRED);
sync = masterReplicaConnect.sync();
//异步的链接
StatefulRedisMasterReplicaConnection<String, String> masterReplicaAsyncConnect = MasterReplica.connect(RedisClient.create(), StringCodec.UTF8, redisURI);
masterReplicaAsyncConnect.setReadFrom(ReadFrom.MASTER_PREFERRED);
pipleLine = masterReplicaAsyncConnect.async();
//手工提交
pipleLine.setAutoFlushCommands(false);
//用以上方式链接,就不需要使用以下 client方式,满提供一下
singleClient = (RedisClient) initClient(nodes, password, redisTimeout, type, masterId);
}
init = true;
RedisClient asyncClient = RedisClient.create();
}
cluster
protected void initCluser(String nodes, String password,Integer redisTimeout)
{
if(StringUtils.isBlank(nodes))
{
logger.error("configMap is blank!");
return ;
}
if(init)
{
logger.info("redis config changged by " + nodes);
}
//lettuce 链接可以复用,如果不是做事务操作,可以都用一个连接
//客户端
clusterClient = (RedisClusterClient)initClient(nodes,password,redisTimeout,type,null);
//以下取了2个连接
//同步
sync = clusterClient.connect().sync();
//异步,不自动提交,pipleLine
//异步的一定要和同步链接区分,因为 tAutoFlushCommands 是共享的
pipleLine = clusterClient.connect().async();
pipleLine.setAutoFlushCommands(false);
init = true;
}
/**
* 初始化连接
* 如果有密码,
* 1:如果都一样,那么直接一个统一参数放入
* 2:如果每个节点都不一样,那么放在端口后面,:隔开
* redisNodes = 10.123.6.100:7000,10.123.6.100:7001,10.123.6.100:7002,10.123.6.100:7003,10.123.6.100:7004,10.123.6.100:7005
* timeOut = 1500
* maxRedirects = 3
* 哨兵
* 10.212.22.82:26379,10.212.22.83:26379,10.212.22.87:26379
* masterId:要获取的redis的master
*/
protected AbstractRedisClient initClient(String nodes, String password, Integer timeOut, String type, String masterId) {
if (StringUtils.isEmpty(nodes)) {
logger.error("lettuce nodes config is empty.");
return null;
}
String[] ipPortArr = nodes.split(",", -1);
List<RedisURI> redisURIs = new ArrayList<>();
RedisURI.Builder builder = null;
if (timeOut == null || timeOut < 500) {
timeOut = 2000;
}
Duration timeout = Duration.ofMillis(timeOut.longValue());
for (String host : ipPortArr) {
if (StringUtils.isEmpty(host)) {
logger.error("lettuce config is wrong:" + host);
return null;
}
final String[] ipPort = host.trim().split(":");
//2个以上
if (ipPort.length >= 2) {
if (SENTINEL.equals(type)) {
if (builder == null) {
builder = RedisURI.Builder.sentinel(ipPort[0], Integer.parseInt(ipPort[1]), masterId, password);
} else {
builder.withSentinel(ipPort[0], Integer.parseInt(ipPort[1]), password);
}
//sentinel的情况,不需要build 出uri
continue;
} else {
builder = RedisURI.Builder.redis(ipPort[0], Integer.parseInt(ipPort[1]));
}
}
if (StringUtils.isNotBlank(password)) {
builder.withPassword(password);
}
builder.withTimeout(timeout);
redisURIs.add(builder.build());
}
//哨兵的情况
if (redisURIs.size() == 0) {
redisURIs.add(builder.build());
}
AbstractRedisClient client = null;
//集群
if (redisURIs.size() > 1) {
client = getClusterClient(redisURIs);
}
//单节点也有代理的cluster,比如腾讯云的集群版本,还有哨兵
else {
if (CLUSTER.equals(type)) {
client = getClusterClient(redisURIs);
} else {
//哨兵也可以用这个方式获取master
client = RedisClient.create(redisURIs.get(0));
}
}
client.setDefaultTimeout(timeout);
return client;
}
private RedisClusterClient getClusterClient(List<RedisURI> redisURIs) {
//cluster 节点变动自动刷新拓扑 https://blog.csdn.net/u010046887/article/details/106948341
//开启 自适应集群拓扑刷新和周期拓扑刷新
ClusterTopologyRefreshOptions clusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
// 开启全部自适应刷新
.enableAllAdaptiveRefreshTriggers() // 开启自适应刷新,自适应刷新不开启,Redis集群变更时将会导致连接异常
// 自适应刷新超时时间(默认30秒) 适配cluster 异常恢复时间 15秒
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(30)) //默认关闭开启后时间为30秒
// 开周期刷新,因为server的 主kill 后,需要最块15秒恢复,这里采样刷新频率是 信号的2倍即可,<7.5秒一次
.enablePeriodicRefresh(Duration.ofSeconds(7)) // 默认关闭开启后时间为60秒 ClusterTopologyRefreshOptions.DEFAULT_REFRESH_PERIOD 60 .enablePeriodicRefresh(Duration.ofSeconds(2)) = .enablePeriodicRefresh().refreshPeriod(Duration.ofSeconds(2))
.build();
ClusterClientOptions clientOptions = ClusterClientOptions.builder()
.topologyRefreshOptions(clusterTopologyRefreshOptions)
// .maxRedirects(5) //默认就是5次
.build();
RedisClusterClient client = RedisClusterClient.create(redisURIs);
client.setOptions(clientOptions);
return client;
}
核心部分在 getClusterClient 方法里,需要 使用 ClusterTopologyRefreshOptions 配置自适应刷新cluster的拓扑信息,构建 RedisClusterClient。
测试代码
1:客户端初始化
600ms 超时,如果有密码,自行配置上
private String sentinelNodes = "ip1:port1,ip2:port2,ip3:port3";
private String sentinelMaster = "masterName";
private String clusterNodes = "ip1:port1,ip2:port2,ip3:port3,ip4:port4,ip5:port5,ip6:port6";
@Bean
public RedisCluster initCluserClient() {
return new RedisCluster(clusterNodes,600); }
/**
* 初始化一个Sentinel 的master client * @return
*/
@Bean
public RedisSentinel initSentinelClient() {
return new RedisSentinel(sentinelNodes,sentinelMaster,600); }
2:测试逻辑
开启2个线程,分别使用cluster和sentinel无限循环while(true)执行读写动作,其中cluster的key是随机生成,其中taskScheduler 是spring 的定时任务线程池。
@PostConstruct
public void initService() {
LOGGER.info("initService hello"); //10秒触发一次
taskScheduler.schedule(new Runnable() {
@Override
public void run() {
ApiAppBaseInfoVo baseInfoVo = new ApiAppBaseInfoVo(); baseInfoVo.setAppCode("testCode"); baseInfoVo.setName("testName"); baseInfoVo.setAppKey("key"+RandomUtils.nextInt()); baseInfoVo.setDescription("testAAA");
while(true) {
try {
long start = System.currentTimeMillis();
String Key = (RandomUtils.nextInt() % 100) + "testCluster";
String clusterRe = redisCluster.set(Key,baseInfoVo,10L);
LOGGER.info("while redisCluser set :"+clusterRe + " use:"+ (System.currentTimeMillis()-start)+" ms"); ApiAppBaseInfoVo clusterVAlue = redisCluster.get(Key,ApiAppBaseInfoVo.class);
start = System.currentTimeMillis();
LOGGER.info("while redisCluser get use:"+ (System.currentTimeMillis()-start)+" ms, value:" + clusterVAlue);
}catch (Exception e) {
LOGGER.error("redisCluser test error:"+e.getMessage()); }
} }
},new Date(System.currentTimeMillis() + 10000));
//10秒触发一次
taskScheduler.schedule(new Runnable() {
@Override
public void run() {
while(true) {
try {
long start = System.currentTimeMillis();
String clusterRe = redisSentinel.getCMD().set("testSentinel", RandomUtils.nextInt()+"-hello",
SetArgs.Builder.ex(10));
LOGGER.info("while redisSentinel set :"+clusterRe + " use:"+ (System.currentTimeMillis()-start)+" ms");
start = System.currentTimeMillis();
String sentinelVAlue = redisSentinel.get("testSentinel");
LOGGER.info("while redisSentinel get use:"+ (System.currentTimeMillis()-start)+" ms, value:" + sentinelVAlue);
}catch (Exception e) {
LOGGER.error("redisSentinel test error:" + e.getMessage()); }
}
}
},new Date(System.currentTimeMillis() + 10000));
}
正常运行,截取1秒内日志简单分析,哨兵是 1302 QPS,集群是 1372 QPS
模拟redis异常
一:redis cluster
1:切换主从
业务运行无异常,QPS正常
2:加片减片
加片/减片操作 需要人工分钟级操作,但是业务端没有发现错误日志,都正常读写
3:突发异常场景
把redis cluster 直接kill -9其中一个主节点,(15~25秒恢复)
这里依赖redis cluster serve的主从异常超时设置,这里设置的15秒,说明server端最快15秒发现主异常,进行切换。client端最快就是15秒自动恢复
二:redis sentinel
1:主动维护场景
人工操作切换主从:从日志出现异常到恢复,8秒多
2:突发异常场景
kill -9 Redis 主节点,需要25秒恢复
这里依赖redis sentinel的主从异常超时设置,这里设置的20秒,说明server端最快20秒发现主异常,进行切换。client端最快就是20秒自动 恢复
小结
redis cluster的高可用自恢复能力更好
redis cluster(超时配置15秒) | redis sentinel(超时配置20秒) | |
人工维护:主从切换 | 无异常 | 8秒+ |
人工维护:加片减片 | 无异常 | 不涉及 |
突发异常:kill -9 主 |
15~25秒(取决于redis cluster 超时配置和client刷新拓扑间隔时间配置) | 20~25秒(取决于redis sentinel 超时配置和client 刷新间隔时间配置) |