searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

一种基于Redis的分布式锁实现方案

2023-05-25 03:22:03
231
0

## 背景

 

在分布式系统中,为了保证各个节点的数据的一致性和避免资源的竞争,我们需要使用分布式锁来保证在某个时间点上只有一个进程能够对某个资源进行访问或操作。在这篇博客中,我们将要探讨一种使用Redis实现分布式锁的技术。

 

## 原理

 

在单机环境下实现锁非常的容易,但是在分布式系统下实现就比较复杂了。其中的核心问题在于实现一个在分布式系统下可靠、高效、正确的锁机制。然而,Redis正是由于其高效、可靠、分布式特性成为了使用最广泛的分布式锁实现。

 

在Redis中,我们可以使用setnx命令来实现分布式锁。我们可以将锁的状态保存在Redis中的一个字符串类型的键中。如果Redis中不存在这个键(locked_key),那么我们就可以在这个键上加锁,如果存在这个键就说明锁已经被其他进程获取了,我们就需要等待锁的释放。

 

在加锁的过程中,我们需要给这个键设置一个存活时间,以保证如果获取锁的进程发生了故障或者意外终止的时候能够自动释放锁。我们可以使用expire命令来给这个键设置一个存活时间。

 

在释放锁的过程中,我们可以使用del命令来删除这个键,并且在其他进程获取到这个键的时候判断这个键是否存在来判断锁是否有效。

 

## 方案实现

redis配置信息:

spring:
redis:
hostAndPorts:
password:
pool:
max-active: 20 # 连接池最大活跃连接数(负值表示没有限制)
max-idle: 20 # 连接池最大空闲连接数
min-idle: 10 # 连接池最小空闲连接数


redis配置类
@Configuration
//@ConditionalOnProperty(name = "redis.enabled", havingValue = "true")
@ConditionalOnProperty(name = "spring.redis.hostAndPorts")
public class RedisConfig {



@Value("${spring.redis.hostAndPorts}")
private String hostAndPorts;

@Value("${spring.redis.password:ctgCdt@135}")
private String password;

@Value("${spring.redis.timeout:300000}")
private int timeout;

@Value("${spring.redis.pool.max-idle:30}")
private int maxIdle;

@Value("${spring.redis.pool.min-idle:0}")
private int minIdle;

@Value("${spring.redis.pool.max-wait:60000}")
private long maxWaitMillis;

@Value("${spring.redis.monitor:false}")
private Boolean isMonitor;

@Value("${spring.redis.db:1}")
private int db;

@Value("${spring.redis.salt:ctgCdt@9102}")
private String salt;

@Bean
public CtgJedisPool redisPoolFactory() throws Exception {
List<HostAndPort> hostAndPortList = new ArrayList<>();
String[] hosts = hostAndPorts.split(",");
for (String host : hosts) {
String[] split = host.split(":");
HostAndPort hp = new HostAndPort(split[0], Integer.parseInt(split[1]));
hostAndPortList.add(hp);
}

GenericObjectPoolConfig poolConfig = new JedisPoolConfig(); //线程池配置
poolConfig.setMaxTotal(maxIdle); // 最大连接数(空闲+使用中)
poolConfig.setMaxIdle(maxIdle); //最大空闲连接数
poolConfig.setMinIdle(minIdle <= 0 ? Math.max(1, maxIdle/10): minIdle); //保持的最小空闲连接数
poolConfig.setMaxWaitMillis(maxWaitMillis); //借出连接时最大的等待时间
// 设置获取连接时不进行连接验证(通过 PoolableObjectFactory.validateObject() 验证连接是否有效)
poolConfig.setTestOnBorrow(true);
// 设置退还连接时不进行连接验证(通过 PoolableObjectFactory.validateObject() 验证连接是否有效)
poolConfig.setTestOnReturn(false);
// 设置连接空闲时进行连接验证
// poolConfig.setTestWhileIdle(true);
// 设置连接被回收前的最大空闲时间
// poolConfig.setMinEvictableIdleTimeMillis(10 * 60000);
// 设置检测线程的运行时间间隔
// poolConfig.setTimeBetweenEvictionRunsMillis(60000);

CtgJedisPoolConfig config = new CtgJedisPoolConfig(hostAndPortList);
config.setDatabase(db) //分组对应的桶位
.setPassword(password) // “用户#密码”
.setPoolConfig(poolConfig) //线程池配置
.setPeriod(3000) //后台监控执行周期,毫秒
.setMonitorTimeout(1000) //后台监控ping命令超时时间,毫秒
.setSoTimeout(10000)//毫秒
.setMonitorLog(false)
.setMonitorSwitch(isMonitor);

CtgJedisPool pool = new CtgJedisPool(config); //创建连接池
return pool;
}
}



