Java concurrent programming-concurrent process control and AQS principle and related source code analysis

Java concurrent programming-concurrent process control and AQS principle and related source code analysis

Reprinted from: juejin.im/post/684490...

Java concurrent programming

Code GitHub address github.com/imyiren/con...

  1. How many ways are there to understand how to create a thread?
  2. How to correctly start and stop a thread best practice and source code analysis
  3. Multi-case understanding of Object's wait, notify, notifyAll and Thread's sleep, yield, join and other methods
  4. Understand thread attributes, how to handle sub-thread exceptions
  5. Multi-thread safety and performance issues
  6. Principle and Application of JMM (Java Memory Model) in Concurrency
  7. In-depth understanding of deadlock problems and their solutions
  8. Analyze the use and composition of thread pools
  9. Take you an article to understand the usage and internal principles of ThreadLocal
  10. Detailed explanation of the classification and characteristics of Lock under JUC (combined case and source code)
  11. Use of various Atomic classes under JUC and analysis of CAS related source code
  12. Analyze the principle of ConcurrenthashMap and CopyOnWriteArrayList combined with source code
  13. Concurrent process control and AQS principle and related source code analysis

0. Main content

  • The article is divided into two parts:
  1. The first part mainly talks about the use and cases of various types of concurrent process control
  2. The second part is mainly to first analyze the composition and principle of AQS, and then analyze the source code logic with CountDownLatch, Semaphore, etc.

ps: The content of the article is more

1. Concurrent process control

1.1 What is concurrent process control

  • Concurrent process control is to let threads cooperate with each other to complete tasks to meet business logic
  • Such as: let thread A wait for thread B to complete before executing other strategies

1.2 Tools for concurrent process control

class effect Description
Semaphore Semaphore: You can control the number of "allows" to ensure coordination between threads The thread can continue to run only if it has the permission
CyclicBarrier Loop fence: The thread will wait until enough threads reach the specified number, and then perform the next task Suitable for scenarios where threads wait for processing results to be ready
Phaser Similar to CyclicBarrier, but the count is variable New classes added by java7
CountDownLatch It is also a counting waiting correlation, when the quantity sees 0, the action is triggered Not reusable
Exchanger Let two threads exchange objects when appropriate Applicable to two threads working on different instances of the same class, used to exchange data
Condition You can control the waiting and wake-up of threads Is an upgraded version of Object.wati()

2. CountDownLatch

2.1 Function

  • Concurrent process control tool, used to wait for the number (we set) to be sufficient before performing certain tasks

2.2 Main methods

  • CountDownLatch(int count): There is only one construction method, the parameter count is the value that needs to be counted down
  • await(): The thread calling this method will be suspended, it will wait until the count value is zero before continuing to execute
  • countdown(): Talk count minus 1, until 0, the waiting thread will be awakened

2.3 Usage 1: Wait for the thread to finish executing

/**
 * @author yiren
 */
public class CountDownLatchExample01 {
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger integer = new AtomicInteger(1);
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 5; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName()+ " produce ....");
                    TimeUnit.SECONDS.sleep(1);
                    integer.incrementAndGet();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    latch.countDown();
                }
            });
        }
        System.out.println(Thread.currentThread().getName() + " waiting....");
        latch.await();
        System.out.println(Thread.currentThread().getName() + " finished!");
        System.out.println(Thread.currentThread().getName() + " num: " +  integer.get());
        executorService.shutdown();
    }
}
  
pool-1-thread-1 produce ....
pool-1-thread-2 produce ....
pool-1-thread-3 produce ....
main waiting....
pool-1-thread-4 produce ....
pool-1-thread-5 produce ....
main finished!
main num: 6

Process finished with exit code 0
  

2.4 Usage 2: Wait one more time

/**
 * @author yiren
 */
public class CountDownLatchExample02 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 5; i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " ready!");
                try {
                    latch.await();
                    System.out.println(Thread.currentThread().getName()+ " produce ....");
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    latch.countDown();
                }
            });
        }
        Thread.sleep(10);
        System.out.println(Thread.currentThread().getName() + " ready!");
        latch.countDown();
        System.out.println(Thread.currentThread().getName() + " go!");

        executorService.shutdown();
    }
}
  
