searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

如何使用MySQL来实现分布式公平锁

2023-04-25 06:39:25
42
0

分布式公平锁,解决的是资源独占问题和公平性问题,即资源在同一时刻只能有一个线程访问,而且等待时间最长的线程优先访问。分布式公平锁的实现原理,是在分布式锁的基础上,增加一个等待队列,没抢到锁的线程只能在队列中等待,而锁被释放之后会唤醒队列中第一个线程。

用MySQL来实现分布式锁,网上已经有比较通用的实现方案,即用一个表来存储锁信息,通过乐观锁的方式来抢锁。

所以这里的难点是锁被释放后,如何去唤醒队列中的第一个线程,因为分布式系统中各个线程可能存在于不同的JVM。

基于MySQL的方案就是,在表中增加一个字段来记录各个等待线程的排队信息,

流程:

- 当一个线程尝试获取锁的时候,如果锁已经被占用,那么该线程需要将自己加入到排队信息中,

- 然后该线程要不断轮询这个锁信息,

- 发现锁被释放后,需要检查自己是否队列中的第一个,如果是,则获得锁,否则继续等待

下面直接上代码

接口类:

public interface DistributedLock {
	/**
	 * 锁默认过期时间,30秒
	 */
	static final long DEFAULT_EXPIRE_SECONDS = 30;
	
	/**
	 * 
	 * @param lockName
	 */
	default boolean lockByName(String lockName) {
		return lockByName(lockName, DEFAULT_EXPIRE_SECONDS * 1000);
	}
	
	/**
	 * 
	 * @param lockName
	 * @param effectTime 预计占用时间,毫秒,超过这个时间锁将会过期而被其他线程获取
	 */
	boolean lockByName(String lockName, long effectTime);

	/**
	 * @param lockName
	 * @param timeout  超时时间,毫秒
	 * @param fair     是否公平锁
	 * @return
	 */
	default boolean tryLockByName(String lockName, long timeout, boolean fair) throws InterruptedException {
		return tryLockByName(lockName, timeout, DEFAULT_EXPIRE_SECONDS * 1000, fair);
	}

	/**
	 * @param lockName
	 * @param timeout    超时时间,毫秒, 超过这个时间仍未获取到锁则返回false
	 * @param effectTime 预计占用时间,毫秒,超过这个时间锁将会过期而被其他线程获取
	 * @param fair       是否公平锁
	 * @return
	 */
	boolean tryLockByName(String lockName, long timeout, long effectTime, boolean fair) throws InterruptedException;

	void unLockByName(String lockName);
}

实现类:

@Slf4j
public class DbDistributedLock implements DistributedLock {

	private OptimisticLockMapper mapper;

	private String idPrefix;

	private static final String seperator = ",";

	private static final int fairQueueCapacity = 20;

	public DbDistributedLock(OptimisticLockMapper mapper) {
		this.mapper = mapper;
	}

	@PostConstruct
	private void init() {
		idPrefix = IpUtil.getLocalHostIp() + "_";
	}

	/**
	 * 获取线程排队标识,IP+线程名
	 * 
	 * @return
	 */
	private String getQueueId() {
		return idPrefix + Thread.currentThread().getName();
	}

	@Override
	public void unLockByName(String lockName) {
		OptimisticLock lock = mapper.selectByResource(lockName);
		if (lock == null)
			return;

		String fairQueue = lock.getFairQueue();
		Integer versionNo = lock.getVersionNo();
		if (StringUtils.isEmpty(fairQueue)) {
			int deleteCount = mapper.deleteByResource(lockName, versionNo);
			if (deleteCount == 1) {
				log.info("锁释放成功, resource:【{}】", lockName);
			} else {
				// 锁释放失败(可能有别的线程排队),重来一次
				unLockByName(lockName);
			}
		} else {
			String queueStr = getQueueId() + seperator;
			if (!fairQueue.startsWith(queueStr))
				// 可能是锁过期导致排队信息被删
				return;

			String newFairQueue = fairQueue.substring(queueStr.length());
			int updateCount = 0;
			if (newFairQueue.isEmpty())
				// 如果已经没人排队,则删除
				updateCount = mapper.deleteByResource(lockName, versionNo);
			else
				// 如果仍有人排队,则更新队列
				updateCount = mapper.updateFairQueuByIdAndVersion(newFairQueue, versionNo + 1, lock.getId(), versionNo);
			if (updateCount == 1) {
				log.info("锁释放成功, resource:【{}】", lockName);
			} else {
				// 锁释放失败(可能有别的线程排队),重来一次
				unLockByName(lockName);
			}
		}
	}