@Configuration
@ConditionalOnProperty(name = {"spring.redis.hostAndPorts"})
public class RedisTemplateConf {

@Autowired
private CtgJedisPool ctgJedisPool;

@Value("${redis.redistemplate.retry-times:0}")
private int retryTimes;

private static Integer dbIndex = null;

private static final Logger logger = LoggerFactory.getLogger(RedisTemplateConf.class);

private static RedisTemplate<String,String> singleRedisTemplate = null;


public static RedisTemplate<String, String> getRedisTemplate(){
return singleRedisTemplate;
}

@Bean
@Primary
public RedisTemplate<String, String> redisTemplate(){

RedisTemplate<String, String> template = new RedisTemplate<>();
RedisConnectionFactory redisConnectionFactory = this.getRedisConnectionFactory();
template.setDefaultSerializer(new StringRedisSerializer());
template.setConnectionFactory(redisConnectionFactory);
template.afterPropertiesSet();
singleRedisTemplate = template;
return template;

}


public RedisConnectionFactory getRedisConnectionFactory() {

return new RedisConnectionFactory(){

@Override
public DataAccessException translateExceptionIfPossible(RuntimeException e) {
throw new RuntimeException("不支持translateExceptionIfPossible");
}

@Override
public RedisConnection getConnection() {

final ProxyJedis jedis;
try {
jedis = ctgJedisPool.getResource();
}catch(Exception e){
throw new RuntimeException(e);
}

if(jedis == null){
throw new RuntimeException("无法从连接池获取jedis连接");
}

if(dbIndex == null){
dbIndex = jedis.getDB().intValue();
}

if(retryTimes <= 0) {
return new JedisConnection(jedis, null, dbIndex) {
@Override
public void close() throws DataAccessException {
try {
jedis.close();
} catch (Exception e) {
logger.error("redis连接归还失败", e);
}
}

};
}else{
return new RetryCtgcacheConnection(jedis, dbIndex, retryTimes);
}
}

@Override
public RedisClusterConnection getClusterConnection() {
throw new RuntimeException("不支持ClusterConnection");
}

/**
* ctgcache不支持pipeline,所以不存在批量结果转换的情况
* @return
*/
@Override
public boolean getConvertPipelineAndTxResults() {
return false;
}

@Override
public RedisSentinelConnection getSentinelConnection() {
throw new RuntimeException("不支持SentinelConnection");
}
};

}

}