pool-1-thread-1 ready!
pool-1-thread-4 ready!
pool-1-thread-3 ready!
pool-1-thread-2 ready!
pool-1-thread-5 ready!
main ready!
main go!
pool-1-thread-1 produce ....
pool-1-thread-2 produce ....
pool-1-thread-5 produce ....
pool-1-thread-3 produce ....
pool-1-thread-4 produce ....

Process finished with exit code 0
  

2.4 Attention

  • CountDownLatchNot only can you wait indefinitely, but you can also give parameters to wake up the thread to continue execution if you wait within the specified event

  • boolean await(long timeout, TimeUnit unit)
      
  • CountDownLatchCan not be reused, if it involves re-counting, it can be used CyclicBarrieror newly createdCountDownLatch

3. Semaphore semaphore

3.1 Semaphore function

  • SemaphoreCan be used to limit or manage a limited number of resource usage

  • The lease of a semaphore is to maintain a license count, the thread can obtain the license, and then the semaphore is decremented by one; the thread can also release the license, and the semaphore is incremented by one; if the license of the semaphore is issued, other threads need to wait to obtain it. Until another thread releases the license.

3.2 Semaphore use

  1. Initialize Semaphore to specify the number of licenses
  2. Add acquire()or acquireUniterruptibly()method in front of the code that needs to be licensed
  3. After the task is executed, there is a call to release()release permission

3.3 Main methods

  • Semaphore(int permits, boolean fair)Set the number of licenses here, and whether to use a fair policy.
    • If it is passed in for truethat long, wait for the thread to be put into the FIFO queue.
  • aquire()Request permission, can respond to interrupt
  • aquireUnniterruptibly()Request permission can not be interrupted
  • tryAcquire()Check to see if there is any free permission, and return if there is one true; this method can also set a waiting time to one timeout, allowing the thread to wait for a period of time.
  • release()Release license

3.4 Case demonstration

/**
 * @author yiren
 */
public class SemaphoreExample01 {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3, true);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 8; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName()+" start to get permit");
                    semaphore.acquire();
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + " " + LocalDateTime.now() +" finished!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();
                }
            });
        }
        executorService.shutdown();
    }
}
  
pool-1-thread-1 start to get permit
pool-1-thread-4 start to get permit
pool-1-thread-3 start to get permit
pool-1-thread-2 start to get permit
pool-1-thread-5 start to get permit
pool-1-thread-6 start to get permit
pool-1-thread-7 start to get permit
pool-1-thread-8 start to get permit
pool-1-thread-3 2020-02-21T19:54:47.392 finished!
pool-1-thread-1 2020-02-21T19:54:47.392 finished!
pool-1-thread-4 2020-02-21T19:54:47.392 finished!
pool-1-thread-6 2020-02-21T19:54:49.396 finished!
pool-1-thread-2 2020-02-21T19:54:49.396 finished!
pool-1-thread-5 2020-02-21T19:54:49.396 finished!
pool-1-thread-8 2020-02-21T19:54:51.401 finished!
pool-1-thread-7 2020-02-21T19:54:51.401 finished!

Process finished with exit code 0
  

3.5 Points to note

  • The licenses acquired and released must be the same. Both acquire and release can pass in values to determine the number of acquisitions and releases. If our acquisition and release are inconsistent, it will easily lead to program bugs. Of course, it is not absolute, unless there are special business needs, all get and release settings are the same
  • Note that the fairness is set when initializing Semaphore, it is generally reasonable to set it to true. If the queue jump situation is serious, some threads may be blocked all the time
  • Obtaining and releasing permission is not required for the thread, and thread A can release it when thread A acquires it.

4. Condition interface

4.1 Function

  • When thread A needs to wait for a certain task or a certain resource, it can execute the condition.await()method, and then it will fall into a blocked state.

  • At this time, another thread B, to obtain resources or perform tasks is completed, call condition.signal()or signalAll()method, notify thread A, continue execution

  • This is similar to object.wait(), notify(),notifyAll()

  • signal()If the method encounters multiple threads waiting, it will wake up the one with the longest waiting time

  • ReentrantLockYou can directly create a new one among us Condition. Look at the following case

4.2 Case demonstration

  • Common usage
/**
 * @author yiren
 */