	@Override
	public boolean lockByName(String lockName, long effectTime) {
		return lockByName(lockName, effectTime, false);
	}

	private boolean lockByName(String lockName, long effectTime, boolean fair) {
		OptimisticLock lock = null;
		try {
			lock = mapper.selectByResource(lockName);
		} catch (Exception e) {
			log.warn("获取锁信息失败", e);
			return false;
		}
		long now = System.currentTimeMillis();
		if (lock == null) {
			lock = new OptimisticLock();
			lock.setResource(lockName);
			lock.setCreateTime(now);
			lock.setVersionNo(0);
			lock.setExpireTime(now + effectTime);
			if (fair) {
				lock.setFairQueue(getQueueId() + seperator);
			}
			try {
				int insertCount = mapper.insert(lock);
				if (insertCount == 1) {
					log.info("获取锁成功, resource:【{}】", lockName);
					return true;
				} else {
					log.debug("获取锁失败, resource:【{}】", lockName);
					return false;
				}
			} catch (Exception e) {
				log.debug("获取锁失败并发生异常, resource:【{}】, 异常信息【{}】", lockName, e.getMessage());
				return false;
			}
		} else {
			// 当锁非空时,判断是否过期,若过期,通过乐观锁,争抢锁资源
			Long expiredTime = lock.getExpireTime();
			int originalVersion = lock.getVersionNo();
			// 锁未过期
			if (expiredTime >= now) {
				if (fair) {
					return checkOrQueue(lock, effectTime);
				} else {
					log.debug("锁被占用, resource:【{}】", lockName);
					return false;
				}
			}

			// 锁已过期,更新锁
			int newVersion = originalVersion + 1;
			// 是否空队列(前面没人排队,或前面超时)
			boolean emptyQueue = !fair;
			String newFairQueue = null;
			if (fair) {
				String fairQueue = lock.getFairQueue();
				// 去掉锁拥有者的排队记录
				if (StringUtils.isNotEmpty(fairQueue)) {
					newFairQueue = fairQueue.substring(fairQueue.indexOf(seperator) + 1);
				}
				else {
					newFairQueue = "";
					emptyQueue = true;
				}
				// 如果当前线程未排队,加上当前线程的排队信息
				if (fairQueue != null && !fairQueue.contains(seperator + getQueueId() + seperator))
					newFairQueue += getQueueId() + seperator;
			}
			try {
				int updateCount = mapper.updateExpireTimeByIdAndVersion(now + effectTime, newVersion, lock.getId(),
						originalVersion, newFairQueue);
				if (updateCount == 1 && emptyQueue) {
					log.info("原锁已超时, 本次进程成功获取锁, resource:【{}】, 新锁版本:【{}】", lockName, newVersion);
					return true;
				} else {
					log.info("原锁已超时, 但本次进程未能成功获取锁, resource:【{}】, 原锁版本:【{}】", lockName, originalVersion);
					return false;
				}
			} catch (Exception e) {
				log.debug("原锁已超时, 但本次进程获取锁失败并发生异常, resource:【{}】, 异常信息【{}】", lockName, e.getMessage());
				return false;
			}
		}
	}

