问题体现:消息广播机制失效
在多节点集群环境下,当目标用户连接至不同服务器实例时,消息未能通过 Redis 广播机制正确分发,导致用户接收失败。
原代码
核心问题代码如下。原代码试图通过 try-catch 捕获本地发送失败的异常,触发 Redis 广播降级策略:
private boolean dispatchMessage(Long targetId, MessagePacket packet, int retryCount) {
String uidStr = String.valueOf(targetId);
int attempts = MAX_ATTEMPTS - retryCount + 1;
// 1. 活跃状态校验 (Redis)
if (!SessionManager.isOnline(uidStr)) {
log.warn("Target offline - UID: {}, Attempt: {}/{}", targetId, attempts, MAX_ATTEMPTS);
// 离线重试机制
if (retryCount > 0) {
try {
Thread.sleep(BACKOFF_DELAY);
return dispatchMessage(targetId, packet, retryCount - 1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}
try {
// 2. 尝试本地推送
// BUG所在:此方法为异步非阻塞,若用户不在本地,不会抛出异常,而是静默失败
msgTemplate.convertAndSendToUser(uidStr, "/queue/notify", packet);
log.info("Local dispatch success - UID: {}", targetId);
return true;
} catch (Exception e) {
// 这里的逻辑永远不会被触发,除非发生网络层面的严重错误
log.warn("Local dispatch failed, switching to cluster broadcast - UID: {}", targetId);
try {
// 3. 降级为集群广播
broadcastToCluster(targetId, packet);
return true;
} catch (Exception ex) {
log.error("Cluster broadcast failed", ex);
// 广播重试逻辑...
return false;
}
}
}
问题分析
经排查,SimpMessagingTemplate.convertAndSendToUser 底层实现是异步非阻塞的(Fire-and-Forget)。
当目标 User Session 不在当前 JVM 内存中时,Spring Websocket 仅仅是丢弃消息或不做处理,并不会抛出 Exception。因此,代码永远不会进入 catch 块,Redis 广播逻辑彻底失效。
解决方案
方案一:添加可靠的节点判断
在发送前引入节点校验,显式判断用户是否连接在当前实例。
/**
* 校验用户Session是否位于当前服务节点
*/
private boolean isSessionLocal(String uid) {
try {
String key = SESSION_CACHE_PREFIX + uid;
String locationInfo = redisClient.opsForValue().get(key);
if (locationInfo == null || locationInfo.isEmpty()) {
return false;
}
// 格式解析: IP:PORT:SessionId
String[] segments = locationInfo.split(":", 3);
if (segments.length < 2) {
log.warn("Invalid session info format: uid={}, data={}", uid, locationInfo);
return false;
}
String targetNodeId = segments[0] + ":" + segments[1];
String localNodeId = InstanceContext.getLocalId();
boolean isLocal = localNodeId.equals(targetNodeId);
if (log.isDebugEnabled()) {
log.debug("Node check - UID: {}, Target: {}, Local: {}, IsLocal: {}",
uid, targetNodeId, localNodeId, isLocal);
}
return isLocal;
} catch (Exception e) {
log.error("Node check error: uid={}", uid, e);
// 异常情况下保守返回false,触发广播兜底
return false;
}
}
修正后的发送逻辑:
// 修改后的逻辑
if (isSessionLocal(uidStr)) {
// 用户在本地,直接发送
msgTemplate.convertAndSendToUser(uidStr, destination, packet);
} else {
// 用户在其他节点,直接走Redis广播
broadcastToCluster(targetId, packet);
}
方案二:异步回调
如果不希望维护复杂的节点状态比对,可以考虑配置 SendResult 回调或使用 CompletableFuture。
(以上原代码因涉及保密需要,因此使用AI二次处理过,如果错误请见谅)