在构建分布式聊天系统时,我们面临着一个核心挑战:如何在多个服务器实例之间有效管理WebSocket连接,并确保消息能够准确传递到正确的机器人客户端。本文详细介绍了解决方案,包括Redis集成、跨实例消息转发、心跳机制等关键技术实现。

系统架构概述

系统采用了混合存储架构:本地内存存储活跃的WebSocket连接,Redis存储全局连接状态信息。这种设计既保证了本地访问的高性能,又实现了跨实例的状态共享。

Redis配置与连接管理

系统支持Redis单节点和集群两种部署模式。在配置文件中定义了完整的Redis配置:

# config.py
class RedisConfig:
    # Redis集群配置
    REDIS_NODES = [
        {"host": "10.7.104.44", "port": 4031},  # master
        {"host": "10.7.104.47", "port": 4032},
        {"host": "10.7.104.48", "port": 4031},  # master
        {"host": "10.7.104.44", "port": 4032},
        {"host": "10.7.104.47", "port": 4031},  # master
        {"host": "10.7.104.48", "port": 4032},
    ]
    
    # 单节点配置(兼容性保留)
    HOST = os.getenv("REDIS_HOST", "10.7.104.44")
    PORT = int(os.getenv("REDIS_PORT", "4031"))
    DB = int(os.getenv("REDIS_DB", "0"))
    
    # 认证信息
    USERNAME = os.getenv("REDIS_USERNAME", "aigc")
    PASSWORD = os.getenv("REDIS_PASSWORD", "qvQfwMfYT2A&")
    
    # 集群配置
    USE_CLUSTER = os.getenv("REDIS_USE_CLUSTER", "True").lower() == "true"
    CLUSTER_REQUIRE_FULL_COVERAGE = False
    CLUSTER_SKIP_FULL_COVERAGE_CHECK = True
    
    # WebSocket连接相关配置
    WEBSOCKET_CONNECTION_PREFIX = "ws_conn:"
    HEARTBEAT_INTERVAL = 30  # 心跳间隔(秒)
    CONNECTION_TIMEOUT = 90  # 连接超时时间(秒)

这个配置支持生产环境的Redis 6.2.17集群,包含3个主节点和对应的从节点,提供高可用性和数据分片能力。

为了确保每个服务器实例都有唯一标识,我们实现了基于主机名、端口和进程ID的实例ID生成机制:

class InstanceConfig:
    @staticmethod
    def get_instance_id():
        # 优先使用环境变量
        instance_id = os.getenv('SERVER_INSTANCE_ID')
        if instance_id:
            return instance_id
        
        # 基于主机名、端口和进程ID生成唯一标识
        hostname = socket.gethostname()
        port = os.getenv('PORT', '8000')
        pid = os.getpid()
        
        # 生成哈希值作为实例ID
        hash_input = f"{hostname}:{port}:{pid}"
        return hashlib.md5(hash_input.encode()).hexdigest()[:16]

这个设计解决了同一台机器上运行多个服务实例时的身份识别问题。

Redis管理器实现

我们创建了一个专门的Redis管理器来处理连接状态的存储和查询,支持集群和单节点两种模式:

# app/core/redis_manager.py
import aioredis
from aioredis.cluster import RedisCluster
import json
import logging
from datetime import datetime
from typing import Optional, Dict, List
from config import RedisConfig, InstanceConfig

