1.简介AbstractQueuedSynchronizer (抽象队列同步器,以下简称 AQS)出现在 JDK 1.5 中,由大师 Doug Lea 所创作。AQS 是很多同步器的基础框架,比如 ReentrantLock、CountDownLatch 和 Semaphore 等都是基于 AQS 实现的。除此之外,我们还可以基于 AQS,定制出我们所需要的同步器。
AQS 的使用方式通常都是通过内部类继承 AQS 实现同步功能,通过继承 AQS,可以简化同步器的实现。如前面所说,AQS 是很多同步器实现的基础框架。弄懂 AQS 对理解 Java 并发包里的组件大有裨益,这也是我学习 AQS 并写出这篇文章的缘由。另外,需要说明的是,AQS 本身并不是很好理解,细节很多。在看的过程中药有一定的耐心,做好看多遍的准备。好了,其他的就不多说了,开始进入正题吧。
2.原理概述在 AQS 内部,通过维护一个FIFO 队列
来管理多线程的排队工作。在公平竞争的情况下,无法获取同步状态的线程将会被封装成一个节点,置于队列尾部。入队的线程将会通过自旋的方式获取同步状态,若在有限次的尝试后,仍未获取成功,线程则会被阻塞住。大致示意图如下:
当头结点释放同步状态后,且后继节点对应的线程被阻塞,此时头结点
线程将会去唤醒后继节点线程。后继节点线程恢复运行并获取同步状态后,会将旧的头结点从队列中移除,并将自己设为头结点。大致示意图如下:
3.重要方法介绍本节将介绍三组重要的方法,通过使用这三组方法即可实现一个同步组件。
第一组方法是用于访问/设置同步状态的,如下:
方法 说明 int getState() 获取同步状态 void setState() 设置同步状态 boolean compareAndSetState(int expect, int update) 通过 CAS 设置同步状态
第二组方需要由同步组件覆写。如下:
方法 说明 boolean tryAcquire(int arg) 独占式获取同步状态 boolean tryRelease(int arg) 独占式释放同步状态 int tryAcquireShared(int arg) 共享式获取同步状态 boolean tryReleaseShared(int arg) 共享式私房同步状态 boolean isHeldExclusively() 检测当前线程是否获取独占锁
第三组方法是一组模板方法,同步组件可直接调用。如下:
方法 说明 void acquire(int arg) 独占式获取同步状态,该方法将会调用 tryAcquire 尝试获取同步状态。获取成功则返回,获取失败,线程进入同步队列等待。 void acquireInterruptibly(int arg) 响应中断版的 acquire boolean tryAcquireNanos(int arg,long nanos) 超时+响应中断版的 acquire void acquireShared(int arg) 共享式获取同步状态,同一时刻可能会有多个线程获得同步状态。比如读写锁的读锁就是就是调用这个方法获取同步状态的。 void acquireSharedInterruptibly(int arg) 响应中断版的 acquireShared boolean tryAcquireSharedNanos(int arg,long nanos) 超时+响应中断版的 acquireShared boolean release(int arg) 独占式释放同步状态 boolean releaseShared(int arg) 共享式释放同步状态
上面列举了一堆方法,看似繁杂。但稍微理一下,就会发现上面诸多方法无非就两大类:一类是独占式获取和释放共享状态,另一类是共享式获取和释放同步状态。至于这两类方法的实现细节,我会在接下来的章节中讲到,继续往下看吧。
4.源码分析 4.1 节点结构在并发的情况下,AQS 会将未获取同步状态的线程将会封装成节点,并将其放入同步队列尾部。同步队列中的节点除了要保存线程,还要保存等待状态。不管是独占式还是共享式,在获取状态失败时都会用到节点类。所以这里我们要先看一下节点类的实现,为后面的源码分析进行简单铺垫。源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
4.2 独占模式分析 4.2.1 获取同步状态独占式获取同步状态时通过 acquire 进行的,下面来分析一下该方法的源码。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private void setHead (Node node) { head = node; node.thread = null ; node.prev = null ; } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); } private void cancelAcquire (Node node) { if (node == null ) return ; node.thread = null ; Node pred = node.prev; while (pred.waitStatus > 0 ) node.prev = pred = pred.prev; Node predNext = pred.next; node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null ); } else { int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null ) { Node next = node.next; if (next != null && next.waitStatus <= 0 ) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; } } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
到这里,独占式获取同步状态的分析就讲完了。如果仅分析获取同步状态的大致流程,那么这个流程并不难。但若深入到细节之中,还是需要思考思考。这里对独占式获取同步状态的大致流程做个总结,如下:
调用 tryAcquire 方法尝试获取同步状态 获取成功,直接返回 获取失败,将线程封装到节点中,并将节点入队 入队节点在 acquireQueued 方法中自旋获取同步状态 若节点的前驱节点是头节点,则再次调用 tryAcquire 尝试获取同步状态 获取成功,当前节点将自己设为头节点并返回 获取失败,可能再次尝试,也可能会被阻塞。这里简单认为会被阻塞。 上面的步骤对应下面的流程图:
上面流程图参考自《Java并发编程》第128页图 5-5,这里进行了重新绘制,并做了一定的修改。
4.2.2 释放同步状态相对于获取同步状态,释放同步状态的过程则要简单的多,这里简单罗列一下步骤:
调用 tryRelease(arg) 尝试释放同步状态 根据条件判断是否应该唤醒后继线程 就两个步骤,下面看一下源码分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
4.3 共享模式分析与独占模式不同,共享模式下,同一时刻会有多个线程获取共享同步状态。共享模式是实现读写锁中的读锁、CountDownLatch 和 Semaphore 等同步组件的基础,搞懂了,再去理解一些共享同步组件就不难了。
4.3.1 获取同步状态共享类型的节点获取共享同步状态后,如果后继节点也是共享类型节点,当前节点则会唤醒后继节点。这样,多个节点线程即可同时获取共享同步状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
到这里,共享模式下获取同步状态的逻辑就分析完了,不过我这里只做了简单分析。相对于独占式获取同步状态,共享式的情况更为复杂。独占模式下,只有一个节点线程可以成功获取同步状态,也只有获取已同步状态节点线程才可以释放同步状态。但在共享模式下,多个共享节点线程可以同时获得同步状态,在一些线程获取同步状态的同时,可能还会有另外一些线程正在释放同步状态。所以,共享模式更为复杂。这里我的脑力跟不上了,没法面面俱到的分析,见谅。
最后说一下共享模式下获取同步状态的大致流程,如下:
获取共享同步状态 若获取失败,则生成节点,并入队 如果前驱为头结点,再次尝试获取共享同步状态 获取成功则将自己设为头结点,如果后继节点是共享类型的,则唤醒 若失败,将节点状态设为 SIGNAL,再次尝试。若再次失败,线程进入等待状态 4.3.2 释放共享状态释放共享状态主要逻辑在 doReleaseShared 中,doReleaseShared 上节已经分析过,这里就不赘述了。共享节点线程在获取同步状态和释放同步状态时都会调用 doReleaseShared,所以 doReleaseShared 是多线程竞争集中的地方。
1 2 3 4 5 6 7 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
5.PROPAGATE 状态存在的意义AQS 的节点有几种不同的状态,这个在 4.1 节介绍过。在这几个状态中,PROPAGATE 的用途可能是最不好理解的。网上包括一些书籍关于该状态的叙述基本都是一句带过,也就是 PROPAGATE 字面意义,即向后传播唤醒动作。至于怎么传播,鲜有资料说明过。不过,好在最终我还是找到了一篇详细叙述了 PROPAGATE 状态的文章。在博客园上,博友 活在夢裡 在他的文章 AbstractQueuedSynchronizer源码解读 对 PROPAGATE,以及其他的一些细节进行了说明,很有深度。在钦佩之余,不由得感叹作者思考的很深入。在征得他的同意后,我将在本节中引用他文章中对 PROPAGATE 状态说明的部分,并进行一定的补充说明。这里感谢作者 活在夢裡 的精彩分享,若不参考他的文章,我的这篇文章内容会比较空洞。好了,其他的不多说了,继续往下分析。
在本节中,将会说明两个个问题,如下:
PROPAGATE 状态用在哪里,以及怎样向后传播唤醒动作的? 引入 PROPAGATE 状态是为了解决什么问题? 这两个问题将会在下面两节中分别进行说明。
5.1 利用 PROPAGATE 传播唤醒动作PROPAGATE 状态是用来传播唤醒动作的,那么它是在哪里进行传播的呢?答案是在setHeadAndPropagate
方法中,这里再来看看 setHeadAndPropagate 方法的实现:
1 2 3 4 5 6 7 8 9 10 11 private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
大家注意看 setHeadAndPropagate 方法中那个长长的判断语句,其中有一个条件是h.waitStatus < 0
,当 h.waitStatus = SIGNAL(-1) 或 PROPAGATE(-3) 是,这个条件就会成立。那么 PROPAGATE 状态是在何时被设置的呢?答案是在doReleaseShared
方法中,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {...} else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } ... } }
再回到 setHeadAndPropagate 的实现,该方法既然引入了h.waitStatus < 0
这个条件,就意味着仅靠条件propagate > 0
判断是否唤醒后继节点线程的机制是不充分的。至于为啥不充分,请继续往看下看。
5.2 引入 PROPAGATE 所解决的问题PROPAGATE 的引入是为了解决一个 BUG – JDK-6801020 ,复现这个 BUG 的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import java.util.concurrent.Semaphore;public class TestSemaphore { private static Semaphore sem = new Semaphore(0 ); private static class Thread1 extends Thread { @Override public void run () { sem.acquireUninterruptibly(); } } private static class Thread2 extends Thread { @Override public void run () { sem.release(); } } public static void main (String[] args) throws InterruptedException { for (int i = 0 ; i < 10000000 ; i++) { Thread t1 = new Thread1(); Thread t2 = new Thread1(); Thread t3 = new Thread2(); Thread t4 = new Thread2(); t1.start(); t2.start(); t3.start(); t4.start(); t1.join(); t2.join(); t3.join(); t4.join(); System.out.println(i); } } }
根据 BUG 的描述消息可知 JDK 6u11,6u17 两个版本受到影响。那么,接下来再来看看引起这个 BUG 的代码 – JDK 6u17 中 setHeadAndPropagate 和 releaseShared 两个方法源码,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void setHeadAndPropagate (Node node, int propagate) { setHead(node); if (propagate > 0 && node.waitStatus != 0 ) { Node s = node.next; if (s == null || s.isShared()) unparkSuccessor(node); } } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
下面来简单说明 TestSemaphore 这个类的逻辑。这个类持有一个数值为 0 的信号量对象,并创建了4个线程,线程 t1 和 t2 用于获取信号量,t3 和 t4 则是调用 release() 方法释放信号量。在一般情况下,TestSemaphore 这个类的代码都可以正常执行。但当有极端情况出现时,可能会导致同步队列挂掉。这里演绎一下这个极端情况,考虑某次循环时,队列结构如下:
时刻1:线程 t3 调用 unparkSuccessor 方法,head 节点状态由 SIGNAL(-1) 变为0
,并唤醒线程 t1。此时信号量数值为1。 时刻2:线程 t1 恢复运行,t1 调用 Semaphore.NonfairSync 的 tryAcquireShared,返回0
。然后线程 t1 被切换,暂停运行。 时刻3:线程 t4 调用 releaseShared 方法,因 head 的状态为0
,所以 t4 不会调用 unparkSuccessor 方法。 时刻4:线程 t1 恢复运行,t1 成功获取信号量,调用 setHeadAndPropagate。但因为 propagate = 0,线程 t1 无法调用 unparkSuccessor 唤醒线程 t2,t2 面临无线程唤醒的情况。因为 t2 无法退出等待状态,所以 t2.join 会阻塞主线程,导致程序挂住。 下面再来看一下修复 BUG 后的代码,根据 BUG 详情页显示,该 BUG 在 JDK 1.7 中被修复。这里找一个 JDK 7 较早版本(JDK 7u10)的代码看一下,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
在按照上面的代码演绎一下逻辑,如下:
时刻1:线程 t3 调用 unparkSuccessor 方法,head 节点状态由 SIGNAL(-1) 变为0
,并唤醒线程t1。此时信号量数值为1。 时刻2:线程 t1 恢复运行,t1 调用 Semaphore.NonfairSync 的 tryAcquireShared,返回0
。然后线程 t1 被切换,暂停运行。 时刻3:线程 t4 调用 releaseShared 方法,检测到h.waitStatus = 0
,t4 将头节点等待状态由0
设为PROPAGATE(-3)
。 时刻4:线程 t1 恢复运行,t1 成功获取信号量,调用 setHeadAndPropagate。因 propagate = 0,propagate > 0
条件不满足。而 h.waitStatus = PROPAGATE(-3)
,所以条件h.waitStatus < 0
成立。进而,线程 t1 可以唤醒线程 t2,完成唤醒动作的传播。 到这里关于状态 PROPAGATE 的内容就讲完了。最后,简单总结一下本章开头提的两个问题。
问题一:PROPAGATE 状态用在哪里,以及怎样向后传播唤醒动作的? 答:PROPAGATE 状态用在 setHeadAndPropagate。当头节点状态被设为 PROPAGATE 后,后继节点成为新的头结点后。若 propagate > 0
条件不成立,则根据条件h.waitStatus < 0
成立与否,来决定是否唤醒后继节点,即向后传播唤醒动作。
问题二:引入 PROPAGATE 状态是为了解决什么问题? 答:引入 PROPAGATE 状态是为了解决并发释放信号量 所导致部分请求信号量的线程无法被唤醒的问题。
声明: 本章内容是在博友 活在夢裡 的文章 AbstractQueuedSynchronizer源码解读 基础上,进行了一定的补充说明。本章所参考的观点已经过原作者同意,为避免抄袭嫌疑,特此声明。
6.总结到这里,本文就差不多结束了。如果大家从头看到尾,到这里就可以放松一下了。写到这里,我也可以放松一下了。这篇文章总共花费了我12天的空闲时间,确实不容易。本来我只打算讲一下基本原理,但知道后来看到本文多次推荐的那篇文章。那篇文章给我的第一感觉是,作者很厉害。第二感觉是,我也要写出一篇较为深入的 AQS 分析文章。虽然写出来也不能怎么样,水平也不会因此提高多少,也不会去造个类似的轮子。但是写完后,确实感觉很有成就感。本文的最后,来说一下如何学习 AQS 原理。AQS 的大致原理不是很难理解,所以一开始不建议纠结细节,应该先弄懂它的大致原理。在此基础上,再去分析一些细节,分析细节时,要从多线程的角度去考虑。比如,有点地方 CAS 失败后要重试,有的不用重试。总体来说 AQS 的大致原理容易理解,细节部分比较复杂。很多细节要在脑子里演绎一遍,好好思考才能想通,有点烧脑。另外因为文章篇幅的问题,关于 AQS ConditionObject 部分的分析将会放在下一篇文章中进行。
最后,再向 AQS 的作者 Doug Lea 致以崇高的敬意。仅尽力弄懂 AQS 的原理都很难了,可想而知,实现 AQS 的难度有多大。
限于本人的能力,加之深入分析 AQS 本身就比较有难度。所以文中难免会有错误出现,如果不慎翻车,请见谅。也欢迎在评论区指明这些错误,感谢。
参考