生产者-消费者问题-BlockingQueue实现

生产者-消费者(producer-consumer)问题,也称作有界缓冲区(bounded-buffer)问题,两个进程共享一个公共的固定大小的缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息。问题出现在当缓冲区已经满了,而此时生产者还想向其中放入一个新的数据项的情形,其解决方法是让生产者此时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。同样地,当缓冲区已经空了,而消费者还想去取消息,此时也可以让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它。

BlockingQueue

阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:
img
从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;
常用的队列主要有以下两种:(当然通过不同的实现方式,还可以延伸出很多不同类型的队列,DelayQueue就是其中的一种)
  先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。
  后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。

​ 多线程环境中,通过队列可以很容易实现数据共享,比如经典的“生产者”和“消费者”模型中,通过队列可以很便利地实现两者之间的数据共享。假设我们有若干生产者线程,另外又有若干个消费者线程。如果生产者线程需要把准备好的数据共享给消费者线程,利用队列的方式来传递数据,就可以很方便地解决他们之间的数据共享问题。但如果生产者和消费者在某个时间段内,万一发生数据处理速度不匹配的情况呢?理想情况下,如果生产者产出数据的速度大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的数据处理完毕,反之亦然。然而,在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。好在此时,强大的concurrent包横空出世了,而他也给我们带来了强大的BlockingQueue。(在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒)

使用方法

放入数据:
  offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
    则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
  offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
    加入BlockingQueue,则返回失败。
  put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
    直到BlockingQueue里面有空间再继续.
获取数据:
  poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
    取不到时返回null;
  poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
    队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
  take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
    BlockingQueue有新的数据被加入;
  drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
    通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//Producer

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;

public class Producer extends Thread {
private BlockingQueue<String> bq;

public Producer(BlockingQueue<String> bq) {
this.bq = bq;
}

@Override
public void run() {
try {
String string =Thread.currentThread().getName() + "'s product." ;
System.out.println(Thread.currentThread().getName() + ": I have made a product:" );
bq.put(string);
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//Consumer

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;

public class Consumer extends Thread {
private BlockingQueue<String> bq;

public Consumer(BlockingQueue<String> bq) {
this.bq = bq;
}

@Override
public void run() {
try {
String string = bq.take();
System.out.println("准备消费:" + string);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//Test

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

public class BlockingQueueTest {
public static void main(String[] args){
//默认大小Integer.MAX_VALUE
BlockingDeque<String> bq = new LinkedBlockingDeque<>(2);

Consumer consumer = new Consumer(bq);
Producer producer = new Producer(bq);

for (int i=0;i < 5;i++){
new Thread(producer,"producer" + (i+1)).start();
new Thread(consumer,"consumer" + (i+1)).start();
}
}
}

1
2
3
4
5
6
7
8
9
10
11
//输出
producer1: I have made a product:
producer2: I have made a product:
准备消费:producer1's product.
准备消费:producer2's product.
producer3: I have made a product:
producer4: I have made a product:
准备消费:producer3's product.
准备消费:producer4's product.
producer5: I have made a product:
准备消费:producer5's product.