锁实现
@Service
public class DistributeLockServiceImpl implements IDistributeLockService {
private static final Logger logger = LoggerFactory.getLogger(DistributeLockServiceImpl.class);
private static final String LOCK_VALUE = "1";

private static String OK_CODE = "OK";

@Override
public Boolean tryLock(String lockKey, int expireSecond, int timeOutSecond, long sleepTime) throws Exception {
return new RedisCallable<Boolean>() {
@Override
public Boolean exception() throws Exception {
return false;
}

@Override
public Boolean action(ProxyJedis jedis) throws Exception {
final long startTime = System.currentTimeMillis();
boolean lock = false;
while (true) {
String result = jedis.setpxnx(lockKey, LOCK_VALUE, expireSecond * 1000);
if (OK_CODE.equals(result)) {
lock = true;
break;
}
if (System.currentTimeMillis() - startTime > timeOutSecond * 1000) {
lock = false;
break;
}
TimeUnit.MILLISECONDS.sleep(sleepTime);
}
return lock;
}
}.syncCall();
}

@Override
public Boolean tryLock(String lockKey, int expireSecond, int timeOutSecond) throws Exception {
return tryLock(lockKey, expireSecond, timeOutSecond, 300L);
}

@Override
public DistributeLockResponseEnum tryAtomicLock(String lockKey, int expireSecond, int timeOutSecond) {
try {
return new RedisCallable<DistributeLockResponseEnum>() {
@Override
public DistributeLockResponseEnum exception() throws Exception {
return DistributeLockResponseEnum.EXCEPTION;
}

@Override
public DistributeLockResponseEnum action(ProxyJedis jedis) throws Exception {
final long startTime = System.currentTimeMillis();
while (true) {
String result = jedis.setpxnx(lockKey, LOCK_VALUE, expireSecond * 1000);
if (OK_CODE.equals(result)) {
return DistributeLockResponseEnum.OK;
}
if (System.currentTimeMillis() - startTime > timeOutSecond * 1000) {
return DistributeLockResponseEnum.FAILED;
}
TimeUnit.MILLISECONDS.sleep(300);
}
}
}.syncCall();
} catch (Exception e) {
logger.error("try atomic lock 失败", e);
return DistributeLockResponseEnum.EXCEPTION;
}
}


@Override
public DistributeLockResponseEnum releaseLock(boolean isMyLock, String lockKey) {
try {
return new RedisCallable<DistributeLockResponseEnum>() {
@Override
public DistributeLockResponseEnum exception() throws Exception {
return DistributeLockResponseEnum.EXCEPTION;
}

@Override
public DistributeLockResponseEnum action(ProxyJedis jedis) throws Exception {
if (isMyLock) {
Long result = jedis.del(lockKey);
return result > 0 ? DistributeLockResponseEnum.OK : DistributeLockResponseEnum.FAILED;
}
return DistributeLockResponseEnum.FAILED;
}
}.syncCall();
} catch (Exception e) {
logger.error("释放锁失败", e);
return DistributeLockResponseEnum.EXCEPTION;
}
}

@Override
public DistributeLockResponseEnum renewLock(boolean isMyLock, String lockKey, int expireSecond) {
try {
return new RedisCallable<DistributeLockResponseEnum>() {
@Override
public DistributeLockResponseEnum exception() throws Exception {
return DistributeLockResponseEnum.EXCEPTION;
}

@Override
public DistributeLockResponseEnum action(ProxyJedis jedis) throws Exception {
if (isMyLock) {
jedis.setpxnx(lockKey, LOCK_VALUE, expireSecond * 1000);
}
return DistributeLockResponseEnum.FAILED;
}
}.syncCall();
} catch (Exception e) {
logger.error("续期锁失败", e);
return DistributeLockResponseEnum.EXCEPTION;
}
}

}


在这个实现中,我们首先使用setpxnx方法来尝试获取锁,如果返回false,说明锁已经被其他进程获取,我们就等再次尝试获取锁。如果返回true,说明获取锁成功,同时设置超时时间,超时仍无法获取锁则返回获取锁失败。

 

如果获取锁的线程在持有锁期间出现了异常或故障,那么在超过锁的过期时间后,Redis会自动释放锁,其他线程也就可以获取到这个锁了。

 

## 结论

 

使用Redis实现分布式锁的优点包括:

 

- 高效:Redis是内存存储,速度非常快,非常适合于高并发情况下的分布式锁实现。

- 可靠:Redis保证了数据的持久性,即使Redis出现了故障,也不会丢失加锁的信息。

- 支持分布式:Redis天生就是分布式的,可以非常容易地支持分布式锁的应用场景。

 

当然,Redis也有其不足之处。其中最大的一个问题是锁的过期时间不能太长,因为如果加锁的进程在加锁期间一直未释放锁,那么其他进程就一直无法获得这个锁。为了解决这个问题,我们可以对加锁和重试的时间进行合理的设置,并且在Redis中加入更多的监控和报警机制。

 

总的来说,对于大多数的情况下,使用Redis作为分布式锁的实现是非常可行和理想的方案。







0条评论
作者已关闭评论
吴****彬
2文章数
0粉丝数
吴****彬
2 文章 | 0 粉丝
吴****彬
2文章数
0粉丝数
吴****彬
2 文章 | 0 粉丝
原创

