JAVA并发终章-核心源码调试

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

前言

**全篇高能非战斗人员请撤离非战斗人员请撤离非战斗人员请撤离**本章将会调试juc源码将整个过程串起来作为并发终章后面不会再更新并发章节有调整除外

源码调试

ReentrantLock 显式锁

前文说过除非对锁要做操作不然不建议用显式锁但是这并不代表我们可以不懂他的机制更不允许不懂他的源码
ReentrantLock实现了Lock接口。ReentrantLock中包含了Sync对象而且Sync是AQS的子类其中有之前说的公平锁和闯入锁前面贴了部分源码之前也讲过ReentrantLock是个独占锁。默然是非公平的。现在就从源码上来了解。

    public void lock() {
        sync.lock();
    }
     abstract void lock();

从这里可以看出ReentrantLock默认是使用闯入锁的

  public ReentrantLock() {
        sync = new NonfairSync();
    }

闯入锁

 static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

首先闯入锁做了一次cas动作如果cas成功那么

 protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

当前线程可以获得执行权并且设置排它如果没有获得就 acquire(1);

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire()

   protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
  final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

非常简单的代码大致意思是如果state是0那么当前线程获得锁如果当前线程就是执行的线程那么状态加1,1就是传递的参数。如果不是当前线程持有锁那么返回false。

addWaiter

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

比较简单意思是加入队列并且将当前线程插入到等待队列中EXCLUSIVE是排他节点表示独占。和tryAcquire连起来就是如果获得锁失败了那么就会加入到等待队列中去排队。

acquireQueued

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            	// //获得该node的前置节点
                final Node p = node.predecessor();
                // 如果上一个是头节点那么再抢夺一次如果抢夺成功那么就设置自己为哑结点头
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

果前置节点为SIGNAL可以理解为成功意味着只需要等待其他前置节点的线程被释放并且挂起当前线程如果上一个线程被取消了状态大于0是1那么循环这个双向队列移除掉已经取消的线程如果状态是其他当前的线程设置状态SIGNAL(-1)。

parkAndCheckInterrupt

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

使用LockSupport.park挂起当前线程编程WATING状态并且检查这个线程的中断状态

selfInterrupt

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

设置中断标志位为true当线程满足两个条件阻塞状态和中断标志为ture则会抛出InterruptedException异常。

所以整个闯入锁的流程应该是lock了之后立马去争夺一次锁争夺成功了OK争夺失败了之后查看当前线程状态如果线程状态是0那么争夺如果是自己则state加1否则乖乖去排队。如果上一个节点是哑结点难么再次争夺。把自己设置为头节点否则把自己挂起。

公平锁

      final void lock() {
            acquire(1);
        }

由此可见公平锁和闯入锁的差距就是在于lock的时候闯入锁闯入了一次而公平锁直接acquire(1)进去看看。

 protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

和闯入锁区别在于多了一次检测看看这次检测干了什么又在检测什么

public final boolean hasQueuedPredecessors() {
    //读取头节点
    Node t = tail; 
   //读取尾节点
    Node h = head;
    //s是首节点h的后继节点
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

h != t当头节点和尾节点相等时才会返回false。
1.头节点和尾节点都为null空队列不需要排队。
2.头节点和尾节点不为null但是相等说明头节点和尾节点都指向一个元素表示队列中只有一个节点不需要排队。

s.thread != Thread.currentThread()
首节点的后继节点如果是自己就不需要排队不是就要排队。

如果说检测成功是首节点或者是首节点的后继节点尝试去拥有锁。后续和闯入锁一样不再赘述了。

其实闯入锁相对于公平锁多了两次先机第一次是请求的时候闯一次第二次是公平锁需要检测前面是不是首节点如果是就不抢了而闯入锁是直接闯其他一模一样。

锁的释放

 public void unlock() {
        sync.release(1);
    }
 public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

如果当前线程不是独占的直接抛出异常如果状态是0释放将独占锁放掉这里有个有趣的现象如果连上两次锁再解一次实际上是解不开的。
unparkSuccessor代码过于简单。

private void unparkSuccessor(Node node) { 
	int ws = node.waitStatus;//获得head节点的状态 
	if (ws < 0) 
	compareAndSetWaitStatus(node, ws, 0);// 
	//设置head节点 状态为0 
	Node s = node.next;//得到head节点的下一个节点 
	if (s == null || s.waitStatus > 0) { //如果下一个节点为null或者status>0表示cancelled状态. 
		//通过从尾部节点开始扫描找到距离head最近的一个
		s = null; 
		for (Node t = tail; t != null && t != node; t =	t.prev) 
		if (t.waitStatus <= 0) 
		s = t; 
	} 
	if (s != null) //next节点不为空直接唤醒这个线程即可 
	LockSupport.unpark(s.thread); 
}

值得一提的是从尾部遍历实际上是怕尾部分叉。因为此时并发环境下可能会有其他线程入队。而此时如果A在cas往后插B也插此时C来释放那不是分叉了吗故而从尾部插入

tryRelease

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

如果当前线程不是独占的直接抛出异常如果状态sh

读写锁

我们刚刚对显式独占锁进行了源码的剖析现在讲一下读写锁。然后再逐渐深入AQS。

ReentrantReadWriteLock

先看类

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
      ......
}

