简介

阻塞队列(BlockingQueue)是一个支持阻塞的插入和移除的队列。阻塞插入即当队列满时,队列会阻塞插入元素的线程,直到队列不满;阻塞移除即当队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列提供了四种处理方法:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

分类

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现,队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。可以用于设计缓存系统和定时任务调度。
  • SynchronousQueue:一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。
  • LinkedBlockingDeque:LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。

实现

JDK是使用通知模式(await()/signal())实现的阻塞队列:当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。下面是ArrayBlockingQueue的部分源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private final Condition notFull;
private final Condition notEmpty;

public ArrayBlockingQueue(int capacity, boolean fair) {
//省略其他代码
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}

private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}

生产者-消费者模式

下面通过阻塞队列来实现一个经典的生产者-消费者模式,由于已经将底层封装的很好了,所以代码十分简洁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ProducerConsumer {
private static BlockingQueue<Task> queue = new ArrayBlockingQueue<>(5);
private static AtomicInteger increTaskNo = new AtomicInteger(0);

private static class Producer implements Runnable{
@Override
public void run() {
Task task = new Task(increTaskNo.getAndIncrement());
try {
queue.put(task);
System.out.println("produce: " + task.no);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private static class Consumer implements Runnable{
@Override
public void run(){
try {
Task task = queue.take();
System.out.println("consume: " + task.no);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private static class Task{
public int no;
public Task(int no) {
this.no = no;
}
}

public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(new Producer()).start();
}
for (int i = 0; i < 5; i++) {
new Thread(new Consumer()).start();
}
}
}

这里要注意,put()take()方法与输出语句不是原子的,这会导致日志的输出顺序与实际任务的入队/出队顺序不一定匹配。

参考资料