一种基于Redis的分布式锁实现方案

2023-05-25 03:22:03
231
0

## 背景

 

在分布式系统中,为了保证各个节点的数据的一致性和避免资源的竞争,我们需要使用分布式锁来保证在某个时间点上只有一个进程能够对某个资源进行访问或操作。在这篇博客中,我们将要探讨一种使用Redis实现分布式锁的技术。

 

## 原理

 

在单机环境下实现锁非常的容易,但是在分布式系统下实现就比较复杂了。其中的核心问题在于实现一个在分布式系统下可靠、高效、正确的锁机制。然而,Redis正是由于其高效、可靠、分布式特性成为了使用最广泛的分布式锁实现。

 

在Redis中,我们可以使用setnx命令来实现分布式锁。我们可以将锁的状态保存在Redis中的一个字符串类型的键中。如果Redis中不存在这个键(locked_key),那么我们就可以在这个键上加锁,如果存在这个键就说明锁已经被其他进程获取了,我们就需要等待锁的释放。

 

在加锁的过程中,我们需要给这个键设置一个存活时间,以保证如果获取锁的进程发生了故障或者意外终止的时候能够自动释放锁。我们可以使用expire命令来给这个键设置一个存活时间。

 

在释放锁的过程中,我们可以使用del命令来删除这个键,并且在其他进程获取到这个键的时候判断这个键是否存在来判断锁是否有效。

 

## 方案实现

redis配置信息:

spring:
redis:
hostAndPorts:
password:
pool:
max-active: 20 # 连接池最大活跃连接数(负值表示没有限制)
max-idle: 20 # 连接池最大空闲连接数
min-idle: 10 # 连接池最小空闲连接数


redis配置类
@Configuration
//@ConditionalOnProperty(name = "redis.enabled", havingValue = "true")
@ConditionalOnProperty(name = "spring.redis.hostAndPorts")
public class RedisConfig {



@Value("${spring.redis.hostAndPorts}")
private String hostAndPorts;

@Value("${spring.redis.password:ctgCdt@135}")
private String password;

@Value("${spring.redis.timeout:300000}")
private int timeout;

@Value("${spring.redis.pool.max-idle:30}")
private int maxIdle;

@Value("${spring.redis.pool.min-idle:0}")
private int minIdle;

@Value("${spring.redis.pool.max-wait:60000}")
private long maxWaitMillis;

@Value("${spring.redis.monitor:false}")
private Boolean isMonitor;

@Value("${spring.redis.db:1}")
private int db;

@Value("${spring.redis.salt:ctgCdt@9102}")
private String salt;

@Bean
public CtgJedisPool redisPoolFactory() throws Exception {
List<HostAndPort> hostAndPortList = new ArrayList<>();
String[] hosts = hostAndPorts.split(",");
for (String host : hosts) {
String[] split = host.split(":");
HostAndPort hp = new HostAndPort(split[0], Integer.parseInt(split[1]));
hostAndPortList.add(hp);
}

GenericObjectPoolConfig poolConfig = new JedisPoolConfig(); //线程池配置
poolConfig.setMaxTotal(maxIdle); // 最大连接数(空闲+使用中)
poolConfig.setMaxIdle(maxIdle); //最大空闲连接数
poolConfig.setMinIdle(minIdle <= 0 ? Math.max(1, maxIdle/10): minIdle); //保持的最小空闲连接数
poolConfig.setMaxWaitMillis(maxWaitMillis); //借出连接时最大的等待时间
// 设置获取连接时不进行连接验证(通过 PoolableObjectFactory.validateObject() 验证连接是否有效)
poolConfig.setTestOnBorrow(true);
// 设置退还连接时不进行连接验证(通过 PoolableObjectFactory.validateObject() 验证连接是否有效)
poolConfig.setTestOnReturn(false);
// 设置连接空闲时进行连接验证
// poolConfig.setTestWhileIdle(true);
// 设置连接被回收前的最大空闲时间
// poolConfig.setMinEvictableIdleTimeMillis(10 * 60000);
// 设置检测线程的运行时间间隔
// poolConfig.setTimeBetweenEvictionRunsMillis(60000);

CtgJedisPoolConfig config = new CtgJedisPoolConfig(hostAndPortList);
config.setDatabase(db) //分组对应的桶位
.setPassword(password) // “用户#密码”
.setPoolConfig(poolConfig) //线程池配置
.setPeriod(3000) //后台监控执行周期,毫秒
.setMonitorTimeout(1000) //后台监控ping命令超时时间,毫秒
.setSoTimeout(10000)//毫秒
.setMonitorLog(false)
.setMonitorSwitch(isMonitor);

CtgJedisPool pool = new CtgJedisPool(config); //创建连接池
return pool;
}
}