首先他实现了ReadWriteLock接口去看看

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}

这个接口比较简单就是一个读锁一个写锁。看样子是获得两个锁先方法。

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
    

很简单就是new 了两个锁另外使用的默认是闯入锁。
看看 ReadLock和 WriteLock

readLock

  protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
  public void lock() {
            sync.acquireShared(1);
        }

获取共享锁参数是1看看

 public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

tryAcquireShared

protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

AQS里边只有一个state来记录锁的状态那怎么区分读锁和写锁呢这里用到了二进制。将state转化为二进制有32位左边高16位记录读锁数量包括重入的读锁数量右边低16位记录写锁数量包括重入的写锁数量。

  static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
 if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;

意思是如果有独占锁并且独占锁不是自己那么去排队把。

sharedCount

 abstract boolean readerShouldBlock();

公平锁

final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }

这个代码之前看过意思是是否是首节点或者是其后置节点。

闯入锁

final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            return apparentlyFirstQueuedIsExclusive();
        }

非公平锁中判断是否阻塞读锁请求就是看队列中是否有等待的写锁线程。

然后再看是否读锁数量已经到达上限如果没有夺锁。

  if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } 

简单意思就是看看是不是第一个如果是把第一个读者设置为自己如果当前没有读者设置为自己。

else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }

其实就是叠加读者次数。

fullTryAcquireShared

  final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

这段代码看起来比较复杂慢慢分析其实很简单。
首先有个自旋操作。

if (exclusiveCount(c) != 0) {
...
}

如果当前独占锁的数目不是0有写锁

 if (getExclusiveOwnerThread() != current)
                        return -1;

独占锁不是自己那么失败

else if (readerShouldBlock()) {
...
}

当前读者应该被阻塞里面的代码是

  // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }

取到缓存中最后一次获取到读锁的计数器计数为 0说明当前线程没有获取到过读锁为了垃圾回收考虑干掉readHolds如果rh不是空但是count是0那么说明当前线程不是第一个读者也没有读者说明当前的根本不是读锁就去排队。继续分析后面代码。

if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");

读者数达到最大那么抛出异常。简单的。

 if (compareAndSetState(c, c + SHARED_UNIT)) {
                   if (sharedCount(c) == 0) {
                       firstReader = current;
                       firstReaderHoldCount = 1;
                   } else if (firstReader == current) {
                       firstReaderHoldCount++;
                   } else {
                       if (rh == null)
                           rh = cachedHoldCounter;
                       if (rh == null || rh.tid != getThreadId(current))
                           rh = readHolds.get();
                       else if (rh.count == 0)
                           readHolds.set(rh);
                       rh.count++;
                       cachedHoldCounter = rh; // cache for release
                   }
                   return 1;
               }
           }

这个和上面的已经重复。不在赘述。

