delayqueue使用及原理解刨


2018年8月31日

在大体评估了需要进行延迟消费的队列长度之后,非常机智的选择使用了DelayQueue,而不是使用Schedule轮询。

顺路给大家安利一波DelayQueue的使用以及原理。

Overview

通过官方文档我们可以发现,这是一个来自 java.util.concurrent 包里的 Queue,并且是一个 BlockingQueue,可以非常有效的用于(生产者-消费者)延迟消费的场景下。

分两部分说,先说说如何使用,然后再解刨一下原理。

.

第一部分:使用

Step1. 为准备放入 Queue 的元素实现 Delayed 接口

我们需要为准备放入 DelayQueue 的元素实现 Delayed 接口,首先我们来定一个用来放入 Queue 的元素就叫 DelayObject.

public class DelayObject implements Delayed {

	// 模拟携带的数据内容
    private String data;   
    // 元素开始消费的时间 
    private long startTime;

	// 提供一个两个参数的构造器,以 data 为携带的内容,delayInMilliseconds 为延迟的时长。 
    public DelayObject(String data, long delayInMilliseconds) {
        this.data = data;
        this.startTime = System.currentTimeMillis() + delayInMilliseconds;
    }

    // 实现 Delayed 接口的 getDelay() 方法,该方法返回还有多少时间这个元素将可以被消费。
    // 这里通过 TimeUnit.convert() 方法返回正确时间单位的剩余时间。
    // 当该方法返回 0 或 负数 时,该元素才可以被从队列中取出。
    @Override
	public long getDelay(TimeUnit unit) {
	    long diff = startTime - System.currentTimeMillis();
	    return unit.convert(diff, TimeUnit.MILLISECONDS);
	}

	// 需要实现 compareTo() 方法,因为元素在 DelayQueue 中将按照过期时间顺序存储。
	// 最先过期的元素排在头部,过期时间最长的元素排在尾部。
	@Override
	public int compareTo(Delayed o) {
	    return Ints.saturatedCast(
	      this.startTime - ((DelayObject) o).startTime);
	}
}

Step2. DelayQueue 的生产者和消费者

实现生产者和消费者用来测试我们的DelayQueue,示例代码如下:

生产者示例:

public class DelayQueueProducer implements Runnable {
  
  	// DelayQueue队列
    private BlockingQueue<DelayObject> queue;
    // 生产者生产元素的数量
    private Integer numberOfElementsToProduce;
    // 每条记录延时的时间
    private Integer delayOfEachProducedMessageMilliseconds;
 
 	// 省略Constructor
 
 	// 当 run 起来之后,第500毫秒向队列中放入一个元素
    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToProduce; i++) {
            DelayObject object = new DelayObject(
                UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
            System.out.println("Put object: " + object);

            try {
                queue.put(object);
                Thread.sleep(500);
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
}

消费者示例:

public class DelayQueueConsumer implements Runnable {

	// DelayQueue队列
    private BlockingQueue<DelayObject> queue;
    // 消费者消费元素的数量
    private Integer numberOfElementsToTake;
    // 实际消费数量记数器(后边有用)
    public AtomicInteger numberOfConsumedElements = new AtomicInteger();

    // 省略Constructor
 
    @Override
    public void run() {
        for (int i = 0; i < numberOfElementsToTake; i++) {
            try {
                DelayObject object = queue.take();
                numberOfConsumedElements.incrementAndGet();
                System.out.println("Consumer take: " + object);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Step3. 来一个测试,拉出来溜溜

这里我们创建了生产者和消费者两个线程,生产者每500ms向队列里扔一个元素, 最后检查消费者是否消费了2个元素。

@Test
public void givenDelayQueue_whenProduceElement
  _thenShouldConsumeAfterGivenDelay() throws InterruptedException {

    // Executor
    ExecutorService executor = Executors.newFixedThreadPool(2); 
    // DelayQueue
    BlockingQueue<DelayObject> queue = new DelayQueue<>();
    // 生产元素的数量
    int numberOfElementsToProduce = 2;
    // 每个元素Delay的时间
    int delayOfEachProducedMessageMilliseconds = 500;

    // 消息者线程
    DelayQueueConsumer consumer = new DelayQueueConsumer(
      queue, numberOfElementsToProduce);
    // 生产者线程
    DelayQueueProducer producer = new DelayQueueProducer(
      queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
 
    // 启动线程
    executor.submit(producer);
    executor.submit(consumer);
 
    // 运行 5 秒钟
    executor.awaitTermination(5, TimeUnit.SECONDS);
    executor.shutdown();
  
  	// 消息的数量正确
    assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}

输出的结果如下:

Put object: {data='2a1d6f7a-d01d-485c-b5f8-80d5e16cffdd', startTime=1535712496578}
Consumer take: {data='2a1d6f7a-d01d-485c-b5f8-80d5e16cffdd', startTime=1535712496578}
Put object: {data='3cc18f72-ca7b-47db-a14b-1b1ef162bc19', startTime=1535712497083}
Consumer take: {data='3cc18f72-ca7b-47db-a14b-1b1ef162bc19', startTime=1535712497083}

Step4. 问题:如果运行时间没有消费完

我们也来试一下,设定元素的 Delay 时间为 10s。

int numberOfElementsToProduce = 1; 
// Delay 10 秒
int delayOfEachProducedMessageMilliseconds = 10_000; 

DelayQueueConsumer consumer = new DelayQueueConsumer(
  queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
  queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

然后,我们将线程运行时间搞成 5 秒之后 terminate

executor.submit(producer);
executor.submit(consumer);
 
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
assertEquals(consumer.numberOfConsumedElements.get(), 0);

敲黑板:消费的数量为 0 。

Step5. 使用总结

我们了解了 DelayQueue 来自 java.util.concurrent 包,队列中的元素需要实现 Delayed 接口。

通过DelayQueue我们可以实现延时消费。

第二部分:解刨DelayQueue

Step1. 祖谱

继承祖谱

java.lang.Object java.util.AbstractCollection java.util.AbstractQueue java.util.concurrent.DelayQueue

实现的接口

Iterable, Collection, BlockingQueue, Queue

通过祖谱和接口实现,我第一步,大概知道了他有这么多功能,是一个这样的Queue

Step2. 基本实现