@Configuration
@ConditionalOnProperty(name = {"spring.redis.hostAndPorts"})
public class RedisTemplateConf {

@Autowired
private CtgJedisPool ctgJedisPool;

@Value("${redis.redistemplate.retry-times:0}")
private int retryTimes;

private static Integer dbIndex = null;

private static final Logger logger = LoggerFactory.getLogger(RedisTemplateConf.class);

private static RedisTemplate<String,String> singleRedisTemplate = null;


public static RedisTemplate<String, String> getRedisTemplate(){
return singleRedisTemplate;
}

@Bean
@Primary
public RedisTemplate<String, String> redisTemplate(){

RedisTemplate<String, String> template = new RedisTemplate<>();
RedisConnectionFactory redisConnectionFactory = this.getRedisConnectionFactory();
template.setDefaultSerializer(new StringRedisSerializer());
template.setConnectionFactory(redisConnectionFactory);
template.afterPropertiesSet();
singleRedisTemplate = template;
return template;

}


public RedisConnectionFactory getRedisConnectionFactory() {

return new RedisConnectionFactory(){

@Override
public DataAccessException translateExceptionIfPossible(RuntimeException e) {
throw new RuntimeException("不支持translateExceptionIfPossible");
}

@Override
public RedisConnection getConnection() {

final ProxyJedis jedis;
try {
jedis = ctgJedisPool.getResource();
}catch(Exception e){
throw new RuntimeException(e);
}

if(jedis == null){
throw new RuntimeException("无法从连接池获取jedis连接");
}

if(dbIndex == null){
dbIndex = jedis.getDB().intValue();
}

if(retryTimes <= 0) {
return new JedisConnection(jedis, null, dbIndex) {
@Override
public void close() throws DataAccessException {
try {
jedis.close();
} catch (Exception e) {
logger.error("redis连接归还失败", e);
}
}

};
}else{
return new RetryCtgcacheConnection(jedis, dbIndex, retryTimes);
}
}

@Override
public RedisClusterConnection getClusterConnection() {
throw new RuntimeException("不支持ClusterConnection");
}

/**
* ctgcache不支持pipeline,所以不存在批量结果转换的情况
* @return
*/
@Override
public boolean getConvertPipelineAndTxResults() {
return false;
}

@Override
public RedisSentinelConnection getSentinelConnection() {
throw new RuntimeException("不支持SentinelConnection");
}
};

}

}


