农场主的黑科技.

并发操作合集-7.并发容器:BlockingQueue

字数统计: 3.6k阅读时长: 22 min
2018/11/04 Share

🍤 并发操作合集系列 目录

🍕 并发操作合集系列 源代码

这一篇文章也会介绍一个并发容器,BlockingQueue。

BlockingQueue

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用
  • 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
  • 返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
  • 超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

Java里的阻塞队列

JDK7提供了7个阻塞队列。分别是

  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

LinkedBlockingQueue

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。

我们可以使用以下代码创建一个公平的阻塞队列:

1
ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界队列。默认情况下元素采取自然顺序排列,也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。

SynchronousQueue

SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另外一个线程使用,SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。

DelayQueue

DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将DelayQueue运用在以下应用场景:

  • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
  • 定时任务调度。使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,从比如TimerQueue就是使用DelayQueue实现的。

队列中的Delayed必须实现compareTo来指定元素的顺序。

LinkedTransferQueue

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列LinkedTransferQueue多了tryTransfer和transfer方法。

transfer方法。如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。生产者会一直阻塞直到所添加到队列的元素被某一个消费者消费

tryTransfer方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。

对于带有时间限制的tryTransfer(E e, long timeout, TimeUnit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,以First单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是Jdk的bug,使用时还是用带有First和Last后缀的方法更清楚。在初始化LinkedBlockingDeque时可以初始化队列的容量,用来防止其再扩容时过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。

关于LinkedBlockingDeque的工作窃取示例可以参考这篇文章

示例:再看发送者-接受者问题

在之前2.状态转换方法一章中,用wait/notify写过一个发送者-接受者(其实本质就是生产者消费者问题)的例子。这次我们用BlokingQueue来实现这个问题。

Data类来实现数据包的生产者消费者操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Data {

private BlockingQueue<String> queue;

public Data(BlockingQueue queue) {
this.queue = queue;
}

public String receive() {

try {
String packet = queue.take();
System.out.println(Thread.currentThread().getName() + " receive: " + packet);
return packet+ " <======= FROM " + Thread.currentThread().getName();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
return null;

}

public void send(String packet) {

try {
System.out.println(Thread.currentThread().getName() + " send: " + packet);
queue.put(packet);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
}
}

值得注意的是,之前我们使用wait时,使用synchronize包住了receive()和send()方法,这在BlockingQueue中是不需要的,否则会产生死锁。

下面是Sender类和Receiver类,它们将对Data类进行同步操作:

Sender类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Sender implements Runnable {

protected Data data;

public Sender(Data data) {
this.data = data;
}

@Override
public void run() {
String packets[] = {
"First packet",
"Second packet",
"Third packet",
"Fourth packet",
"End"
};
for (String packet : packets) {
data.send(packet);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread Interrupted");
}
}

}
}

Receiver类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Receiver implements Runnable {

private Data load;
private int senderNum = 1;

public Receiver(Data load, int consumerNum) {
this.load = load;
this.senderNum = consumerNum;
}

public Receiver(Data load) {
this.load = load;
}

@Override
public void run() {
while (senderNum >= 0) {

String receivedMessage = load.receive();
if ("End".equals(receivedMessage)) senderNum--;

System.out.println("=======> " + receivedMessage);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Thread Interrupted");
}
}
}
}

一直接受数据包并输出,直到从所有线程收到“End”数据包。

LinkedBlockingQueue

用LinkedBlockingDeque实现一个BlockingQueue,LinkedBlockingDeque是创建BlockingQueue时最简单,也是最常用的创建BlockingQueue方法。

1
2
3
4
5
6
7
BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();
Data data = new Data(blockingQueue);
Thread sender = new Thread(new Sender(data), "Sender Thread");
Thread receiver = new Thread(new Receiver(data), "Receiver Thread");

sender.start();
receiver.start();

ArrayBlockingQueue

用ArrayBlockingQueue实现大小为100,并一个具备公平锁的BlockingQueue,把这个BlockingQueue传给发送者线程和接受者线程:

1
2
3
4
5
6
7
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(100, true);
Data data = new Data(blockingQueue);
Thread sender = new Thread(new Sender(data), "Sender Thread");
Thread receiver = new Thread(new Receiver(data), "Receiver Thread");

sender.start();
receiver.start();

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界队列。我们给它一个倒序的比较器,让它输出输出字母倒序后的结果。

1
2
3
4
5
6
7
8
9
//按字母排序,输出字符串倒序后的结果
BlockingQueue<String> blockingQueue = new PriorityBlockingQueue<>(100, Comparator.reverseOrder());
Data data = new Data(blockingQueue);
Thread sender = new Thread(new Sender(data), "Sender Thread");
Thread receiver = new Thread(new Receiver(data), "Receiver Thread");

sender.start();
sender.join(); //等发送方全部发送完成
receiver.start(); //输出 Third packet,Second packet, Fourth packet ... End

SynchronousQueue

SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。

从下面的示例中,只有最后被放入的元素会被取到,而发送者放入的大多数数据都丢失了。

1
2
3
4
5
6
7
8
9
10
11
12
//只会取到最后被放入的元素
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
int senderNum = 3;
Data data = new Data(blockingQueue);
for (int i = 0; i < senderNum; i++) {
Thread sender = new Thread(new Sender(data), "Sender Thread" + i);
sender.start();

}
Thread receiver = new Thread(new Receiver(data, senderNum), "Receiver Thread");

receiver.start();

DelayQueue

DelayQueue中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。我们需要先把数据包包装成Delayed:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class DelayPacket implements Delayed {

private String data;
private long startTime;


DelayPacket(String data, long delayInMilliseconds) {
this.data = data;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}

@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed o) { //它是支持优先队列的,我们可以自定义它的排序方式
return (int) ( this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}

@Override
public String toString() {
return "{" + "data='" + data + '\'' + ", startTime=" + startTime + '}';
}
}

为了能发送和取到包装后的延迟数据包,我们把Data改为如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Data {
private DelayQueue<DelayPacket> delayQueue;
private final int DELAY = 3000; //3秒发送延迟

public Data(DelayQueue delayQueue) {
this.delayQueue = delayQueue;
}
public String receive() {
try {
String packet = delayQueue.take().toString();
System.out.println(Thread.currentThread().getName() + " receive: " + packet);
return packet+ " <======= FROM " + Thread.currentThread().getName();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
}
return null;

}
public void send(String packet) {
System.out.println(Thread.currentThread().getName() + " send: " + packet);
DelayPacket delayPacket = new DelayPacket(packet, DELAY);
delayQueue.put(delayPacket);
}
}

启动线程,接收者将在发送者发送的5秒后收到数据包

1
2
3
4
5
6
7
8
DelayQueue<DelayPacket> delayQueue = new DelayQueue<>();

DData data = new DData(delayQueue);
Thread sender = new Thread(new Sender(data), "Sender Thread");
Thread receiver = new Thread(new Receiver(data), "Receiver Thread");

sender.start();
receiver.start(); //延迟3秒后收到

LinkedTransferQueue

使用LinkedTransferQueue的transfer方法,生产者会一直阻塞直到所添加到队列的元素被某一个消费者消费。为了测试LinkedTransferQueue需要先把Data类的发送者put()的部分改为transfer()。

对Data类进行修改:

1
2
3
4
5
6
7
private TransferQueue<String> transferQueue;
...
public void send(String packet) {
...
transferQueue.transfer(packet + " FROM " + Thread.currentThread().getName());
...
}

测试LinkedTransferQueue,启动多个发送者线程,每个发送者会一直阻塞直到自己放入的元素被取走。

1
2
3
4
5
6
7
8
9
10
11
//生产者会一直阻塞直到自己放入的元素被取走
TransferQueue<String> blockingQueue = new LinkedTransferQueue<>();

TData data = new TData(blockingQueue);
for (int i = 0; i < 3; i++) {
Thread sender = new Thread(new Sender(data), "Sender Thread" + i);
sender.start();

}
Thread receiver = new Thread(new Receiver(data), "Receiver Thread");
receiver.start();

乍一看和SynchronousQueue有点相似,都是在生产者放入的元素被取走前进行进行阻塞。不同的是SynchronousQueue只会保留所有线程中最后被放入的那个元素。和SynchronousQueue不同LinkedTransferQueue不会丢失数据包。

Reference

聊聊并发(七)——Java中的阻塞队列

Guide to DelayQueue

CATALOG
  1. 1. BlockingQueue
    1. 1.1. Java里的阻塞队列
      1. 1.1.1. LinkedBlockingQueue
      2. 1.1.2. ArrayBlockingQueue
      3. 1.1.3. PriorityBlockingQueue
      4. 1.1.4. SynchronousQueue
      5. 1.1.5. DelayQueue
      6. 1.1.6. LinkedTransferQueue
      7. 1.1.7. LinkedBlockingDeque
  2. 2. 示例:再看发送者-接受者问题
    1. 2.1. LinkedBlockingQueue
    2. 2.2. ArrayBlockingQueue
    3. 2.3. PriorityBlockingQueue
    4. 2.4. SynchronousQueue
    5. 2.5. DelayQueue
    6. 2.6. LinkedTransferQueue
  3. 3. Reference