阻塞队列(BlockingQueue)是一种支持额外操作的队列,这两个附加的操作是:
l 在队列为空时,获取元素的线程会等待队列变为非空。
l 当队列满时,存储元素的线程会等待队列可用。
Java提供了java.util.concurrent.BlockingQueue<E>接口以提供对阻塞队列的支持。该接口是Java Collections Framework的一个成员。
1. BlockingQueue的方法
BlockingQueue对不能立即满足的操作有不同的处理方法,这些方法共有四种形式:
l 抛出一个异常
l 返回一个特殊值,根据操作的不同可能返回null或false
l 无限期地阻塞当前线程,直到操作成功
l 在放弃之前只阻塞给定的最大时间限制。
这些方法总结如下:
方法\处理方式 |
抛出异常 |
返回特殊值 |
一直阻塞 |
超时退出 |
插入方法 |
add(e)
|
offer(e)
|
put(e)
|
offer(e,time,unit)
|
移除方法 |
remove()
|
poll()
|
take()
|
poll(time,unit)
|
检查方法 |
element()
|
peek()
|
不可用 |
不可用 |
BlockingQueue不接受null值,当尝试add
、put
或offer
一个null值时,将抛出
NullPointerException。null值有特殊的用意,用来表示poll操作失败。
BlockingQueue可以有容量限制,如果不指定容量则默认容量大小是Integer.MAX_VALUE。任何时候,只要超过了剩余容量(remainingCapacity),则新的元素不能被放入队列中。
BlockingQueue实现被设计为主要用于生产者-消费者队列,但还支持Collection接口。因此,例如,使用remove(x)可以从队列中删除任意元素。然而,这些操作通常不会非常有效地执行,并且只用于偶尔使用,例如在取消排队的消息的时候。
BlockingQueue实现是线程安全的。所有排队方法都使用内部锁或其他并发控制形式自动地实现其效果。但是,除非在实现中指定了otherwise,否则批量Collection操作addAll、containsAll、retainAll和removeAll不一定是自动执行的。因此,例如,在只添加c中的某些元素之后, addAll ( c )可能会失败(抛出一个异常)。
BlockingQueue本质上不支持任何类型的close或shutdown操作,以指示不再添加任何项。这些特性的需求和使用往往依赖于具体的实现。例如,一种常见的策略是生产者插入特殊的对象(比如end-of-stream或 poison),当被消费者采用时,这些对象将得到相应的解释。
2. BlockingQueue的使用示例
以下是一个典型的生产者-消费者场景的使用示例。注意,一个BlockingQueue可以安全地与多个生产者和多个消费者一起使用。
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}}
3. BlockingQueue的内存一致性影响
这种影响与其他并发集合一样,某个线程将数据元素放入BlockingQueue的操作,happen-before 于另一个线程访问或者删除BlockingQueue中该数据元素的操作。
3.1. happens-before的含义
happen-before规则用来描述两个操作之间的顺序关系,这两个操作可以再一个线程内,也可以不再一个线程内。此顺序并不严格意味着执行时间上的顺序,而是至前一个操作的结果要对后一个操作可见。
happens-Before关系的定义如下:
l 如果一个happens-before另一个操作,那么第一个操作的执行结果对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前
l 两个操作之间存在happens-before关系,并不意味着Java平台的具体实现必须按照happens-before关系指定的顺序来执行。如果重排序之后的执行结果,与按照happens-before关系来执行的结果一致,那么这种重排序并不非法。
举例来说,如果在程序执行顺序上,A先于B,并且A修改了共享变量,而B正好使用该共享变量,那么A需要happen-before B,再直白一点,就是A对共享变量的修改,需要在B执行时,对B可见。
3.2. happens-before规则
l 程序顺序规则:一个线程中的每个操作,happens-before于该线程中的任意后续操作。
l 监视器锁规则:对一个锁的解锁,happens-before于随后对这个锁的加锁。
l volatile规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的读。
l 传递性:如果A happens-before B,并且B happens-before C,那么A happens-before C。
l start()规则:如果线程A执行操作ThreadB.start(),那么A线程的ThreadB.start()操作happens-before于线程B中的任意操作。
l join()规则:如果线程A执行操作ThreadB.join()并成功返回,那么线程B的任意操作happens-before于线程A从ThreadB.join()操作成功返回。
对所有这些规则的说明:A happens-before B并不意味着A一定要先在B之前发生,而是说,如果A已经发生在了B前面,那么A的操作结果一定要对B可见
4. 参考引用
本系列归档至《Java数据结构及算法实战》
《数据结构和算法基础(Java语言实现)》(柳伟卫著,北京大学出版社出版)