锁实现
@Service
public class DistributeLockServiceImpl implements IDistributeLockService {
private static final Logger logger = LoggerFactory.getLogger(DistributeLockServiceImpl.class);
private static final String LOCK_VALUE = "1";

private static String OK_CODE = "OK";

@Override
public Boolean tryLock(String lockKey, int expireSecond, int timeOutSecond, long sleepTime) throws Exception {
return new RedisCallable<Boolean>() {
@Override
public Boolean exception() throws Exception {
return false;
}

@Override
public Boolean action(ProxyJedis jedis) throws Exception {
final long startTime = System.currentTimeMillis();
boolean lock = false;
while (true) {
String result = jedis.setpxnx(lockKey, LOCK_VALUE, expireSecond * 1000);
if (OK_CODE.equals(result)) {
lock = true;
break;
}
if (System.currentTimeMillis() - startTime > timeOutSecond * 1000) {
lock = false;
break;
}
TimeUnit.MILLISECONDS.sleep(sleepTime);
}
return lock;
}
}.syncCall();
}

@Override
public Boolean tryLock(String lockKey, int expireSecond, int timeOutSecond) throws Exception {
return tryLock(lockKey, expireSecond, timeOutSecond, 300L);
}

@Override
public DistributeLockResponseEnum tryAtomicLock(String lockKey, int expireSecond, int timeOutSecond) {
try {
return new RedisCallable<DistributeLockResponseEnum>() {
@Override
public DistributeLockResponseEnum exception() throws Exception {
return DistributeLockResponseEnum.EXCEPTION;
}

@Override
public DistributeLockResponseEnum action(ProxyJedis jedis) throws Exception {
final long startTime = System.currentTimeMillis();
while (true) {
String result = jedis.setpxnx(lockKey, LOCK_VALUE, expireSecond * 1000);
if (OK_CODE.equals(result)) {
return DistributeLockResponseEnum.OK;
}
if (System.currentTimeMillis() - startTime > timeOutSecond * 1000) {
return DistributeLockResponseEnum.FAILED;
}
TimeUnit.MILLISECONDS.sleep(300);
}
}
}.syncCall();
} catch (Exception e) {
logger.error("try atomic lock 失败", e);
return DistributeLockResponseEnum.EXCEPTION;
}
}


@Override
public DistributeLockResponseEnum releaseLock(boolean isMyLock, String lockKey) {
try {
return new RedisCallable<DistributeLockResponseEnum>() {
@Override
public DistributeLockResponseEnum exception() throws Exception {
return DistributeLockResponseEnum.EXCEPTION;
}

@Override
public DistributeLockResponseEnum action(ProxyJedis jedis) throws Exception {
if (isMyLock) {
Long result = jedis.del(lockKey);
return result > 0 ? DistributeLockResponseEnum.OK : DistributeLockResponseEnum.FAILED;
}
return DistributeLockResponseEnum.FAILED;
}
}.syncCall();
} catch (Exception e) {
logger.error("释放锁失败", e);
return DistributeLockResponseEnum.EXCEPTION;
}
}

@Override
public DistributeLockResponseEnum renewLock(boolean isMyLock, String lockKey, int expireSecond) {
try {
return new RedisCallable<DistributeLockResponseEnum>() {
@Override
public DistributeLockResponseEnum exception() throws Exception {
return DistributeLockResponseEnum.EXCEPTION;
}

@Override
public DistributeLockResponseEnum action(ProxyJedis jedis) throws Exception {
if (isMyLock) {
jedis.setpxnx(lockKey, LOCK_VALUE, expireSecond * 1000);
}
return DistributeLockResponseEnum.FAILED;
}
}.syncCall();
} catch (Exception e) {
logger.error("续期锁失败", e);
return DistributeLockResponseEnum.EXCEPTION;
}
}

}


在这个实现中,我们首先使用setpxnx方法来尝试获取锁,如果返回false,说明锁已经被其他进程获取,我们就等再次尝试获取锁。如果返回true,说明获取锁成功,同时设置超时时间,超时仍无法获取锁则返回获取锁失败。

 

如果获取锁的线程在持有锁期间出现了异常或故障,那么在超过锁的过期时间后,Redis会自动释放锁,其他线程也就可以获取到这个锁了。

 

## 结论

 

使用Redis实现分布式锁的优点包括:

 

- 高效:Redis是内存存储,速度非常快,非常适合于高并发情况下的分布式锁实现。

- 可靠:Redis保证了数据的持久性,即使Redis出现了故障,也不会丢失加锁的信息。

- 支持分布式:Redis天生就是分布式的,可以非常容易地支持分布式锁的应用场景。

 

当然,Redis也有其不足之处。其中最大的一个问题是锁的过期时间不能太长,因为如果加锁的进程在加锁期间一直未释放锁,那么其他进程就一直无法获得这个锁。为了解决这个问题,我们可以对加锁和重试的时间进行合理的设置,并且在Redis中加入更多的监控和报警机制。

 

总的来说,对于大多数的情况下,使用Redis作为分布式锁的实现是非常可行和理想的方案。







文章来自个人专栏
文章 | 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0