tryAcquireShared方法分析完了来总结一下一个读请求想要获取到共享锁先看看有没有独占的如果有那么就阻塞如果最大数目达到了报异常如果当前是第一个设为第一个读者当前不是那么就累加。如果条件没有满足执行fullTryAcquireShared表示全力再试一次里面的逻辑是判断是否有写锁占用然后如果已经被阻塞了那么确定一下是否可以再次进入如果不能就阻塞能就执行一样的逻辑。

if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);

如果不可以共享执行
doAcquireShared

  private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这个代码很简单意思是以共享模式把这个节点加入等候队列如果是前驱节点是头结点就再试一次如果不是就挂起简单的。其中有个方法

setHeadAndPropagate

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

意思是把当前节点设置为头节点如果当前节点为空或者状态不对那么获取下一个节点如果下一个节点空了释放

    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

这意思是 反复获取头节点如果头结点没有变化就退出循环进去If代表这里不止一个节点如果状态为SIGNAL就唤醒后面的节点如果状态为0说明h的后继所代表的线程已经被唤醒或即将被唤醒。

那么把读锁的逻辑连起来整个过程非常清晰就是如果读锁可以占用则占用如果读锁不能占用如果前驱节点就是头节点尝试占用占用成功并且唤醒后面的节点如果不是则挂起

writelock
写锁的逻辑和独占锁一模一样不需要赘述。

读锁释放

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

doReleaseShared不在分析前面有。说一下括号里面的


        protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

AQSAbstractQueuedSynchronizer

在同步组件的实现中AQS是核心部分同步组件的实现者通过使用AQS提供的模板方法实现同步组件语义AQS则实现了对同步状态的管理以及对阻塞线程进行排队等待通知等等一些底层的实现处理。

AQS里有一个专门描述同步状态的变量

private volatile int state;

state是一个可见的状态值一般0表示锁还没有被用1表示占用1+表示重入次数。

AQS中的东西其实上面都分析过了

// 独占锁获取
void acquire(int arg)
//与acquire方法相同但在同步队列中进行等待的时候可以检测中断
void acquireInterruptibly(int arg)
//在acquireInterruptibly基础上增加了超时等待功能在超时时间内没有获得同步状态返回false;
boolean tryAcquireNanos(int arg, long nanosTimeout)
//释放同步状态该方法会唤醒在同步队列中的下一个节点
boolean release(int arg)
//共享式获取同步状态与独占式的区别在于同一时刻有多个线程获取同步状态
void acquireShared(int arg)
//在acquireShared方法基础上增加了能响应中断的功能
void acquireSharedInterruptibly(int arg)
//在acquireSharedInterruptibly基础上增加了超时等待的功能
boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
//共享式释放同步状态
boolean releaseShared(int arg)

不在赘述。主要讲一下AQS中的队列和Condition队列

abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    private static final long serialVersionUID = 7373984972572414691L;
    protected AbstractQueuedSynchronizer() {}
    static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED = 1;//节点从同步队列中取消
        static final int SIGNAL = -1;//后继节点的线程处于等待状态如果当前节点释放同步状态会通知后继节点使得后继节点的线程能够运行
        static final int CONDITION = -2;//当前节点进入等待队列中
        static final int PROPAGATE = -3;//表示下一次共享式同步状态获取将会无条件传播下去
        volatile int waitStatus //节点状态
        volatile Node prev //当前节点/线程的前驱节点
        volatile Node next; //当前节点/线程的后继节点
        volatile Thread thread;//加入同步队列的线程引用
        Node nextWaiter;//等待队列中的下一个节点
    }
    //队列的头指针
    private transient volatile Node head;
    //队列的尾指针
    private transient volatile Node tail;
    private volatile int state;
}

其实就是个双向队列里面每一个节点里有一个线程和它的状态。就这么简单。

condition

condition是一个接口。

public interface Condition {

