Lock: 队列同步器AbstractQueuedSynchronizer

概述

队列同步器(AbstractQueuedSynchronizer,AQS),是用来构建或其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

队列同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解:

锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理线程的排队等待与唤醒等底层操作。

队列同步器的设计是基于模板方法模式的,使用时需要继承同步器并重写指定的方法,随后将同步器组合在自定义的同步组件中,并调用同步器提供的模板方法,这些模板方法会调用重写的方法。

队列同步器的模板方法基本上分为3类:独占式获取和释放同步状态、共享式获取和释放同步状态、查询同步队列中的线程情况。

AQS的功能可以分为两类:独占功能和共享功能,它的所有子类中,要么实现并使用了它独占功能的API,要么使用了共享锁的功能,而不会同时使用两套API,即便是它最有名的子类ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套API来实现的。

队列同步器的实现分析

节点的waitStatus

  • CANCELLED:因为超时或者中断,结点会被设置为取消状态,被取消状态的结点不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态。处于这种状态的结点会被踢出队列,被GC回收;
  • SIGNAL:表示这个结点的继任结点被阻塞了,到时需要通知它;
  • CONDITION:表示这个结点在条件队列中,因为等待某个条件而被阻塞;
  • PROPAGATE:使用在共享模式头结点有可能处于这种状态,表示锁的下一次获取可以无条件传播;
  • 0:None of the above,新结点会处于这种状态。

同步队列

队列同步器依赖内部的同步队列(一个FIFO的双向队列)来完成同步状态的管理

当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node),并将其加入同步队列,同时会阻塞当前线程;当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

同步队列中的节点用来保存获取同步状态失败的线程引用等待状态以及前驱节点和后继节点

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点在获取同步状态成功时会将自己设置为首节点。

独占式同步状态获取和释放

通过调用acquire(int arg)方法独占式(同一时刻只有一个线程成功获取同步状态)地获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。

acquire

1
2
3
4
5
6
7
8
/**
* Acquires in exclusive mode, ignoring interrupts.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

上述代码主要逻辑:

  1. 同步状态获取
    首先调用自定义同步器实现tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态
  2. 节点构造
    如果同步状态获取失败,则构造同步节点,独占式的节点。
  3. 加入同步队列
    节点构造完成后,通过addWaiter(Node noe)方法加入到同步队列的尾部。
  4. 在同步队列中自旋
    最后调用acquireQueued(Node node, int arg)方法,使得该节点以“死循环”的方式获取同步状态。

addWaiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Creates and enqueues node for current thread and given mode.
*/k
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 快速尝试在尾部添加
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect, Node update)。注意,此时可能发生竞争,如果有另外一个线程在两个if之间抢先更新的队列的尾节点,CAS操作将会失败,这时会调用enq方法,继续试图将node放入队列。

如果使用线程不安全的LinkedList来维护节点之间的关系,那么并发的时候,LinkedList将难以保证Node的正确添加。

enq

enq(Node noe)方法中,同步器通过“死循环”来保证节点的正确添加,在死循环中只有通过CAS将节点设置为尾节点之后,当前线程才能从该方法返回,否则当前线程不断尝试。

enq方法将并发添加节点的请求通过CAS变得串行化了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Inserts node into queue, initializing if necessary.
* 由于这里存在多线程并发问题,使用死循环保证node能够添加到链表中,因此enq本身是线程安全的
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

acquireQueued

节点进入同步队列后,接下来的任务就是监视队列,等待获取资源。

