RabbitMQ - 4 ( 22000 字 RabbitMQ 入门级教程 )

RabbitMQ - 4 ( 22000 字 RabbitMQ 入门级教程 )

一: RabbitMQ 高级特性

前面主要讲解了 RabbitMQ 的概念和应用。RabbitMQ 实现了 AMQP 0-9-1 规范,并在此基础上进行了多项扩展。在 RabbitMQ 官方网站中详细介绍了其特性,我们将其中一些重要且常用的特性挑选出来进行讲解。

1.1 消息确认

生产者发送的消息在到达消费者端后,可能会出现以下几种情况:

情况 描述
消息处理成功 消费端成功处理消息,完成相应的业务逻辑。
消息处理异常 消费端在处理消息时发生异常,可能导致消息未被正确消费。

RabbitMQ 在向消费者发送消息后,会立即将该消息从队列中删除,但如果消费者处理消息时出现异常,则可能导致消息丢失。为了解决这一问题,RabbitMQ 提供了消息确认机制,用于确保消息被消费者成功接收并正确处理。在消费者订阅队列时,可以通过设置 autoAck 参数来控制消息确认机制,根据该参数的不同,消息确认机制分为两种模式。

确认模式 描述 适用场景
自动确认 当 autoAck 等于 true 时,RabbitMQ 会在消息发送后立即将其置为确认,并从内存或磁盘中删除,无论消费者是否真正消费成功。 适用于对消息可靠性要求不高的场景。
手动确认 当 autoAck 等于 false 时,RabbitMQ 会等待消费者显式调用 Basic.Ack 命令,确认后才删除消息。 适用于对消息可靠性要求较高的场景,确保消息被正确处理后再删除。
// 创建一个 DefaultConsumer 对象,用于处理接收到的消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
   
   
    // 当有消息送达时会触发该方法
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
   
   
        // 输出接收到的消息内容,将字节数组转换为字符串
        System.out.println("接收到消息: " + new String(body));
    }
};

// 开始监听队列,指定队列名称、是否自动确认消息以及消费者对象
channel.basi***onsume(Constants.TOPIC_QUEUE_NAME1, false, consumer);

当 autoAck 参数设置为 false 时,对于 RabbitMQ 服务端而言,队列中的消息会分为两部分:一部分是尚未投递给消费者的消息,另一部分是已投递但尚未收到消费者确认的消息。如果 RabbitMQ 长时间未收到消费者的确认信号,并且该消费者已断开连接,则 RabbitMQ 会将消息重新放回队列,等待重新投递给下一个消费者,或者可能再次投递给原来的消费者。

从 RabbitMQ 的 Web 管理平台上也可以看到当前队列中 Ready 状态和 Unacked 状态的消息数

状态 描述
Ready 等待投递给消费者的消息数。
Unacked 已投递给消费者但尚未收到消费者确认信号的消息数。

1.1.1 自动确认

自动确认之前已经提到过,这里不再赘述。

1.1.2 手动确认

消费者在收到消息后,可以选择确认消息、拒绝消息或跳过消息。RabbitMQ 提供了多种确认应答方式,消费者可以通过调用其对应的 channel 方法进行操作,主要包括以下三种方式:

确认类型 描述 方法
肯定确认 RabbitMQ 知道消息已被成功处理,可以将其丢弃。 Channel.basicAck(long deliveryTag, boolean multiple)
否定确认 消费者调用方法通知 RabbitMQ 拒绝该消息,可选择是否重新入队。 Channel.basicReject(long deliveryTag, boolean requeue)
批量否定确认 如果需要批量拒绝消息,可以使用 Basic.Nack 命令,通知 RabbitMQ 拒绝多条消息,可选择是否重新入队。 Channel.basi***ack(long deliveryTag, boolean multiple,boolean requeue)

下面是参数解释:

参数 作用
deliveryTag 消息的唯一标识,用于标识 RabbitMQ 中的每条消息。它是一个由 RabbitMQ 生成的单调递增的 64 位长整型值,每个通道(Channel)独立维护,确保唯一性,消费者在确认、拒绝或重新入队消息时,必须通过对应的 deliveryTag 来标识消息。
multiple 是否启用批量操作,用于减少网络流量。在消息确认或拒绝时,如果设置为 true,则会批量操作所有小于或等于指定 deliveryTag 的消息。如果设置为 false,则仅对当前指定的 deliveryTag 消息进行操作。
requeue 表示拒绝这条消息后如何处理,控制被拒绝的消息是否重新入队,用于消息的再投递。true: 消息会重新进入队列,等待被其他消费者或同一消费者再次消费。false: 消息会直接从队列中移除,不再被重新投递。适用于无法处理的消息或需要丢弃的场景。

我们将基于 SpringBoot 演示消息的确认机制。与直接使用 RabbitMQ Java Client 库相比,Spring-AMQP 对消息确认机制的使用方式有所不同,但是也提供了三种策略来实现消息确认。