	private boolean checkOrQueue(OptimisticLock lock, long effectTime) {
		String fairQueue = lock.getFairQueue();
		try {
			if (fairQueue != null && fairQueue.startsWith(getQueueId() + seperator)) {
				// 如果是公平锁,而且处于队列首位,则表示已经抢到锁,需要更新锁过期时间
				OptimisticLock update = new OptimisticLock();
				update.setExpireTime(System.currentTimeMillis() + effectTime);
				update.setId(lock.getId());
				mapper.updateById(update);
				log.info("获取公平锁成功, resource:【{}】", lock.getResource());
				return true;
			} else if (fairQueue != null && fairQueue.contains(seperator + getQueueId() + seperator)) {
				// 如果是公平锁,而且处于队列中,则表示抢锁失败,但已经排队
				return false;
			} else {
				// 如果是公平锁,而且不在队列中,则表示抢锁失败,需要排队
				if (fairQueue == null)
					fairQueue = "";
				// 如果队列已满,则不允许排队
				if (count(fairQueue, seperator) > fairQueueCapacity) {
					return false;
				}
				String newFairQueue = fairQueue + getQueueId() + seperator;
				Integer versionNo = lock.getVersionNo();
				int updateCount = mapper.updateFairQueuByIdAndVersion(newFairQueue, versionNo + 1, lock.getId(),
						versionNo);
				if (updateCount == 1)
					log.info("获取公平锁失败但排队成功, resource:【{}】", lock.getResource());
				// 无需判断是否排队成功,若不成功,则只能下次再排队
				return false;
			}
		} catch (Exception e) {
			log.error("获取公平锁异常,resource: 【{}】", lock.getResource(), e);
			return false;
		}
	}

	/**
	 * 计算content中包含几个target
	 * 
	 * @param content
	 * @param target
	 * @return
	 */
	private static int count(String content, String target) {
		int count = 0;

		for (int i = 0; i < content.length(); i++) {
			int t = content.indexOf(target, i);
			if (i == t) {
				count++;
			}
		}
		return count;
	}

	// 不能加事务
	@Override
	public boolean tryLockByName(String lockName, long timeout, long effectTime, boolean fair)
			throws InterruptedException {
		if (timeout <= 0)
			return lockByName(lockName, effectTime, fair);

		long total = 0;
		boolean got = false;
		int offset = 250;
		while (total < timeout && !(got = lockByName(lockName, effectTime, fair))) {
			TimeUnit.MILLISECONDS.sleep(offset);
			total += offset;
		}
		return got;
	}

}
0条评论
0 / 1000
白龙马
14文章数
0粉丝数
白龙马
14 文章 | 0 粉丝
原创

如何使用MySQL来实现分布式公平锁

2023-04-25 06:39:25
42
0

分布式公平锁,解决的是资源独占问题和公平性问题,即资源在同一时刻只能有一个线程访问,而且等待时间最长的线程优先访问。分布式公平锁的实现原理,是在分布式锁的基础上,增加一个等待队列,没抢到锁的线程只能在队列中等待,而锁被释放之后会唤醒队列中第一个线程。

用MySQL来实现分布式锁,网上已经有比较通用的实现方案,即用一个表来存储锁信息,通过乐观锁的方式来抢锁。

所以这里的难点是锁被释放后,如何去唤醒队列中的第一个线程,因为分布式系统中各个线程可能存在于不同的JVM。

基于MySQL的方案就是,在表中增加一个字段来记录各个等待线程的排队信息,

流程:

- 当一个线程尝试获取锁的时候,如果锁已经被占用,那么该线程需要将自己加入到排队信息中,

- 然后该线程要不断轮询这个锁信息,

- 发现锁被释放后,需要检查自己是否队列中的第一个,如果是,则获得锁,否则继续等待

下面直接上代码

接口类:

public interface DistributedLock {
	/**
	 * 锁默认过期时间,30秒
	 */
	static final long DEFAULT_EXPIRE_SECONDS = 30;
	
	/**
	 * 
	 * @param lockName
	 */
	default boolean lockByName(String lockName) {
		return lockByName(lockName, DEFAULT_EXPIRE_SECONDS * 1000);
	}
	
	/**
	 * 
	 * @param lockName
	 * @param effectTime 预计占用时间,毫秒,超过这个时间锁将会过期而被其他线程获取
	 */
	boolean lockByName(String lockName, long effectTime);

	/**
	 * @param lockName
	 * @param timeout  超时时间,毫秒
	 * @param fair     是否公平锁
	 * @return
	 */
	default boolean tryLockByName(String lockName, long timeout, boolean fair) throws InterruptedException {
		return tryLockByName(lockName, timeout, DEFAULT_EXPIRE_SECONDS * 1000, fair);
	}

