Talk about the producer consumer model in Java-BlockingQueue

Talk about the producer consumer model in Java-BlockingQueue

Preface

I believe you are familiar with the producer/consumer model , and it is a very common distributed resource scheduling model. In this model, there are at least two objects: producers and consumers. The producer is only responsible for creating resources, and the consumer is only responsible for using resources. It is also easy to implement a simple producer/consumer model by yourself, nothing more than to do it through a queue, but this approach has many hidden flaws:

  1. Need to ensure the thread visibility of resources, and manually implement thread synchronization
  2. Need to consider various critical situations and rejection strategies
  3. Need to maintain a balance between throughput and thread safety

So Java has already encapsulated the interface and implementation for us in advance. Next, we will conduct a brief analysis of the BlockingQueue interface and its commonly used implementation class LinkedBlockingQueue

Blocking queue

concept

BlockingQueue means a blocking queue. We can see from the class definition that it inherits the Queue interface, so it can be used as a queue:

Since it is called a blocking queue, that is to say, the operation of this queue is performed in a blocking manner, which is reflected in the following two aspects:

  • The operation of inserting an element is blocking: when the queue is full, the thread performing the insert operation is blocked
  • Blocked during the operation of removing elements: When the queue is empty, the thread performing the removal operation is blocked

In this way, the relationship between producers and consumers can be easily coordinated

Interface method

In BlockingQueue, the following 6 interfaces are defined:

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);

    boolean offer(E e);

    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

    E take() throws InterruptedException;

    E poll(long timeout, TimeUnit unit) throws InterruptedException;

    int remainingCapacity();

    boolean remove(Object o);

    public boolean contains(Object o);

    int drainTo(Collection<? super E> c);

    int drainTo(Collection<? super E> c, int maxElements);
}
 

These interface methods can be divided into three categories according to their functions:

  • Add elements: including add, offer, put
  • Remove elements: including remove, poll, take, drainTo
  • Get/check elements: including contains, remainingCapacity

Generally, we also call adding elements an putoperation (even if a offermethod is used instead of a putmethod), and removing an element is an takeoperation

The first two categories can be divided into the following categories according to exception handling methods:

  • Throw an exception: add, remove
  • Return special values: offer(e), poll
  • Blocking: put(e), take
  • Exit after timeout: offer(e, time, unit), poll(time, unit)

I won t explain more about these processing methods. The literal meaning is already obvious.

Implementation of blocking queue

JDK8 provides the following implementation classes of BlockQueue:

The basics we commonly use are as follows:

  • ArrayBlockingQueue: a blocking queue implemented based on ArrayList, bounded
  • LinkedBlockingQueue: Blocking queue based on LinkedList, bounded
  • PriorityBlockingQueue: priority queue, unbounded
  • DelayQueue: Priority queue that supports delayed acquisition of elements, unbounded

Those who are interested in the rest of the implementation can understand by themselves. Let s take LinkedBlockingQueue as an example to introduce how Java implements blocking queues.

Interface method

In addition to the interface methods provided by BlockingQueue, LinkedBlockingQueue also provides a method peekto obtain the head node of the queue

So far, our commonly used blocking queue methods have been explained, here is a table to summarize [1] :

Method/processing method Throw an exception Return special value block Timeout exit
Insert element add(e) offer(e) put(e) offer(e, timeout, unit)
Remove element remove() poll() take() poll(timeout, unit)
Get element element() peek() / /

The elementmethod and peekmethod function are the same

Attributes

BlockingQueue only defines the interface specification. The actual implementation is still completed by the specific implementation class. Let's skip the AbstractQueue in the middle for now and directly study LinkedBlockingQueue, which defines several important domain objects:

    /**   */
    private final AtomicInteger count = new AtomicInteger();
    
    /**   */
    transient Node<E> head;
    /**   */
    private transient Node<E> last;

    /** take poll take  */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** take  */
    private final Condition notEmpty = takeLock.newCondition();

    /** put offer put   */
    private final ReentrantLock putLock = new ReentrantLock();
    /** put  */
    private final Condition notFull = putLock.newCondition();
 

The Node node is an ordinary queue node. Like the LinkedList, we mainly focus on the following four domain objects, which can be divided into two categories: those used to insert elements and those used to remove elements. Each of these categories has two attributes: ReentranLockand Condition. Among them ReentranLockis a reentrant lock implemented based on AQS [2] (those who do not understand the concept of reentrant can be treated as ordinary locks), and Conditiona concrete realization of the wait/notify mode (it can be understood as a kind of more powerful the waitand notifyclasses)

countProperties of natural Needless to say, headand lastit is clear that is used to maintain the queue storage elements, I believe that they do not elaborate. The difference between blocking queues and ordinary queues is the following ReentrantLockandCondition the four attributes type. The meaning of these four attributes will be analyzed in depth in the next few modules.

But for the convenience of the next explanation, let's briefly introduce Conditionthis class first. In fact, it Conditionis an interface, and the specific implementation class is in AQS. For this article, you only need to know 3 methods: await(),, signal()and more singalAll(). These three methods can be entirely analogous wait(), notify()and notifyAll()the difference between them may be understood as a blur, wait/notifywhich is the method of managing object locks and lock type , they are waiting for locks manipulation queue of threads; and await/signalthese processes are managed Based on the AQS lock , it is naturally the thread waiting queue in AQS that is manipulated