确认模式 描述 特点
AcknowledgeMode.NONE 消息一旦投递给消费者,不管是否成功处理,RabbitMQ 都会自动确认并移除消息。如果消费者处理失败,消息可能会丢失。 适用于对消息可靠性要求较低的场景。
AcknowledgeMode.AUTO 默认模式。消费者在成功处理消息时会自动确认,但如果处理过程中抛出异常,则不会确认消息。 提供了一定的可靠性,但在异常情况下消息会回到队列。
AcknowledgeMode.MANUAL 手动确认模式。消费者需要在成功处理消息后显式调用 basicAck 方法确认。如果消息未被确认,RabbitMQ 会重新投递消息。 提高消息处理的可靠性,确保消息不会因处理失败而丢失。适用于高可靠性场景。

如果需要配置的话在配置文件中进行配置即可,下面演示一种确认模式的使用:

spring:
  rabbitmq:
    addresses: amqp://study:study@110.41.51.65:15673/bite
    listener:
      simple:
        acknowledge-mode: none
  1. 生产者代码:
    public static final String ACK_EXCHANGE_NAME = "ack_exchange";
    public static final String ACK_QUEUE = "ack_queue";
@Configuration
public class RabbitMQConfig {
   
   

    // 声明交换机
    @Bean("ackExchange")
    public Exchange ackExchange() {
   
   
        return ExchangeBuilder
                .topicExchange(Constant.ACK_EXCHANGE_NAME)
                .durable(true)
                .build();
    }

    // 声明队列
    @Bean("ackQueue")
    public Queue ackQueue() {
   
   
        return QueueBuilder
                .durable(Constant.ACK_QUEUE)
                .build();
    }

    // 绑定队列和交换机
    @Bean("ackBinding")
    public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange,
                               @Qualifier("ackQueue") Queue queue) {
   
   
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("ack")
                .noargs();
    }
}
@RestController
@RequestMapping("/producer")
public class ProductController {
   
   

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 发送消息到指定交换机和路由键
    @RequestMapping("/ack")
    public String ack() {
   
   
        rabbitTemplate.convertAndSend(
                Constant.ACK_EXCHANGE_NAME, 
                "ack", 
                "consumer ack test..."
        );
        return "发送成功!";
    }
}
  1. 消费者代码
@***ponent
public class AckQueueListener {
   
   

    // 监听队列 Constant.ACK_QUEUE
    @RabbitListener(queues = Constant.ACK_QUEUE)
    public void listenerQueue(Message message, Channel channel) throws Exception {
   
   
        try {
   
   
            // 获取消息内容和 deliveryTag
            String messageBody = new String(message.getBody(), "UTF-8");
            long deliveryTag = message.getMessageProperties().getDeliveryTag();

            System.out.printf("接收到消息: %s, deliveryTag: %d%n", messageBody, deliveryTag);

            // 模拟处理失败
            int num = 3 / 0;

            System.out.println("处理完成");
        } catch (Exception e) {
   
   
            // 处理异常逻辑,日志记录或其他操作
            System.err.println("消息处理失败: " + e.getMessage());
            // 根据业务需求,这里可以选择是否重新入队或丢弃消息
            channel.basi***ack(deliveryTag, false, true);
        }
    }
}

1.2 持久性

前面我们讨论了如何在消费端处理消息时确保消息不丢失,但如果 RabbitMQ 服务停止或崩溃后,如何确保生产者发送的消息不丢失呢?默认情况下,RabbitMQ 在退出或崩溃时会丢失队列和消息,除非明确配置其持久化机制。RabbitMQ 的持久化包括三个部分:交换机的持久化、队列的持久化和消息的持久化。

1.2.1 交换机持久化

交换器的持久化通过在声明交换机时将 durable 参数设置为 true 来实现,这会将交换机的属性保存到服务器中。当 RabbitMQ 服务器发生意外或关闭后,重启时交换机会自动恢复,无需重新创建,相当于一直存在。如果未设置持久化,则在 RabbitMQ 重启后,交换机的元数据会丢失。对于长期使用的交换机,建议将其设置为持久化,以确保其可靠性和持久性。

ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()

1.2.2 队列的持久化

队列的持久化通过在声明队列时将 durable 参数设置为 true 来实现。如果队列未设置持久化,在 RabbitMQ 服务重启后,该队列会被删除,同时其中的消息也会丢失(队列消失,消息无处存储)。队列的持久化可以保证队列本身的元数据在异常情况下不会丢失,但不能保证队列中的消息不丢失。要确保消息的可靠性,还需将消息设置为持久化。我们之前创建的队列都是持久化队列。

QueueBuilder.durable(Constant.ACK_QUEUE).build(); // 持久化队列
QueueBuilder.nonDurable(Constant.ACK_QUEUE).build(); // 非持久化队列
转载请说明出处内容投诉
CSS教程网 » RabbitMQ - 4 ( 22000 字 RabbitMQ 入门级教程 )

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买