	/**
	 * @param lockName
	 * @param timeout    超时时间,毫秒, 超过这个时间仍未获取到锁则返回false
	 * @param effectTime 预计占用时间,毫秒,超过这个时间锁将会过期而被其他线程获取
	 * @param fair       是否公平锁
	 * @return
	 */
	boolean tryLockByName(String lockName, long timeout, long effectTime, boolean fair) throws InterruptedException;

	void unLockByName(String lockName);
}

实现类:

@Slf4j
public class DbDistributedLock implements DistributedLock {

	private OptimisticLockMapper mapper;

	private String idPrefix;

	private static final String seperator = ",";

	private static final int fairQueueCapacity = 20;

	public DbDistributedLock(OptimisticLockMapper mapper) {
		this.mapper = mapper;
	}

	@PostConstruct
	private void init() {
		idPrefix = IpUtil.getLocalHostIp() + "_";
	}

	/**
	 * 获取线程排队标识,IP+线程名
	 * 
	 * @return
	 */
	private String getQueueId() {
		return idPrefix + Thread.currentThread().getName();
	}

	@Override
	public void unLockByName(String lockName) {
		OptimisticLock lock = mapper.selectByResource(lockName);
		if (lock == null)
			return;

		String fairQueue = lock.getFairQueue();
		Integer versionNo = lock.getVersionNo();
		if (StringUtils.isEmpty(fairQueue)) {
			int deleteCount = mapper.deleteByResource(lockName, versionNo);
			if (deleteCount == 1) {
				log.info("锁释放成功, resource:【{}】", lockName);
			} else {
				// 锁释放失败(可能有别的线程排队),重来一次
				unLockByName(lockName);
			}
		} else {
			String queueStr = getQueueId() + seperator;
			if (!fairQueue.startsWith(queueStr))
				// 可能是锁过期导致排队信息被删
				return;

			String newFairQueue = fairQueue.substring(queueStr.length());
			int updateCount = 0;
			if (newFairQueue.isEmpty())
				// 如果已经没人排队,则删除
				updateCount = mapper.deleteByResource(lockName, versionNo);
			else
				// 如果仍有人排队,则更新队列
				updateCount = mapper.updateFairQueuByIdAndVersion(newFairQueue, versionNo + 1, lock.getId(), versionNo);
			if (updateCount == 1) {
				log.info("锁释放成功, resource:【{}】", lockName);
			} else {
				// 锁释放失败(可能有别的线程排队),重来一次
				unLockByName(lockName);
			}
		}
	}

	@Override
	public boolean lockByName(String lockName, long effectTime) {
		return lockByName(lockName, effectTime, false);
	}