Therefore, notEmptythe waiting take thread queue is maintained here, and the waiting thread queue is notFullmaintained put . It is also easy to understand from the literal sense, which notEmptymeans "the queue is not empty yet", so you can take elements, and in the same way, notFullit means "the queue is not full" and you can insert elements into it.

Insert element

offer(e)

First look at the offer(e)method, the source code is as follows:

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        // false
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        // put 
        putLock.lock();
        try {
            if (count.get() < capacity) {
                // 
                enqueue(node);
                // c 
                c = count.getAndIncrement();
                // put 
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            // 
            putLock.unlock();
        }
        if (c == 0)
            // take 
            signalNotEmpty();
        return c >= 0;
    }
 

Most operations of this method are well understood. When the operation of adding elements is not allowed, the offermethod will return falseto the user , similar to a non-blocking communication method. offerThe thread safety of the method is put guaranteed by

There is a very interesting place here, we look at the final judgment if c == 0, then it will wake up an takeoperation. Many people may wonder why a judgment is added here. This is the case. In the whole method, cthe initial value is -1, and the only place to modify its value is c = count.getAndIncrement()this sentence. In other words, if it is determined c == 0, then the return value of this statement is 0that the queue is empty before the element is inserted. Therefore, if the queue is empty at the beginning, when the first element is inserted, an takeoperation will be awakened immediately [3]

So far, the entire method flow can be summarized as:

  1. Obtainput
  2. Elements into the team, and increment countvalue
  3. If the capacity is not up to the upper limit, wake up an putoperation
  4. If the queue is empty before inserting the element, an takeoperation will be awakened at the end

offer(e, timeout, unit)

Strike while the iron is hot, let's look at the offermethod with a timeout mechanism :

    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // put 
        putLock.lockInterruptibly();
        try {
            // while 
            while (count.get() == capacity) {
                // false
                if (nanos <= 0)
                    return false;
                // notFull 
                // 
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }
 

The whole method is basically the offer(e)same as the method, with two differences:

  1. The lock acquisition is in an interruptible form, namelyputLock.lockInterruptibly()
  2. If the queue is always full, the notFull.awaitNanos(nanos)operation will be executed in a loop to add the current thread to the notFullwaiting queue (waiting for putthe execution of the operation)

The rest is offer(e)exactly the same, so I won t repeat it here.

add(e)

addCompared with offermethods, when the operation is not allowed, methods will throw an exception instead of returning a special value, as follows:

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
 

Simply is to offer(e)do a second package, nothing to say, you need to mention the implementation of this method is that it is important in AbstractQueuethe

put(e)

put(e)The method will block the thread when the operation is not allowed. Let's see how it is implemented:

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        
        // put 
        putLock.lockInterruptibly();
        try {
            // offer(e, timeout, unit) 
            while (count.get() == capacity) {
                // signal notFull 
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
 

Sure enough, the methods are similar and the put(e)operations can be compared to what we said before offer(e, timeout, unit). There is only one difference, that is, when the queue is full, the awaitoperation no longer has a timeout period, that is, it can only be called by the takeoperation [4]signal method. Wake up the thread

Remove element

poll()

poll()The method is used to remove and return the head node of the team, the following is the specific implementation of the method:

    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        // take 
        takeLock.lock();
        try {
            if (count.get() > 0) {
                // 
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    // take 
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        // put 
        if (c == capacity)
            signalNotFull();
        return x;
    }
 

If you read the offer(e)method carefully , poll()there is nothing to say about the method, it is offer(e)a complete copy (I also want to talk about something, but the poll()method is offer(e)exactly the same as the process...)

other

poll(timeout, unit)/take()/remove()The method is offer(e, timeout, unit)/put()/add()a copy of the method. There is nothing special, so I will skip it here.

Get element

peek()

peek()The method is used to obtain the head of the team, and its implementation is as follows:

    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        // take 
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
 

There is nothing to say about the process. It should be noted that the method needs to be acquired take , which means that peek()the operation of removing elements cannot be performed when the method is executed.

element()

element()The implementation of the method is in AbstractQueue:

    public E element() {
        E x = peek();
        if (x != null)
            return x;
        else
            throw new NoSuchElementException();
    }
 

Still the same secondary packaging operation

summary

What I BlockingQueuesaid was , but I said it for a long time LinkedBlockingQueue. However, as a classic implementation of blocking queues, LinkedBlockingQueuethe method in the realization of ideas is also very important for understanding blocking queues. Want to understand the concept of blocking queue, the most important thing is to understand the concept of locks, such as LinkedBlockingQueuethrough /put and /take , as well as the corresponding lock Conditionto achieve thread-safe objects. Understand this point in order to understand the whole/


  1. Here is a reference to "The Art of Concurrent Programming in Java"

  2. See On the AQS (abstract queue synchronizer) article

  3. The description here as "wake up an takeoperation" is somewhat inaccurate, but should actually be described as "wake up a waittake thread", but I think the former is more helpful for readers to understand, so I follow the former description

  4. Refers to takea set of methods that are similar to the function, including take/poll/remove, the putoperation is the same