demo使用
private static void lock() {
//1 创建redission的config对象并配置redis服务器(此处使用singleServer)
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
//2 创建redis客户端redissionClient
RedissonClient redissonClient = Redisson.create(config);
RLock whyLock = redissonClient.getLock("readLock");
try {
whyLock.lock();//获取锁--直至获取成功 ---解读源码见代码块1开始
while (true) { //模拟业务--触发看门狗机制
TimeUnit.SECONDS.sleep(1);
}
} catch (Exception exception) {
} finally {
whyLock.unlock();
}
}
public void lock() {
try {
//leaseTime租约时间为-1(会赋值30秒),waitTime没有表示无限等待
lock(-1, null, false); //见代码块2
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();//获取当前线程id
//尝试获取锁(当返回为null时表示获取锁成功)
Long ttl = tryAcquire(-1, leaseTime, unit, threadId); //核心逻辑-->见
// lock acquired
if (ttl == null) {
return; //成功获取锁,返回
}
// 下面是锁获取失败的操作
RFuture<RedissonLockEntry> future = subscribe(threadId); //订阅当前线程的id
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) { //循环获取锁,直到获取成功
ttl = tryAcquire(-1, leaseTime, unit, threadId); //获取锁
// lock acquired
if (ttl == null) {
break; //获取锁成功,返回
}
// 获取锁失败
if (ttl >= 0) { //ttl当前锁的过期时间
try {
//此段代码相当一TimeUnit.SECONDS.sleep(ttl);用于减缓下次获取的时间
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); //
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) { //leaseTime租约时间,通过上面路径租约时间为-1
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
//当租约时间为-1时,默认使用30秒 -->核心逻辑见代码4
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> { //接口回调,
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId); //传说中的看门狗思想
}
}
});
return ttlRemainingFuture;
}
利用redis脚本,设计锁思想。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " + // 判断锁是否存在?0表示不存在
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //hincrby 设置hash并指定属性为线程id的值为1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + //设置过期时间
"return nil; " + //返回null,表示获取锁成功
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + //判断锁和指定的属性是否存在?1表示存在
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + ////hincrby 设置hash并指定属性为线程id的值+1
"redis.call('pexpire', KEYS[1], ARGV[1]); " +//设置过期时间
"return nil; " +//返回null,表示获取锁成功
"end; " +
"return redis.call('pttl', KEYS[1]);",//返回key的过期时间,表示获取锁失败
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
//核心功能是判断业务线程是否结束
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {//
oldEntry.addThreadId(threadId); //已经设置过业务线程
} else {
entry.addThreadId(threadId);
try {
renewExpiration(); //见代码块7->看门狗的设计思想
} finally {
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return; //业务线程已结束
}
//定时任务
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
小结:常用的Redisson锁方法有6个,可根据异步与非分类如下
同步接口
RLock lock = redisson.getLock("myLock");
// 如上分析会出发看门狗机制(多线程慎用)
lock.lock();
// 10秒后自动解锁
lock.lock(10, TimeUnit.SECONDS);
// 最大尝试100秒,若100秒后没有获取到锁,则返回失败
// 10秒后自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
异步接口(如同步接口)
RLock lock = redisson.getLock("myLock");
RFuture<Void> lockFuture = lock.lockAsync();
// or acquire lock and automatically unlock it after 10 seconds
RFuture<Void> lockFuture = lock.lockAsync(10, TimeUnit.SECONDS);
// or wait for lock aquisition up to 100 seconds
// and automatically unlock it after 10 seconds
RFuture<Boolean> lockFuture = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
lockFuture.whenComplete((res, exception) -> {
// ...
lock.unlockAsync();
});