一: RabbitMQ 高级特性
前面主要讲解了 RabbitMQ 的概念和应用。RabbitMQ 实现了 AMQP 0-9-1 规范,并在此基础上进行了多项扩展。在 RabbitMQ 官方网站中详细介绍了其特性,我们将其中一些重要且常用的特性挑选出来进行讲解。
1.1 消息确认
生产者发送的消息在到达消费者端后,可能会出现以下几种情况:
| 情况 | 描述 |
|---|---|
| 消息处理成功 | 消费端成功处理消息,完成相应的业务逻辑。 |
| 消息处理异常 | 消费端在处理消息时发生异常,可能导致消息未被正确消费。 |
RabbitMQ 在向消费者发送消息后,会立即将该消息从队列中删除,但如果消费者处理消息时出现异常,则可能导致消息丢失。为了解决这一问题,RabbitMQ 提供了消息确认机制,用于确保消息被消费者成功接收并正确处理。在消费者订阅队列时,可以通过设置 autoAck 参数来控制消息确认机制,根据该参数的不同,消息确认机制分为两种模式。
| 确认模式 | 描述 | 适用场景 |
|---|---|---|
| 自动确认 | 当 autoAck 等于 true 时,RabbitMQ 会在消息发送后立即将其置为确认,并从内存或磁盘中删除,无论消费者是否真正消费成功。 | 适用于对消息可靠性要求不高的场景。 |
| 手动确认 | 当 autoAck 等于 false 时,RabbitMQ 会等待消费者显式调用 Basic.Ack 命令,确认后才删除消息。 | 适用于对消息可靠性要求较高的场景,确保消息被正确处理后再删除。 |
// 创建一个 DefaultConsumer 对象,用于处理接收到的消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 当有消息送达时会触发该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 输出接收到的消息内容,将字节数组转换为字符串
System.out.println("接收到消息: " + new String(body));
}
};
// 开始监听队列,指定队列名称、是否自动确认消息以及消费者对象
channel.basi***onsume(Constants.TOPIC_QUEUE_NAME1, false, consumer);
当 autoAck 参数设置为 false 时,对于 RabbitMQ 服务端而言,队列中的消息会分为两部分:一部分是尚未投递给消费者的消息,另一部分是已投递但尚未收到消费者确认的消息。如果 RabbitMQ 长时间未收到消费者的确认信号,并且该消费者已断开连接,则 RabbitMQ 会将消息重新放回队列,等待重新投递给下一个消费者,或者可能再次投递给原来的消费者。
从 RabbitMQ 的 Web 管理平台上也可以看到当前队列中 Ready 状态和 Unacked 状态的消息数
| 状态 | 描述 |
|---|---|
| Ready | 等待投递给消费者的消息数。 |
| Unacked | 已投递给消费者但尚未收到消费者确认信号的消息数。 |
1.1.1 自动确认
自动确认之前已经提到过,这里不再赘述。
1.1.2 手动确认
消费者在收到消息后,可以选择确认消息、拒绝消息或跳过消息。RabbitMQ 提供了多种确认应答方式,消费者可以通过调用其对应的 channel 方法进行操作,主要包括以下三种方式:
| 确认类型 | 描述 | 方法 |
|---|---|---|
| 肯定确认 | RabbitMQ 知道消息已被成功处理,可以将其丢弃。 | Channel.basicAck(long deliveryTag, boolean multiple) |
| 否定确认 | 消费者调用方法通知 RabbitMQ 拒绝该消息,可选择是否重新入队。 | Channel.basicReject(long deliveryTag, boolean requeue) |
| 批量否定确认 | 如果需要批量拒绝消息,可以使用 Basic.Nack 命令,通知 RabbitMQ 拒绝多条消息,可选择是否重新入队。 | Channel.basi***ack(long deliveryTag, boolean multiple,boolean requeue) |
下面是参数解释:
| 参数 | 作用 |
|---|---|
| deliveryTag | 消息的唯一标识,用于标识 RabbitMQ 中的每条消息。它是一个由 RabbitMQ 生成的单调递增的 64 位长整型值,每个通道(Channel)独立维护,确保唯一性,消费者在确认、拒绝或重新入队消息时,必须通过对应的 deliveryTag 来标识消息。 |
| multiple | 是否启用批量操作,用于减少网络流量。在消息确认或拒绝时,如果设置为 true,则会批量操作所有小于或等于指定 deliveryTag 的消息。如果设置为 false,则仅对当前指定的 deliveryTag 消息进行操作。 |
| requeue | 表示拒绝这条消息后如何处理,控制被拒绝的消息是否重新入队,用于消息的再投递。true: 消息会重新进入队列,等待被其他消费者或同一消费者再次消费。false: 消息会直接从队列中移除,不再被重新投递。适用于无法处理的消息或需要丢弃的场景。 |
我们将基于 SpringBoot 演示消息的确认机制。与直接使用 RabbitMQ Java Client 库相比,Spring-AMQP 对消息确认机制的使用方式有所不同,但是也提供了三种策略来实现消息确认。
| 确认模式 | 描述 | 特点 |
|---|---|---|
| AcknowledgeMode.NONE | 消息一旦投递给消费者,不管是否成功处理,RabbitMQ 都会自动确认并移除消息。如果消费者处理失败,消息可能会丢失。 | 适用于对消息可靠性要求较低的场景。 |
| AcknowledgeMode.AUTO | 默认模式。消费者在成功处理消息时会自动确认,但如果处理过程中抛出异常,则不会确认消息。 | 提供了一定的可靠性,但在异常情况下消息会回到队列。 |
| AcknowledgeMode.MANUAL | 手动确认模式。消费者需要在成功处理消息后显式调用 basicAck 方法确认。如果消息未被确认,RabbitMQ 会重新投递消息。 | 提高消息处理的可靠性,确保消息不会因处理失败而丢失。适用于高可靠性场景。 |
如果需要配置的话在配置文件中进行配置即可,下面演示一种确认模式的使用:
spring:
rabbitmq:
addresses: amqp://study:study@110.41.51.65:15673/bite
listener:
simple:
acknowledge-mode: none
- 生产者代码:
public static final String ACK_EXCHANGE_NAME = "ack_exchange";
public static final String ACK_QUEUE = "ack_queue";
@Configuration
public class RabbitMQConfig {
// 声明交换机
@Bean("ackExchange")
public Exchange ackExchange() {
return ExchangeBuilder
.topicExchange(Constant.ACK_EXCHANGE_NAME)
.durable(true)
.build();
}
// 声明队列
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder
.durable(Constant.ACK_QUEUE)
.build();
}
// 绑定队列和交换机
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange,
@Qualifier("ackQueue") Queue queue) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("ack")
.noargs();
}
}
@RestController
@RequestMapping("/producer")
public class ProductController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 发送消息到指定交换机和路由键
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(
Constant.ACK_EXCHANGE_NAME,
"ack",
"consumer ack test..."
);
return "发送成功!";
}
}
- 消费者代码
@***ponent
public class AckQueueListener {
// 监听队列 Constant.ACK_QUEUE
@RabbitListener(queues = Constant.ACK_QUEUE)
public void listenerQueue(Message message, Channel channel) throws Exception {
try {
// 获取消息内容和 deliveryTag
String messageBody = new String(message.getBody(), "UTF-8");
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到消息: %s, deliveryTag: %d%n", messageBody, deliveryTag);
// 模拟处理失败
int num = 3 / 0;
System.out.println("处理完成");
} catch (Exception e) {
// 处理异常逻辑,日志记录或其他操作
System.err.println("消息处理失败: " + e.getMessage());
// 根据业务需求,这里可以选择是否重新入队或丢弃消息
channel.basi***ack(deliveryTag, false, true);
}
}
}
1.2 持久性
前面我们讨论了如何在消费端处理消息时确保消息不丢失,但如果 RabbitMQ 服务停止或崩溃后,如何确保生产者发送的消息不丢失呢?默认情况下,RabbitMQ 在退出或崩溃时会丢失队列和消息,除非明确配置其持久化机制。RabbitMQ 的持久化包括三个部分:交换机的持久化、队列的持久化和消息的持久化。
1.2.1 交换机持久化
交换器的持久化通过在声明交换机时将 durable 参数设置为 true 来实现,这会将交换机的属性保存到服务器中。当 RabbitMQ 服务器发生意外或关闭后,重启时交换机会自动恢复,无需重新创建,相当于一直存在。如果未设置持久化,则在 RabbitMQ 重启后,交换机的元数据会丢失。对于长期使用的交换机,建议将其设置为持久化,以确保其可靠性和持久性。
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()
1.2.2 队列的持久化
队列的持久化通过在声明队列时将 durable 参数设置为 true 来实现。如果队列未设置持久化,在 RabbitMQ 服务重启后,该队列会被删除,同时其中的消息也会丢失(队列消失,消息无处存储)。队列的持久化可以保证队列本身的元数据在异常情况下不会丢失,但不能保证队列中的消息不丢失。要确保消息的可靠性,还需将消息设置为持久化。我们之前创建的队列都是持久化队列。
QueueBuilder.durable(Constant.ACK_QUEUE).build(); // 持久化队列
QueueBuilder.nonDurable(Constant.ACK_QUEUE).build(); // 非持久化队列