在上篇文章深入理解AbstractQueuedSynchronizer(一)中,通过ReentrantLock详细介绍了AQS独占模式的实现,本文通过工具类CountDownLatch来分析一下共享功能的实现。
CountDownLatch是什么
CountDownLatch是在java1.5被引入的,跟它一起被引入的并发工具类还有CyclicBarrier、Semaphore、ConcurrentHashMap和BlockingQueue,它们都存在于java.util.concurrent包下。CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
执行过程如下图所示:
/f65cc83b7b4664916fad5d1398a36005.png)
参考自:http://www.importnew.com/15731.html
CountDownLatch的使用
下面通过一个例子来说明一下CountDownLatch的使用,代码如下:
|
|
创建了一个初始值为3的CountDownLatch对象latch,然后创建了3个线程,每个线程执行时都会执行latch.countDown()
使计数器的值减1,而主线程在执行到latch.await()
时会等待直到计数器的值为0。输出的结果如下:
|
|
AQS共享模式的实现
CountDownLatch构造方法
CountDownLatch的构造方法如下:
|
|
传入一个参数count,CountDownLatch也使用了内部类Sync来实现,Sync继承自AQS:
|
|
这里调用了AQS类中的setState方法来设置count,AQS的state属性在上篇文章已经提到,它是AQS中的状态标识,具体的含义由子类来定义,可见这里把state定义为数量。
CountDownLatch的await方法
|
|
直接调用了AQS类中的acquireSharedInterruptibly方法。
acquireSharedInterruptibly方法
|
|
这里的tryAcquireShared方法在Sync中被重写。
CountDownLatch的tryAcquireShared方法
|
|
仅仅是根据状态来判断,如果state等于0的时候,说明计数器为0了,返回1表示成功,否则返回-1表示失败。
doAcquireSharedInterruptibly方法
|
|
这里的重点是setHeadAndPropagate方法。
setHeadAndPropagate方法
|
|
首先先将之前的head记录一下,用于下面的判断;然后设置当前节点为头节点;最后在判断是否需要唤醒。这里的propagate值是根据tryAcquireShared方法的返回值传入的,所以对于CountDownLatch来说,如果获取成功,则应该是1。
这里的if判断条件比较多,这里用了之前保存的head节点变量h来进行判断,让我疑惑的地方是,什么时候h会等于null?这个地方目前还没想明白。
如果h.waitStatus >= 0
,表示是初始状态或者是取消状态,那么当propagate <= 0
时将不唤醒节点。
获取node的下一个节点s,如果s == null || s.isShared()
则释放节点并唤醒。为什么下一个节点为null的时候也需要唤醒操作呢?仔细理解一下这句话:
The conservatism in both of these checks may cause unnecessary wake-ups, but only when there are multiple racing acquires/releases, so most need signals now or soon anyway.
这种保守的检查方式可能会引起多次不必要的线程唤醒操作,但这些情况仅存在于多线程并发的acquires/releases操作,所以大多线程数需要立即或者很快地一个信号。这个信号就是执行unpark方法。因为LockSupport在unpark的时候,相当于给了一个信号,即使这时候没有线程在park状态,之后有线程执行park的时候也会读到这个信号就不会被挂起。
在简单点说,就是线程在执行时,如果之前没有unpark操作,在执行park时会阻塞该线程;但如果在park之前执行过一次或多次unpark(unpark调用多次和一次是一样的,结果不会累加)这时执行park时并不会阻塞该线程。
所以,如果在唤醒node的时候下一个节点刚好添加到队列中,就可能避免了一次阻塞的操作。
所以这里的propagate表示传播,传播的过程就是只要成功的获取到共享所就唤醒下一个节点。
doReleaseShared方法
|
|
什么时候状态会是SIGNAL呢?回顾一下shouldParkAfterFailedAcquire方法:
|
|
当状态不为CANCEL或者是SIGNAL时,为了保险起见,这里把状态都设置成了SIGNAL,然后会再次循环进行判断是否需要阻塞。
回到doReleaseShared方法,这里为什么不直接把SIGNAL设置为PROPAGATE,而是先把SIGNAL设置为0,然后再设置为PROPAGATE呢?
原因在于unparkSuccessor方法,该方法会判断当前节点的状态是否小于0,如果小于0则将h的状态设置为0,如果在这里直接设置为PROPAGATE状态的话,则相当于多做了一次CAS操作。unparkSuccessor中的代码如下:
|
|
其实这里只判断状态为SIGNAL和0还有另一个原因,那就是当前执行doReleaseShared循环时的状态只可能为SIGNAL和0,因为如果这时没有后继节点的话,当前节点状态没有被修改,是初始的0;如果在执行setHead方法之前,这时刚好有后继节点被添加到队列中的话,因为这时后继节点判断p == head
为false,所以会执行shouldParkAfterFailedAcquire方法,将当前节点的状态设置为SIGNAL。当状态为0时设置状态为PROPAGATE成功,则判断h == head
结果为true,表示当前节点是队列中的唯一一个节点,所以直接就返回了;如果为false,则说明已经有后继节点的线程设置了head,这时不返回继续循环,但刚才获取的h已经用不到了,等待着被回收。
CountDownLatch的countDown方法
|
|
这里是调用了AQS中的releaseShared方法。
releaseShared方法
|
|
这里调用的tryReleaseShared方法是在CountDownLatch中的Sync类重写的,而doReleaseShared方法已在上文中介绍过了。
CountDownLatch中的tryReleaseShared方法
|
|
这里设置state的操作需要循环来设置以确保成功。
超时控制的await方法
对应于上文中提到的doAcquireSharedInterruptibly方法,还有一个提供了超时控制的doAcquireSharedNanos方法,代码如下:
|
|
与doAcquireSharedInterruptibly方法新增了以下功能:
- 增加了一个deadline变量表示超时的截止时间,根据当前时间与传入的nanosTimeout计算得出;
- 每次循环判断是否已经超出截止时间,即
deadline - System.nanoTime()
是否大于0,大于0表示已经超时,返回false,小于0表示还未超时; - 如果未超时通过调用shouldParkAfterFailedAcquire方法判断是否需要park,如果返回true则再判断
nanosTimeout > spinForTimeoutThreshold
,spinForTimeoutThreshold是自旋的最小阈值,这里被Doug Lea设置成了1000,表示1000纳秒,也就是说如果剩余的时间不足1000纳秒,则不需要park。
总结
本文通过CountDownLatch来分析了AQS共享模式的实现,实现方式如下:
调用await时
- 共享锁获取失败(计数器还不为0),则将该线程封装为一个Node对象放入队列中,并阻塞该线程;
- 共享锁获取成功(计数器为0),则从第一个节点开始依次唤醒后继节点,实现共享状态的传播。
调用countDown时
- 如果计数器不为0,则不释放,继续阻塞,并把state的值减1;
- 如果计数器为0,则唤醒节点,解除线程的阻塞状态。
在这里再对比一下独占模式和共享模式的相同点和不同点:
相同点
- 锁的获取和释放的判断都是由子类来实现的。
不同点
- 独占功能在获取节点之后并且还未释放时,其他的节点会一直阻塞,直到第一个节点被释放才会唤醒;
- 共享功能在获取节点之后会立即唤醒队列中的后继节点,每一个节点都会唤醒自己的后继节点,这就是共享状态的传播。
根据以上的总结可以看出,AQS不关心state具体是什么,含义由子类去定义,子类则根据该变量来进行获取和释放的判断,AQS只是维护了该变量,并且实现了一系列用来判断资源是否可以访问的API,它提供了对线程的入队和出队的操作,它还负责处理线程对资源的访问方式,例如:什么时候可以对资源进行访问,什么时候阻塞线程,什么时候唤醒线程,线程被取消后如何处理等。而子类则用来实现资源是否可以被访问的判断。