事件、websocket、RabbitMQ在java项目中的应用

事件、websocket、RabbitMQ在java项目中的应用

事件、websocket、RabbitMQ在java项目中的应用

本文介绍餐饮云平台中事件机制、WebSocket 与 RabbitMQ 的应用,以及业务数据在不同场景下的流转方式。

流程图

事件

在某一个业务流程中,同服务内可能需要对多个业务块进行操作,如订单操作完成后,需要记录日志、推送消息给客户端、统计等。使用event可以松耦合、易维护和便扩展。

定义event

定义订单操作event,需要有事件类型用来分流

package ***.cy.order.event;

import ***.cy.order.dto.OrderDto;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;

import java.time.LocalDateTime;

/**
 * 订单操作事件
 * 用于在订单操作(创建、更新、取消等)后通知相关组件进行后续处理
 */
@Getter
public class OrderOperationEvent extends ApplicationEvent {

    /**
     * 事件类型
     */
    private final EventTypeEnum eventType;

    /**
     * 订单信息
     */
    private final OrderDto order;

    /**
     * 事件发生时间
     */
    private final LocalDateTime eventTime;

    /**
     * 操作者ID(可选)
     */
    private final String operatorId;

    /**
     * 操作描述(可选)
     */
    private final String operationDescription;

    /**
     * 创建订单操作事件
     * 
     * @param source 事件源
     * @param eventType 事件类型
     * @param order 订单信息
     */
    public OrderOperationEvent(Object source, EventTypeEnum eventType, OrderDto order) {
        this(source, eventType, order, null, null);
    }

    /**
     * 创建订单操作事件
     * 
     * @param source 事件源
     * @param eventType 事件类型
     * @param order 订单信息
     * @param operatorId 操作者ID
     * @param operationDescription 操作描述
     */
    public OrderOperationEvent(Object source, EventTypeEnum eventType, OrderDto order, 
                              String operatorId, String operationDescription) {
        super(source);
        this.eventType = eventType;
        this.order = order;
        this.eventTime = LocalDateTime.now();
        this.operatorId = operatorId;
        this.operationDescription = operationDescription;
    }
}

监听

事件监听,在项目中定义了三个,且都是上面订单操作ev***的监听;可在同服务中的多个模块定义,哪里需要就在哪里监听:

  • UserNotificationListener: 用户通知监听器
  • StoreNotificationListener: 门店通知监听器
  • PaymentNotificationListener: 支付通知监听器
package ***.cy.order.listener;

import ***.cy.order.dto.OrderDto;
import ***.cy.order.event.EventTypeEnum;
import ***.cy.order.event.OrderOperationEvent;
import ***.cy.order.websocket.AppWebSocketEndpoint;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.***ponent;

import java.time.Instant;

/**
 * 用户通知监听器
 * 负责处理订单操作后的用户通知相关逻辑
 */
@***ponent
@Slf4j
@RequiredArgsConstructor
public class UserNotificationListener {

    /**
     * 监听订单操作事件,向用户发送通知
     * 
     * @param event 订单操作事件
     */
    @EventListener(OrderOperationEvent.class)
    public void handleOrderOperationEvent(OrderOperationEvent event) {
        // 只处理订单更新事件
        if (event.getEventType() != EventTypeEnum.ORDER_UPDATED) {
            return;
        }
        
        OrderDto order = event.getOrder();
        log.info("收到订单更新事件,准备通过WebSocket向用户发送通知,订单ID: {}, 用户ID: {}",
                 order.getId(), order.getUserId());
        
        try {
            // 构建WebSocket通知消息
            String notification = "{"
                    + "\"type\": \"order_created\","
                    + "\"orderId\": \"" + order.getId() + "\","
                    + "\"orderNumber\": \"" + order.getOrderNumber() + "\","
                    + "\"message\": \"您的订单已更新成功,订单号:" + order.getOrderNumber() + "\","
                    + "\"status\": \"PENDING\","
                    + "\"timestamp\": " + Instant.now().toEpochMilli()
                    + "}";
            
            // 通过WebSocket发送通知给用户
            AppWebSocketEndpoint.sendMessageToUser(order.getUserId(), notification);
            
            log.info("用户通知已通过WebSocket发送,订单ID: {}", order.getId());
            
        } catch (Exception e) {
            log.error("发送用户WebSocket通知失败,订单ID: {}", order.getId(), e);
            // 这里可以添加重试逻辑或失败记录
        }
    }
}

websocket

全双工、持久化的客户端与服务端通信协议,适用于需要服务端主动推送、低延迟交互的场景;如:实时订单状态推送、桌台状态同步、通知广播等。