public class ConditionExample01 {
    private static ReentrantLock lock = new ReentrantLock();
    private static Condition condition = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(() -> {
            task1();
        });
        Thread thread2 = new Thread(() -> {
            task2();
        });
        thread1.start();
        Thread.sleep(100);
        thread2.start();
    }

    private static void task1() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " start await()");
            condition.await();
            System.out.println(Thread.currentThread().getName() + " await finished!");
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private static void task2() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " start signal()");
            Thread.sleep(1000);
            condition.signal();
            System.out.println(Thread.currentThread().getName() + " signal finished!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
  
Thread-0 start await()
Thread-1 start signal()
Thread-1 signal finished!
Thread-0 await finished!

Process finished with exit code 0
  
  • Producer consumer model
/**
 * @author yiren
 */
public class ConditionExample02 {

    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionExample02 conditionDemo2 = new ConditionExample02();
        Producer producer = conditionDemo2.new Producer();
        Consumer consumer = conditionDemo2.new Consumer();
        producer.start();
        consumer.start();
    }

    class Consumer extends Thread {

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        System.out.println(" ");
                        try {
                            notEmpty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    notFull.signalAll();
                    System.out.println(" " + queue.size() + " ");
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Producer extends Thread {

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        System.out.println(" ");
                        try {
                            notFull.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    notEmpty.signalAll();
                    System.out.println(" " + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }

}
  
  • The above two conditions are used as a notification delivery tool for queue full and empty to communicate between producers and consumers.

4.3 Points to note

  • We know that it Lockcan be regarded as synchronizedan alternative, but it Conditionis used to replace object.wait/notify, and the usage is almost the same.

  • The lock await()must be held when the method is called Lock, otherwise an exception will be thrown and the await()method will release the currently held Locklock.

  • A Locklock can have multiple Conditionmore flexible

5. CyclicBarrier

5.1 Function

  • CyclicBarrier loop fence and CountDownLatch are very similar, can block a group of threads
  • When multiple threads are required to complete the task and finally need to be summarized in a unified manner, we can use CyclicBarrier. When a thread completes the task, it will wait first, wait until all threads have performed the task, and then continue to execute the rest. Task
    • For example: At the same time, I went out to dinner and made an appointment at the company, and waited for everyone to walk over to the company.
  • But note that CyclicBarrier can be reused, this is different from CountDownLatch

5.2 Case

/**
 * @author yiren
 */
public class CyclicBarrierExample {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("   ");
            }
        });
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(i, cyclicBarrier)).start();
        }
    }

    static class Task implements Runnable {
        private int id;
        private CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println(" " + id + " ");
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println(" " + id + " ");
                cyclicBarrier.await();
                System.out.println(" " + id + " ");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
}
  
 0 
 2 
 3 
 1 
 4 
 5 
 6 
 7 
 8 
 9 
 3 
 9 
 8 
 4 
 5 
   
 5 
 3 
 8 
 4 
 9 
 1 
 6 
 0 
 7 
 2 
   
 2 
 1 
 7 
 0 
 6 

Process finished with exit code 0

  
  • After every five people arrive, a batch will be set off

