目录

ThasBlog

学无止境

X

Java AQS

AQS

AbstractQueuedSynchronizer, 队列同步器, 支持队列等待.
继承自AbstractOwnableSynchronizer, 不带队列的同步器, 只能有一个线程同步, 其他线程忽略.

AQS 使用 state 和 一个FIFO队列来进行线程同步. state表示申请资源的状态, 默认0, 若已被占用则小于0.
AQS 不关注申请和释放资源的方式, 只提供了操作state的方法. 在并发申请资源过程, AQS 维护各个线程获取资源的原则, 如不满足获取资源的条件, 则先将它们放入等待队列, 等待资源可以被申请了, 再唤醒等待队列中的线程.

申请和释放资源的方法需要锁自己实现:

// 尝试申请资源
boolean tryAcquire(int arg)
// 尝试释放资源
boolean tryRelease(int arg)
// 资源是否被独占
boolean isHeldExclusively()
// 尝试以共享模式申请资源
int tryAcquireShared(int arg)
// 尝试以共享模式释放资源
boolean tryReleaseShared(int arg)

资源获取流程:

public final void acquire(int arg) {
        if (!tryAcquire(arg) && // 尝试获取锁 如果获取到, 直接退出
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 添加到阻塞队列
            selfInterrupt();
    }
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
// 如果队列已经存在, 入队
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
// 如果队列不存在, 则创建空的尾结点, 再入队
        enq(node);
        return node;
    }
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
// 如果存在前一个节点, 且它是头结点, 且本次直接获取到锁了, 则将自己设置为头结点, 拿到锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
// 一般情况下不会那么顺利的
// 判断是否需要阻塞, 如果前一个节点waitStatus=-1, 则阻塞, 否则设置为-1, 将它作为自己的唤醒人
// 这里必须要while(true) 自旋, 因为前一个节点有可能取消申请
                if (shouldParkAfterFailedAcquire(p, node) &&
// 入队和park动作并非原子性的, 所以可能在park之前抢先被其他线程读取到这个入队状态, 抢先unpark, 但是unpark是可以抵消到未来的一个park动作, 所以这里不会出现死锁问题.
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

释放资源流程:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
// 释放成功 unpark队列下一个线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

条件等待与普通的阻塞等待类似, 有一个专门的 waiter队列

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
// 添加到 waiter 队列
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
// 如果不在阻塞等待中, 直接park
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
// 如果被唤醒了 且已进入阻塞队列 则进入正常的阻塞流程
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

条件唤醒:

private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
// 将第一个送到阻塞队列
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
// 全部送到阻塞队列
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

延时获取锁的方式比较简单, 用的是, 最多park这么长时间, 结束后判断能不能处理, 不能就失败出队

LockSupport.parkNanos

ReentrantLock

state=0, 代表无线程占用资源;
state=-1, 代表已有线程占用资源, exclusiveOwnerThread 存放占用资源的线程
state=-N, 代表重入的次数

NonfairSync

允许插队, 当线程获取锁时, 如果能够获取锁, 则不判断是否有线程在队列中, 直接获取, 若获取到, 则抢占, 否则进入队列. 非公平模式下队列中的线程依然是有序唤醒.

非公平锁的优势: 非公平锁模式下, 执行中的线程抢锁时, 有几率抢锁成功, 此线程无需进入队列即可连续运行, 省掉了线程上下文切换的开销; 但是队列中的线程, 则面临被唤醒然后又进入队列阻塞等待的不公平待遇. 此种模式下总的上下文切换开销并没有减少. 此种方式通过牺牲某些线程的"利益"保障了另一部分线程执行的连续性. 所以这算不上非公平锁模式的优点.
非公平锁在一定程度上可以减少死锁情况的发生. 例如:
如果一个任务分成 3 段执行, 第 1 段和第 3 段需要抢锁, 第 2 段释放锁. 抢锁超时会从头开始重试. 在公平锁模式下, 如果有多个线程都有执行这个任务, 则这些线程在执行第 3 段抢锁时都需要进入队列等待全部线程的第 1 段任务执行完成, 若此时超时, 则这些线程都需要重新进入队列抢第 1 段的锁重试, 这些线程将进入一个无限循环重试的状态.
非公平锁模式在新来的线程抢锁失败的情形下等同于公平锁模式, 但是在抢锁成功情况下, 可以解除上述的死循环的问题, 若抢锁成功, 线程可以完整执行成功退出等待队列, 同时队列长度减少, 即便有几率抢锁失败, 这个队列的等待时间也会逐渐减少, 从而概率性的退出无限循环的状态.

FairSync

不允许插队, 所有线程获取锁时, 优先判断队列中是否有其他, 有则进入队列.


标题:Java AQS
作者:thas
地址:https://thas.cc/articles/2020/12/06/1607259469668.html