本项目定义了两个端点,分别处理门店和客户端的业务:

  • @ServerEndpoint(“/ws/app”) - APP端WebSocket连接端点
  • @ServerEndpoint(“/ws/store”)- 门店端WebSocket连接端点

使用客户端会话管理器用于缓存和管理WebSocket客户端连接,可以实现客户端在
线状态显示、超时清理session、发送消息给客户端等;

  public void addUserSession(String userId, Session session) {
        SessionInfo sessionInfo = new SessionInfo(session, LocalDateTime.now(), session.getId());
        userSessions.put(userId, sessionInfo);
        log.info("用户会话已添加: userId={}, sessionId={}, remoteAddress={}", userId, session.getId(), session.getRequestURI());
    }

 /**
     * 定期清理无效会话和过期会话
     * 使用配置的清理间隔时间
     */
    @Scheduled(fixedRateString = "${websocket.cleanup-interval:30000}")
    public void cleanInvalidSessions() {
        log.debug("开始清理无效会话");

        // 清理无效或过期的用户会话
        userSessions.entrySet().removeIf(entry -> {
            SessionInfo sessionInfo = entry.getValue();
            Session session = sessionInfo.getSession();
            
            if (session == null || !session.isOpen() || sessionInfo.isExpired()) {
                log.info("清理无效或过期用户会话: userId={}, sessionId={}", entry.getKey(), 
                         sessionInfo != null ? sessionInfo.getSessionId() : "null");
                return true;
            }
            return false;
        });

        // 清理无效或过期的门店会话
        storeSessions.entrySet().removeIf(entry -> {
            SessionInfo sessionInfo = entry.getValue();
            Session session = sessionInfo.getSession();
            
            if (session == null || !session.isOpen() || sessionInfo.isExpired()) {
                log.info("清理无效或过期门店会话: storeId={}, sessionId={}", entry.getKey(), 
                         sessionInfo != null ? sessionInfo.getSessionId() : "null");

                closeSession(session);
                return true;
            }
            return false;
        });

        log.debug("无效会话清理完成,当前用户会话数: {}, 门店会话数: {}", 
                  userSessions.size(), storeSessions.size());
    }

RabbitMQ

当业务需要跨服务时,可以用到消息队列来实现数据同步,如有库存管理、crm、统计、erp等服务,可在订单服务器完成业务后通过消息队列将数据同步给这些服务。
这里用到了rabbitmq,现默认每个服务都需要订单埋单后的数据,所以Exchange选amq.fanout,即Fanout Exchange(广播交换机),一条消息被多个消费者都收到。

Producer


[Fanout Exchange]
├───> Queue A ───> Consumer A
├───> Queue B ───> Consumer B
└───> Queue C ───> Consumer C

package ***.cy.order.mq;


import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;

/**
 * 广播交换机
 * 一条消息被多个消费者都收到
 */
@Configuration
public class FanoutRabbitConfig {

    public static final String FANOUT_EXCHANGE = "order.fanout.exchange";

    // 消费者 统计 的队列
    @Bean
    public Queue orderNotifyQueueStat() {
        return QueueBuilder.durable("order.notify.queue.stat").build();
    }

    // 消费者 crm 的队列
    @Bean
    public Queue orderNotifyQueueCrm() {
        return QueueBuilder.durable("order.notify.queue.crm").build();
    }

    // 消费者 库存 的队列
    @Bean
    public Queue orderNotifyQueueInventory() {
        return QueueBuilder.durable("order.notify.queue.inventory").build();
    }

    // Fanout Exchange(广播交换机)
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    // 将所有队列绑定到 Fanout Exchange(无需 routing key)
    @Bean
    public Binding bindingStat() {
        return BindingBuilder.bind(orderNotifyQueueStat()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingCrm() {
        return BindingBuilder.bind(orderNotifyQueueCrm()).to(fanoutExchange());
    }


    @Bean
    public Binding bindingInventory() {
        return BindingBuilder.bind(orderNotifyQueueInventory()).to(fanoutExchange());
    }

}

消费端只需监听自己的队列

package ***.cy.crm;

import ***.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.***ponent;

import java.io.IOException;

/**
 * 订单CRM消费者
 * 负责消费CRM相关的消息并进行处理
 */
@***ponent
@Slf4j
public class OrderCrmConsumer {

    @RabbitListener(queues = "order.notify.queue.crm")
    public void handleOrder(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
       try {
           log.info("【消费者 CRM】收到订单通知: {}", message);
           channel.basicAck(tag, false);
       } catch (IOException ex) {
           log.error("【消费者 CRM】处理订单通知失败: {}", message, ex);
       }
    }
}

完整项目demo

转载请说明出处内容投诉
CSS教程网 » 事件、websocket、RabbitMQ在java项目中的应用

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买