简介 阻塞队列(BlockingQueue)是一个支持阻塞的插入和移除的队列。阻塞插入即当队列满时,队列会阻塞插入元素的线程,直到队列不满;阻塞移除即当队列为空时,获取元素的线程会等待队列变为非空。
阻塞队列提供了四种处理方法:
方法\处理方式
抛出异常
返回特殊值
一直阻塞
超时退出
插入方法
add(e)
offer(e)
put(e)
offer(e,time,unit)
移除方法
remove()
poll()
take()
poll(time,unit)
检查方法
element()
peek()
不可用
不可用
分类
ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
DelayQueue:一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现,队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。可以用于设计缓存系统和定时任务调度。
SynchronousQueue:一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。
LinkedBlockingDeque:LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。
实现 JDK是使用通知模式(await()
/signal()
)实现的阻塞队列:当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。下面是ArrayBlockingQueue
的部分源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 private final Condition notFull;private final Condition notEmpty;public ArrayBlockingQueue (int capacity, boolean fair) { notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put (E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return extract(); } finally { lock.unlock(); } } private void insert (E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); }
生产者-消费者模式 下面通过阻塞队列来实现一个经典的生产者-消费者模式,由于已经将底层封装的很好了,所以代码十分简洁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class ProducerConsumer { private static BlockingQueue<Task> queue = new ArrayBlockingQueue<>(5 ); private static AtomicInteger increTaskNo = new AtomicInteger(0 ); private static class Producer implements Runnable { @Override public void run () { Task task = new Task(increTaskNo.getAndIncrement()); try { queue.put(task); System.out.println("produce: " + task.no); } catch (InterruptedException e) { e.printStackTrace(); } } } private static class Consumer implements Runnable { @Override public void run () { try { Task task = queue.take(); System.out.println("consume: " + task.no); } catch (InterruptedException e) { e.printStackTrace(); } } } private static class Task { public int no; public Task (int no) { this .no = no; } } public static void main (String[] args) { for (int i = 0 ; i < 5 ; i++) { new Thread(new Producer()).start(); } for (int i = 0 ; i < 5 ; i++) { new Thread(new Consumer()).start(); } } }
这里要注意,put()
与take()
方法与输出语句不是原子的,这会导致日志的输出顺序与实际任务的入队/出队顺序不一定匹配。
参考资料