事件、websocket、RabbitMQ在java项目中的应用
本文介绍餐饮云平台中事件机制、WebSocket 与 RabbitMQ 的应用,以及业务数据在不同场景下的流转方式。
流程图
事件
在某一个业务流程中,同服务内可能需要对多个业务块进行操作,如订单操作完成后,需要记录日志、推送消息给客户端、统计等。使用event可以松耦合、易维护和便扩展。
定义event
定义订单操作event,需要有事件类型用来分流
package ***.cy.order.event;
import ***.cy.order.dto.OrderDto;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
import java.time.LocalDateTime;
/**
* 订单操作事件
* 用于在订单操作(创建、更新、取消等)后通知相关组件进行后续处理
*/
@Getter
public class OrderOperationEvent extends ApplicationEvent {
/**
* 事件类型
*/
private final EventTypeEnum eventType;
/**
* 订单信息
*/
private final OrderDto order;
/**
* 事件发生时间
*/
private final LocalDateTime eventTime;
/**
* 操作者ID(可选)
*/
private final String operatorId;
/**
* 操作描述(可选)
*/
private final String operationDescription;
/**
* 创建订单操作事件
*
* @param source 事件源
* @param eventType 事件类型
* @param order 订单信息
*/
public OrderOperationEvent(Object source, EventTypeEnum eventType, OrderDto order) {
this(source, eventType, order, null, null);
}
/**
* 创建订单操作事件
*
* @param source 事件源
* @param eventType 事件类型
* @param order 订单信息
* @param operatorId 操作者ID
* @param operationDescription 操作描述
*/
public OrderOperationEvent(Object source, EventTypeEnum eventType, OrderDto order,
String operatorId, String operationDescription) {
super(source);
this.eventType = eventType;
this.order = order;
this.eventTime = LocalDateTime.now();
this.operatorId = operatorId;
this.operationDescription = operationDescription;
}
}
监听
事件监听,在项目中定义了三个,且都是上面订单操作ev***的监听;可在同服务中的多个模块定义,哪里需要就在哪里监听:
-
UserNotificationListener: 用户通知监听器 -
StoreNotificationListener: 门店通知监听器 -
PaymentNotificationListener: 支付通知监听器
package ***.cy.order.listener;
import ***.cy.order.dto.OrderDto;
import ***.cy.order.event.EventTypeEnum;
import ***.cy.order.event.OrderOperationEvent;
import ***.cy.order.websocket.AppWebSocketEndpoint;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.***ponent;
import java.time.Instant;
/**
* 用户通知监听器
* 负责处理订单操作后的用户通知相关逻辑
*/
@***ponent
@Slf4j
@RequiredArgsConstructor
public class UserNotificationListener {
/**
* 监听订单操作事件,向用户发送通知
*
* @param event 订单操作事件
*/
@EventListener(OrderOperationEvent.class)
public void handleOrderOperationEvent(OrderOperationEvent event) {
// 只处理订单更新事件
if (event.getEventType() != EventTypeEnum.ORDER_UPDATED) {
return;
}
OrderDto order = event.getOrder();
log.info("收到订单更新事件,准备通过WebSocket向用户发送通知,订单ID: {}, 用户ID: {}",
order.getId(), order.getUserId());
try {
// 构建WebSocket通知消息
String notification = "{"
+ "\"type\": \"order_created\","
+ "\"orderId\": \"" + order.getId() + "\","
+ "\"orderNumber\": \"" + order.getOrderNumber() + "\","
+ "\"message\": \"您的订单已更新成功,订单号:" + order.getOrderNumber() + "\","
+ "\"status\": \"PENDING\","
+ "\"timestamp\": " + Instant.now().toEpochMilli()
+ "}";
// 通过WebSocket发送通知给用户
AppWebSocketEndpoint.sendMessageToUser(order.getUserId(), notification);
log.info("用户通知已通过WebSocket发送,订单ID: {}", order.getId());
} catch (Exception e) {
log.error("发送用户WebSocket通知失败,订单ID: {}", order.getId(), e);
// 这里可以添加重试逻辑或失败记录
}
}
}
websocket
全双工、持久化的客户端与服务端通信协议,适用于需要服务端主动推送、低延迟交互的场景;如:实时订单状态推送、桌台状态同步、通知广播等。
本项目定义了两个端点,分别处理门店和客户端的业务:
- @ServerEndpoint(“/ws/app”) - APP端WebSocket连接端点
- @ServerEndpoint(“/ws/store”)- 门店端WebSocket连接端点
使用客户端会话管理器用于缓存和管理WebSocket客户端连接,可以实现客户端在
线状态显示、超时清理session、发送消息给客户端等;
public void addUserSession(String userId, Session session) {
SessionInfo sessionInfo = new SessionInfo(session, LocalDateTime.now(), session.getId());
userSessions.put(userId, sessionInfo);
log.info("用户会话已添加: userId={}, sessionId={}, remoteAddress={}", userId, session.getId(), session.getRequestURI());
}
/**
* 定期清理无效会话和过期会话
* 使用配置的清理间隔时间
*/
@Scheduled(fixedRateString = "${websocket.cleanup-interval:30000}")
public void cleanInvalidSessions() {
log.debug("开始清理无效会话");
// 清理无效或过期的用户会话
userSessions.entrySet().removeIf(entry -> {
SessionInfo sessionInfo = entry.getValue();
Session session = sessionInfo.getSession();
if (session == null || !session.isOpen() || sessionInfo.isExpired()) {
log.info("清理无效或过期用户会话: userId={}, sessionId={}", entry.getKey(),
sessionInfo != null ? sessionInfo.getSessionId() : "null");
return true;
}
return false;
});
// 清理无效或过期的门店会话
storeSessions.entrySet().removeIf(entry -> {
SessionInfo sessionInfo = entry.getValue();
Session session = sessionInfo.getSession();
if (session == null || !session.isOpen() || sessionInfo.isExpired()) {
log.info("清理无效或过期门店会话: storeId={}, sessionId={}", entry.getKey(),
sessionInfo != null ? sessionInfo.getSessionId() : "null");
closeSession(session);
return true;
}
return false;
});
log.debug("无效会话清理完成,当前用户会话数: {}, 门店会话数: {}",
userSessions.size(), storeSessions.size());
}
RabbitMQ
当业务需要跨服务时,可以用到消息队列来实现数据同步,如有库存管理、crm、统计、erp等服务,可在订单服务器完成业务后通过消息队列将数据同步给这些服务。
这里用到了rabbitmq,现默认每个服务都需要订单埋单后的数据,所以Exchange选amq.fanout,即Fanout Exchange(广播交换机),一条消息被多个消费者都收到。
Producer
│
▼
[Fanout Exchange]
├───> Queue A ───> Consumer A
├───> Queue B ───> Consumer B
└───> Queue C ───> Consumer C
package ***.cy.order.mq;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
/**
* 广播交换机
* 一条消息被多个消费者都收到
*/
@Configuration
public class FanoutRabbitConfig {
public static final String FANOUT_EXCHANGE = "order.fanout.exchange";
// 消费者 统计 的队列
@Bean
public Queue orderNotifyQueueStat() {
return QueueBuilder.durable("order.notify.queue.stat").build();
}
// 消费者 crm 的队列
@Bean
public Queue orderNotifyQueueCrm() {
return QueueBuilder.durable("order.notify.queue.crm").build();
}
// 消费者 库存 的队列
@Bean
public Queue orderNotifyQueueInventory() {
return QueueBuilder.durable("order.notify.queue.inventory").build();
}
// Fanout Exchange(广播交换机)
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
// 将所有队列绑定到 Fanout Exchange(无需 routing key)
@Bean
public Binding bindingStat() {
return BindingBuilder.bind(orderNotifyQueueStat()).to(fanoutExchange());
}
@Bean
public Binding bindingCrm() {
return BindingBuilder.bind(orderNotifyQueueCrm()).to(fanoutExchange());
}
@Bean
public Binding bindingInventory() {
return BindingBuilder.bind(orderNotifyQueueInventory()).to(fanoutExchange());
}
}
消费端只需监听自己的队列
package ***.cy.crm;
import ***.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.***ponent;
import java.io.IOException;
/**
* 订单CRM消费者
* 负责消费CRM相关的消息并进行处理
*/
@***ponent
@Slf4j
public class OrderCrmConsumer {
@RabbitListener(queues = "order.notify.queue.crm")
public void handleOrder(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
log.info("【消费者 CRM】收到订单通知: {}", message);
channel.basicAck(tag, false);
} catch (IOException ex) {
log.error("【消费者 CRM】处理订单通知失败: {}", message, ex);
}
}
}