目录
延迟队列
应用场景
TTL+死信队列 实现延迟队列
添加配置
常量类
声明队列和交换机并绑定二者关系
编写生产消息代码
编写消费消息代码
观察效果
修改生产消息代码
观察效果
延迟队列
延迟队列(Delayed Queue),即消息被发送以后, 并不想让消费者⽴刻拿到消息, ⽽是等待特定时间后,消费者才能拿到这个消息进⾏消费.
应用场景
延迟队列的使⽤场景有很多, ⽐如:
1. 智能家居: ⽤⼾希望通过⼿机远程遥控家⾥的智能设备在指定的时间进⾏⼯作. 这时候就可以将⽤⼾指令发送到延迟队列, 当指令设定的时间到了再将指令推送到智能设备.
2. ⽇常管理: 预定会议后,需要在会议开始前⼗五分钟提醒参会⼈参加会议
3. ⽤⼾注册成功后, 7天后发送短信, 提⾼⽤⼾活跃度等
RabbitMQ本⾝没有直接⽀持延迟队列的的功能, 但是可以通过前⾯所介绍的TTL+死信队列的⽅式组合模拟出延迟队列的功能.
假设⼀个应⽤中需要将每条消息都设置为10秒的延迟, ⽣产者通过 normal_exchange 这个交换器将发送的消息存储在 normal_queue 这个队列中. 消费者订阅的并⾮是 normal_queue 这个队列, ⽽
是 dlx_queue 这个队列. 当消息从 normal_queue 这个队列中过期之后被存⼊ dlx_queue 这个
队列中,消费者就恰巧消费到了延迟10秒的这条消息.
TTL+死信队列 实现延迟队列
添加配置
spring:
application:
name: rabbit-extensions-demo
rabbitmq:
addresses: amqp://study:study@47.98.109.138:5672/extension
常量类
public class Constants {
//死信
public static final String NORMAL_QUEUE = "normal.queue";
public static final String NORMAL_EXCHANGE = "normal.exchange";
public static final String DL_QUEUE = "dl.queue";
public static final String DL_EXCHANGE= "dl.exchange";
}
声明队列和交换机并绑定二者关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;
@Configuration
public class DLConfig {
//正常的交换机和队列
@Bean("normalQueue")
public Queue normalQueue(){
return QueueBuilder.durable(Constants.NORMAL_QUEUE)
.deadLetterExchange(Constants.DL_EXCHANGE)
.deadLetterRoutingKey("dlx")
.build();
}
@Bean("normalExchange")
public DirectExchange normalExchange(){
return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();
}
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
}
//死信交换机和队列
@Bean("dlQueue")
public Queue dlQueue(){
return QueueBuilder.durable(Constants.DL_QUEUE).build();
}
@Bean("dlExchange")
public DirectExchange dlExchange(){
return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();
}
@Bean("dlBinding")
public Binding dlBinding(@Qualifier("dlQueue") Queue queue, @Qualifier("dlExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
}
}
编写生产消息代码
@RequestMapping("/delay")
public String delay() {
System.out.println("delay...");
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 10s...", message -> {
message.getMessageProperties().setExpiration("10000"); //单位: 毫秒, 过期时间为10s
return message;
});
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 30s...", message -> {
message.getMessageProperties().setExpiration("30000"); //单位: 毫秒, 过期时间为30s
return message;
});
System.out.printf("%tc 消息发送成功 \n", new Date());
return "消息发送成功";
}
编写消费消息代码
import ***.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.***ponent;
import rabbitextensionsdemo.constant.Constants;
import java.util.Date;
@***ponent
public class DLListener {
@RabbitListener(queues = Constants.DL_QUEUE)
public void dlHandMessage(Message message, Channel channel) throws Exception {
//消费者逻辑
System.out.printf("[dl.queue] %tc 接收到消息: %s, deliveryTag: %d \n", new Date(), new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());
}
}
观察效果
此时我们可以看到,基于TTL+死信队列实现出来了 延迟队列 的效果,但是这样就没问题了吗?
修改生产消息代码
@RequestMapping("/delay")
public String delay() {
System.out.println("delay...");
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 30s...", message -> {
message.getMessageProperties().setExpiration("30000"); //单位: 毫秒, 过期时间为30s
return message;
});
rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE, "normal", "ttl test 10s...", message -> {
message.getMessageProperties().setExpiration("10000"); //单位: 毫秒, 过期时间为10s
return message;
});
System.out.printf("%tc 消息发送成功 \n", new Date());
return "消息发送成功";
}
观察效果
此时我们看到,设置TTL为10秒的消息居然在30秒后才进入死信队列,这是为什么呢?
是因为RabbitMQ检查消息的TTL是在消息发送给消费方的时候进行检测的,而什么时候发送给发送方又根据队头消息的TTL,所以这就是问题所在,也是TTL+死信队列实现延迟队列所存在的问题。