1188. Design Bounded Blocking Queue

Created: 10/10/2021

Leetcode

Implement a thread-safe bounded blocking queue that has the following methods:

  • BoundedBlockingQueue(int capacity) The constructor initializes the queue with a maximum capacity.

  • void enqueue(int element) Adds an element to the front of the queue. If the queue is full, the calling thread is blocked until the queue is no longer full.

  • int dequeue() Returns the element at the rear of the queue and removes it. If the queue is empty, the calling thread is blocked until the queue is no longer empty.

  • int size() Returns the number of elements currently in the queue.

Your implementation will be tested using multiple threads at the same time. Each thread will either be a producer thread that only makes calls to the enqueue method or a consumer thread that only makes calls to the dequeue method. The size method will be called after every test case.

Please do not use built-in implementations of bounded blocking queue as this will not be accepted in an interview.

Example 1:

Input:
1
1
["BoundedBlockingQueue","enqueue","dequeue","dequeue","enqueue","enqueue","enqueue","enqueue","dequeue"]
[[2],[1],[],[],[0],[2],[3],[4],[]]

Output:
[1,0,2,2]

Explanation:
Number of producer threads = 1
Number of consumer threads = 1

BoundedBlockingQueue queue = new BoundedBlockingQueue(2);   // initialize the queue with capacity = 2.

queue.enqueue(1);   // The producer thread enqueues 1 to the queue.
queue.dequeue();    // The consumer thread calls dequeue and returns 1 from the queue.
queue.dequeue();    // Since the queue is empty, the consumer thread is blocked.
queue.enqueue(0);   // The producer thread enqueues 0 to the queue. The consumer thread is unblocked and returns 0 from the queue.
queue.enqueue(2);   // The producer thread enqueues 2 to the queue.
queue.enqueue(3);   // The producer thread enqueues 3 to the queue.
queue.enqueue(4);   // The producer thread is blocked because the queue's capacity (2) is reached.
queue.dequeue();    // The consumer thread returns 2 from the queue. The producer thread is unblocked and enqueues 4 to the queue.
queue.size();       // 2 elements remaining in the queue. size() is always called at the end of each test case.

Example 2:

Input:
3
4
["BoundedBlockingQueue","enqueue","enqueue","enqueue","dequeue","dequeue","dequeue","enqueue"]
[[3],[1],[0],[2],[],[],[],[3]]
Output:
[1,0,2,1]

Explanation:
Number of producer threads = 3
Number of consumer threads = 4

BoundedBlockingQueue queue = new BoundedBlockingQueue(3);   // initialize the queue with capacity = 3.

queue.enqueue(1);   // Producer thread P1 enqueues 1 to the queue.
queue.enqueue(0);   // Producer thread P2 enqueues 0 to the queue.
queue.enqueue(2);   // Producer thread P3 enqueues 2 to the queue.
queue.dequeue();    // Consumer thread C1 calls dequeue.
queue.dequeue();    // Consumer thread C2 calls dequeue.
queue.dequeue();    // Consumer thread C3 calls dequeue.
queue.enqueue(3);   // One of the producer threads enqueues 3 to the queue.
queue.size();       // 1 element remaining in the queue.

Since the number of threads for producer/consumer is greater than 1, we do not know how the threads will be scheduled in the operating system, even though the input seems to imply the ordering. Therefore, any of the output [1,0,2] or [1,2,0] or [0,1,2] or [0,2,1] or [2,0,1] or [2,1,0] will be accepted.

Constraints:

  • 1 <= Number of Prdoucers <= 8

  • 1 <= Number of Consumers <= 8

  • 1 <= size <= 30

  • 0 <= element <= 20

  • The number of calls to enqueue is greater than or equal to the number of calls to dequeue.

  • At most 40 calls will be made to enque, deque, and size.

Basic Idea:

这是经典的生产者消费者多线程问题,对于一个bounded buffer,需要支持enqueue和dequeue操作,两种操作都会有多个线程同时尝试进行,当buffer为空的时候,需要block尝试dequeue操作的线程,而当buffer满的时候则要block尝试enqueue的线程。除此之外,当条件满足的时候要唤醒相应的线程。

这个问题的难点是很难通过一个简单的mutex来实现,因为即使我们将enqueue和dequeue两个方法变为synchronized,当操作条件不满足的时候(满或空)我们仍然不能有效让当前线程等待,只能用sleep来让当前线程让出mutex,而且没有办法在需要时候唤醒该线程。

解决这个问题需要考虑线程之间的同步,即要让一个线程暂停等待某种条件,然后在条件满足之后才继续运行,一般来讲可以使用Semaphore 或者 Condition variable 来处理。

Java Code:

Semaphore Solution:

使用两个semaphore分别表示当前可以dequeue和enqueue的额度,他们之间的关系满足enqueue+dequeue=Capacity ,另外需要一个mutex来保证操作w,r变量时候的原子性。

class BoundedBlockingQueue {
    
    Semaphore enqueue;
    Semaphore dequeue;
    ReentrantLock lock = new ReentrantLock();
    
    int[] q;
    int w, r;
    int capacity;
    
    
    public BoundedBlockingQueue(int capacity) {
        enqueue = new Semaphore(capacity);
        dequeue = new Semaphore(0);
        q = new int[capacity + 1];
        w = r = 0;
        this.capacity = capacity;
    }
    
    public void enqueue(int element) throws InterruptedException {
        enqueue.acquire();
        lock.lock();
        q[w++] = element;
        w %= q.length;
        lock.unlock();
        dequeue.release();
    }
    
    public int dequeue() throws InterruptedException {
        dequeue.acquire();
        lock.lock();
        int res = q[r++];
        r %= q.length;
        lock.unlock();
        enqueue.release();
        return res;
    }
    
    public int size() {
        if (w < r) return w + capacity + 1 - r;
        else return w - r;
    }
}

// 0 1 2 3 4, capacity = 4
// w=1 r=3, (3,4,0) size = 1 + 4 + 1 - 3 = 3

Last updated