进入到一个自旋的过程,每个节点(线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire

acquireQueued()自旋中,获取资源失败时,需要调用shouldParkAfterFailedAcquire()方法检测一下是否需要暂停休息一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

独占模式下所有不使用Condition的同步器,节点的waitStatus只可能有CANCELLED、SIGNAL和0三中状态。

shouldParkAfterFailedAcquire()就是靠前驱节点判断当前线程是否应该被阻塞。
首先检测下node的前驱节点pred,如果pred状态已经被置为SIGNAL,直接返回true。否则,从node的前驱继续往前找,直到找到一个waitStatus小于等于0的节点,设置该点为node的前驱(注意:此时node与这个节点之间的节点从等待队列中被“摘下”,等待被回收了)并返回false。返回之后,上层的acquireQueued方法继续自旋,再次进入shouldParkAfterFailedAcquire方法之后,如果发现node前驱不是取消状态且waitStatus不等于SIGNAL,调用CAS函数进行注册(注意:这个操作可能失败,因此不能直接返回true,而是返回false由上层的自旋再次调用shouldParkAfterFailedAcquire直到确认注册成功)。

parkAndCheckInterrupt

1
2
3
4
5
6
7
/**
* Convenience method to park and then check if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

release

当线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态。

该方法会唤醒头结点的后继节点线程,unparkSuccessor(Node node)方法使用LockSupport来唤醒处于等待状态的线程。独占模式下,waitStatus!=0与waitStatus==-1等价(这里waitStatus不会为CANCELLED,因为已经获取资源了)。

被唤醒的线程将继续在acquireQueued的死循环中进行锁竞争,直到成功获取锁。

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

unparkSuccessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Wakes up node's successor, if one exists.
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

共享式同步状态获取和释放

共享式获取与独占式获取最主要的区别在于同一个时刻能否有多个线程同时获取到同步状态。

以文件读写为例,写操作要求对资源的独占式访问,而读操作可以是共享式访问。

acquireShared

通过调用队列同步器的acquireShared(int arg)方法可以共享式地获取同步状态。

tryAcquireShared()的返回值表示剩余资源个数,负值表示获取失败,0表示获取成功但已无剩余资源。

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

doAcquireShared

doAcquireShared(int arg)方法中将共享式节点加入到同步队列中,并进入自旋的过程。

doAcquireShared方法与acquireQueued方法相似,不同的地方在于:共享模式下成功获取资源并将head指向自己之后,要检查并试图唤醒之后的等待线程。因为共享资源可能剩余,可以被后面的等待线程获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doAcquireShared(int arg) {
// 将当前线程包装为一个共享类型的节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 如果新建节点的前一个节点,就是Head,说明当前节点是AQS队列中等待获取锁的第一个节点。
// 按照FIFO的原则,可以直接尝试获取锁。
int r = tryAcquireShared(arg);
if (r >= 0) {
// 成功获取锁,需要将当前节点设置为AQS队列中的第一个节点。
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

setHeadAndPropagate

setHeadAndPropagate(Node node, int propagate)方法首先是更换了头结点,然后获取当前节点的后继节点,如果同样是“shared”类型的,再做一次“releaseShared”操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
* 尝试唤醒后继节点:
* propagate > 0说明许可还有能够继续被线程acquire;
* 或者 之前的head被设置为PROPAGATE(PROPAGATE可以被转换为SIGNAL)说明需要往后传递;
* 或者为null
* 并且 后继节点是共享模式或者为null。
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 唤醒后继节点
doReleaseShared();
}
}

s.isShared()的判断主要是考虑到读写锁的情况,在读写锁的使用过程中,申请写锁(独占模式)和申请读锁(共享模式)的线程可能同时存在,这个判断发现后继线程是共享模式的时唤醒它。

releaseShared

通过调用releaseShared(int arg)方法可以释放同步状态,在释放同步状态之后,将会唤醒后续处于等待状态的节点。

对于能够支持多个线程同时访问的并发组件(如Semaphore),它和独占式的主要区别在于tryReleasedShared(int arg)方法必须确保同步状态线程安全释放,一般是通过循环和CAS来保证的,因为释放同步状态的操作会同时来自多个线程。

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

doReleaseShared

doReleaseShared()是一个自旋过程。首先获取head节点h,然后检查它的waitStatus是否为SIGNAL,如果是的话,调用CAS将h的waitStatus设置为0,并调用unparkSuccessor唤醒下一个等待线程。注意,这里调用CAS方法而不是直接赋值,是因为在共享模式下,这里可能发生竞争。doReleaseShared方法可能由head节点在使用完共享资源后主动调用;也可能由刚刚“上位”的等待线程调用,在上位之后,原来的head线程已被踢出队列。

  • 第一种情况,只有刚刚释放资源的head线程调用,这时候没有竞争,waitStatus是SIGNAL,就去唤醒下个线程,是0,就重置为PROPAGATE。

  • 第二种情况,刚刚释放完资源的旧head,和刚刚上位的新head同时调用doReleaseShared()方法,这时候最新的head调用该方法时获取的头节点都是自己,若干被踢出的旧head调用该方法时获取的头节点可能是旧head,也可能是新head。这些被踢出的旧head线程也在根据自己获取的head(不管新旧)的状态进行CAS操作和unparkSuccessor操作,幸运的是,这些操作不会造成错误,只是多了一些唤醒而已(这些唤醒可能导致一个线程获得资源,也可能是一个“虚晃”)。

我们可以发现,不管head引用怎样更迭,最终新head的waitStatus都会被顺利处理。注意,可能有多个旧head同时参与这个过程,都不影响正确性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

独占式超时获取同步状态

通过调用同步器的doAcquireNanos(int arg, long nanosTimeout)方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到了则返回true,否则返回false。

该方法提供了传统Java同步操作(如synchronized关键字)所不具备的特性。

doAcquireNanos(int arg)方法在自旋过程中,当节点的前驱节点为头节点时尝试获取同步状态,如果获取成功则从该方法返回,这个过程和独占式同步获取的过程类似。但是在同步状态获取失败的处理上有所不同,如果当前线程获取同步状态失败,则判断是否超时,如果没有超时,重新计算超时间隔,然后使当前线程等待。

共享式超时获取同步状态的过程与之类似doAcquireSharedNanos(int arg, long nanosTimeout)


感谢:
http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer
http://www.infoq.com/cn/articles/java8-abstractqueuedsynchronizer
http://www.jianzhaoyang.com/go2sea/p/5618628.html