2 minutes
delayqueue使用及原理解刨
delayqueue使用及原理解刨
2018年8月31日
在大体评估了需要进行延迟消费的队列长度之后,非常机智的选择使用了DelayQueue,而不是使用Schedule轮询。
顺路给大家安利一波DelayQueue的使用以及原理。
Overview
通过我们可以发现,这是一个来自 java.util.concurrent 包里的 Queue,并且是一个 BlockingQueue,可以非常有效的用于(生产者-消费者)延迟消费的场景下。
分两部分说,先说说如何使用,然后再解刨一下原理。
.
第一部分:使用
Step1. 为准备放入 Queue 的元素实现 Delayed 接口
我们需要为准备放入 DelayQueue 的元素实现 Delayed 接口,首先我们来定一个用来放入 Queue 的元素就叫 DelayObject.
public class DelayObject implements Delayed {
// 模拟携带的数据内容
private String data;
// 元素开始消费的时间
private long startTime;
// 提供一个两个参数的构造器,以 data 为携带的内容,delayInMilliseconds 为延迟的时长。
public DelayObject(String data, long delayInMilliseconds) {
this.data = data;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}
// 实现 Delayed 接口的 getDelay() 方法,该方法返回还有多少时间这个元素将可以被消费。
// 这里通过 TimeUnit.convert() 方法返回正确时间单位的剩余时间。
// 当该方法返回 0 或 负数 时,该元素才可以被从队列中取出。
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
// 需要实现 compareTo() 方法,因为元素在 DelayQueue 中将按照过期时间顺序存储。
// 最先过期的元素排在头部,过期时间最长的元素排在尾部。
@Override
public int compareTo(Delayed o) {
return Ints.saturatedCast(
this.startTime - ((DelayObject) o).startTime);
}
}
Step2. DelayQueue 的生产者和消费者
实现生产者和消费者用来测试我们的DelayQueue,示例代码如下:
生产者示例:
public class DelayQueueProducer implements Runnable {
// DelayQueue队列
private BlockingQueue<DelayObject> queue;
// 生产者生产元素的数量
private Integer numberOfElementsToProduce;
// 每条记录延时的时间
private Integer delayOfEachProducedMessageMilliseconds;
// 省略Constructor
// 当 run 起来之后,第500毫秒向队列中放入一个元素
@Override
public void run() {
for (int i = 0; i < numberOfElementsToProduce; i++) {
DelayObject object = new DelayObject(
UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
System.out.println("Put object: " + object);
try {
queue.put(object);
Thread.sleep(500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
消费者示例:
public class DelayQueueConsumer implements Runnable {
// DelayQueue队列
private BlockingQueue<DelayObject> queue;
// 消费者消费元素的数量
private Integer numberOfElementsToTake;
// 实际消费数量记数器(后边有用)
public AtomicInteger numberOfConsumedElements = new AtomicInteger();
// 省略Constructor
@Override
public void run() {
for (int i = 0; i < numberOfElementsToTake; i++) {
try {
DelayObject object = queue.take();
numberOfConsumedElements.incrementAndGet();
System.out.println("Consumer take: " + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Step3. 来一个测试,拉出来溜溜
这里我们创建了生产者和消费者两个线程,生产者每500ms向队列里扔一个元素, 最后检查消费者是否消费了2个元素。
@Test
public void givenDelayQueue_whenProduceElement
_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
// Executor
ExecutorService executor = Executors.newFixedThreadPool(2);
// DelayQueue
BlockingQueue<DelayObject> queue = new DelayQueue<>();
// 生产元素的数量
int numberOfElementsToProduce = 2;
// 每个元素Delay的时间
int delayOfEachProducedMessageMilliseconds = 500;
// 消息者线程
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
// 生产者线程
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
// 启动线程
executor.submit(producer);
executor.submit(consumer);
// 运行 5 秒钟
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
// 消息的数量正确
assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}
输出的结果如下:
Put object: {data='2a1d6f7a-d01d-485c-b5f8-80d5e16cffdd', startTime=1535712496578}
Consumer take: {data='2a1d6f7a-d01d-485c-b5f8-80d5e16cffdd', startTime=1535712496578}
Put object: {data='3cc18f72-ca7b-47db-a14b-1b1ef162bc19', startTime=1535712497083}
Consumer take: {data='3cc18f72-ca7b-47db-a14b-1b1ef162bc19', startTime=1535712497083}
Step4. 问题:如果运行时间没有消费完
我们也来试一下,设定元素的 Delay 时间为 10s。
int numberOfElementsToProduce = 1;
// Delay 10 秒
int delayOfEachProducedMessageMilliseconds = 10_000;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
然后,我们将线程运行时间搞成 5 秒之后 terminate
executor.submit(producer);
executor.submit(consumer);
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);
敲黑板:消费的数量为 0 。
Step5. 使用总结
我们了解了 DelayQueue 来自 java.util.concurrent 包,队列中的元素需要实现 Delayed 接口。
通过DelayQueue我们可以实现延时消费。
第二部分:解刨DelayQueue
Step1. 祖谱
继承祖谱
java.lang.Object
java.util.AbstractCollection
实现的接口
Iterable
通过祖谱和接口实现,我第一步,大概知道了他有这么多功能,是一个这样的Queue
Step2. 基本实现
416 Words
2018-08-31 08:00 +0800