通讯 (WebSocket)
概述
WebSocket 通讯模块 (ruoyi-common-websocket) 提供实时双向通信与消息推送功能,支持分布式环境下的多服务实例消息分发。该模块采用现代化的架构设计,具备高性能、高可用性和企业级的安全特性。
核心特性
- 实时双向通信:基于 Spring Boot WebSocket 实现高性能的实时通信
- 多租户隔离:完整的多租户支持,确保不同租户间的数据和连接完全隔离
- 多连接支持:同一用户可在多个设备/浏览器标签页建立多个连接,避免互相挤号
- 分布式消息分发:基于 Redis 发布订阅机制实现跨服务实例消息推送
- 用户认证集成:集成 SaToken 认证框架,确保连接安全
- 智能消息路由:优先本地发送,跨实例自动路由
- 会话管理:线程安全的用户会话管理,支持三级映射结构
- 心跳检测:内置心跳机制保持连接活跃
- 权限控制:细粒度的管理员权限控制,支持超级管理员和租户管理员
架构设计
模块依赖
<dependencies>
<!-- 核心模块 - 提供基础功能支持 -->
<dependency>
<groupId>plus.ruoyi</groupId>
<artifactId>ruoyi-common-core</artifactId>
</dependency>
<!-- Redis模块 - 提供分布式消息支持 -->
<dependency>
<groupId>plus.ruoyi</groupId>
<artifactId>ruoyi-common-redis</artifactId>
</dependency>
<!-- 认证模块 - 提供用户认证支持 -->
<dependency>
<groupId>plus.ruoyi</groupId>
<artifactId>ruoyi-common-satoken</artifactId>
</dependency>
<!-- JSON模块 - 提供消息序列化支持 -->
<dependency>
<groupId>plus.ruoyi</groupId>
<artifactId>ruoyi-common-json</artifactId>
</dependency>
<!-- 租户模块 - 多租户支持 -->
<dependency>
<groupId>plus.ruoyi</groupId>
<artifactId>ruoyi-common-tenant</artifactId>
</dependency>
<!-- Spring Boot WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>多租户架构
采用三级映射结构实现多租户隔离:
租户会话映射表:
{
"000000": { // 默认租户
1001: { // 用户ID
"session_001": WebSocketSession1,
"session_002": WebSocketSession2
}
},
"100001": { // 租户1
2001: { // 用户ID
"session_003": WebSocketSession3
}
}
}多租户隔离特性
- 连接隔离:每个租户的WebSocket连接完全独立管理
- 消息隔离:消息只在租户内传递,不会跨租户泄露
- 权限隔离:租户管理员只能管理本租户的连接和用户
- 统计隔离:连接统计按租户分别计算
核心组件
1. 配置层 (Configuration)
WebSocketConfig - 主配置类
- 自动配置条件:
websocket.enabled=true - 配置WebSocket端点、拦截器、处理器
- 初始化主题监听器
WebSocketProperties - 配置属性类
websocket:
enabled: true # 是否启用WebSocket
path: "/websocket" # 服务端点路径,默认 "/websocket"
allowedOrigins: "*" # 允许跨域的源地址,默认 "*"2. 拦截器层 (Interceptor)
PlusWebSocketInterceptor - 握手拦截器
- 用户身份认证验证
- 登录状态检查
- 用户信息注入会话属性
- 支持多租户上下文验证
3. 处理器层 (Handler)
PlusWebSocketHandler - 消息处理器
- 连接建立与关闭处理
- 支持多种格式的心跳检测(简单字符串、JSON格式)
- 文本/二进制消息处理
- 传输错误处理
- 多连接场景下的精确连接管理
4. 会话管理层 (Session Management)
WebSocketSessionHolder - 会话管理器
- 三级映射结构:tenantId -> userId -> (sessionId -> WebSocketSession)
- 线程安全的会话存储 (ConcurrentHashMap)
- 支持同一用户多连接管理
- 会话添加、移除、查询
- 在线用户管理(支持全局和租户级别)
- 连接统计功能
5. 工具层 (Utilities)
WebSocketUtils - 工具类
- 消息发送(单点、批量、群发)
- 智能消息路由分发
- 跨租户消息发送(超级管理员专用)
- 全局消息广播(超级管理员专用)
- Redis发布订阅消息处理
- 权限验证集成
6. 监听器层 (Listener)
WebSocketTopicListener - 主题监听器
- 应用启动时初始化Redis订阅
- 跨服务实例消息分发处理
- 支持全局消息、租户消息、定向消息处理
- 多租户上下文动态切换
7. 管理层 (Administration)
WebSocketAdminController - 管理控制器
- 提供完整的WebSocket管理API
- 支持超级管理员和租户管理员权限控制
- 连接统计查询
- 在线用户管理
- 消息推送管理
- 连接断开管理
消息流转机制
智能分发策略
- 本地优先:优先在当前服务实例内直接发送消息给用户的所有连接
- 跨实例路由:对于不在当前实例的用户,通过Redis发布订阅机制分发到其他实例
- 租户隔离:确保消息只在指定租户内传递
- 多连接支持:向用户的所有活跃连接发送消息
消息流程图
发送消息 → 检查租户权限 → 获取目标用户连接
↓ ↓
权限验证 本地连接存在?
↓ ↓ ↓
通过检查 发送到所有连接 加入Redis队列
↓ ↓
完成 发布到主题
↓
其他实例监听
↓
租户上下文切换
↓
处理并发送到所有连接API 使用指南
配置启用
在 application.yml 中配置:
websocket:
enabled: true # 启用WebSocket功能
path: "/resource/websocket" # WebSocket端点路径
allowedOrigins: "*" # 允许的跨域源核心API
1. 消息发送API
向指定用户发送消息(发送到用户的所有连接)
// 向单个用户的所有连接发送消息
WebSocketUtils.sendMessage(userId, "Hello World");
// 向多个用户发送消息
List<Long> userIds = Arrays.asList(1001L, 1002L, 1003L);
WebSocketMessageDto message = WebSocketMessageDto.of(userIds, "批量消息内容");
WebSocketUtils.publishMessage(message);向用户的特定连接发送消息
// 向用户的特定连接发送消息
boolean success = WebSocketUtils.sendMessage(userId, sessionId, "特定连接消息");群发消息
// 向当前租户的所有在线用户发送消息
WebSocketUtils.publishAll("系统公告:服务将在10分钟后维护");
// 向所有租户的所有在线用户发送消息(需要超级管理员权限)
WebSocketUtils.publishGlobal("全局系统维护通知");
// 跨租户发送消息(需要超级管理员权限)
WebSocketUtils.publishCrossTenant("100001", Arrays.asList(2001L, 2002L), "跨租户消息");2. 会话管理API
// 检查用户是否在线(当前租户)
boolean isOnline = WebSocketSessionHolder.isUserOnline(userId);
// 获取用户的所有会话(当前租户)
Map<String, WebSocketSession> userSessions = WebSocketSessionHolder.getUserSessions(userId);
// 获取用户的特定会话
WebSocketSession session = WebSocketSessionHolder.getSession(userId, sessionId);
// 获取当前租户所有在线用户ID
Set<Long> onlineUsers = WebSocketSessionHolder.getAllUserIds();
// 获取全局所有在线用户ID(超级管理员专用)
Set<Long> globalOnlineUsers = WebSocketSessionHolder.getGlobalAllUserIds();
// 移除用户的特定会话
WebSocketSessionHolder.removeSessionById(userId, sessionId);
// 移除用户的所有会话
WebSocketSessionHolder.removeAllSessions(userId);3. 统计信息API
// 获取当前租户连接统计
WebSocketSessionHolder.ConnectionStats stats = WebSocketUtils.getConnectionStats();
// 获取全局连接统计(超级管理员专用)
WebSocketSessionHolder.ConnectionStats globalStats = WebSocketUtils.getGlobalConnectionStats();4. 消息订阅API
// 订阅WebSocket消息主题(通常在监听器中使用)
WebSocketUtils.subscribeMessage(message -> {
// 处理接收到的消息
if (message.isGlobal()) {
System.out.println("收到全局消息: " + message.getMessage());
} else if (message.isBroadcast()) {
System.out.println("收到租户广播消息: " + message.getMessage());
} else {
System.out.println("收到定向消息: " + message.getMessage());
System.out.println("目标用户: " + message.getUserIds());
}
});管理API
1. 连接统计
@RestController
@RequestMapping("/webSocket")
public class WebSocketController {
/**
* 获取连接统计信息
*/
@GetMapping("/getStats")
@SaCheckRole(value = {TenantConstants.SUPER_ADMIN_ROLE_KEY, TenantConstants.TENANT_ADMIN_ROLE_KEY}, mode = SaMode.OR)
public R<WebSocketSessionHolder.ConnectionStats> getStats() {
if (LoginHelper.isSuperAdmin()) {
// 超级管理员查看全局统计
return R.ok(WebSocketUtils.getGlobalConnectionStats());
} else {
// 租户管理员查看本租户统计
return R.ok(WebSocketUtils.getConnectionStats());
}
}
}2. 在线用户管理
/**
* 获取在线用户列表
*/
@GetMapping("/getOnlineUsers")
@SaCheckRole(value = {TenantConstants.SUPER_ADMIN_ROLE_KEY, TenantConstants.TENANT_ADMIN_ROLE_KEY}, mode = SaMode.OR)
public R<Set<Long>> getOnlineUsers(@RequestParam(required = false, defaultValue = "false") boolean global) {
if (global && LoginHelper.isSuperAdmin()) {
// 超级管理员查看全局在线用户
return R.ok(WebSocketSessionHolder.getGlobalAllUserIds());
} else {
// 查看当前租户在线用户
return R.ok(WebSocketSessionHolder.getAllUserIds());
}
}3. 消息推送管理
/**
* 向指定用户发送消息
*/
@PostMapping("/sendUserMessage")
@SaCheckRole(value = {TenantConstants.SUPER_ADMIN_ROLE_KEY, TenantConstants.TENANT_ADMIN_ROLE_KEY}, mode = SaMode.OR)
public R<Void> sendUserMessage(@RequestBody SendUserMessageRequest request) {
WebSocketUtils.publishMessage(WebSocketMessageDto.of(request.getUserId(), request.getMessage()));
return R.ok();
}
/**
* 广播消息
*/
@PostMapping("/broadcastMessage")
@SaCheckRole(value = {TenantConstants.SUPER_ADMIN_ROLE_KEY, TenantConstants.TENANT_ADMIN_ROLE_KEY}, mode = SaMode.OR)
public R<Void> broadcastMessage(@RequestBody BroadcastMessageRequest request) {
if (request.isGlobal() && LoginHelper.isSuperAdmin()) {
// 超级管理员全局群发
WebSocketUtils.publishGlobal(request.getMessage());
} else {
// 当前租户群发
WebSocketUtils.publishAll(request.getMessage());
}
return R.ok();
}业务场景集成示例
1. 实时通知推送
@Service
public class NotificationPushService {
/**
* 推送订单状态变更通知
*/
public void pushOrderStatusChange(Long userId, String orderId, String status) {
WebSocketMessage message = WebSocketMessage.create(
"order_status",
"订单状态更新",
Map.of("orderId", orderId, "status", status)
);
WebSocketUtils.sendMessage(userId, JsonUtils.toJsonString(message));
}
/**
* 推送系统维护通知(租户级别)
*/
public void pushTenantMaintenance(String content, LocalDateTime scheduleTime) {
WebSocketMessage message = WebSocketMessage.create(
"system_notice",
"系统维护通知",
Map.of("content", content, "scheduleTime", scheduleTime)
);
// 向当前租户所有在线用户推送
WebSocketUtils.publishAll(JsonUtils.toJsonString(message));
}
/**
* 推送全局系统维护通知(超级管理员专用)
*/
public void pushGlobalMaintenance(String content, LocalDateTime scheduleTime) {
if (!LoginHelper.isSuperAdmin()) {
throw new RuntimeException("无权限执行全局推送操作");
}
WebSocketMessage message = WebSocketMessage.create(
"global_notice",
"全局系统维护通知",
Map.of("content", content, "scheduleTime", scheduleTime)
);
// 向所有租户所有在线用户推送
WebSocketUtils.publishGlobal(JsonUtils.toJsonString(message));
}
}2. 多连接场景处理
@Service
public class MultiConnectionService {
/**
* 向用户的所有设备推送消息
*/
public void pushToAllDevices(Long userId, String message) {
// 获取用户的所有连接
Map<String, WebSocketSession> userSessions = WebSocketSessionHolder.getUserSessions(userId);
log.info("向用户 {} 的 {} 个连接推送消息", userId, userSessions.size());
// 发送到用户的所有连接
WebSocketUtils.sendMessage(userId, message);
}
/**
* 向用户的特定设备推送消息
*/
public boolean pushToSpecificDevice(Long userId, String sessionId, String message) {
return WebSocketUtils.sendMessage(userId, sessionId, message);
}
/**
* 获取用户的连接详情
*/
public UserConnectionInfo getUserConnectionInfo(Long userId) {
Map<String, WebSocketSession> sessions = WebSocketSessionHolder.getUserSessions(userId);
UserConnectionInfo info = new UserConnectionInfo();
info.setUserId(userId);
info.setTenantId(TenantHelper.getTenantId());
info.setOnline(WebSocketSessionHolder.isUserOnline(userId));
info.setConnectionCount(sessions.size());
info.setSessionIds(sessions.keySet());
return info;
}
/**
* 强制断开用户的某个特定连接
*/
public void disconnectSpecificSession(Long userId, String sessionId, String reason) {
// 先发送断开通知
WebSocketMessage message = WebSocketMessage.create(
"force_disconnect",
"连接被管理员断开",
Map.of("reason", reason)
);
WebSocketUtils.sendMessage(userId, sessionId, JsonUtils.toJsonString(message));
// 断开连接
WebSocketSessionHolder.removeSessionById(userId, sessionId);
}
}3. 租户隔离的消息处理
@Service
public class TenantMessageService {
/**
* 向租户内的指定角色用户发送消息
*/
public void sendToRoleUsers(String roleKey, String message) {
// 获取当前租户具有指定角色的用户ID列表
List<Long> roleUserIds = getUserIdsByRole(roleKey);
if (CollUtil.isNotEmpty(roleUserIds)) {
WebSocketMessageDto messageDto = WebSocketMessageDto.of(roleUserIds, message);
WebSocketUtils.publishMessage(messageDto);
}
}
/**
* 跨租户消息发送(超级管理员专用)
*/
public void sendCrossTenantMessage(String targetTenantId, List<Long> userIds, String message) {
if (!LoginHelper.isSuperAdmin()) {
throw new RuntimeException("无权限执行跨租户发送操作");
}
WebSocketUtils.publishCrossTenant(targetTenantId, userIds, message);
}
/**
* 获取租户连接统计对比
*/
public Map<String, Object> getTenantCompareStats() {
if (!LoginHelper.isSuperAdmin()) {
throw new RuntimeException("无权限查看全局统计");
}
Map<String, Object> result = new HashMap<>();
// 全局统计
WebSocketSessionHolder.ConnectionStats globalStats = WebSocketUtils.getGlobalConnectionStats();
result.put("global", globalStats);
// 各租户统计(需要切换租户上下文)
Map<String, WebSocketSessionHolder.ConnectionStats> tenantStats = new HashMap<>();
for (String tenantId : WebSocketSessionHolder.getAllTenantIds()) {
TenantHelper.dynamic(tenantId, () -> {
WebSocketSessionHolder.ConnectionStats stats = WebSocketUtils.getConnectionStats();
tenantStats.put(tenantId, stats);
});
}
result.put("tenants", tenantStats);
return result;
}
private List<Long> getUserIdsByRole(String roleKey) {
// 根据角色获取用户ID列表的具体实现
// 这里需要根据实际的用户角色查询逻辑实现
return Collections.emptyList();
}
}核心实现原理
多租户连接认证机制
WebSocket连接建立时的多租户认证流程:
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) {
try {
// 从请求中获取token并验证用户登录状态(包含租户信息)
LoginUser loginUser = LoginHelper.getLoginUser();
// 验证租户权限和状态
String tenantId = TenantHelper.getTenantId();
if (StringUtils.isBlank(tenantId)) {
log.error("WebSocket 认证失败: 无效的租户信息");
return false;
}
// 将用户信息存储到WebSocket会话属性中
attributes.put(LOGIN_USER, loginUser);
return true;
} catch (NotLoginException e) {
log.error("WebSocket 认证失败: {}, 无法访问系统资源", e.getMessage());
return false; // 拒绝连接
}
}多连接会话生命周期管理
连接建立:
- 通过握手拦截器验证用户身份和租户权限
- 将用户信息注入WebSocket会话属性
- 在会话管理器中注册用户会话映射(支持同一用户多个连接)
- 使用sessionId作为连接的唯一标识
- 记录连接建立日志和统计信息
消息处理:
- 从会话属性中获取用户信息
- 验证租户上下文
- 处理心跳检测(支持多种格式)
- 构造消息传输对象
- 调用消息发布机制
连接关闭:
- 从会话属性中获取用户信息
- 从会话管理器中精确移除指定会话
- 保留用户的其他活跃连接
- 清理租户映射(如果用户无连接且租户无用户)
- 记录连接关闭日志
多租户分布式消息分发机制
智能路由算法:
public static void publishMessage(WebSocketMessageDto webSocketMessage) {
// 确保消息对象有租户ID
if (StringUtils.isBlank(webSocketMessage.getTenantId())) {
webSocketMessage.setTenantId(TenantHelper.getTenantId());
}
String targetTenantId = webSocketMessage.getTenantId();
List<Long> unsentUserIds = new ArrayList<>();
// 第一阶段:本地会话处理(在目标租户上下文中)
for (Long userId : webSocketMessage.getUserIds()) {
if (WebSocketSessionHolder.isUserOnline(userId)) {
// 用户在当前实例,发送到用户的所有连接
sendMessage(userId, webSocketMessage.getMessage());
} else {
// 用户不在当前实例,标记为待分发
unsentUserIds.add(userId);
}
}
// 第二阶段:跨实例分发
if (CollUtil.isNotEmpty(unsentUserIds)) {
WebSocketMessageDto broadcastMessage = WebSocketMessageDto.of(unsentUserIds, webSocketMessage.getMessage());
broadcastMessage.setTenantId(targetTenantId); // 保持租户信息
// 通过Redis发布订阅分发到其他实例
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
log.info("WebSocket跨实例消息分发 - 租户: {}, 目标用户:{}", targetTenantId, unsentUserIds);
});
}
}多租户订阅处理机制:
// 监听Redis主题消息
WebSocketUtils.subscribeMessage((message) -> {
try {
// 处理全局消息(所有租户)
if (message.isGlobal()) {
handleGlobalMessage(message);
return;
}
// 获取消息的目标租户ID
String messageTenantId = message.getTenantId();
if (messageTenantId == null) {
log.warn("收到无租户ID的消息,忽略处理");
return;
}
// 在指定租户上下文中处理消息
TenantHelper.dynamic(messageTenantId, () -> {
if (CollUtil.isNotEmpty(message.getUserIds())) {
// 定向推送:向指定用户的所有连接发送消息
handleTargetedMessage(message, messageTenantId);
} else {
// 群发消息:向当前租户的所有在线用户发送消息
handleBroadcastMessage(message, messageTenantId);
}
});
} catch (Exception e) {
log.error("处理WebSocket消息异常: {}", e.getMessage(), e);
}
});线程安全保证
会话并发管理:
- 使用
ConcurrentHashMap保证线程安全的会话存储 - 三级映射的同步控制机制
- 会话添加/移除操作的原子性处理
- synchronized关键字保证消息发送的线程安全
public static void addSession(Long userId, String sessionId, WebSocketSession session) {
String tenantId = TenantHelper.getTenantId();
// 添加同步控制
synchronized (TENANT_SESSION_MAP) {
Map<Long, Map<String, WebSocketSession>> tenantUsers = TENANT_SESSION_MAP.computeIfAbsent(
tenantId, k -> new ConcurrentHashMap<>()
);
Map<String, WebSocketSession> userSessions = tenantUsers.computeIfAbsent(
userId, k -> new ConcurrentHashMap<>()
);
// 处理已存在的会话
WebSocketSession existingSession = userSessions.get(sessionId);
if (existingSession != null && existingSession.isOpen()) {
try {
existingSession.close(CloseStatus.NORMAL);
} catch (Exception e) {
log.warn("关闭旧会话异常: {}", e.getMessage());
}
}
userSessions.put(sessionId, session);
}
}消息格式标准化
统一消息格式
@Data
public class WebSocketMessage {
/**
* 消息类型:notification、chat、system、business等
*/
private String type;
/**
* 消息标题
*/
private String title;
/**
* 消息内容
*/
private Object content;
/**
* 时间戳
*/
private Long timestamp;
/**
* 发送者ID
*/
private Long senderId;
/**
* 租户ID
*/
private String tenantId;
/**
* 消息优先级
*/
private Integer priority;
/**
* 扩展数据
*/
private Map<String, Object> extra;
public static WebSocketMessage create(String type, String title, Object content) {
WebSocketMessage message = new WebSocketMessage();
message.setType(type);
message.setTitle(title);
message.setContent(content);
message.setTimestamp(System.currentTimeMillis());
message.setTenantId(TenantHelper.getTenantId());
return message;
}
}心跳消息格式支持
模块支持多种心跳消息格式:
- 简单字符串格式:
"ping" - JSON格式:
{"type":"ping","timestamp":1234567890} - 自定义格式:可扩展支持其他格式
权限控制
角色权限说明
- 超级管理员:可以查看和管理所有租户的WebSocket连接
- 租户管理员:只能查看和管理本租户的WebSocket连接
- 普通用户:只能建立和使用自己的WebSocket连接
权限控制实现
@SaCheckRole(value = {TenantConstants.SUPER_ADMIN_ROLE_KEY, TenantConstants.TENANT_ADMIN_ROLE_KEY}, mode = SaMode.OR)
public R<WebSocketSessionHolder.ConnectionStats> getStats() {
if (LoginHelper.isSuperAdmin()) {
// 超级管理员查看全局统计
return R.ok(WebSocketUtils.getGlobalConnectionStats());
} else {
// 租户管理员查看本租户统计
return R.ok(WebSocketUtils.getConnectionStats());
}
}性能优化
连接管理优化
1. 连接池配置
WebSocket连接的性能与服务器资源密切相关,需要合理配置连接参数:
# application.yml
server:
undertow:
# 工作线程数(建议为CPU核心数的2-4倍)
worker-threads: 200
# IO线程数(建议为CPU核心数)
io-threads: 8
# 直接内存缓冲区
direct-buffers: true
# 缓冲区大小
buffer-size: 1024
# WebSocket配置
websocket:
enabled: true
path: "/resource/websocket"
allowedOrigins: "*"2. 会话清理策略
系统采用懒清理策略,在以下时机清理无效连接:
/**
* 自动清理无效连接的场景
*/
public class SessionCleanupStrategy {
// 1. 消息发送失败时清理
public static void sendMessage(Long userId, String message) {
Map<String, WebSocketSession> userSessions = WebSocketSessionHolder.getUserSessions(userId);
List<String> failedSessions = new ArrayList<>();
for (Map.Entry<String, WebSocketSession> entry : userSessions.entrySet()) {
if (!sendMessage(entry.getValue(), message)) {
failedSessions.add(entry.getKey());
}
}
// 清理失败的连接
for (String sessionId : failedSessions) {
WebSocketSessionHolder.removeSessionById(userId, sessionId);
}
}
// 2. 连接关闭回调时清理
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
Long userId = getLoginUser(session).getUserId();
WebSocketSessionHolder.removeSessionById(userId, session.getId());
}
// 3. 用户登出时强制清理
public void onUserLogout(Long userId) {
WebSocketSessionHolder.removeAllSessions(userId);
}
}3. 心跳机制优化
多种心跳格式支持,客户端可根据需求选择:
/**
* 心跳处理策略
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
String payload = message.getPayload();
// 方式1:简单字符串心跳(最轻量)
if ("ping".equals(payload)) {
WebSocketUtils.sendMessage(session, "pong");
return;
}
// 方式2:JSON格式心跳(携带时间戳)
if (payload.contains("\"type\":\"ping\"")) {
JSONObject response = new JSONObject();
response.put("type", "pong");
response.put("timestamp", System.currentTimeMillis());
WebSocketUtils.sendMessage(session, response.toString());
return;
}
// 业务消息处理...
}消息发送优化
1. 批量消息合并
对于高频消息场景,可使用消息合并策略:
@Service
public class MessageBatchService {
private final ConcurrentMap<Long, List<String>> pendingMessages = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
public MessageBatchService() {
// 每100ms刷新一次待发送消息
scheduler.scheduleAtFixedRate(this::flushMessages, 100, 100, TimeUnit.MILLISECONDS);
}
/**
* 添加待发送消息
*/
public void addMessage(Long userId, String message) {
pendingMessages.computeIfAbsent(userId, k -> new CopyOnWriteArrayList<>()).add(message);
}
/**
* 刷新发送消息
*/
private void flushMessages() {
for (Map.Entry<Long, List<String>> entry : pendingMessages.entrySet()) {
Long userId = entry.getKey();
List<String> messages = entry.getValue();
if (!messages.isEmpty()) {
// 合并消息为JSON数组
JSONArray batch = new JSONArray();
messages.forEach(batch::add);
WebSocketUtils.sendMessage(userId, batch.toString());
messages.clear();
}
}
}
}2. 消息压缩
对于大消息体,可启用压缩:
/**
* 消息压缩工具
*/
public class MessageCompressor {
private static final int COMPRESS_THRESHOLD = 1024; // 1KB以上启用压缩
public static String compressIfNeeded(String message) {
if (message.length() > COMPRESS_THRESHOLD) {
// 使用Gzip压缩并Base64编码
byte[] compressed = GzipUtils.compress(message.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(compressed);
}
return message;
}
public static String decompressIfNeeded(String message) {
if (isCompressed(message)) {
byte[] decoded = Base64.getDecoder().decode(message);
return new String(GzipUtils.decompress(decoded), StandardCharsets.UTF_8);
}
return message;
}
}分布式性能优化
1. 本地优先策略
系统采用本地优先发送策略,减少Redis通信开销:
public static void publishMessage(WebSocketMessageDto webSocketMessage) {
List<Long> userIds = webSocketMessage.getUserIds();
List<Long> unsentUserIds = new ArrayList<>();
// 第一阶段:本地会话处理(无Redis开销)
for (Long userId : userIds) {
if (WebSocketSessionHolder.isUserOnline(userId)) {
// 用户在当前实例,直接发送
sendMessage(userId, webSocketMessage.getMessage());
} else {
unsentUserIds.add(userId);
}
}
// 第二阶段:仅对不在本地的用户通过Redis分发
if (CollUtil.isNotEmpty(unsentUserIds)) {
WebSocketMessageDto broadcastMessage = WebSocketMessageDto.of(
unsentUserIds, webSocketMessage.getMessage()
);
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
log.info("跨实例消息分发 - 目标用户: {}", unsentUserIds);
});
}
}2. Redis发布订阅优化
# Redis连接池配置
spring:
data:
redis:
lettuce:
pool:
# 最大连接数
max-active: 50
# 最大空闲连接
max-idle: 20
# 最小空闲连接
min-idle: 5
# 获取连接超时时间
max-wait: 2000ms监控指标
连接统计
/**
* 定期记录连接统计信息
*/
@Scheduled(fixedRate = 60000) // 每分钟
public void logConnectionStats() {
// 全局统计
ConnectionStats globalStats = WebSocketSessionHolder.getGlobalConnectionStats();
log.info("WebSocket全局统计 - 租户数: {}, 在线用户: {}, 总连接数: {}",
globalStats.getTotalTenants(),
globalStats.getOnlineUsers(),
globalStats.getTotalConnections()
);
// 各租户统计
for (String tenantId : WebSocketSessionHolder.getAllTenantIds()) {
TenantHelper.dynamic(tenantId, () -> {
ConnectionStats stats = WebSocketSessionHolder.getConnectionStats();
log.info("租户 {} 统计 - 在线用户: {}, 连接数: {}",
tenantId, stats.getOnlineUsers(), stats.getTotalConnections());
});
}
}模块集成
与Redis模块集成
WebSocket模块依赖Redis实现分布式消息分发:
/**
* Redis发布订阅消息处理
*/
public class RedisWebSocketIntegration {
/**
* 订阅WebSocket消息主题
*/
public static void subscribeMessage(Consumer<WebSocketMessageDto> consumer) {
RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer);
}
/**
* 发布消息到Redis主题
*/
public static void publishToRedis(WebSocketMessageDto message) {
RedisUtils.publish(WEB_SOCKET_TOPIC, message, consumer -> {
log.info("消息发布到Redis - topic: {}", WEB_SOCKET_TOPIC);
});
}
}与SaToken模块集成
WebSocket连接必须经过SaToken认证:
/**
* WebSocket握手拦截器 - SaToken集成
*/
public class PlusWebSocketInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) {
try {
// 从请求中获取token并验证用户登录状态
LoginUser loginUser = LoginHelper.getLoginUser();
// 验证租户权限和状态
String tenantId = TenantHelper.getTenantId();
if (StringUtils.isBlank(tenantId)) {
log.error("WebSocket认证失败: 无效的租户信息");
return false;
}
// 将用户信息存储到WebSocket会话属性中
attributes.put(LOGIN_USER, loginUser);
return true;
} catch (NotLoginException e) {
log.error("WebSocket认证失败: {}", e.getMessage());
return false;
}
}
}与租户模块集成
多租户隔离是WebSocket模块的核心特性:
/**
* 多租户WebSocket消息处理
*/
public class TenantWebSocketIntegration {
/**
* 在指定租户上下文中处理消息
*/
public void handleMessage(WebSocketMessageDto message) {
String messageTenantId = message.getTenantId();
// 在指定租户上下文中处理消息
TenantHelper.dynamic(messageTenantId, () -> {
if (CollUtil.isNotEmpty(message.getUserIds())) {
// 定向推送
handleTargetedMessage(message, messageTenantId);
} else {
// 群发消息
handleBroadcastMessage(message, messageTenantId);
}
});
}
/**
* 处理全局消息(所有租户)
*/
public void handleGlobalMessage(WebSocketMessageDto message) {
for (String tenantId : WebSocketSessionHolder.getAllTenantIds()) {
TenantHelper.dynamic(tenantId, () -> {
var tenantUserIds = WebSocketSessionHolder.getAllUserIds();
for (Long userId : tenantUserIds) {
WebSocketUtils.sendMessage(userId, message.getMessage());
}
});
}
}
}与日志模块集成
WebSocket操作可集成日志记录:
@Service
public class WebSocketLogService {
@Autowired
private OperLogService operLogService;
/**
* 记录WebSocket管理操作日志
*/
public void logAdminOperation(String action, Long targetUserId, String detail) {
OperLog operLog = new OperLog();
operLog.setTitle("WebSocket管理");
operLog.setBusinessType(BusinessType.OTHER.ordinal());
operLog.setMethod("WebSocket." + action);
operLog.setOperatorType(OperatorType.MANAGE.ordinal());
operLog.setOperName(LoginHelper.getUsername());
operLog.setOperParam(String.format("目标用户: %d, 详情: %s", targetUserId, detail));
operLog.setOperTime(new Date());
operLogService.insertOperlog(operLog);
}
}测试策略
单元测试
会话管理器测试
@ExtendWith(MockitoExtension.class)
class WebSocketSessionHolderTest {
@Mock
private WebSocketSession mockSession;
@BeforeEach
void setUp() {
// 模拟租户上下文
TenantHelper.setTenantId("000000");
when(mockSession.getId()).thenReturn("test-session-001");
when(mockSession.isOpen()).thenReturn(true);
}
@Test
void testAddSession() {
Long userId = 1001L;
String sessionId = "test-session-001";
// 添加会话
WebSocketSessionHolder.addSession(userId, sessionId, mockSession);
// 验证会话存在
assertTrue(WebSocketSessionHolder.isUserOnline(userId));
assertTrue(WebSocketSessionHolder.isSessionExists(userId, sessionId));
assertEquals(mockSession, WebSocketSessionHolder.getSession(userId, sessionId));
}
@Test
void testMultipleConnections() {
Long userId = 1001L;
WebSocketSession session1 = mock(WebSocketSession.class);
WebSocketSession session2 = mock(WebSocketSession.class);
when(session1.getId()).thenReturn("session-001");
when(session2.getId()).thenReturn("session-002");
when(session1.isOpen()).thenReturn(true);
when(session2.isOpen()).thenReturn(true);
// 添加多个连接
WebSocketSessionHolder.addSession(userId, "session-001", session1);
WebSocketSessionHolder.addSession(userId, "session-002", session2);
// 验证用户有2个连接
Map<String, WebSocketSession> sessions = WebSocketSessionHolder.getUserSessions(userId);
assertEquals(2, sessions.size());
}
@Test
void testRemoveSpecificSession() {
Long userId = 1001L;
WebSocketSession session1 = mock(WebSocketSession.class);
WebSocketSession session2 = mock(WebSocketSession.class);
when(session1.getId()).thenReturn("session-001");
when(session2.getId()).thenReturn("session-002");
when(session1.isOpen()).thenReturn(true);
when(session2.isOpen()).thenReturn(true);
WebSocketSessionHolder.addSession(userId, "session-001", session1);
WebSocketSessionHolder.addSession(userId, "session-002", session2);
// 移除一个连接
WebSocketSessionHolder.removeSessionById(userId, "session-001");
// 验证只剩一个连接
assertFalse(WebSocketSessionHolder.isSessionExists(userId, "session-001"));
assertTrue(WebSocketSessionHolder.isSessionExists(userId, "session-002"));
assertTrue(WebSocketSessionHolder.isUserOnline(userId));
}
@Test
void testConnectionStats() {
Long userId1 = 1001L;
Long userId2 = 1002L;
WebSocketSession session1 = mock(WebSocketSession.class);
WebSocketSession session2 = mock(WebSocketSession.class);
WebSocketSession session3 = mock(WebSocketSession.class);
when(session1.getId()).thenReturn("session-001");
when(session2.getId()).thenReturn("session-002");
when(session3.getId()).thenReturn("session-003");
WebSocketSessionHolder.addSession(userId1, "session-001", session1);
WebSocketSessionHolder.addSession(userId1, "session-002", session2);
WebSocketSessionHolder.addSession(userId2, "session-003", session3);
// 验证统计信息
WebSocketSessionHolder.ConnectionStats stats = WebSocketSessionHolder.getConnectionStats();
assertEquals(2, stats.getOnlineUsers()); // 2个用户
assertEquals(3, stats.getTotalConnections()); // 3个连接
}
}消息处理器测试
@ExtendWith(MockitoExtension.class)
class PlusWebSocketHandlerTest {
@Mock
private WebSocketSession mockSession;
@Mock
private MessageProcessor mockProcessor;
@InjectMocks
private PlusWebSocketHandler handler;
private LoginUser loginUser;
@BeforeEach
void setUp() {
loginUser = new LoginUser();
loginUser.setUserId(1001L);
loginUser.setUserType("sys_user");
Map<String, Object> attributes = new HashMap<>();
attributes.put("LOGIN_USER", loginUser);
when(mockSession.getAttributes()).thenReturn(attributes);
when(mockSession.getId()).thenReturn("test-session");
}
@Test
void testHandlePingMessage() throws Exception {
TextMessage pingMessage = new TextMessage("ping");
// 处理ping消息
handler.handleTextMessage(mockSession, pingMessage);
// 验证返回pong响应
verify(mockSession).sendMessage(argThat(msg ->
msg instanceof TextMessage && "pong".equals(((TextMessage) msg).getPayload())
));
}
@Test
void testHandleJsonMessage() throws Exception {
String jsonPayload = "{\"type\":\"chat\",\"content\":\"Hello\"}";
TextMessage message = new TextMessage(jsonPayload);
when(mockProcessor.support("chat")).thenReturn(true);
handler.handleTextMessage(mockSession, message);
verify(mockProcessor).process(eq(mockSession), eq(loginUser), eq(jsonPayload));
}
}集成测试
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = {"websocket.enabled=true"})
class WebSocketIntegrationTest {
@LocalServerPort
private int port;
private WebSocketClient client;
private WebSocketSession clientSession;
private List<String> receivedMessages = new CopyOnWriteArrayList<>();
@BeforeEach
void setUp() throws Exception {
client = new StandardWebSocketClient();
// 获取测试token
String token = getTestToken();
// 建立WebSocket连接
URI uri = new URI("ws://localhost:" + port + "/resource/websocket?token=" + token);
clientSession = client.execute(new TextWebSocketHandler() {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
receivedMessages.add(message.getPayload());
}
}, uri).get(5, TimeUnit.SECONDS);
}
@Test
void testConnectionEstablishment() {
assertTrue(clientSession.isOpen());
}
@Test
void testHeartbeat() throws Exception {
// 发送ping
clientSession.sendMessage(new TextMessage("ping"));
// 等待pong响应
Thread.sleep(500);
assertTrue(receivedMessages.contains("pong"));
}
@Test
void testMessageReceive() throws Exception {
Long testUserId = 1001L;
String testMessage = "Test notification";
// 服务端发送消息
WebSocketUtils.sendMessage(testUserId, testMessage);
// 等待消息到达
Thread.sleep(500);
assertTrue(receivedMessages.contains(testMessage));
}
@AfterEach
void tearDown() throws Exception {
if (clientSession != null && clientSession.isOpen()) {
clientSession.close();
}
}
}负载测试
@SpringBootTest
class WebSocketLoadTest {
@Test
void testMassiveConnections() throws Exception {
int connectionCount = 1000;
List<WebSocketSession> sessions = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(100);
CountDownLatch latch = new CountDownLatch(connectionCount);
AtomicInteger successCount = new AtomicInteger(0);
for (int i = 0; i < connectionCount; i++) {
final int userId = i;
executor.submit(() -> {
try {
WebSocketSession session = createConnection(userId);
if (session != null && session.isOpen()) {
sessions.add(session);
successCount.incrementAndGet();
}
} finally {
latch.countDown();
}
});
}
latch.await(60, TimeUnit.SECONDS);
log.info("连接测试结果 - 成功: {}/{}", successCount.get(), connectionCount);
assertTrue(successCount.get() >= connectionCount * 0.95); // 95%成功率
// 清理
for (WebSocketSession session : sessions) {
try {
session.close();
} catch (Exception ignored) {}
}
}
@Test
void testMessageThroughput() throws Exception {
int messageCount = 10000;
Long testUserId = 1001L;
AtomicInteger sentCount = new AtomicInteger(0);
long startTime = System.currentTimeMillis();
for (int i = 0; i < messageCount; i++) {
WebSocketUtils.sendMessage(testUserId, "Message " + i);
sentCount.incrementAndGet();
}
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
double throughput = (double) messageCount / duration * 1000;
log.info("消息吞吐量测试 - 发送: {} 条, 耗时: {} ms, 吞吐量: {} msg/s",
messageCount, duration, throughput);
assertTrue(throughput >= 1000); // 至少1000 msg/s
}
}安全考虑
连接认证
所有WebSocket连接必须经过认证:
/**
* WebSocket连接认证策略
*/
public class WebSocketSecurityConfig {
/**
* 握手前验证Token
*/
public boolean validateConnection(ServerHttpRequest request) {
// 1. 从URL参数获取Token
String token = extractToken(request);
if (StringUtils.isBlank(token)) {
log.warn("WebSocket连接拒绝: 缺少Token");
return false;
}
// 2. 验证Token有效性
try {
StpUtil.checkLogin();
} catch (NotLoginException e) {
log.warn("WebSocket连接拒绝: Token无效 - {}", e.getMessage());
return false;
}
// 3. 验证租户状态
String tenantId = TenantHelper.getTenantId();
if (!isTenantActive(tenantId)) {
log.warn("WebSocket连接拒绝: 租户已禁用 - {}", tenantId);
return false;
}
return true;
}
}消息权限控制
/**
* 消息发送权限验证
*/
public class MessagePermissionValidator {
/**
* 验证是否可以发送全局消息
*/
public static void validateGlobalMessage() {
if (!LoginHelper.isSuperAdmin()) {
throw new ServiceException("无权限发送全局消息");
}
}
/**
* 验证是否可以跨租户发送消息
*/
public static void validateCrossTenantMessage(String targetTenantId) {
if (!LoginHelper.isSuperAdmin()) {
throw new ServiceException("无权限跨租户发送消息");
}
}
/**
* 验证是否可以向指定用户发送消息
*/
public static void validateTargetUser(Long targetUserId) {
String currentTenantId = TenantHelper.getTenantId();
String userTenantId = getUserTenantId(targetUserId);
if (!currentTenantId.equals(userTenantId) && !LoginHelper.isSuperAdmin()) {
throw new ServiceException("无权限向其他租户用户发送消息");
}
}
}防止连接滥用
/**
* 连接限制策略
*/
@Component
public class ConnectionLimitInterceptor implements HandshakeInterceptor {
// 每个用户最大连接数
private static final int MAX_CONNECTIONS_PER_USER = 10;
// 每个IP最大连接数
private static final int MAX_CONNECTIONS_PER_IP = 50;
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) {
LoginUser loginUser = LoginHelper.getLoginUser();
Long userId = loginUser.getUserId();
// 检查用户连接数限制
Map<String, WebSocketSession> userSessions = WebSocketSessionHolder.getUserSessions(userId);
if (userSessions.size() >= MAX_CONNECTIONS_PER_USER) {
log.warn("用户 {} 连接数已达上限: {}", userId, MAX_CONNECTIONS_PER_USER);
return false;
}
// 检查IP连接数限制
String clientIp = getClientIp(request);
int ipConnections = countConnectionsByIp(clientIp);
if (ipConnections >= MAX_CONNECTIONS_PER_IP) {
log.warn("IP {} 连接数已达上限: {}", clientIp, MAX_CONNECTIONS_PER_IP);
return false;
}
return true;
}
}消息内容安全
/**
* 消息内容过滤
*/
public class MessageContentFilter {
// 敏感词列表
private static final List<String> SENSITIVE_WORDS = loadSensitiveWords();
// 最大消息长度
private static final int MAX_MESSAGE_LENGTH = 10000;
/**
* 过滤消息内容
*/
public static String filterMessage(String message) {
if (StringUtils.isBlank(message)) {
return message;
}
// 长度检查
if (message.length() > MAX_MESSAGE_LENGTH) {
throw new ServiceException("消息内容过长");
}
// 敏感词过滤
String filtered = message;
for (String word : SENSITIVE_WORDS) {
filtered = filtered.replace(word, "***");
}
// XSS过滤
filtered = HtmlUtil.cleanHtmlTag(filtered);
return filtered;
}
}注意事项
连接生命周期
连接建立
- 必须通过握手拦截器验证用户身份
- 验证通过后将用户信息存入会话属性
- 使用sessionId作为连接唯一标识
消息处理
- 从会话属性中获取用户信息,不要重复查询数据库
- 心跳消息优先处理,避免阻塞业务消息
- 使用策略模式将消息路由到对应处理器
连接关闭
- 及时清理会话管理器中的连接记录
- 关闭连接时优雅处理,使用合适的关闭状态码
- 保留用户的其他活跃连接
多租户注意事项
租户上下文
- WebSocket消息处理没有自动的租户上下文
- 必须从消息中获取租户ID并手动切换上下文
- 使用
TenantHelper.dynamic()确保正确的租户隔离
跨租户操作
- 跨租户消息发送需要超级管理员权限
- 全局广播需要遍历所有租户分别发送
- 统计信息区分租户级别和全局级别
分布式部署注意事项
Redis依赖
- WebSocket分布式消息依赖Redis发布订阅
- 确保Redis连接稳定可用
- 配置合适的连接池大小
本地优先策略
- 优先在当前实例发送消息
- 仅对不在本实例的用户通过Redis分发
- 减少不必要的Redis通信开销
常见问题
1. 连接无法建立
问题原因:
- Token无效或已过期
- 租户状态异常
- 连接数超限
- WebSocket服务未启用
解决方案:
// 1. 检查WebSocket是否启用
@Value("${websocket.enabled:false}")
private boolean websocketEnabled;
// 2. 检查Token有效性
try {
StpUtil.checkLogin();
LoginUser loginUser = LoginHelper.getLoginUser();
log.info("用户认证成功: {}", loginUser.getUserId());
} catch (NotLoginException e) {
log.error("Token验证失败: {}", e.getMessage());
}
// 3. 检查连接数
Map<String, WebSocketSession> sessions = WebSocketSessionHolder.getUserSessions(userId);
log.info("当前用户连接数: {}", sessions.size());2. 消息发送失败
问题原因:
- 目标用户不在线
- 会话已关闭
- 网络异常
解决方案:
// 1. 检查用户是否在线
if (!WebSocketSessionHolder.isUserOnline(userId)) {
log.warn("用户 {} 当前不在线", userId);
// 可以考虑存储离线消息
saveOfflineMessage(userId, message);
return;
}
// 2. 发送消息并处理失败
boolean success = WebSocketUtils.sendMessage(userId, sessionId, message);
if (!success) {
log.warn("消息发送失败,会话可能已断开");
// 会话会自动清理,可以尝试发送到用户其他连接
WebSocketUtils.sendMessage(userId, message);
}3. 多实例消息不同步
问题原因:
- Redis连接异常
- 主题订阅失败
- 消息序列化问题
解决方案:
// 1. 检查Redis连接状态
RedisConnectionFactory factory = redisTemplate.getConnectionFactory();
RedisConnection connection = factory.getConnection();
log.info("Redis连接状态: {}", connection.ping());
// 2. 检查主题订阅状态
WebSocketUtils.subscribeMessage(message -> {
log.info("收到订阅消息: {}", message);
});
// 3. 手动触发消息分发测试
WebSocketMessageDto testMessage = WebSocketMessageDto.of(
Arrays.asList(1001L), "测试消息"
);
RedisUtils.publish(WEB_SOCKET_TOPIC, testMessage, consumer -> {
log.info("测试消息已发布");
});4. 心跳超时断连
问题原因:
- 客户端心跳间隔过长
- 服务器超时配置过短
- 网络不稳定
解决方案:
# 调整Undertow配置
server:
undertow:
# WebSocket超时时间(毫秒)
websocket-timeout: 300000 # 5分钟// 客户端心跳配置
const HEARTBEAT_INTERVAL = 30000; // 30秒
setInterval(() => {
if (websocket.readyState === WebSocket.OPEN) {
websocket.send('ping');
}
}, HEARTBEAT_INTERVAL);5. 内存泄漏
问题原因:
- 连接关闭但未清理会话
- 消息处理异常导致会话残留
- 大量僵尸连接
解决方案:
// 定期清理僵尸连接
@Scheduled(fixedRate = 300000) // 5分钟
public void cleanupZombieSessions() {
for (String tenantId : WebSocketSessionHolder.getAllTenantIds()) {
TenantHelper.dynamic(tenantId, () -> {
Set<Long> userIds = WebSocketSessionHolder.getAllUserIds();
for (Long userId : userIds) {
Map<String, WebSocketSession> sessions =
WebSocketSessionHolder.getUserSessions(userId);
for (Map.Entry<String, WebSocketSession> entry : sessions.entrySet()) {
WebSocketSession session = entry.getValue();
if (session == null || !session.isOpen()) {
WebSocketSessionHolder.removeSessionById(userId, entry.getKey());
log.info("清理僵尸会话: 用户={}, 会话={}", userId, entry.getKey());
}
}
}
});
}
}