分布式公平锁,解决的是资源独占问题和公平性问题,即资源在同一时刻只能有一个线程访问,而且等待时间最长的线程优先访问。分布式公平锁的实现原理,是在分布式锁的基础上,增加一个等待队列,没抢到锁的线程只能在队列中等待,而锁被释放之后会唤醒队列中第一个线程。
用MySQL来实现分布式锁,网上已经有比较通用的实现方案,即用一个表来存储锁信息,通过乐观锁的方式来抢锁。
所以这里的难点是锁被释放后,如何去唤醒队列中的第一个线程,因为分布式系统中各个线程可能存在于不同的JVM。
基于MySQL的方案就是,在表中增加一个字段来记录各个等待线程的排队信息,
流程:
- 当一个线程尝试获取锁的时候,如果锁已经被占用,那么该线程需要将自己加入到排队信息中,
- 然后该线程要不断轮询这个锁信息,
- 发现锁被释放后,需要检查自己是否队列中的第一个,如果是,则获得锁,否则继续等待
下面直接上代码
接口类:
public interface DistributedLock {
/**
* 锁默认过期时间,30秒
*/
static final long DEFAULT_EXPIRE_SECONDS = 30;
/**
*
* @param lockName
*/
default boolean lockByName(String lockName) {
return lockByName(lockName, DEFAULT_EXPIRE_SECONDS * 1000);
}
/**
*
* @param lockName
* @param effectTime 预计占用时间,毫秒,超过这个时间锁将会过期而被其他线程获取
*/
boolean lockByName(String lockName, long effectTime);
/**
* @param lockName
* @param timeout 超时时间,毫秒
* @param fair 是否公平锁
* @return
*/
default boolean tryLockByName(String lockName, long timeout, boolean fair) throws InterruptedException {
return tryLockByName(lockName, timeout, DEFAULT_EXPIRE_SECONDS * 1000, fair);
}
/**
* @param lockName
* @param timeout 超时时间,毫秒, 超过这个时间仍未获取到锁则返回false
* @param effectTime 预计占用时间,毫秒,超过这个时间锁将会过期而被其他线程获取
* @param fair 是否公平锁
* @return
*/
boolean tryLockByName(String lockName, long timeout, long effectTime, boolean fair) throws InterruptedException;
void unLockByName(String lockName);
}
实现类:
@Slf4j
public class DbDistributedLock implements DistributedLock {
private OptimisticLockMapper mapper;
private String idPrefix;
private static final String seperator = ",";
private static final int fairQueueCapacity = 20;
public DbDistributedLock(OptimisticLockMapper mapper) {
this.mapper = mapper;
}
@PostConstruct
private void init() {
idPrefix = IpUtil.getLocalHostIp() + "_";
}
/**
* 获取线程排队标识,IP+线程名
*
* @return
*/
private String getQueueId() {
return idPrefix + Thread.currentThread().getName();
}
@Override
public void unLockByName(String lockName) {
OptimisticLock lock = mapper.selectByResource(lockName);
if (lock == null)
return;
String fairQueue = lock.getFairQueue();
Integer versionNo = lock.getVersionNo();
if (StringUtils.isEmpty(fairQueue)) {
int deleteCount = mapper.deleteByResource(lockName, versionNo);
if (deleteCount == 1) {
log.info("锁释放成功, resource:【{}】", lockName);
} else {
// 锁释放失败(可能有别的线程排队),重来一次
unLockByName(lockName);
}
} else {
String queueStr = getQueueId() + seperator;
if (!fairQueue.startsWith(queueStr))
// 可能是锁过期导致排队信息被删
return;
String newFairQueue = fairQueue.substring(queueStr.length());
int updateCount = 0;
if (newFairQueue.isEmpty())
// 如果已经没人排队,则删除
updateCount = mapper.deleteByResource(lockName, versionNo);
else
// 如果仍有人排队,则更新队列
updateCount = mapper.updateFairQueuByIdAndVersion(newFairQueue, versionNo + 1, lock.getId(), versionNo);
if (updateCount == 1) {
log.info("锁释放成功, resource:【{}】", lockName);
} else {
// 锁释放失败(可能有别的线程排队),重来一次
unLockByName(lockName);
}
}
}
@Override
public boolean lockByName(String lockName, long effectTime) {
return lockByName(lockName, effectTime, false);
}
private boolean lockByName(String lockName, long effectTime, boolean fair) {
OptimisticLock lock = null;
try {
lock = mapper.selectByResource(lockName);
} catch (Exception e) {
log.warn("获取锁信息失败", e);
return false;
}
long now = System.currentTimeMillis();
if (lock == null) {
lock = new OptimisticLock();
lock.setResource(lockName);
lock.setCreateTime(now);
lock.setVersionNo(0);
lock.setExpireTime(now + effectTime);
if (fair) {
lock.setFairQueue(getQueueId() + seperator);
}
try {
int insertCount = mapper.insert(lock);
if (insertCount == 1) {
log.info("获取锁成功, resource:【{}】", lockName);
return true;
} else {
log.debug("获取锁失败, resource:【{}】", lockName);
return false;
}
} catch (Exception e) {
log.debug("获取锁失败并发生异常, resource:【{}】, 异常信息【{}】", lockName, e.getMessage());
return false;
}
} else {
// 当锁非空时,判断是否过期,若过期,通过乐观锁,争抢锁资源
Long expiredTime = lock.getExpireTime();
int originalVersion = lock.getVersionNo();
// 锁未过期
if (expiredTime >= now) {
if (fair) {
return checkOrQueue(lock, effectTime);
} else {
log.debug("锁被占用, resource:【{}】", lockName);
return false;
}
}
// 锁已过期,更新锁
int newVersion = originalVersion + 1;
// 是否空队列(前面没人排队,或前面超时)
boolean emptyQueue = !fair;
String newFairQueue = null;
if (fair) {
String fairQueue = lock.getFairQueue();
// 去掉锁拥有者的排队记录
if (StringUtils.isNotEmpty(fairQueue)) {
newFairQueue = fairQueue.substring(fairQueue.indexOf(seperator) + 1);
}
else {
newFairQueue = "";
emptyQueue = true;
}
// 如果当前线程未排队,加上当前线程的排队信息
if (fairQueue != null && !fairQueue.contains(seperator + getQueueId() + seperator))
newFairQueue += getQueueId() + seperator;
}
try {
int updateCount = mapper.updateExpireTimeByIdAndVersion(now + effectTime, newVersion, lock.getId(),
originalVersion, newFairQueue);
if (updateCount == 1 && emptyQueue) {
log.info("原锁已超时, 本次进程成功获取锁, resource:【{}】, 新锁版本:【{}】", lockName, newVersion);
return true;
} else {
log.info("原锁已超时, 但本次进程未能成功获取锁, resource:【{}】, 原锁版本:【{}】", lockName, originalVersion);
return false;
}
} catch (Exception e) {
log.debug("原锁已超时, 但本次进程获取锁失败并发生异常, resource:【{}】, 异常信息【{}】", lockName, e.getMessage());
return false;
}
}
}
private boolean checkOrQueue(OptimisticLock lock, long effectTime) {
String fairQueue = lock.getFairQueue();
try {
if (fairQueue != null && fairQueue.startsWith(getQueueId() + seperator)) {
// 如果是公平锁,而且处于队列首位,则表示已经抢到锁,需要更新锁过期时间
OptimisticLock update = new OptimisticLock();
update.setExpireTime(System.currentTimeMillis() + effectTime);
update.setId(lock.getId());
mapper.updateById(update);
log.info("获取公平锁成功, resource:【{}】", lock.getResource());
return true;
} else if (fairQueue != null && fairQueue.contains(seperator + getQueueId() + seperator)) {
// 如果是公平锁,而且处于队列中,则表示抢锁失败,但已经排队
return false;
} else {
// 如果是公平锁,而且不在队列中,则表示抢锁失败,需要排队
if (fairQueue == null)
fairQueue = "";
// 如果队列已满,则不允许排队
if (count(fairQueue, seperator) > fairQueueCapacity) {
return false;
}
String newFairQueue = fairQueue + getQueueId() + seperator;
Integer versionNo = lock.getVersionNo();
int updateCount = mapper.updateFairQueuByIdAndVersion(newFairQueue, versionNo + 1, lock.getId(),
versionNo);
if (updateCount == 1)
log.info("获取公平锁失败但排队成功, resource:【{}】", lock.getResource());
// 无需判断是否排队成功,若不成功,则只能下次再排队
return false;
}
} catch (Exception e) {
log.error("获取公平锁异常,resource: 【{}】", lock.getResource(), e);
return false;
}
}
/**
* 计算content中包含几个target
*
* @param content
* @param target
* @return
*/
private static int count(String content, String target) {
int count = 0;
for (int i = 0; i < content.length(); i++) {
int t = content.indexOf(target, i);
if (i == t) {
count++;
}
}
return count;
}
// 不能加事务
@Override
public boolean tryLockByName(String lockName, long timeout, long effectTime, boolean fair)
throws InterruptedException {
if (timeout <= 0)
return lockByName(lockName, effectTime, fair);
long total = 0;
boolean got = false;
int offset = 250;
while (total < timeout && !(got = lockByName(lockName, effectTime, fair))) {
TimeUnit.MILLISECONDS.sleep(offset);
total += offset;
}
return got;
}
}