5.3 Difference between CountDownLatch and CyclicBarrier`

  • Different roles: CountDownLatchuse countDown()is for events, and CyclicBarrieruse await()is for threads
  • Reusability different: CountDownLatchit can not be reused after the countdown to zero again unless you create a new object; and CyclicBarriercan be directly reused

6. Deep AQS to understand the foundation of JUC

6.1 The role and importance of AQS

  • AQS is used in CountDownLatchother tools, its full name is: it AbstractQueuedSynchronizeris an abstract class

  • The lock and the thread concurrency control class ( Semaphoreetc.) above have similarities. In fact, they all use AQS as an extension of the base class at the bottom.

  • Because many of their tasks are similar, JDK has extracted this part of the general logic and provided them for direct use, so that they do not have to pay attention to many deep-level details to complete their functions.

  • We can take a look at the internal implementation of these concurrency control tool classes and locks used in our locks.

    • `Semaphore``
    public class Semaphore implements java.io.Serializable {
        private static final long serialVersionUID = -3222578661600680210L;
        private final Sync sync;
    
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 1192457210091910933L;
    
            Sync(int permits) {
                setState(permits);
            }
    	......
      
    • ReentrantLock
    public class ReentrantLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = 7373984872572414699L;
        private final Sync sync;
    
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -5179523762034025860L;
    		......
      
    • CountDownLatch
    public class CountDownLatch {
    
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    		......
      
  • From the source code we can see that there is an internal class in it, Syncinherited fromAbstractQueuedSynchronizer

  • So what is AQS used for?

    • JUC is basically implemented based on AQS. AQS is a framework for building locks, synchronizers, and thread collaboration tools for use by subclasses. It is mainly designed using template mode.
    • Its main job is to manage the blocking and wake-up of threads, realize synchronization management, and queue management of blocked threads

6.2 The composition and internal principles of AQS

  • AbstractQueuedSynchronizerSince JDK1.5 was added, it is a basic framework for synchronizers based on the FIFO waiting queue.
  • JDK1.8 inherits the classes implemented by AQS:

  • We can see that in reentrant locks, read-write locks, counting door latches, etc., semaphores are all used subclasses of AQS. Next, we will learn the internal principles of AQS.
  1. The three parts of AQS

    • state: state,

    • FIFO queue: management queue for thread competition lock

    • Acquisition and release methods: methods that need to be implemented by tool classes

  2. state: state

       /**
         * The synchronization state.
         */
        private volatile int state;
      
    • Its meaning is not specific, and varies according to different implementations. For example, the Semaphoreinner is the number of remaining licenses and the CountDownLatchinner is the number that needs to be counted down. It can be regarded as a counter, but the functions and meanings of different types are not used.
        protected final boolean compareAndSetState(int expect, int update) {
           //See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
      
    • The update of the status value is done using Unsafe's CAS
    • In ReentrantLock: state represents the occupancy of the lock, and the reentrant count is incremented by one for each reentry. When the lock is to be released, its value will become 0, indicating that it is not occupied by any thread.
  3. FIFO queue:

       /**
         * Head of the wait queue, lazily initialized.  Except for
         * initialization, it is modified only via method setHead.  Note:
         * If head exists, its waitStatus is guaranteed not to be
         * CANCELLED.
         */
        private transient volatile Node head;
    
       /**
         * Tail of the wait queue, lazily initialized.  Modified only via
         * method enq to add new wait node.
         */
        private transient volatile Node tail;
    
      
    • This queue is used to store waiting threads, and AQS will manage this queue. When multiple threads compete for a lock, the one that does not get the lock will be turned into the queue. The thread that currently holds the lock will end, and AQS will select a thread from the queue to occupy the lock.
    • AQS maintains a waiting queue of a doubly linked list , and puts the waiting threads in this queue for management; the head node of the queue is the thread currently holding the lock; the head and tail nodes of this queue are stored in AQS.
  4. Methods of acquisition and release

    • Obtaining method:
      • The acquisition operation depends on the state variable and often blocks, such as: when the lock cannot be obtained, when the permission cannot be obtained, etc.
      • In ReentrantLock, it is to acquire the lock.state+1
      • In the Semaphoremiddle is to acquireobtain permission state-1, when state==0it will block
      • Being in CountDownLatchis the awaitmethod, just waitingstate==0
    • Release method:
      • The release operation will not block
      • In ReentrantLockit is the unlockmethod call release(1)correspondingstate-1
      • In the Semaphoremiddle is realease, toostate-1
      • CountDownLatchMiddle is the countDownmethod, alsostate-1
    • Under normal circumstances, the implementation class will implement tryAcquireand tryReleaserelated methods to correspond to the needs of each class

6.3 Usage of AQS

  1. Specify collaboration logic to implement acquisition and release methods
  2. Write a Sync class internally to inherit AQS
  3. The method of rewriting is determined according to whether it is exclusive or not: exclusive use tryAcquire/tryRelease, shared use tryAcquireShared(int acquires)/tryReleaseShared(int releases), call the Sync method in the acquisition and release related methods in the main logic

7. Source code analysis of AQS in CountDownLatch

  • Let's take CountDownLatchan example to analyze the source code:

  • Constructor

    • We see that the internal implementation is to initialize one Syncand pass in the count value
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
      
    • We can see below CountDownlatchin the Syncrealization of the construction method creates Syncan incoming countcall to a setStatemethod passed in AQSthe statemiddle
  • In CountDownLatchthe interior there is a succession of AQSSync

        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
               //Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
      
  • CountDownLatchThe getCount()method

    public long getCount() {
            return sync.getCount();
        }
      
    • We can see the getCountactual call is also Syncof getCount()acquiring stateand return
  • CountDownLatchThe countDown()method

        public void countDown() {
            sync.releaseShared(1);
        }
      
    • We look to it directly calls AQSthereleaseShared(1)
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
      
    • But releaseSharedit is CountDownLatchimplemented in the call backtryReleaseShared
            protected boolean tryReleaseShared(int releases) {
               //Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
      
    • In the tryReleaseSharedmiddle, it mainly performs -1 operation on the value of state. If the state is greater than zero, it can be obtained, then subtract one and use CAS to update the value concurrently. If the latest value is 0, it returns true.
    • After returning true doReleaseShared, the lock is released and the waiting threads in the queue are awakened. That is await(), the thread that called the method
  • CountDownLatchThe await()method

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
      
    • The awaitdefault will be called in to achieve AQSsync.acquireSharedInterruptibly(1);
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
      
    • And inside it is to call to tryAcquireShared(arg) < 0see if it is less than 0, if it is less than 0, it means that the lock is not acquired, and then call to doAcquireSharedInterruptibly(arg);join

    • And tryAcquireSharedit is in CountDownLatchthe Syncrealization of

            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
      
    • If the current state is 0 (that is, the count has reached 0 ), it will return a 1 acquireSharedInterruptiblyand the condition in the above method will not be satisfied, and it will be released. If it is not equal to 0, it will return -1. Join the team. Call doAcquireSharedInterruptiblymethod
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null;//help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
      
    • This method first addWaiterwraps the current thread into a Nodenode and adds it to the end of the queue; and this Nodenode is the node of the FIFO queue.
    • Then it will enter the loop. If the current node is not head, then it will enter the subsequent judgment. The important thing is parkAndCheckInterruptthat the method is as follows:
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
      
    • It calls LockSupportthe parkand this parkis to encapsulate Unsafethe native method park()to suspend the thread into the blocked state
        public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, 0L);
            setBlocker(t, null);
        }
      
    • It doesn't make sense to go further down. We only need to know that the doAcquireSharedInterruptiblymethod is to put the current thread in the blocking queue and block the thread to be OK.
  • CountDownLatchSome points used in AQS :

    • Call CountDownLatchof await()time, it will try to acquire a shared lock, beginning not obtain the lock, so he blocked
    • The condition that can be obtained is that the counter is 0, that is state==0, when.
    • Only every time the countDownmethod is called will the counter decrement by one, and when it reaches 0, it will go back and wake up the blocked thread.

8. Source code analysis of AQS in Semaphore

  • Since the above is very detailed, let s just a little bit more briefly

  • In Semaphorethe statenumber of licenses is

  • The main operations are acquire and release, and also use the operation of Sync on the state to control the blocking and awakening of threads

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
  
    public void release() {
        sync.releaseShared(1);
    }
  
  • Let's take a look acquireat acquireSharedInterruptiblythe method called as mentioned above.
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
  
  • There are two implementations of Sync in Semaphore: NonfairSync FairSync
  • There tryAcquireSharedwill be a hasQueuedPredecessorsjudgment in FairSync , if it is not the head node, then return -1, acquireSharedInterruptiblycall doAcquireSharedInterruptiblythe queue and block the thread in the method
        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
  
  • But NonfairSyncit is called directly SyncinnonfairTryAcquireShared

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
      
            final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
      
    • It can be seen that there is no judgment on whether the head node of the queue is blocked, and the value is directly obtained to judge whether the permission is sufficient.
  • And releasein the calling of AQS releaseShared, which is also calling Semaphorein Syncof tryReleaseSharedwhether judges need to release the lock, to wake blocked threads

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
      
  • tryReleaseShared

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current)//overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
  
  • We can see that this is about Semaphorethe release of the obtained license, stateadd it back and update it with CASstate

9. Application of AQS in ReentrantLock

  • The source code will not be analyzed

  • In ReentrantLock, it is statemainly the number of re-entry, state+1 when the lock is locked, and when the lock is released, state-1 then judges the currentstate==0

  • In ReentrantLockthe AQS are associated with three categories: UnfairSync, FairSync,Sync

  • The logic of locking and unlocking is also the logic of the acquire method in AQS (it will be placed in the queue if the lock is acquired) and the release method (call the tryRelease of the subclass to remove the head and wake up the thread)

  • The logic in locking and unlocking is mainly the difference between fair lock and unfair lock. Fair lock will determine whether it is at the head of the queue and execute it if it is there, while unfair lock will grab the lock. It doesn't matter whether you are at the head of the queue.

  • I believe that after the above source code analysis, the analysis ReentrantLockis very simple. You can analyze it yourself.