    /**
     * Causes the current thread to wait until it is signalled or
     * {@linkplain Thread#interrupt interrupted}.
     *
     * <p>The lock associated with this {@code Condition} is atomically
     * released and the current thread becomes disabled for thread scheduling
     * purposes and lies dormant until <em>one</em> of four things happens:
     * <ul>
     * <li>Some other thread invokes the {@link #signal} method for this
     * {@code Condition} and the current thread happens to be chosen as the
     * thread to be awakened; or
     * <li>Some other thread invokes the {@link #signalAll} method for this
     * {@code Condition}; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
     * current thread, and interruption of thread suspension is supported; or
     * <li>A &quot;<em>spurious wakeup</em>&quot; occurs.
     * </ul>
     *
     * <p>In all cases, before this method can return the current thread must
     * re-acquire the lock associated with this condition. When the
     * thread returns it is <em>guaranteed</em> to hold this lock.
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * and interruption of thread suspension is supported,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared. It is not specified, in the first
     * case, whether or not the test for interruption occurs before the lock
     * is released.
     *
     * <p><b>Implementation Considerations</b>
     *
     * <p>The current thread is assumed to hold the lock associated with this
     * {@code Condition} when this method is called.
     * It is up to the implementation to determine if this is
     * the case and if not, how to respond. Typically, an exception will be
     * thrown (such as {@link IllegalMonitorStateException}) and the
     * implementation must document that fact.
     *
     * <p>An implementation can favor responding to an interrupt over normal
     * method return in response to a signal. In that case the implementation
     * must ensure that the signal is redirected to another waiting thread, if
     * there is one.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         (and interruption of thread suspension is supported)
     */
    void await() throws InterruptedException;

    /**
     * Causes the current thread to wait until it is signalled.
     *
     * <p>The lock associated with this condition is atomically
     * released and the current thread becomes disabled for thread scheduling
     * purposes and lies dormant until <em>one</em> of three things happens:
     * <ul>
     * <li>Some other thread invokes the {@link #signal} method for this
     * {@code Condition} and the current thread happens to be chosen as the
     * thread to be awakened; or
     * <li>Some other thread invokes the {@link #signalAll} method for this
     * {@code Condition}; or
     * <li>A &quot;<em>spurious wakeup</em>&quot; occurs.
     * </ul>
     *
     * <p>In all cases, before this method can return the current thread must
     * re-acquire the lock associated with this condition. When the
     * thread returns it is <em>guaranteed</em> to hold this lock.
     *
     * <p>If the current thread's interrupted status is set when it enters
     * this method, or it is {@linkplain Thread#interrupt interrupted}
     * while waiting, it will continue to wait until signalled. When it finally
     * returns from this method its interrupted status will still
     * be set.
     *
     * <p><b>Implementation Considerations</b>
     *
     * <p>The current thread is assumed to hold the lock associated with this
     * {@code Condition} when this method is called.
     * It is up to the implementation to determine if this is
     * the case and if not, how to respond. Typically, an exception will be
     * thrown (such as {@link IllegalMonitorStateException}) and the
     * implementation must document that fact.
     */
    void awaitUninterruptibly();

