On the core of Java concurrency (two)

On the core of Java concurrency (two)

review

In the previous Java Concurrency On the core we probably learned Lockand synchronizedcommon ground, and then briefly summarize below:

  • LockA main counter is the custom, to take advantage of CAStheir atomicity operation, and synchronizedis c++ hotspotrealized monitor (we did not see the specific, we can not say)
  • Both are reentrant (recursion, intermodulation, loop), and their essence is to maintain a countable counter. When other threads access the locked object, it will judge whether the counter is 0.
  • Theoretically speaking, both are blocking, because when the thread is holding the lock, if it cannot get it, the final result can only wait (provided that the ultimate goal of the thread is to acquire the lock) the read-write lock is separated into two locks , So different

For example: thread A holds the monitor of an object. When other threads access the object, they find that the monitor is not 0, so they can only block the suspension or join the waiting queue, and wait for the monitor to exit after thread A finishes processing. Set to 0. While thread A is processing tasks, other threads will either cyclically visit the monitor or keep blocking waiting for thread A to wake up. If it doesn't work, it's really like I said, giving up the lock competition and processing other tasks. But should not be able to handle other tasks, the task is processed halfway, and then go back to grab the lock after being notified by thread A

Fair lock and unfair lock

Do not share counter

        // 
        public final void acquire(int arg) {
        // 
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
        }
        
        // 
        final void lock() {
            // 
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1); // 
        }
        
        //  Acquire  
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 
            int c = getState();
            if (c == 0) {
                //    !hasQueuedPredecessors()  
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)  //overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
        
        /**
         * @return {@code true} if there is a queued thread preceding the// 
         *         current thread, and {@code false} if the current thread
         *         is at the head of the queue or the queue is empty// 
         * @since 1.7
         */
        public final boolean hasQueuedPredecessors() {
            //The correctness of this depends on head being initialized
            //before tail and on head.next being accurate if the current
            //thread is first in queue.
            Node t = tail; //Read fields in reverse initialization order
            Node h = head;
            Node s;
            // 
            // 
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
        
        // 
        final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; //help GC
                    failed = false;
                    return interrupted;
                }
                // 
                //  park  
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1; // 
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;  //  park
        /** waitStatus value to indicate thread is waiting on condition */ //
        static final int CONDITION = -2; // 
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3; // 
        
        // 
        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;
 

Read lock and write lock (shared lock and exclusive lock)

Read lock: shared counter

Write lock: do not share counter

        // 
        //  16   16  
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count. */ // 
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count. */ // 
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
        
        /**
         * A counter for per-thread read hold counts.  
         * Maintained as a ThreadLocal; cached in cachedHoldCounter.
         */
        static final class HoldCounter {
            int count;          //initially 0
            //Use id, not reference, to avoid garbage retention
            final long tid = LockSupport.getThreadId(Thread.currentThread()); //  id
        }
        
    // 
    protected final int tryAcquireShared(int unused) {
            //ReentrantReadWriteLock ReadLock  
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            // 
            // 
            // 
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            // 
            int r = sharedCount(c);
            //   
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                //  16  
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null ||
                        rh.tid != LockSupport.getThreadId(current))
                        //cachedHoldCounter  
                        //readHolds  ThreadLocalHoldCounter  ThreadLocal  cachedHoldCounter
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            // 
            // 
             /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            return fullTryAcquireShared(current);
        }
        
    //   
    protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail. 
             *  
             *  
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)// 
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.// 
             */
            Thread current = Thread.currentThread();
            int c = getState();
            // 
            int w = exclusiveCount(c);
            if (c != 0) {
                //(Note: if c != 0 and w == 0 then shared count != 0)
                // 
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                // 
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //Reentrant acquire  
                setState(c + acquires);
                return true;
            }
            // 
            //state   0 
            //  false  CAS
            //  true  return false;
            //  false  CAS
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            // 
            setExclusiveOwnerThread(current);
            return true;
        }    
    
    // 
     protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            // 
            //firstReader  
            if (firstReader == current) {
                //assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null ||
                    rh.tid != LockSupport.getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    //Releasing the read lock has no effect on readers,
                    //but it may allow waiting writers to proceed if
                    //both read and write locks are now free.
                    return nextc == 0;
            }
        }
 

Conclusion

The "lock" implementation of fair lock and unfair lock is based on the fact CASthat fairness is based on the internally maintained Nodelinked list

The read-write lock can be roughly understood as two states of read and write, so the design here is similar to the state of a thread pool. However, the read count can be shared by multiple reader threads (excluding write locks), and each read thread maintains its own read count. In the case of a write lock, the write count is exclusive and all other threads are excluded.