	private boolean lockByName(String lockName, long effectTime, boolean fair) {
		OptimisticLock lock = null;
		try {
			lock = mapper.selectByResource(lockName);
		} catch (Exception e) {
			log.warn("获取锁信息失败", e);
			return false;
		}
		long now = System.currentTimeMillis();
		if (lock == null) {
			lock = new OptimisticLock();
			lock.setResource(lockName);
			lock.setCreateTime(now);
			lock.setVersionNo(0);
			lock.setExpireTime(now + effectTime);
			if (fair) {
				lock.setFairQueue(getQueueId() + seperator);
			}
			try {
				int insertCount = mapper.insert(lock);
				if (insertCount == 1) {
					log.info("获取锁成功, resource:【{}】", lockName);
					return true;
				} else {
					log.debug("获取锁失败, resource:【{}】", lockName);
					return false;
				}
			} catch (Exception e) {
				log.debug("获取锁失败并发生异常, resource:【{}】, 异常信息【{}】", lockName, e.getMessage());
				return false;
			}
		} else {
			// 当锁非空时,判断是否过期,若过期,通过乐观锁,争抢锁资源
			Long expiredTime = lock.getExpireTime();
			int originalVersion = lock.getVersionNo();
			// 锁未过期
			if (expiredTime >= now) {
				if (fair) {
					return checkOrQueue(lock, effectTime);
				} else {
					log.debug("锁被占用, resource:【{}】", lockName);
					return false;
				}
			}

			// 锁已过期,更新锁
			int newVersion = originalVersion + 1;
			// 是否空队列(前面没人排队,或前面超时)
			boolean emptyQueue = !fair;
			String newFairQueue = null;
			if (fair) {
				String fairQueue = lock.getFairQueue();
				// 去掉锁拥有者的排队记录
				if (StringUtils.isNotEmpty(fairQueue)) {
					newFairQueue = fairQueue.substring(fairQueue.indexOf(seperator) + 1);
				}
				else {
					newFairQueue = "";
					emptyQueue = true;
				}
				// 如果当前线程未排队,加上当前线程的排队信息
				if (fairQueue != null && !fairQueue.contains(seperator + getQueueId() + seperator))
					newFairQueue += getQueueId() + seperator;
			}
			try {
				int updateCount = mapper.updateExpireTimeByIdAndVersion(now + effectTime, newVersion, lock.getId(),
						originalVersion, newFairQueue);
				if (updateCount == 1 && emptyQueue) {
					log.info("原锁已超时, 本次进程成功获取锁, resource:【{}】, 新锁版本:【{}】", lockName, newVersion);
					return true;
				} else {
					log.info("原锁已超时, 但本次进程未能成功获取锁, resource:【{}】, 原锁版本:【{}】", lockName, originalVersion);
					return false;
				}
			} catch (Exception e) {
				log.debug("原锁已超时, 但本次进程获取锁失败并发生异常, resource:【{}】, 异常信息【{}】", lockName, e.getMessage());
				return false;
			}
		}
	}

	private boolean checkOrQueue(OptimisticLock lock, long effectTime) {
		String fairQueue = lock.getFairQueue();
		try {
			if (fairQueue != null && fairQueue.startsWith(getQueueId() + seperator)) {
				// 如果是公平锁,而且处于队列首位,则表示已经抢到锁,需要更新锁过期时间
				OptimisticLock update = new OptimisticLock();
				update.setExpireTime(System.currentTimeMillis() + effectTime);
				update.setId(lock.getId());
				mapper.updateById(update);
				log.info("获取公平锁成功, resource:【{}】", lock.getResource());
				return true;
			} else if (fairQueue != null && fairQueue.contains(seperator + getQueueId() + seperator)) {
				// 如果是公平锁,而且处于队列中,则表示抢锁失败,但已经排队
				return false;
			} else {
				// 如果是公平锁,而且不在队列中,则表示抢锁失败,需要排队
				if (fairQueue == null)
					fairQueue = "";
				// 如果队列已满,则不允许排队
				if (count(fairQueue, seperator) > fairQueueCapacity) {
					return false;
				}
				String newFairQueue = fairQueue + getQueueId() + seperator;
				Integer versionNo = lock.getVersionNo();
				int updateCount = mapper.updateFairQueuByIdAndVersion(newFairQueue, versionNo + 1, lock.getId(),
						versionNo);
				if (updateCount == 1)
					log.info("获取公平锁失败但排队成功, resource:【{}】", lock.getResource());
				// 无需判断是否排队成功,若不成功,则只能下次再排队
				return false;
			}
		} catch (Exception e) {
			log.error("获取公平锁异常,resource: 【{}】", lock.getResource(), e);
			return false;
		}
	}

	/**
	 * 计算content中包含几个target
	 * 
	 * @param content
	 * @param target
	 * @return
	 */
	private static int count(String content, String target) {
		int count = 0;

		for (int i = 0; i < content.length(); i++) {
			int t = content.indexOf(target, i);
			if (i == t) {
				count++;
			}
		}
		return count;
	}

	// 不能加事务
	@Override
	public boolean tryLockByName(String lockName, long timeout, long effectTime, boolean fair)
			throws InterruptedException {
		if (timeout <= 0)
			return lockByName(lockName, effectTime, fair);

		long total = 0;
		boolean got = false;
		int offset = 250;
		while (total < timeout && !(got = lockByName(lockName, effectTime, fair))) {
			TimeUnit.MILLISECONDS.sleep(offset);
			total += offset;
		}
		return got;
	}

}
文章来自个人专栏
Java
14 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0