    /**
     * Causes the current thread to wait until it is signalled or interrupted,
     * or the specified waiting time elapses.
     *
     * <p>The lock associated with this condition is atomically
     * released and the current thread becomes disabled for thread scheduling
     * purposes and lies dormant until <em>one</em> of five things happens:
     * <ul>
     * <li>Some other thread invokes the {@link #signal} method for this
     * {@code Condition} and the current thread happens to be chosen as the
     * thread to be awakened; or
     * <li>Some other thread invokes the {@link #signalAll} method for this
     * {@code Condition}; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
     * current thread, and interruption of thread suspension is supported; or
     * <li>The specified waiting time elapses; or
     * <li>A &quot;<em>spurious wakeup</em>&quot; occurs.
     * </ul>
     *
     * <p>In all cases, before this method can return the current thread must
     * re-acquire the lock associated with this condition. When the
     * thread returns it is <em>guaranteed</em> to hold this lock.
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * and interruption of thread suspension is supported,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared. It is not specified, in the first
     * case, whether or not the test for interruption occurs before the lock
     * is released.
     *
     * <p>The method returns an estimate of the number of nanoseconds
     * remaining to wait given the supplied {@code nanosTimeout}
     * value upon return, or a value less than or equal to zero if it
     * timed out. This value can be used to determine whether and how
     * long to re-wait in cases where the wait returns but an awaited
     * condition still does not hold. Typical uses of this method take
     * the following form:
     *
     *  <pre> {@code
     * boolean aMethod(long timeout, TimeUnit unit) {
     *   long nanos = unit.toNanos(timeout);
     *   lock.lock();
     *   try {
     *     while (!conditionBeingWaitedFor()) {
     *       if (nanos <= 0L)
     *         return false;
     *       nanos = theCondition.awaitNanos(nanos);
     *     }
     *     // ...
     *   } finally {
     *     lock.unlock();
     *   }
     * }}</pre>
     *
     * <p>Design note: This method requires a nanosecond argument so
     * as to avoid truncation errors in reporting remaining times.
     * Such precision loss would make it difficult for programmers to
     * ensure that total waiting times are not systematically shorter
     * than specified when re-waits occur.
     *
     * <p><b>Implementation Considerations</b>
     *
     * <p>The current thread is assumed to hold the lock associated with this
     * {@code Condition} when this method is called.
     * It is up to the implementation to determine if this is
     * the case and if not, how to respond. Typically, an exception will be
     * thrown (such as {@link IllegalMonitorStateException}) and the
     * implementation must document that fact.
     *
     * <p>An implementation can favor responding to an interrupt over normal
     * method return in response to a signal, or over indicating the elapse
     * of the specified waiting time. In either case the implementation
     * must ensure that the signal is redirected to another waiting thread, if
     * there is one.
     *
     * @param nanosTimeout the maximum time to wait, in nanoseconds
     * @return an estimate of the {@code nanosTimeout} value minus
     *         the time spent waiting upon return from this method.
     *         A positive value may be used as the argument to a
     *         subsequent call to this method to finish waiting out
     *         the desired time.  A value less than or equal to zero
     *         indicates that no time remains.
     * @throws InterruptedException if the current thread is interrupted
     *         (and interruption of thread suspension is supported)
     */
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    /**
     * Causes the current thread to wait until it is signalled or interrupted,
     * or the specified waiting time elapses. This method is behaviorally
     * equivalent to:
     *  <pre> {@code awaitNanos(unit.toNanos(time)) > 0}</pre>
     *
     * @param time the maximum time to wait
     * @param unit the time unit of the {@code time} argument
     * @return {@code false} if the waiting time detectably elapsed
     *         before return from the method, else {@code true}
     * @throws InterruptedException if the current thread is interrupted
     *         (and interruption of thread suspension is supported)
     */
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    /**
     * Causes the current thread to wait until it is signalled or interrupted,
     * or the specified deadline elapses.
     *
     * <p>The lock associated with this condition is atomically
     * released and the current thread becomes disabled for thread scheduling
     * purposes and lies dormant until <em>one</em> of five things happens:
     * <ul>
     * <li>Some other thread invokes the {@link #signal} method for this
     * {@code Condition} and the current thread happens to be chosen as the
     * thread to be awakened; or
     * <li>Some other thread invokes the {@link #signalAll} method for this
     * {@code Condition}; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
     * current thread, and interruption of thread suspension is supported; or
     * <li>The specified deadline elapses; or
     * <li>A &quot;<em>spurious wakeup</em>&quot; occurs.
     * </ul>
     *
     * <p>In all cases, before this method can return the current thread must
     * re-acquire the lock associated with this condition. When the
     * thread returns it is <em>guaranteed</em> to hold this lock.
     *
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * and interruption of thread suspension is supported,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared. It is not specified, in the first
     * case, whether or not the test for interruption occurs before the lock
     * is released.
     *
     *
     * <p>The return value indicates whether the deadline has elapsed,
     * which can be used as follows:
     *  <pre> {@code
     * boolean aMethod(Date deadline) {
     *   boolean stillWaiting = true;
     *   lock.lock();
     *   try {
     *     while (!conditionBeingWaitedFor()) {
     *       if (!stillWaiting)
     *         return false;
     *       stillWaiting = theCondition.awaitUntil(deadline);
     *     }
     *     // ...
     *   } finally {
     *     lock.unlock();
     *   }
     * }}</pre>
     *
     * <p><b>Implementation Considerations</b>
     *
     * <p>The current thread is assumed to hold the lock associated with this
     * {@code Condition} when this method is called.
     * It is up to the implementation to determine if this is
     * the case and if not, how to respond. Typically, an exception will be
     * thrown (such as {@link IllegalMonitorStateException}) and the
     * implementation must document that fact.
     *
     * <p>An implementation can favor responding to an interrupt over normal
     * method return in response to a signal, or over indicating the passing
     * of the specified deadline. In either case the implementation
     * must ensure that the signal is redirected to another waiting thread, if
     * there is one.
     *
     * @param deadline the absolute time to wait until
     * @return {@code false} if the deadline has elapsed upon return, else
     *         {@code true}
     * @throws InterruptedException if the current thread is interrupted
     *         (and interruption of thread suspension is supported)
     */
    boolean awaitUntil(Date deadline) throws InterruptedException;

