详解RabbitMQ高级特性之延迟队列

目录

延迟队列

应用场景

TTL+死信队列 实现延迟队列

添加配置

常量类

声明队列和交换机并绑定二者关系

编写生产消息代码

编写消费消息代码

观察效果

修改生产消息代码

观察效果


延迟队列

延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, ⽽是等待特定时间后,消费者才能拿到这个消息进⾏消费.

应用场景

延迟队列的使⽤场景有很多, ⽐如:

1. 智能家居: ⽤⼾希望通过⼿机远程遥控家⾥的智能设备在指定的时间进⾏⼯作. 这时候就可以将⽤⼾指令发送到延迟队列, 当指令设定的时间到了再将指令推送到智能设备.
2. ⽇常管理: 预定会议后,需要在会议开始前⼗五分钟提醒参会⼈参加会议
3. ⽤⼾注册成功后, 7天后发送短信, 提⾼⽤⼾活跃度等

RabbitMQ本⾝没有直接⽀持延迟队列的的功能, 但是可以通过前⾯所介绍的TTL+死信队列的⽅式组合模拟出延迟队列的功能.

假设⼀个应⽤中需要将每条消息都设置为10秒的延迟, ⽣产者通过 normal_exchange 这个交换器将发送的消息存储在 normal_queue 这个队列中. 消费者订阅的并⾮是 normal_queue 这个队列, ⽽
是 dlx_queue 这个队列. 当消息从 normal_queue 这个队列中过期之后被存⼊ dlx_queue 这个
队列中,消费者就恰巧消费到了延迟10秒的这条消息.

TTL+死信队列 实现延迟队列
添加配置
spring:
  application:
    name: rabbit-extensions-demo
  rabbitmq:
    addresses: amqp://study:study@47.98.109.138:5672/extension
常量类
public class Constants {

    //死信
    public static final String NORMAL_QUEUE = "normal.queue";
    public static final String NORMAL_EXCHANGE = "normal.exchange";

    public static final String DL_QUEUE = "dl.queue";
    public static final String DL_EXCHANGE= "dl.exchange";
}
声明队列和交换机并绑定二者关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;

@Configuration
public class DLConfig {
    //正常的交换机和队列
    @Bean("normalQueue")
    public Queue normalQueue(){
        return QueueBuilder.durable(Constants.NORMAL_QUEUE)
                .deadLetterExchange(Constants.DL_EXCHANGE)
                .deadLetterRoutingKey("dlx")
                .build();
    }
    @Bean("normalExchange")
    public DirectExchange normalExchange(){
        return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
    }

    @Bean("normalBinding")
    public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
    }
    //死信交换机和队列
    @Bean("dlQueue")
    public Queue dlQueue(){
        return QueueBuilder.durable(Constants.DL_QUEUE).build();
    }
    @Bean("dlExchange")
    public DirectExchange dlExchange(){
        return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
    }

    @Bean("dlBinding")
    public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
    }
}
编写生产消息代码
    @RequestMapping("/delay")
    public String delay() {
        System.out.println("delay...");

        rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 10s...", message -> {
            message.getMessageProperties().setExpiration("10000");  //单位: 毫秒, 过期时间为10s
            return message;
        });

        rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 30s...", message -> {
            message.getMessageProperties().setExpiration("30000");  //单位: 毫秒, 过期时间为30s
            return message;
        });

        System.out.printf("%tc 消息发送成功 \n", new Date());
        return "消息发送成功";
    }
编写消费消息代码
import ***.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.***ponent;
import rabbitextensionsdemo.constant.Constants;

import java.util.Date;

@***ponent
public class DLListener {

    @RabbitListener(queues = Constants.DL_QUEUE)
    public void dlHandMessage(Message message, Channel channel) throws Exception {
        //消费者逻辑
        System.out.printf("[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \n", new Date(), new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
    }
}
观察效果

此时我们可以看到,基于TTL+死信队列实现出来了 延迟队列 的效果,但是这样就没问题了吗?

修改生产消息代码
    @RequestMapping("/delay")
    public String delay() {
        System.out.println("delay...");

        rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 30s...", message -> {
            message.getMessageProperties().setExpiration("30000");  //单位: 毫秒, 过期时间为30s
            return message;
        });

        rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 10s...", message -> {
            message.getMessageProperties().setExpiration("10000");  //单位: 毫秒, 过期时间为10s
            return message;
        });

        System.out.printf("%tc 消息发送成功 \n", new Date());
        return "消息发送成功";
    }
观察效果

此时我们看到,设置TTL为10秒的消息居然在30秒后才进入死信队列,这是为什么呢?

是因为RabbitMQ检查消息的TTL是在消息发送给消费方的时候进行检测的,而什么时候发送给发送方又根据队头消息的TTL,所以这就是问题所在,也是TTL+死信队列实现延迟队列所存在的问题。

转载请说明出处内容投诉
CSS教程网 » 详解RabbitMQ高级特性之延迟队列

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买