并发编程--Queues

队列

队列我们会分为两种,并发阻塞队列和并发不阻塞队列. 阻塞队列常用的有

ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlockingQueue
SynchronousQueue
DelayQueue

非阻塞队列常用的有

ConcurrentLinkedQueue
ConcurrentLinkedDeque

这些类涵盖了开发时大多数的用法,如生产者-消费者,消息传送,并发任务以及其他并发的设计
实现接口 TransferQueue,并继承 LinkedTransferQueue, 重写同步方法 transfer 也可以达到阻塞消费者的效果

并发阻塞队列 BlockingQueue

java.util.concurrent 包里的 BlockingQueue 接口表示一个线程添加和提取实例的队列。 BlockingQueue 提供了4组不同的操作来实现 insert, remove, examine。

抛异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
删除 remove() poll() take() poll(time, unit)
检查 element() peak() NONE NONE
  • 抛异常:如果试图的操作无法立即执行,抛一个异常。
  • 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  • 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  • 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

此外,BlockingQueue 接口有以下几个特点:

  • BlockingQueue 不接受 Null 对象, add,put,offfer 等操作Null对象会抛出异常
  • BlockingQueue 有大小的限制,
  • BlockingQueue 是线程安全的
  • BlockingQueue 被设计为主要用于生产者,消费者队列,但也支持Collection接口。不过remove(x)这样的操作是低效率的,建议少用
  • BlockingQueue 没有类似 close,shutdown的操作去禁止添加新对象,类似的功能需要子类去实现

ArrayBlockingQueue 数组阻塞队列

ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。

Simple Usage

BlockingQueue<String> queue = new ArrayBlockingQueue(1024);  
queue.put("1");
Object object = queue.take();

LinkedBlockingQueue
PriorityBlockingQueue

DelayQueue延迟队列

DelayQueue 实现了 BlockingQueue 接口。DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口,该接口定义:

public interface Delayed extends Comparable<Delayed< {  
    public long getDelay(TimeUnit timeUnit); 

    public int compareTo(@NonNull Delayed o);
}

DelayQueue 将会在每个元素的 getDelay() 方法返回的值的时间段之后才释放掉该元素。如果返回的是 0 或者负值,延迟将被认为过期,该元素将会在 DelayQueue 的下一次 take 被调用的时候被释放掉。传递给 getDelay 方法的 timeUnit 实例是一个枚举类型,它表明了将要延迟的时间段。

LinkedBlockingQueue 链阻塞队列

LinkedBlockingQueue 类实现了 BlockingQueue 接口。
LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。
LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

PriorityBlockingQueue 优先级的阻塞队列

PriorityBlockingQueue 类实现了 BlockingQueue 接口。

PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。注意 PriorityBlockingQueue 对于具有相等优先级(compare() == 0)的元素并不强制任何特定行为。

同时注意,如果你从一个 PriorityBlockingQueue 获得一个 Iterator 的话,该 Iterator 并不能保证它对元素的遍历是以优先级为序的。

SynchronousQueue 同步队列

SynchronousQueue 类实现了 BlockingQueue 接口。

SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。
据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

阻塞双端队列 BlockingDeque

java.util.concurrent 包里的 BlockingDeque 接口表示一个线程安放入和提取实例的双端队列。本小节我将给你演示如何使用 BlockingDeque。BlockingDeque 类是一个双端队列,在不能够插入元素时,它将阻塞住试图插入元素的线程;在不能够抽取元素时,它将阻塞住试图抽取的线程。deque(双端队列) 是 “Double Ended Queue” 的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。

BlockingDeque

一个线程生产元素,并把它们插入到队列的任意一端。如果双端队列已满,插入线程将被阻塞,直到一个移除线程从该队列中移出了一个元素。如果双端队列为空,移除线程将被阻塞,直到一个插入线程向该队列插入了一个新元素。

BlockingDeque 的方法

BlockingDeque 具有 4 组不同的方法用于插入、移除以及对双端队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:
method
四组不同的行为方式解释:

  • 抛异常:如果试图的操作无法立即执行,抛一个异常。
  • 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  • 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  • 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

Simple Usage
既然 BlockingDeque 是一个接口,它的实现类有:LinkedBlockingDeque,ConcurrentLinkedDeque

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();  

deque.addFirst("1");  
deque.addLast("2");  

String two = deque.takeLast();  
String one = deque.takeFirst();

Concurrent队列

ConcurrentLinkedQueue

一个没有大小限制,线程安全,使用链式结构设计的非阻塞式队列。 ConcurrentLinkedQueue也是使用FIFO的设计理念, 同其他并发队列一样,该队列也不允许元素是Null对象。使用的算法是Maged M. Michael and Michael L. Scott.设计出来的 Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms

请注意,与大多数集合不同, size方法不是总是准确的。由于这些队列的异步性质,如果当前线程需要遍历元素,而其他线程在遍历期间修改此集合,则可能会返回不准确的size结果。此外,该批量操作addAll,removeAll,retainAll, containsAll,equals和toArray 不能保证是按原子执行的。
内存一致性效果:与其他并发集合一样,在将对象放入 ConcurrentLinkedQueue happen-before 在另一个线程中从 ConcurrentLinkedQueue 访问或删除该元素。
补充:HashMap 和 ConcurrentHashMap 全解析

ConcurrentLinkedDeque

同ConcurrentLinkedQueue, 额外实现了Deque接口支持双向操作
参考: importnew