笔记:SimpMessagingTemplate 多节点转发失效

笔记:SimpMessagingTemplate 多节点转发失效

 次点击
10 分钟阅读

问题体现:消息广播机制失效

在多节点集群环境下,当目标用户连接至不同服务器实例时,消息未能通过 Redis 广播机制正确分发,导致用户接收失败。

原代码

核心问题代码如下。原代码试图通过 try-catch 捕获本地发送失败的异常,触发 Redis 广播降级策略:

private boolean dispatchMessage(Long targetId, MessagePacket packet, int retryCount) {
    String uidStr = String.valueOf(targetId);
    int attempts = MAX_ATTEMPTS - retryCount + 1;

    // 1. 活跃状态校验 (Redis)
    if (!SessionManager.isOnline(uidStr)) {
        log.warn("Target offline - UID: {}, Attempt: {}/{}", targetId, attempts, MAX_ATTEMPTS);
        
        // 离线重试机制
        if (retryCount > 0) {
            try {
                Thread.sleep(BACKOFF_DELAY);
                return dispatchMessage(targetId, packet, retryCount - 1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
        return false;
    }

    try {
        // 2. 尝试本地推送
        // BUG所在:此方法为异步非阻塞,若用户不在本地,不会抛出异常,而是静默失败
        msgTemplate.convertAndSendToUser(uidStr, "/queue/notify", packet);
        log.info("Local dispatch success - UID: {}", targetId);
        return true; 
        
    } catch (Exception e) {
        // 这里的逻辑永远不会被触发,除非发生网络层面的严重错误
        log.warn("Local dispatch failed, switching to cluster broadcast - UID: {}", targetId);
        
        try {
            // 3. 降级为集群广播
            broadcastToCluster(targetId, packet);
            return true;
        } catch (Exception ex) {
            log.error("Cluster broadcast failed", ex);
            // 广播重试逻辑...
            return false;
        }
    }
}

问题分析

经排查,SimpMessagingTemplate.convertAndSendToUser 底层实现是异步非阻塞的(Fire-and-Forget)。

当目标 User Session 不在当前 JVM 内存中时,Spring Websocket 仅仅是丢弃消息或不做处理,并不会抛出 Exception。因此,代码永远不会进入 catch 块,Redis 广播逻辑彻底失效。

解决方案

方案一:添加可靠的节点判断

在发送前引入节点校验,显式判断用户是否连接在当前实例。

/**
 * 校验用户Session是否位于当前服务节点
 */
private boolean isSessionLocal(String uid) {
    try {
        String key = SESSION_CACHE_PREFIX + uid;
        String locationInfo = redisClient.opsForValue().get(key);

        if (locationInfo == null || locationInfo.isEmpty()) {
            return false;
        }

        // 格式解析: IP:PORT:SessionId
        String[] segments = locationInfo.split(":", 3);
        if (segments.length < 2) {
            log.warn("Invalid session info format: uid={}, data={}", uid, locationInfo);
            return false;
        }

        String targetNodeId = segments[0] + ":" + segments[1];
        String localNodeId = InstanceContext.getLocalId();

        boolean isLocal = localNodeId.equals(targetNodeId);
        
        if (log.isDebugEnabled()) {
             log.debug("Node check - UID: {}, Target: {}, Local: {}, IsLocal: {}", 
                uid, targetNodeId, localNodeId, isLocal);
        }

        return isLocal;

    } catch (Exception e) {
        log.error("Node check error: uid={}", uid, e);
        // 异常情况下保守返回false,触发广播兜底
        return false;
    }
}

修正后的发送逻辑:

// 修改后的逻辑
if (isSessionLocal(uidStr)) {
    // 用户在本地,直接发送
    msgTemplate.convertAndSendToUser(uidStr, destination, packet);
} else {
    // 用户在其他节点,直接走Redis广播
    broadcastToCluster(targetId, packet);
}

方案二:异步回调

如果不希望维护复杂的节点状态比对,可以考虑配置 SendResult 回调或使用 CompletableFuture

(以上原代码因涉及保密需要,因此使用AI二次处理过,如果错误请见谅)

© 本文著作权归作者所有,未经许可不得转载使用。