    /**
     * Wakes up one waiting thread.
     *
     * <p>If any threads are waiting on this condition then one
     * is selected for waking up. That thread must then re-acquire the
     * lock before returning from {@code await}.
     *
     * <p><b>Implementation Considerations</b>
     *
     * <p>An implementation may (and typically does) require that the
     * current thread hold the lock associated with this {@code
     * Condition} when this method is called. Implementations must
     * document this precondition and any actions taken if the lock is
     * not held. Typically, an exception such as {@link
     * IllegalMonitorStateException} will be thrown.
     */
    void signal();

    /**
     * Wakes up all waiting threads.
     *
     * <p>If any threads are waiting on this condition then they are
     * all woken up. Each thread must re-acquire the lock before it can
     * return from {@code await}.
     *
     * <p><b>Implementation Considerations</b>
     *
     * <p>An implementation may (and typically does) require that the
     * current thread hold the lock associated with this {@code
     * Condition} when this method is called. Implementations must
     * document this precondition and any actions taken if the lock is
     * not held. Typically, an exception such as {@link
     * IllegalMonitorStateException} will be thrown.
     */
    void signalAll();
}

他有一个实现类叫做ConditionObject

conditionObject是通过基于单链表的条件队列来管理等待线程的。线程在调用await方法进行等待时会释放同步状态。同时线程将会被封装到一个等待节点中并将节点置入条件队列尾部进行等待。当有线程在获取独占锁的情况下调用signal或singalAll方法时队列中的等待线程将会被唤醒重新竞争锁。另外需要说明的是一个锁对象可同时创建多个 ConditionObject 对象这意味着多个竞争同一独占锁的线程可在不同的条件队列中进行等待。在唤醒时可唤醒指定条件队列中的线程。

方法

await

   public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

一点点来分析

if (Thread.interrupted())
                throw new InterruptedException();

如果线程中断抛出异常

 Node node = addConditionWaiter();

向尾部添加

 private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

完全释放

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

众所周知wait方法和sleep方法的不同是wait的时候会释放锁这里其实也是一样的首先获取同步锁的状态值然后调用 release 释放指定数量的同步状态。

  public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

这里的代码之前都看过不在赘述意思就是释放锁释放完了就完事返回释放的状态值。

isOnSyncQueue

 final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }

如果节点的状态为CONDITION 或者prev没有那么节点就不在队列上就这样去判断节点是不是在队列里。

private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

 while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }

如果节点不在队列则阻塞当前线程

   if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);

acquireQueued之前分析过了。
连起来整个过程就是当前线程已经中断则抛出中断异常将当前线程加入到条件队列中去释放当前线程所占用的锁并保存当前锁的状态然后等待唤醒如果说跳出了循环那么一定是被唤醒了。

signal

public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

isHeldExclusively

protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

这个代码的意思是判断当前独占锁的线程是不是当前线程如果不是那就抛出异常。获取头节点对他进行唤醒操作。

 private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

transferForSignal

 final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

通过transferForSignal方法将first节点从条件队列转移到同步队列当中然后将first节点从条件队列当中踢出去。

串起来就是唤醒就是把condition队列的元素踢到同步队列去争夺锁

doSignalAll就是唤醒所有的条件队列里的线程。

结束

至此。JAVA并发章节全部结束后续不会再更新。
整个源码部分不算太难基本跑一遍看一遍就OK。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Java
返回列表

上一篇:WebAssembly

下一篇:SIMD性能优化