class RedisConnectionManager:
    def __init__(self):
        self.redis: Optional[aioredis.Redis] = None
        self.config = RedisConfig()
        self.instance_id = InstanceConfig.get_instance_id()
        
    async def connect(self):
        """连接到Redis服务器"""
        try:
            if self.config.USE_CLUSTER:
                # 集群模式连接
                startup_nodes = [
                    {"host": node["host"], "port": node["port"]} 
                    for node in self.config.REDIS_NODES
                ]
                
                self.redis = RedisCluster(
                    startup_nodes=startup_nodes,
                    username=self.config.USERNAME,
                    password=self.config.PASSWORD,
                    decode_responses=True,
                    skip_full_coverage_check=self.config.CLUSTER_SKIP_FULL_COVERAGE_CHECK,
                    require_full_coverage=self.config.CLUSTER_REQUIRE_FULL_COVERAGE
                )
                logging.info(f"Redis集群连接成功,节点数: {len(startup_nodes)}")
            else:
                # 单节点模式连接
                auth_string = ""
                if self.config.USERNAME and self.config.PASSWORD:
                    auth_string = f"{self.config.USERNAME}:{self.config.PASSWORD}@"
                elif self.config.PASSWORD:
                    auth_string = f":{self.config.PASSWORD}@"
                    
                redis_url = f"redis://{auth_string}{self.config.HOST}:{self.config.PORT}"
                
                self.redis = aioredis.from_url(
                    redis_url,
                    db=self.config.DB,
                    decode_responses=True
                )
                logging.info(f"Redis单节点连接成功: {self.config.HOST}:{self.config.PORT}")
                
            # 测试连接
            await self.redis.ping()
            
        except Exception as e:
            logging.error(f"Redis连接失败: {e}")
            raise
        
    def store_connection(self, robot_id: str, additional_data: Dict = None):
        """存储机器人连接信息到Redis"""
        connection_data = {
            'instance_id': self.instance_id,
            'robot_id': robot_id,
            'connected_at': datetime.now().isoformat(),
            'last_heartbeat': datetime.now().isoformat(),
            **(additional_data or {})
        }
        
        key = f"{RedisConfig.WEBSOCKET_CONNECTION_PREFIX}{robot_id}"
        self.redis_client.hset(key, mapping=connection_data)
        logging.info(f"存储机器人 {robot_id} 连接信息到Redis,实例: {self.instance_id}")
        
    def get_connection_info(self, robot_id: str) -> Optional[Dict]:
        """获取机器人连接信息"""
        key = f"{RedisConfig.WEBSOCKET_CONNECTION_PREFIX}{robot_id}"
        data = self.redis_client.hgetall(key)
        return data if data else None
        
    def remove_connection(self, robot_id: str):
        """从Redis中移除连接信息"""
        key = f"{RedisConfig.WEBSOCKET_CONNECTION_PREFIX}{robot_id}"
        result = self.redis_client.delete(key)
        if result:
            logging.info(f"从Redis中移除机器人 {robot_id} 的连接信息")
        return result
        
    def update_heartbeat(self, robot_id: str):
        """更新心跳时间戳"""
        key = f"{RedisConfig.WEBSOCKET_CONNECTION_PREFIX}{robot_id}"
        if self.redis_client.exists(key):
            self.redis_client.hset(key, 'last_heartbeat', datetime.now().isoformat())
            
    def get_all_connections(self) -> List[Dict]:
        """获取所有连接信息"""
        pattern = f"{RedisConfig.WEBSOCKET_CONNECTION_PREFIX}*"
        keys = self.redis_client.keys(pattern)
        connections = []
        
        for key in keys:
            data = self.redis_client.hgetall(key)
            if data:
                connections.append(data)
                
        return connections

# 创建全局实例
redis_manager = RedisManager()

这个Redis管理器提供了完整的连接生命周期管理功能,包括存储、查询、更新和清理。

WebSocket连接管理器

核心的WebSocket连接管理器集成了本地内存和Redis存储:

# app/external_service/websocket_channel.py
class ConnectionManager:
    def __init__(self):
        # 本地内存存储活跃连接
        self.active_connections: Dict[str, WebSocket] = {}
        self.instance_id = InstanceConfig.get_instance_id()
        
    async def connect(self, websocket: WebSocket, robot_id: str):
        """建立WebSocket连接"""
        await websocket.accept()
        self.active_connections[robot_id] = websocket
        
        # 同时存储到Redis
        redis_manager.store_connection(robot_id, {
            'status': 'connected',
            'websocket_state': 'active'
        })
        
        logging.info(f"机器人 {robot_id} 已连接到实例 {self.instance_id}")
        
    def disconnect(self, robot_id: str):
        """断开连接并清理"""
        if robot_id in self.active_connections:
            del self.active_connections[robot_id]
            
        # 从Redis中移除
        redis_manager.remove_connection(robot_id)
        logging.info(f"机器人 {robot_id} 已断开连接并清理")

跨实例消息转发机制

最复杂的部分是实现跨实例的消息转发。当目标机器人不在当前实例时,我们需要找到正确的实例并转发消息:

async def send_personal_message(self, message: str, robot_id: str):
    """发送个人消息,支持跨实例转发"""
    # 首先检查本地连接
    if robot_id in self.active_connections:
        websocket = self.active_connections[robot_id]
        
        # 验证连接是否真实有效
        if await self.is_websocket_alive(robot_id):
            try:
                await websocket.send_text(message)
                redis_manager.update_heartbeat(robot_id)
                return True
            except Exception as e:
                logging.warning(f"发送消息到 {robot_id} 失败: {e}")
                # 清理无效连接
                self.disconnect(robot_id)
        else:
            # 本地连接失效,清理并重新查询Redis
            logging.warning(f"机器人 {robot_id} 本地连接失效,清理Redis记录")
            redis_manager.remove_connection(robot_id)
            
            # 重新查询Redis,寻找其他实例的连接
            connection_info = redis_manager.get_connection_info(robot_id)
            if connection_info and connection_info.get('instance_id') != self.instance_id:
                # 尝试跨实例转发
                return await self.forward_message_to_instance(
                    message, robot_id, connection_info.get('instance_id')
                )
            return False
    
    # 本地没有连接,查询Redis
    connection_info = redis_manager.get_connection_info(robot_id)
    if not connection_info:
        return False
        
    target_instance = connection_info.get('instance_id')
    if target_instance == self.instance_id:
        # 应该在本地但找不到,可能是状态不一致
        logging.warning(f"Redis显示机器人 {robot_id} 在当前实例,但本地未找到连接")
        redis_manager.remove_connection(robot_id)
        return False
    else:
        # 跨实例转发
        return await self.forward_message_to_instance(message, robot_id, target_instance)

WebSocket连接状态检测

由于Starlette的WebSocket类没有ping方法,我们实现了适配的连接状态检测:

async def is_websocket_alive(self, robot_id: str) -> bool:
    """检查WebSocket连接是否真实存活"""
    if robot_id not in self.active_connections:
        return False
    
    websocket = self.active_connections[robot_id]
    try:
        # 检查WebSocket状态
        if hasattr(websocket, 'client_state') and websocket.client_state.name == 'DISCONNECTED':
            logging.warning(f"WebSocket连接 {robot_id} 已断开")
            return False
        
        # 检查应用状态
        if hasattr(websocket, 'application_state'):
            if websocket.application_state.name == 'DISCONNECTED':
                logging.warning(f"WebSocket连接 {robot_id} 应用状态已断开")
                return False
        
        return True
    except Exception as e:
        logging.warning(f"WebSocket连接 {robot_id} 检测失败: {e}")
        return False

心跳机制实现

我们实现了基于时间戳的心跳机制来维护连接状态:

async def validate_and_cleanup_connections(self):
    """验证并清理无效连接"""
    current_time = datetime.now()
    expired_robots = []
    
    for robot_id in list(self.active_connections.keys()):
        connection_info = redis_manager.get_connection_info(robot_id)
        if not connection_info:
            expired_robots.append(robot_id)
            continue
            
        # 检查心跳超时
        last_heartbeat = datetime.fromisoformat(connection_info.get('last_heartbeat', ''))
        if (current_time - last_heartbeat).total_seconds() > RedisConfig.CONNECTION_TIMEOUT:
            expired_robots.append(robot_id)
            
    # 清理过期连接
    for robot_id in expired_robots:
        self.disconnect(robot_id)
        logging.info(f"清理过期连接: {robot_id}")

消息发送接口

最终,我们提供了统一的消息发送接口:

async def send_message_to_robot(robot_id: str, message_data: dict, db: Session):
    """发送消息给指定机器人"""
    try:
        message_json = json.dumps(message_data, ensure_ascii=False)
        success = await manager.send_personal_message(message_json, robot_id)
        
        if not success:
            logging.warning(f"未找到机器人 {robot_id} 的连接或发送失败")
            
            # 更新机器人状态为离线
            robot = db.query(Robot).filter(Robot.robot_id == robot_id).first()
            if robot:
                robot.status = 0  # 离线状态
                db.commit()
                
        return success
    except Exception as e:
        logging.error(f"发送消息到机器人 {robot_id} 时发生错误: {e}")
        return False