关于CyclicBarrier与CountDownLatch的源码比较-CountDownLatch 源码解读

前言

本人的观点也不一定正确,仅供读者参考。

CountDownLatch

我们先来读下CountDownLatch这个类的注释:

1
2
3
4
/**
* A synchronization aid that allows one or more threads to wait until
* a set of operations being performed in other threads completes.
**/

此处说明了其使用场景允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。这里有两个关键点:等待一组操作完成。这里要强调的是,等待并不意味着线程一定挂起,一组操作完成并不意味着其中一个操作所在的线程就会结束,这是两码事。
接着来看第二段注释:

1
2
3
4
5
6
7
8
9
/**
*<p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
* The {@link #await await} methods block until the current count reaches
* zero due to invocations of the {@link #countDown} method, after which
* all waiting threads are released and any subsequent invocations of
* {@link #await await} return immediately. This is a one-shot phenomenon
* -- the count cannot be reset. If you need a version that resets the
* count, consider using a {@link CyclicBarrier}.
**/

从此处可以知道,CountDownLatch用给定的count进行初始化。 调用await方法会产生阻塞,直到当前计数count由于调用countDown方法而减至零,此后所有等待的线程被释放,并且后续无论是哪个线程再次进行await调用都会立即返回,不会产生其他动作。 也就是说,这是一次性使用的工具,其计数无法重置。 如果你需要重置计数的版本,请考虑使用CyclicBarrier。
这里,我们可以结合下源码来进一步解读,我们首先会看到,CountDownLatch只定义了一个private final Sync sync;字段,其是final类型,一旦赋值就不可变。

CountDownLatch的初始化

我们先来说CountDownLatch的初始化:

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

可以看到,这里主要还是创建了一个Sync实例,而这也是这个类的核心所在,它是一个针对于CountDownLatch而专门设计的一个实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

其主要还是利用AQSvolatile字段state来进行状态的控制,这也是我们可以进行CAS操作的核心所在。

共享与独占的区别

我们在前面知道,调用await方法会产生阻塞,那么这里我们就来看下await:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// java.util.concurrent.CountDownLatch#await()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}

这里,我们看到了Shared,我们仔细追寻,在AQS的的内部类node中,有定义字段EXCLUSIVESHARED这俩就代表了两种情况的使用,独占和共享。其主要还是针对于资源的使用情况来讲的,前者,是对资源,这里就是这个state状态值,单个线程独占这个资源,不为0,不放弃。后者主要是将state状态值共享出来,几个线程都可以操作。而两者应用最大的区别就在于tryAcquiretryAcquireShared的实现。这里,我并不会对ReentrantLock中的tryAcquire进行讲解。其他地方基本一致,差别点就在于addWaiter(Node.XXX)传入的类型不同,acquireQueueddoAcquireSharedInterruptibly实现思路大致相仿,只是会根据自己实际实现略作调整。这里,我们就专门针对于CountDownLatch所涉及到的进行解读。
题外话:我们通过知道独占与共享的设计区别,我们就可以很轻松的设计出属于自己的一些特有逻辑的实现,主要还是在于我们首先确定api选型,然后重写相应重点方法即可。

acquireSharedInterruptibly

acquireSharedInterruptibly方法名称可以知道,其是可打断的,而且每一个调用await正常来讲都是在一个独立的线程中的,那么这个独立的线程在整个过程中都有可能被打断掉。
我们参考上面CountDownLatch中SynctryAcquireShared实现,状态不为0就进入doAcquireSharedInterruptibly方法中去,这个方法就是,首先先构造个节点,这个节点有绑定当前所在线程,然后让你进个队列,接着,我们的任务就是无限循环找我们前置节点到底是不是头节点,是的话,就再试着获取下状态值,当看到大于0了,对于CountDownLatch中Sync里的实现就是1,那就进入setHeadAndPropagate(node, r);:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) /*之前的节点设定类型在这里就用上了*/
doReleaseShared();
}
}

前面的都能看懂,这里要强调的是,因为你是Shared,还有一点我们需要思考的是,什么时候才会发生tryAcquireShared(1)>0 (这里的参数1在CountDownLatch中SynctryAcquireShared实现里没有什么意义)?就是在状态值为0的时候,也就是产生释放的时候,即调用java.util.concurrent.CountDownLatch#countDown将状态值减为0的时候,然后激活头节点,所以我们这里首先释放的其实就是头节点,那读者可能会有疑问,那pre节点是什么,这也是我要强调的,pre节点并不一定是头节点,但是头节点的pre节点绝对就是自身,
下面我将三者的源码给出,可以很轻易的看到,假如是头节点,那么在for循环下,就再进行一次其pre节点的设定,初次设定的时候头尾都是自身。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
//java.util.concurrent.locks.AbstractQueuedSynchronizer.Node#predecessor
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// java.util.concurrent.locks.AbstractQueuedSynchronizer#initializeSyncQueue
/**
* Initializes head and tail fields on first contention.
*/
private final void initializeSyncQueue() {
Node h;
if (HEAD.compareAndSet(this, null, (h = new Node())))
tail = h;
}

至此,我们知道,在CountDownLatch作释放为0的时候,会率先激活头节点,然后后面的逻辑就是依次将自己设定成头节点,并将自身节点的线程状态由需要SIGNAL变为0,即属于正常运行状态,这样,我们方便在unparkSuccessor方法中激活下一个节点的所绑定的线程,而当下一个节点为空或者这个节点的线程状态标识位大于0也就是CANCELLED的时候,这里就可以根据最后一个节点的来获取线程还未激活的最靠前的那个节点,接下来就是激活这个节点的线程了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}

最后,我们再次回到doAcquireSharedInterruptibly中,这里,我们来看其在最初调用await方法时候所进行的动作:

1
2
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();

这是for循环最后所进行的一个操作,if判断里,前者设定了该node所绑定线程需要进行singal的标志位的设定,接着对其所属线程进行线程挂起操作。代码如下,省的大家找了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted.
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

对于shouldParkAfterFailedAcquire里面的代码,这里需要我给大家解惑的是,在我已确定我要挂起的情况下,因为当我是头节点的情况下,tryAcquireShared返回的是-1,何况后面非头结点直接进入这个if语句中。但是,这个await方法的调用可能前后很快,第一次设定状态的时候依然会返回一次false,并不会进行线程挂起,所以就需要那个do while语句来判断waitStatus标志位,这样,我们就可以找到最靠近头结点的那个未将标志位设定singal的那个节点所在。
对于parkAndCheckInterrupt,我们关心的是LockSupport.park(this);

1
2
3
4
5
6
7
//java.util.concurrent.locks.LockSupport#park(java.lang.Object)
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null);
}

这里首先将线程和所传对象进行setBlocker绑定,告诉我们这里是因为谁而线程挂起的,方便一但出现异常,我们好通过日志确认,然后进行挂起,在挂起结束后就解除标记对象。

至此关于CountDownLatch涉及完毕。

本文配套分享视频地址:

http://v.youku.com/v_show/id_XMzU5Nzc5NjI0NA==.html?spm=a2h3j.8428770.3416059.1

您的支持将鼓励我继续创作!