CountDownLatch
相比ReentranceLock,CountDownLatch的流程还是相对比较简单的,CountDownLatch也是基于AQS,它是AQS的共享功能的一个实现。
下面从源代码的实现上详解CountDownLatch。
1、CountDownLatch 构造
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); // Sync 就是继承了一个AQS this.sync = new Sync(count); }
下面为CountDownLatch中Sync的部分代码片段,CountDownLatch构造器中的count最终还是传递了ASQ中的state,
所以CountDownLatch中的countDown也是对于state状态的改变。
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } }
2、CountDownLatch.countDown() 实现
先看countDown所涉及的代码
1、 public void countDown() { sync.releaseShared(1); } 2、 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //如果计数器的值为0,那么执行下面操作 doReleaseShared(); //这一步的操作主要是唤醒主线程,因为如果state不等于0的话,主线程一直是阻塞的 return true; } return false; } 3、 CountDownLatch重写的方法 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) //countDown的计数器以及到0,体现在我们程序中的,就是并行的几个任务已经执行完 return false; int nextc = c-1; //没执行一次countDown,计数器减1 if (compareAndSetState(c, nextc)) // 利用cas来更新state的状态,这里可能有并发,所以这也是用死循环更新的原因 return nextc == 0; //更新成功就返回 } } 4、 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { /至少有两个节点 int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 说明后继节点需要唤醒 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); //唤醒后继节点 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } 5、 private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //唤醒主线程 LockSupport.unpark(s.thread); }
3、CountDownLatch.await() 实现
先看代码
1、 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } 2、 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) = 0) { // state的状态是0,说明,countDown的所有任务已经完成 setHeadAndPropagate(node, r); //主线程所在的节点设置为头节点 p.next = null; // help GC failed = false; return; //主线程结束等待 } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) //如果是非正常退出的话,取消 cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //讲等待状态设置为后继唤醒 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // 当前线程阻塞,判断线程是否中断 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } //取消当前节点获取锁 private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; Node predNext = pred.next; //如果节点都没有被取消的话,那么这个节点和node是同一个节点 //node的后继节点取消 node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. // CountDownLatch 逻辑就到这里 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
总结
从实现上看CountDownLatch,它也是基于AQS的,await是通过轮询state的状态来判断所有的任务是否都完成。