public class LockUtil { private static final Logger logger = LoggerFactory.getLogger(LockUtil.class); private static final byte[] data = { 0x12, 0x34 }; private static Watcher watcher = new Watcher() { public void process(WatchedEvent event) { LoggerUtils.logInfo(logger,"process : " + event.getType()); } }; private static ZooKeeper zookeeper=null; private final String root; //根节点路径 private String id; private LockNode idName; private String ownerId; private String lastChildId; private Throwable other = null; private KeeperException exception = null; private InterruptedException interrupt = null; private static final Long DEFAULT_TIMEOUT_PERIOD=1000L; private ReentrantLock reentrantLock = new ReentrantLock(); static{ try { zookeeper =new ZooKeeper("ip:port", DEFAULT_TIMEOUT_PERIOD.intValue(),watcher); } catch (IOException e) { logger.error("获取zookeeper错误"); } } public LockUtil(String root) { this.root = root; ensureExists(root); } /** * 尝试获取锁操作,阻塞式可被中断 * @throws NestableRuntimeException */ public void lock() throws InterruptedException, KeeperException, NestableRuntimeException { // 可能初始化的时候就失败了 if (exception != null) { throw exception; } if (interrupt != null) { throw interrupt; } if (other != null) { throw new NestableRuntimeException(other); } if (isOwner()) {//锁重入 return; } reentrantLock.lock(); BooleanMutex mutex = new BooleanMutex(); acquireLock(mutex); // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试 try { mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true // mutex.get(); } catch (TimeoutException e) { if (!mutex.state()) { lock(); } } if (exception != null) { throw exception; } if (interrupt != null) { throw interrupt; } if (other != null) { throw new NestableRuntimeException(other); } } /** * 尝试获取锁对象, 不会阻塞 * * @throws InterruptedException * @throws KeeperException * @throws NestableRuntimeException */ public boolean tryLock() throws KeeperException, NestableRuntimeException { // 可能初始化的时候就失败了 if (exception != null) { throw exception; } if (isOwner()) {//锁重入 return true; } reentrantLock.lock(); acquireLock(null); if (exception != null) { throw exception; } if (interrupt != null) { Thread.currentThread().interrupt(); } if (other != null) { throw new NestableRuntimeException(other); } return isOwner(); } /** * 释放锁对象 */ public void unlock() throws KeeperException { if (id != null) { try { zookeeper.delete(root + "/" + id, -1); reentrantLock.unlock(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (KeeperException.NoNodeException e) { // do nothing } finally { id = null; } } else { //do nothing } } private void ensureExists(final String path) { try { Stat stat = zookeeper.exists(path, false); if (stat != null) { return; } zookeeper.create(path, data,Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException e) { exception = e; } catch (InterruptedException e) { Thread.currentThread().interrupt(); interrupt = e; } } /** * 返回锁对象对应的path */ public String getRoot() { return root; } /** * 判断当前是不是锁的owner */ public boolean isOwner() { return id != null && ownerId != null && id.equals(ownerId); } /** * 返回当前的节点id */ public String getId() { return this.id; } // ===================== helper method ============================= /** * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作 */ private Boolean acquireLock(final BooleanMutex mutex) { try { do { if (id == null) {//构建当前lock的唯一标识 long sessionId = zookeeper.getSessionId(); String prefix = "x-" + sessionId + "-"; //如果第一次,则创建一个节点 String path = zookeeper.create(root + "/" + prefix, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); int index = path.lastIndexOf("/"); id = StringUtils.substring(path, index + 1); idName = new LockNode(id); } if (id != null) { List<String> names = zookeeper.getChildren(root, false); if (names.isEmpty()) { id = null;//异常情况,重新创建一个 } else { //对节点进行排序 SortedSet<LockNode> sortedNames = new TreeSet<LockNode>(); for (String name : names) { sortedNames.add(new LockNode(name)); } if (sortedNames.contains(idName) == false) { id = null;//清空为null,重新创建一个 continue; } //将第一个节点做为ownerId ownerId = sortedNames.first().getName(); if (mutex != null && isOwner()) { mutex.set(true);//直接更新状态,返回 return true; } else if (mutex == null) { return isOwner(); } SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName); if (!lessThanMe.isEmpty()) { //关注一下排队在自己之前的最近的一个节点 LockNode lastChildName = lessThanMe.last(); lastChildId = lastChildName.getName(); //异步watcher处理 Stat stat=zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() { public void asyncProcess(WatchedEvent event) { acquireLock(mutex); } }); if (stat == null) { acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去 } } else { if (isOwner()) { mutex.set(true); } else { id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同 } } } } } while (id == null); } catch (KeeperException e) { exception = e; if (mutex != null) { mutex.set(true); } } catch (InterruptedException e) { interrupt = e; if (mutex != null) { mutex.set(true); } } catch (Throwable e) { other = e; if (mutex != null) { mutex.set(true); } } if (isOwner() && mutex != null) { mutex.set(true); } return Boolean.FALSE; }
public class LockNode implements Comparable<LockNode> { private final String name; private String prefix; private int sequence = -1; public LockNode(String name) { this.name = name; this.prefix = name; int idx = name.lastIndexOf('-'); if (idx >= 0) { this.prefix = name.substring(0, idx); try { this.sequence = Integer.parseInt(name.substring(idx + 1)); } catch (Exception e) { // ignore } } } public int compareTo(LockNode that) { int s1 = this.sequence; int s2 = that.sequence; if (s1 == -1 && s2 == -1) { return this.name.compareTo(that.name); } if (s1 == -1) { return -1; } else if (s2 == -1) { return 1; } else { return s1 - s2; } } public String getName() { return name; } public int getSequence() { return sequence; } public String getPrefix() { return prefix; } public String toString() { return name.toString(); } // ==================== hashcode & equals方法======================= @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((name == null) ? 0 : name.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } LockNode other = (LockNode) obj; if (name == null) { if (other.name != null) { return false; } } else if (!name.equals(other.name)) { return false; } return true; } }
public class BooleanMutex { private Sync sync; public BooleanMutex() { sync = new Sync(); set(false); } public BooleanMutex(Boolean mutex) { sync = new Sync(); set(mutex); } /** * 阻塞等待Boolean为true * * @throws InterruptedException */ public void get() throws InterruptedException { sync.innerGet(); } /** * 阻塞等待Boolean为true,允许设置超时时间 * * @param timeout * @param unit * @throws InterruptedException * @throws TimeoutException */ public void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { sync.innerGet(unit.toNanos(timeout)); } /** * 重新设置对应的Boolean mutex * * @param mutex */ public void set(Boolean mutex) { if (mutex) { sync.innerSetTrue(); } else { sync.innerSetFalse(); } } public boolean state() { return sync.innerState(); } /** * Synchronization control for BooleanMutex. Uses AQS sync state to * represent run status */ private final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -7828117401763700385L; /** State value representing that TRUE */ private static final int TRUE = 1; /** State value representing that FALSE */ private static final int FALSE = 2; private boolean isTrue(int state) { return (state & TRUE) != 0; } /** * 实现AQS的接口,获取共享锁的判断 */ protected int tryAcquireShared(int state) { // 如果为true,直接允许获取锁对象 // 如果为false,进入阻塞队列,等待被唤醒 return isTrue(getState()) ? 1 : -1; } /** * 实现AQS的接口,释放共享锁的判断 */ protected boolean tryReleaseShared(int ignore) { //始终返回true,代表可以release return true; } boolean innerState() { return isTrue(getState()); } void innerGet() throws InterruptedException { acquireSharedInterruptibly(0); } void innerGet(long nanosTimeout) throws InterruptedException, TimeoutException { if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException(); } void innerSetTrue() { for (;;) { int s = getState(); if (s == TRUE) { return; //直接退出 } if (compareAndSetState(s, TRUE)) {// cas更新状态,避免并发更新true操作 releaseShared(0);//释放一下锁对象,唤醒一下阻塞的Thread } } } void innerSetFalse() { for (;;) { int s = getState(); if (s == FALSE) { return; //直接退出 } if (compareAndSetState(s, FALSE)) {//cas更新状态,避免并发更新false操作 setState(FALSE); } } } } }
public abstract class AsyncWatcher implements Watcher { private static final int DEFAULT_POOL_SIZE = 30; private static final int DEFAULT_ACCEPT_COUNT = 60; private static ExecutorService executor = new ThreadPoolExecutor( 1, DEFAULT_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue( DEFAULT_ACCEPT_COUNT), new NamedThreadFactory( "Arbitrate-Async-Watcher"), new ThreadPoolExecutor.CallerRunsPolicy()); public void process(final WatchedEvent event) { executor.execute(new Runnable() {// 提交异步处理 @Override public void run() { asyncProcess(event); } }); } public abstract void asyncProcess(WatchedEvent event); }
public class NestableRuntimeException extends Exception { public NestableRuntimeException(Throwable nest){ super(nest); } }