最新消息:从今天开始,做一个有好习惯的人。

AQS(AbstractQueuedSynchronized)-抽象的队列同步器

java体系 迷路的老鼠 4166浏览 2评论

抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock/Semaphore/CountDownLatch…

ReentrantLock的锁实现是基于AbstractQueuedSynchronizer实现的,其内部实现类Sync继承了AbstractQueuedSynchronizer,同时派生出两个子类NonfairSync(非公平锁)和FairSync(公平锁)。

非公平锁在每次进来时会先用CAS去占有锁,获取不到再使用独占锁,都获取不到锁才会进入队列末尾等待。公平锁会直接使用独占锁去获取锁,如果获取不到就能进去队列等待。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    public void lock() {
        sync.lock();
    }

    // FairSync 公平锁调用方式
    final void lock() {
        acquire(1); // 尝试获取独占锁
    }

    // NonfairSync 非公平锁调用方式
    final void lock() {
        if (compareAndSetState(0, 1)) // 首先判断state资源是否为0,如果恰巧为0则表明目前没有线程占用锁,则利用CAS占有锁
            setExclusiveOwnerThread(Thread.currentThread()); // 当独占锁之后则将设置exclusiveOwnerThread为当前线程
        else
            acquire(1); // 若CAS占用锁失败的话,则再尝试获取独占锁
    }

公平锁tryAcquire源码:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// FairSync 公平锁的 tryAcquire 方法
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState(); // 获取锁资源的最新内存值
        if (c == 0) { // 当state=0,说明锁资源目前还没有被任何线程被占用
            if (!hasQueuedPredecessors() && // 检查线程是否有阻塞队列
                    compareAndSetState(0, acquires)) { // 如果没有阻塞队列,则通过CAS操作获取锁资源
                setExclusiveOwnerThread(current); // 没有阻塞队列,且CAS又成功获取锁资源,则设置独占线程对象为当前线程
                return true; // 返回标志,告诉上层该线程已经获取到了锁资源
            }
        }
        // 执行到此,锁资源值不为0,说明已经有线程正在占用这锁资源
        else if (current == getExclusiveOwnerThread()) { // 既然锁已经被占用,则看看占用锁的线程是不是当前线程
            int nextc = c + acquires; // 如果占用的锁的线程是当前线程的话,则为重入锁概念,状态值做加1操作
            // int类型值小于0,是因为该int类型的state状态值溢出了,溢出了的话那得说明这个锁有多难获取啊,可能出问题了
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true; // 返回成功标志,告诉上层该线程已经获取到了锁资源
        }
        return false; // 返回失败标志,告诉上层该线程没有获取到锁资源
    }

非公平锁tryAcquire源码


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// NonfairSync 非公平锁的 tryAcquire 方法
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires); // 调用父类的非公平获取锁资源方法
    }

    // NonfairSync 非公平锁父类 Sync 类的 nonfairTryAcquire 方法    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState(); // 获取锁资源的最新内存值
        if (c == 0) { // 当state=0,说明锁资源目前还没有被任何线程被占用
            if (compareAndSetState(0, acquires)) { // 先不管三七二十一,先尝试通过CAS操作获取锁资源
                setExclusiveOwnerThread(current); // CAS一旦成功获取锁资源,则设置独占线程对象为当前线程
                return true;// 返回成功标志,告诉上层该线程已经获取到了锁资源
            }
        }
        // 执行到此,锁资源值不为0,说明已经有线程正在占用这锁资源
        else if (current == getExclusiveOwnerThread()) { // 既然锁已经被占用,则看看占用锁的线程是不是当前线程
            int nextc = c + acquires; // 如果占用的锁的线程是当前线程的话,则为重入锁概念,状态值做加1操作
            // int类型值小于0,是因为该int类型的state状态值溢出了,溢出了的话那得说明这个锁有多难获取啊,可能出问题了
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc); //
            return true; // 返回成功标志,告诉上层该线程已经获取到了锁资源
        }
        return false; // 返回失败标志,告诉上层该线程没有获取到锁资源
    }(

获取锁失败后,线程请求会放入队列等待唤醒,此处是用自旋的方式实现(死循环)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private Node enq(final Node node) {
        for (;;) { // 自旋的死循环操作方式
            Node t = tail;
            // 因为是自旋方式,首次链表队列tail肯定为空,但是后续链表有数据后就不会为空了
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node())) // 队列为空时,则创建一个空对象结点作为头结点,无意思,可认为傀儡结点
                    tail = head; // 空队列的话,头尾都指向同一个对象
            } else {
                // 进入 else 方法里面,说明链表队列已经有结点了
                node.prev = t;
                // 因为存在并发操作,通过CAS尝试将新加入的node结点设置为队尾结点
                if (compareAndSetTail(t, node)) {
                    // 如果node设置队尾结点成功,则将之前的旧的对象尾结点t的后继结点指向node,node的前驱结点也设置为t
                    t.next = node;
                    return t;
                }
            }

            // 如果执行到这里,说明上述两个CAS操作任何一个失败的话,该方法是不会放弃的,因为是自旋操作,再次循环继续入队
        }
    }

acquireQueued也是采用一个自旋的死循环操作方式,只有头结点才能尝试获取锁资源,其余的结点挨个挨个在那里等待修改,等待被唤醒,等待机会成为头结点;而新添加的node结点也自然逃不过如此命运,先看看是否头结点,然后再看看是否能休息。


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) { // 自旋的死循环操作方式
                final Node p = node.predecessor(); // 如果新建结点的前驱结点是头结点
                // 如果前驱结点为头结点,那么该结点则是老二,仅次于老大,也希望尝试去获取一下锁,万一头结点恰巧刚刚释放呢?希望还是要有的,万一实现了呢。。。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    // 拿到锁资源后,则该node结点升级做头结点,且设置后继结点指针为空,便于GC回收
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && // 根据前驱结点看看是否需要休息一会儿
                        parkAndCheckInterrupt()) // 阻塞操作,正常情况下,获取不到锁,代码就在该方法停止了,直到被唤醒
                    interrupted = true;

                // 如果执行到这里,说明尝试休息失败了,因为是自旋操作,所以还会再次循环继续操作判断
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

转载请注明:迷路的老鼠 » AQS(AbstractQueuedSynchronized)-抽象的队列同步器

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

网友最新评论 (2)

  1. acquireQueued:地球不爆炸我就不放假
    wa5年前 (2019-10-10)回复
    • 把地球换成CPU和内存
      迷路的老鼠5年前 (2019-10-11)回复