目录

    Phoenix监控平台技术解析(三):WebSocket通信通道——基于Netty的长连接实现

    上一篇我们拆解了 HTTP 通道的完整实现——从连接池、压缩加密到 AOP 透明切面。本篇聚焦 WebSocket 通道的宏观架构,看看 Phoenix 如何在这条长连接链路上编排消息封装、业务路由、事件分发与集群感知——服务端 Netty 的启动细节和客户端 Tyrus 的连接重连机制,将分别在后续两篇中深入展开。


    一、HTTP 够用了,为什么还要 WebSocket?

    上一篇结尾我们留了个悬念:HTTP 通道是"基本款",WebSocket 通道才是"性能款"。

    先回忆一下 HTTP 通道的工作方式:客户端每 30 秒发一次心跳包,每 60 秒发一次 JVM 信息,如果还开了服务器采集、线程池采集……每一次上报都要走一遍"建连 → 发请求 → 等响应 → 关连接"的流程。虽然有连接池可以复用 TCP 连接,但 HTTP 本质上是请求-响应模型——客户端不问,服务端不说。

    这在监控场景下有两个明显的不舒服:

    第一,服务端想"主动找你",做不到。 比如运维人员在 UI 上点了"修改线程池核心线程数",服务端需要把这个指令下发给对应的客户端应用。用 HTTP 怎么做?只能等客户端下一次心跳来的时候"捎带"回去——延迟不可控,实现也别扭。

    第二,每次上报都有协议开销。 HTTP 请求头动辄几百字节,对于心跳这种只有几十字节有效载荷的场景,协议开销比数据本身还大。虽然 keep-alive 避免了 TCP 握手的开销,但 HTTP 头部的冗余在高频场景下还是很浪费。

    WebSocket 就是为这种场景而生的:一次握手,长期持有,双向通信。连接建立之后,客户端和服务端可以随时互发消息,没有额外的 HTTP 头部开销,也不需要轮询。

    从 Phoenix 的源码中可以清晰地看到这个演进过程——几乎每个定时上报线程里都有一段被注释掉的 HTTP 代码:

    // 改成用 WebSocket,弃用 HTTP
    // String result = Sender.send(UrlConstants.HEARTBEAT_URL, heartbeatPackage.toJsonString());
    WebSocketPackage requestPackage = new WebSocketPackage();
    requestPackage.setClassName(HeartbeatPackage.class.getName());
    requestPackage.setPayload(heartbeatPackage);
    DataExchanger.sendMessage(requestPackage);
    

    高频的定时上报(心跳、JVM、服务器、线程池、网络设备、Docker)全部迁移到了 WebSocket;而低频的操作指令(告警上报、异常上报、配置刷新等)仍然保留 HTTP。两条通道各司其职,而不是一刀切地替换。

    二、先看全局:WebSocket 通道的角色分布

    在深入代码之前,先梳理一下 WebSocket 通道涉及的关键角色和它们的关系:

    ┌──────────────────────┐
    │     phoenix-client   │  客户端 SDK(集成到你的 Java 应用中)
    │  ┌────────────────┐  │
    │  │ WebsocketClient│  │  基于 Tyrus(JSR-356),不依赖 Spring
    │  │  (Tyrus/JSR356)│  │  自动重连 + 指数退避
    │  └───────┬────────┘  │
    │          │ ws(s)://  │
    └──────────┼───────────┘
               │
               ▼
    ┌──────────────────────┐
    │    phoenix-agent     │  代理端(WebSocket 中继站)
    │ ┌──────────────────┐ │
    │ │RelayWebSocket    │ │  基于 JSR-356 @ServerEndpoint
    │ │   Endpoint       │ │  下游连接 → 中转 → 上游连接
    │ └───────┬──────────┘ │
    │         │ ws(s)://   │
    └─────────┼────────────┘
              │
              ▼
    ┌──────────────────────┐
    │    phoenix-server    │  服务端(WebSocket 终点站)
    │ ┌──────────────────┐ │
    │ │  WebSocketServer │ │  基于 Netty,高性能 NIO
    │ │  (Netty)         │ │  Boss/Worker 双线程池
    │ └──────────────────┘ │
    └──────────────────────┘
    

    注意到了吗?服务端用 Netty,客户端用 Tyrus(JSR-356)。这不是随意的选择:

    • 服务端需要同时处理成百上千个客户端的长连接,对性能和并发能力要求极高——Netty 的 Reactor 模型正是为此而生。
    • 客户端 SDK 需要能在任何 Java 程序中运行,不能引入 Netty 的重量级依赖(避免和用户项目中的 Netty 版本冲突)——用标准的 JSR-356 WebSocket API + Tyrus 作为实现,既轻量又兼容。

    而代理端作为"中继站",同时扮演着服务端和客户端两个角色:对下游(客户端)来说它是 WebSocket 服务端(使用 SpringBoot 内嵌的 JSR-356 支持),对上游(phoenix-server)来说它是 WebSocket 客户端。

    三、服务端概览:Netty 构建的 WebSocket 引擎

    Phoenix 的 WebSocket 服务端核心是 WebSocketServer 类,基于 Netty 的 Reactor 主从线程模型 构建:

    • bossGroup(1 个线程):只负责接受新的 TCP 连接,就像餐厅门口的迎宾。
    • workerGroup(默认 CPU 核心数 × 2 个线程):负责已建立连接上的所有 I/O 操作——读数据、写数据、编解码、业务处理。

    每一个新连接都会被 WebSocketServerInitializer 装配一条有序的 7 层 Pipeline 处理链

    层级 Handler 职责
    SslHandler(可选) WSS 模式下的 TLS 加解密
    HttpServerCodec HTTP 编解码(WebSocket 握手阶段使用)
    HttpObjectAggregator(10MB) HTTP 消息聚合,应对大型数据包
    WebSocketServerCompressionHandler WebSocket 帧级压缩(permessage-deflate)
    IdleStateHandler(300 秒) 5 分钟无入站数据触发空闲事件
    WebSocketServerProtocolHandler 自动处理握手升级(HTTP → WebSocket)和控制帧
    WebSocketSimpleChannelInboundHandler Phoenix 业务入口

    数据像水一样流过这条 Pipeline——从字节流到 HTTP,从 HTTP 升级到 WebSocket,经过压缩、空闲检测,最终到达业务处理层。

    最后一层 WebSocketSimpleChannelInboundHandler 做了一件巧妙的事:通过 InvokerHolder 机制把 Netty 世界和 Spring 世界连接起来,最终会被转发给 Spring 容器中的 IWebSocketFrameHandler。为什么需要这座"桥"?因为 Netty Handler 是在每次新连接时 new 出来的,它不是 Spring Bean,无法直接注入 Service——InvokerHolder 把调用转发给 Spring 容器管理的 Bean,解决了这个问题。

    空闲连接的处理也值得一提:5 分钟内没有收到任何入站数据(包括 Ping/Pong),服务端会发送一个 CloseWebSocketFrame(状态码 1001,Going Away),由 WebSocketServerProtocolHandler 自动完成后续的关闭流程——这比粗暴地 ctx.close() 优雅得多。

    此外,服务端还有一个 60 秒一轮的连接健康巡检任务:清理已失效的"幽灵连接",并在集群模式下刷新集群存储中的客户端信息。

    下一篇《Netty WebSocket服务端启动与初始化流程》将深入展开 WebSocketServer.start() 的完整引导过程、Boss/Worker 线程池配置、SSL 上下文构建(buildSslContext())、Pipeline 每一层的设计考量、连接健康巡检机制以及优雅关闭策略。

    四、WebSocketFrameHandler——业务路由的枢纽

    前面说到消息最终会被转发给 Spring 容器中的 IWebSocketFrameHandler。在服务端,它的实现类是 WebSocketFrameHandler——一个精心设计的业务路由分发器

    @Component
    @ConditionalOnProperty(name = "ws.server.enable", havingValue = "true")
    public class WebSocketFrameHandler implements IWebSocketFrameHandler {
    
        public static final AttributeKey<String> WS_BUSINESS_TYPE = AttributeKey.valueOf("ws_business_type");
    
        private final Map<String, IWebSocketBusinessHandler> businessHandlers;
    
        public WebSocketFrameHandler(@Autowired List<IWebSocketBusinessHandler> handlers) {
            this.businessHandlers = Maps.newHashMap();
            for (IWebSocketBusinessHandler handler : handlers) {
                this.businessHandlers.put(handler.businessType().toLowerCase(), handler);
            }
        }
    }
    

    构造函数通过 Spring 注入所有 IWebSocketBusinessHandler 的实现类,构建一个 业务类型 → 处理器 的映射表。目前 Phoenix 有两种业务类型:

    • monitoring:监控数据上报(心跳、JVM、服务器、线程池等)
    • arthas:在线 Java 诊断(基于 Alibaba Arthas)

    当 WebSocket 握手完成时,WebSocketFrameHandler 从 URI 路径的最后一段提取业务类型,然后委托给对应的处理器:

    @Override
    public void userEventTriggered(...) {
        if (evt instanceof HandshakeComplete) {
            HandshakeComplete handshake = (HandshakeComplete) evt;
            UriComponents uriComponents = UriComponentsBuilder.fromUriString(handshake.requestUri()).build();
            List<String> pathSegments = uriComponents.getPathSegments();
            if (CollectionUtils.isEmpty(pathSegments)) {
                ctx.fireUserEventTriggered(evt);
                return;
            }
            // URI 最后一段就是业务类型,例如 /phoenix/websocket/relay/monitoring
            String webSocketBusinessType = pathSegments.get(pathSegments.size() - 1).toLowerCase();
            IWebSocketBusinessHandler handler = this.businessHandlers.get(webSocketBusinessType);
            if (handler != null) {
                handler.handle(simpleChannelInboundHandler, ctx, handshake, uriComponents);
                // 把业务类型存到 Channel 属性中,后续消息处理时用
                ctx.channel().attr(WS_BUSINESS_TYPE).set(webSocketBusinessType);
            } else {
                ctx.writeAndFlush(new CloseWebSocketFrame(1008, "不支持的业务类型:" + webSocketBusinessType));
            }
        } else {
            // 非握手事件,继续向 Pipeline 下游传播
            ctx.fireUserEventTriggered(evt);
        }
    }
    

    这个设计的扩展性非常好:新增一种 WebSocket 业务,只需实现 IWebSocketBusinessHandler 接口并注册为 Spring Bean,路由逻辑一行代码都不用改。握手完成后的业务类型被存储到了 Netty Channel 的属性中(AttributeKey),后续收到数据帧时可以直接取出来路由到正确的处理器。

    五、MonitoringFrameHandler——监控业务的"接待员"

    MonitoringFrameHandler 是监控数据业务的核心处理器。它负责三件大事:客户端注册、消息接收和消息下发

    5.1 握手后的客户端注册

    当一个监控客户端通过 WebSocket 连接上来时,handle() 方法被调用:

    @Override
    public void handle(WebSocketSimpleChannelInboundHandler handler, ChannelHandlerContext ctx,
                       HandshakeComplete handshake, UriComponents uriComponents) {
        MultiValueMap<String, String> parameters = uriComponents.getQueryParams();
        String endpoint = parameters.getFirst("endpoint");      // client/agent
        String instanceId = parameters.getFirst("instanceId");   // 应用实例唯一ID
        // ... 参数校验 ...
        
        // 构造全局唯一的客户端标识
        final String websocketClientId = WebsocketClientIdGenerator.generate(endpoint, instanceId);
        
        // 检查是否已有相同标识的客户端在线(防止重复注册)
        Optional<WebSocketClientInfo> socketClientInfo = this.webSocketServer.findClient(websocketClientId);
        if (socketClientInfo.isPresent()) {
            // 旧连接存在!先关闭旧的,再注册新的
            // ... 重复连接处理逻辑 ...
        }
        // 没有重复,直接注册
        this.doRegister(ctx, handshake, websocketClientId);
    }
    

    客户端连接时通过 URL 查询参数携带 endpoint(端点类型)和 instanceId(实例ID),服务端据此生成一个全局唯一的客户端标识。

    重复连接的处理值得特别关注。在网络不稳定的环境下,客户端可能已经重连成功了,但旧连接还没有完全关闭——这时候就会出现"同一个客户端有两个连接"的情况。Phoenix 的处理策略是:

    1. 向旧连接发送 Close 帧(自定义状态码 4000,表示"重复连接")
    2. AtomicBoolean + 超时保护(5秒)确保新连接只被注册一次
    3. 优先等待旧连接正常关闭后再注册新连接;如果 5 秒内旧连接没有关闭,强制注册

    这种"双保险"设计(优先路径 + 超时兜底)在分布式系统中非常常见——你永远不能假设网络一定会按预期工作。客户端收到 4000 状态码后会识别并跳过重连——这部分将在第五篇详细展开。

    5.2 消息接收与分发

    客户端通过 WebSocket 发来的监控数据,最终会到达 onMessageReceived

    @Override
    public void onMessageReceived(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
        if (frame == null || !frame.isFinalFragment()) { return; }
        String message = frame.text();
        if (StringUtils.isBlank(message)) { return; }
        // 将原始 "密文 WebSocket 消息JSON字符串" 解析并转换为 "WebSocketPackage 数据包"
        WebSocketPackage pkg = WebSocketPackage.convert(message, UPSTREAM_ALLOWED_CLASS_NAMES);
        // 委托给 WebSocket 消息分发器
        this.dispatcher.dispatch(ctx, pkg);
    }
    

    几个重要的细节:

    • isFinalFragment() 检查:WebSocket 协议支持消息分片传输(一条大消息拆成多个帧发送)。这里只处理完整的最终帧,避免拼接半截消息。
    • 白名单校验UPSTREAM_ALLOWED_CLASS_NAMES 是一个 Set<String>,只允许特定类型的数据包被反序列化——这是一道重要的安全防线,防止恶意客户端发送可能触发反序列化漏洞的类名。
    • 密文解析WebSocketPackage.convert() 内部会调用 MsgPayloadUtils.decryptPayload() 解密——没错,WebSocket 通道上的数据同样是加密传输的,和 HTTP 通道共享同一套加密体系。

    5.3 服务端主动下发消息

    这是 WebSocket 相对于 HTTP 的杀手级能力。MonitoringFrameHandler 提供了两种下发方式:

    // 异步发送——Fire and forget
    public void sendMsgToClient(String websocketClientId, WebSocketPackage requestPackage) {
        ChannelHandlerContext ctx = this.validateAndResolveContext(websocketClientId, requestPackage);
        if (ctx == null) { return; }
        String encryptStr = MsgPayloadUtils.encryptPayload(requestPackage.toJsonString());
        ctx.writeAndFlush(new TextWebSocketFrame(encryptStr)).addListener(future -> {
            if (!future.isSuccess()) {
                log.warn("向客户端[{}]发送消息失败!", websocketClientId, future.cause());
            }
        });
    }
    
    // 同步发送——等到发完或超时
    public void sendMsgToClientSync(String websocketClientId, WebSocketPackage requestPackage,
                                    long timeout, TimeUnit unit) {
        ChannelHandlerContext ctx = this.validateAndResolveContext(websocketClientId, requestPackage);
        // ... 校验 ...
        String encryptStr = MsgPayloadUtils.encryptPayload(requestPackage.toJsonString());
        ChannelFuture future = ctx.writeAndFlush(new TextWebSocketFrame(encryptStr));
        boolean completed = future.await(timeout, unit);
        if (!completed) {
            throw new WebSocketException("发送消息超时!");
        }
    }
    

    举个实际场景:运维人员在 UI 上修改了某个应用的线程池核心线程数。这个操作通过 HTTP 请求到达服务端后,服务端需要把新配置推送给对应的客户端应用。通过 sendMsgToClient(),服务端可以立刻把指令送达——不需要等下一次心跳周期。

    validateAndResolveContext() 方法做了三重防护:

    1. 参数非空校验
    2. clientInfoMap 中查找客户端
    3. 检查 Channel 是否仍然活跃(ctx.channel().isActive()

    如果 Channel 已经不活跃了,说明连接实际上已经断了,还会主动清理掉这个"僵尸"客户端。

    六、WebSocketPackage——WebSocket 时代的"数据信封"

    在 HTTP 通道中,每种数据包都有自己的 URL 路由(/heartbeat/accept-heartbeat-package/jvm/accept-jvm-package 等)。但在 WebSocket 通道上,所有消息都走同一条连接——怎么区分这条消息是心跳包还是 JVM 信息包?

    Phoenix 设计了 WebSocketPackage 来解决这个问题:

    public class WebSocketPackage extends AbstractSuperBean {
        private String className;   // 负载数据对应的 Java 类全限定名
        private Object payload;     // 实际数据
    }
    

    只有两个字段,简洁到极致。className 告诉接收方"这个包里装的是什么类型的数据",payload 就是数据本身。这就像一个快递箱上贴的标签——标签写着"心跳包",里面装的就是 HeartbeatPackage 对象。

    客户端发送时的用法也很直观:

    // 构建心跳数据包
    HeartbeatPackage heartbeatPackage = this.clientPackageConstructor.structureHeartbeatPackage();
    // 装进 WebSocketPackage 信封
    WebSocketPackage requestPackage = new WebSocketPackage();
    requestPackage.setClassName(HeartbeatPackage.class.getName());
    requestPackage.setPayload(heartbeatPackage);
    // 发送
    DataExchanger.sendMessage(requestPackage);
    

    接收方通过 WebSocketPackage.convert() 方法反序列化时,有一个重要的安全设计——白名单校验

    public static WebSocketPackage convert(String jsonMessage, Set<String> allowedClassNames)
            throws ClassNotFoundException {
        String decryptStr = MsgPayloadUtils.decryptPayload(jsonMessage);
        JSONObject root = JSON.parseObject(decryptStr);
        String className = root.getString("className");
        // 只允许白名单中的类被反序列化
        if (!allowedClassNames.contains(className)) {
            throw new WebSocketException("拒绝反序列化未授权的类:" + className);
        }
        WebSocketPackage pkg = new WebSocketPackage();
        pkg.setClassName(className);
        pkg.setPayload(root.getObject("payload", Class.forName(className)));
        return pkg;
    }
    

    为什么要白名单?因为 Class.forName() + 反序列化是 Java 安全领域的经典攻击面。如果不加限制,攻击者可以构造一个 className 为 java.lang.Runtime 的消息包,利用反序列化漏洞执行任意代码。Phoenix 的白名单(WebSocketPkgPayloadWhitelistConstants)明确列出了所有允许的类型——心跳包、JVM 包、服务器包、告警包等——任何不在白名单中的类名直接拒绝。而且上行和下行分别有各自独立的白名单,严格遵循最小权限原则。

    七、WebSocketMessageDispatcher——事件驱动的消息分发

    服务端收到 WebSocketPackage 后,怎么把它交给正确的业务处理器?Phoenix 用了一套 Spring Event 事件驱动 的分发机制。

    @Component
    @RequiredArgsConstructor
    public class WebSocketMessageDispatcher {
    
        private final ApplicationEventPublisher eventPublisher;
    
        // 路由表:className → 事件创建工厂
        private static final Map<String, IWebSocketEventFactory> EVENT_FACTORY_MAP = new HashMap<>();
    
        static {
            register(AlarmPackage.class, (ctx, payload) -> new AlarmEvent(ctx, (AlarmPackage) payload));
            register(DockerPackage.class, (ctx, payload) -> new DockerEvent(ctx, (DockerPackage) payload));
            register(ExceptionPackage.class, (ctx, payload) -> new ExceptionEvent(ctx, (ExceptionPackage) payload));
            register(HeartbeatPackage.class, (ctx, payload) -> new HeartbeatEvent(ctx, (HeartbeatPackage) payload));
            register(JvmPackage.class, (ctx, payload) -> new JvmEvent(ctx, (JvmPackage) payload));
            register(JavaThreadPoolPackage.class, (ctx, payload) -> new JavaThreadPoolEvent(ctx, (JavaThreadPoolPackage) payload));
            register(NetworkDevicePackage.class, (ctx, payload) -> new NetworkDeviceEvent(ctx, (NetworkDevicePackage) payload));
            register(OfflinePackage.class, (ctx, payload) -> new OfflineEvent(ctx, (OfflinePackage) payload));
            register(ServerPackage.class, (ctx, payload) -> new ServerEvent(ctx, (ServerPackage) payload));
        }
    
        public void dispatch(ChannelHandlerContext ctx, WebSocketPackage pkg) {
            String className = pkg.getClassName();
            IWebSocketEventFactory factory = EVENT_FACTORY_MAP.get(className);
            if (factory == null) {
                log.warn("收到未注册的WebSocket消息类型,className:{}", className);
                return;
            }
            ApplicationEvent event = factory.create(ctx, pkg.getPayload());
            this.eventPublisher.publishEvent(event);
        }
    }
    

    这个设计堪称教科书级的开闭原则实践:

    1. 注册阶段:在 static 块中,为每种数据包类型注册一个事件创建工厂(Lambda 表达式),目前共 9 种类型
    2. 分发阶段:根据 className 查表,创建对应的 Spring 事件并发布
    3. 处理阶段:对应的 @EventListener 方法在 Spring 容器中监听事件并处理

    新增一种数据包类型时,只需:

    • static 块中加一行 register(...)
    • 写一个新的 Event 类和对应的 @EventListener 处理方法

    分发逻辑一行不改。

    和 HTTP 通道的 Controller + AOP 切面相比,WebSocket 通道的消息处理走的是完全不同的路径——不经过 Spring MVC 的 Controller 层,而是通过 Netty Handler → InvokerHolder → Spring Event 这条链路。但底层的加解密逻辑(MsgPayloadUtils)是复用的。

    八、客户端概览:Tyrus 的轻量之道

    为什么不用 Netty 做客户端?

    phoenix-client-core 的设计目标是能在任何 Java 程序中运行。如果客户端也用 Netty,会引入一大堆依赖(netty-codec-http、netty-handler、netty-transport 等),而且如果用户的项目中已经用了不同版本的 Netty——恭喜,你将获得一场精彩的 NoSuchMethodError 表演。

    Phoenix 选择了 Tyrus——一个 JSR-356(Java WebSocket API)的参考实现。JSR-356 是 Java EE 标准的一部分,API 极其简洁:

    @ClientEndpoint
    public class WebsocketClient {
        @OnOpen
        public void onOpen(Session session) { ... }
        @OnMessage
        public void onMessage(String message) { ... }
        @OnClose
        public void onClose(CloseReason reason) { ... }
        @OnError
        public void onError(Throwable throwable) { ... }
    }
    

    四个注解,四个回调方法,一目了然。

    核心能力一览

    WebsocketClient 围绕连接可靠性做了大量设计:

    能力 实现手段 要点
    并发连接保护 AtomicBoolean CAS 操作 防止多线程同时发起连接,无锁、高效
    连接同步 CountDownLatch 等待 @OnOpen 回调确认连接真正建立
    指数退避重连 初始 5 秒 → 1.5 倍递增 → 上限 5 分钟 避免服务端故障时的重连风暴
    重复连接识别 自定义状态码 4000 收到后不触发重连,防止"连上→被踢→重连"死循环
    双层重连开关 autoReconnect(不可变)+ enableReconnect(运行时可变) close() 关闭总闸后彻底停止重连
    共享连接引擎 全局 SHARED_CLIENT(Tyrus ClientManager) 复用底层连接资源

    DataExchanger——客户端的数据交换中枢

    DataExchanger 是客户端 WebSocket 通信的统一入口,封装了连接初始化、消息处理器注册和加密发送的完整流程。核心设计包括:

    • DCL 双重检查锁 保证单次初始化
    • SPI 机制ServiceLoader)自动发现和注册消息处理器——客户端不仅发数据,也要接收服务端下发的指令(比如线程池调参)
    • 加密发送 与 HTTP 通道共享同一套加密基础设施(MsgPayloadUtils
    • volatile 可见性 保证多线程安全读取客户端引用,sendMessage 中先读到局部变量再判空,避免 TOCTOU 竞态条件

    第五篇《Netty WebSocket客户端连接与重连机制》将深入展开 WebsocketClient 的连接流程源码、CAS 并发保护细节、CountDownLatch 同步机制、指数退避重连策略实现、重复连接的客户端处理,以及 DataExchanger 的 DCL 初始化和 SPI 消息处理器注册。

    九、代理端中继:WebSocket 的"驿站"

    在 Phoenix 的架构中,代理端(agent)扮演着"驿站"角色——客户端的消息先到代理端,代理端再转发给服务端。HTTP 通道中是 RestTemplate 转发,WebSocket 通道中则是 Session 配对中继

    代理端同时暴露了一个 @ServerEndpoint 接收下游连接,和一个 UpstreamClientEndpoint 连接上游:

    @ServerEndpoint("/websocket/relay/{subPath}")
    public class RelayWebSocketEndpoint {
        @OnOpen
        public void onOpen(Session downstreamSession, @PathParam("subPath") String subPath) {
            // 1. 检查连接数限制(最大1000个)
            if (WebSocketRelayHelper.isConnectionLimitReached(SESSION_CACHE, CACHE_MAXIMUM_SIZE)) {
                WebSocketUtils.safeClose(downstreamSession, ...);
                return;
            }
            // 2. 构建上游 URL
            String upstreamUrl = WebSocketUtils.buildUrl(UPSTREAM_BASE_URL + "/websocket/relay", subPath, downstreamSession);
            // 3. 连接上游
            UpstreamClientEndpoint clientEndpoint = new UpstreamClientEndpoint(...);
            Session upstreamSession = container.connectToServer(clientEndpoint, URI.create(upstreamUrl));
            // 4. 建立下游 ↔ 上游的配对关系
            WebSocketRelayHelper.establishPairing(..., downstreamSessionId, upstreamSessionId, ...);
        }
    }
    

    核心思路是**"对偶配对"**:每个下游 Session 对应一个上游 Session,消息从一头进来,原封不动(或稍加处理)从另一头出去。配对关系通过两个 ConcurrentMap 维护——DOWNSTREAM_TO_UPSTREAMUPSTREAM_TO_DOWNSTREAM——双向映射,方便从任意一端找到对端。

    代理端在中继监控业务(subPath = "monitoring")的消息时,会在两个方向都做一件额外的事:添加链路信息

    以上行方向(下游→上游,RelayWebSocketEndpoint.onMessage())为例:

    if (WebSocketBusinessTypeConstants.MONITORING.equals(subPath)) {
        WebSocketPackage pkg = WebSocketPackage.convert(message, UPSTREAM_ALLOWED_CLASS_NAMES);
        AbstractSuperPackage superPackage = (AbstractSuperPackage) pkg.getPayload();
        // 添加链路信息:记录"经过了代理端"
        superPackage.setChain(AGENT_PACKAGE_CONSTRUCTOR.getChain(superPackage));
        pkg.setPayload(superPackage);
        message = MsgPayloadUtils.encryptPayload(pkg.toJsonString());
    }
    

    下行方向(上游→下游,UpstreamClientEndpoint.onMessage())也有同样的逻辑,使用的是下行白名单 DOWNSTREAM_ALLOWED_CLASS_NAMES。这就是第一篇博客中提到的 chain 字段——数据在流转过程中,双向都会记录下自己的行经路线(client → agent → server),后续被用来构建服务拓扑图。

    Session 缓存使用了 Guava 的 Cache,配置了 30 分钟过期(expireAfterAccess)和最大 1000 个连接的限制。过期时自动触发 removalListener 回调,安全关闭对应的 Session。此外,onOpen 中配置的 MAX_IDLE_TIMEOUT(10 分钟)也会让 WebSocket 容器在连接空闲时主动关闭——多层清理机制确保不会出现资源泄漏。

    十、配置体系

    WebSocket 通道的配置分为两部分:服务端配置客户端配置

    服务端配置application.yml):

    ws:
      server:
        enable: true              # 是否开启 WebSocket 服务
        host: 0.0.0.0             # 绑定地址
        port: 16001               # WebSocket 端口(和 HTTP 端口独立)
        path: /phoenix            # WebSocket 路径
        client-connect-host:      # 集群部署时的外部连接地址
        ssl:
          enabled: false
          key-store: classpath:keystore.jks
          key-store-password: changeit
          key-store-type: JKS
          key-alias: myalias
    

    注意 ws.server.enable 这个开关——通过 @ConditionalOnProperty 注解,WebSocket 功能可以被完全关闭。如果你只想用 HTTP 通道,不需要改一行代码,改个配置就行。

    客户端配置monitoring.propertiesapplication.yml):

    # 方式一:直接配置 WebSocket URL
    monitoring.comm.websocket.url=ws://127.0.0.1:12000/phoenix-agent
    
    # 方式二:只配置 HTTP URL,客户端自动推导 WebSocket URL
    monitoring.comm.http.url=http://127.0.0.1:12000/phoenix-agent
    

    Phoenix 的客户端有一个贴心的设计:如果没有显式配置 WebSocket URL,它会自动从 HTTP URL 推导——把 http:// 替换为 ws://https:// 替换为 wss://。不过有一个前提条件:上游服务不能是直连 phoenix-server(因为 server 的 WebSocket 端口和 HTTP 端口是独立的)。只有通过代理端(agent)连接时,WebSocket 才能从 HTTP URL 自动推导。

    十一、双通道对比:全景图

    把 HTTP 和 WebSocket 两条通道放在一起对比:

    维度 HTTP 通道 WebSocket 通道
    通信模式 请求-响应,短连接 全双工,长连接
    服务端实现 Spring MVC Controller Netty Handler + Spring Event
    客户端实现 Apache HttpClient Tyrus(JSR-356)
    加解密 AOP 切面(RequestBodyAdvice 手动调用 MsgPayloadUtils
    路由方式 URL 路径(/heartbeat/accept-... WebSocketPackage.className
    消息分发 Controller 方法直接处理 WebSocketMessageDispatcher 事件驱动
    服务端下推 不支持 sendMsgToClient()sendMsgToClientSync()
    代理端中继 RestTemplate 转发 Session 配对透传
    重试机制 HttpClient 内置 3 次 / @Retryable 指数退避自动重连
    典型场景 告警上报、配置刷新、数据库操作 心跳、JVM、服务器、线程池等高频上报

    两者在数据安全层面完全一致——都使用 MsgPayloadUtils 进行压缩/加密,都传输 CiphertextPackage 格式的密文。差异主要在传输层和消息分发层。

    十二、画一张完整的 WebSocket 消息流转图

    以客户端通过代理端发送心跳包到服务端为例:

    ┌──────────────┐                ┌──────────────────┐            ┌────────────────────┐
    │phoenix-client│                │  phoenix-agent   │            │  phoenix-server    │
    │              │                │  (中继站)         │            │  (终点站)           │
    └──────┬───────┘                └────────┬─────────┘            └──────────┬─────────┘
           │                                 │                                 │
           │ ① 构造 HeartbeatPackage         │                                 │
           │ ② 封装为 WebSocketPackage       │                                 │
           │   className=HeartbeatPackage    │                                 │
           │ ③ DataExchanger.sendMessage()   │                                 │
           │   toJsonString → encryptPayload │                                 │
           │                                 │                                 │
           │─── ws TextFrame (密文) ────────▶│                                  │
           │                                 │                                 │
           │              ④ RelayWebSocketEndpoint.onMessage()                 │
           │                 解密 → 添加链路信息 → 重新加密                        │
           │                                 │                                  │
           │                                 │── ws TextFrame (密文) ───────▶   │
           │                                 │                                  │
           │                                 │  ⑤ WebSocketSimpleChannel       │
           │                                 │     InboundHandler.channelRead0  │
           │                                 │  ⑥ WebSocketFrameHandler         │
           │                                 │     路由到 monitoring 处理器       │
           │                                 │  ⑦ MonitoringFrameHandler        │
           │                                 │     .onMessageReceived()         │
           │                                 │  ⑧ WebSocketPackage.convert()    │
           │                                 │     解密 → 白名单校验 → 反序列化     │
           │                                 │  ⑨ WebSocketMessageDispatcher    │
           │                                 │     .dispatch()                   │
           │                                 │  ⑩ eventPublisher                │
           │                                 │     .publishEvent(HeartbeatEvent) │
           │                                 │  ⑪ @EventListener 处理心跳         │
           │                                 │     写入 MONITOR_INSTANCE 表       │
    

    如果是直连模式(不经过代理端),去掉中间的中继环节,消息从客户端直达服务端。但注意:直连服务端时走的是 Netty 的 WebSocket 端口(如 16001),而不是 HTTP 端口(如 16000)。

    十三、集群部署支持

    当 Phoenix 以集群模式部署多个 server 实例时,同一个客户端可能连接到不同的 server 节点。这时候就需要知道"客户端 A 连接在哪个 server 上"——IWebSocketClusterStore 接口就是为此而生的。

    public interface IWebSocketClusterStore {
        void addClient(String clientId, WebSocketClientClusterInfo info, long expire, TimeUnit timeUnit);
        WebSocketClientClusterInfo findClient(String clientId);
        void removeClient(String clientId);
        Collection<String> allClientIds();
        Map<String, WebSocketClientClusterInfo> clientInfo(String appName);
    }
    

    Phoenix 默认提供了基于 Caffeine 缓存 的实现:

    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnProperty(name = "spring.cache.type", havingValue = "caffeine")
    public IWebSocketClusterStore webSocketClusterStore(@Autowired CacheManager cacheManager) {
        Cache cache = cacheManager.getCache(WebSocketCacheConstants.CACHE_IN_CAFFEINE_WEB_SOCKET_CLUSTER);
        WebSocketInCaffeineClusterStore store = new WebSocketInCaffeineClusterStore();
        store.setCache(cache);
        return store;
    }
    

    在集群场景下,每个 server 节点每 60 秒刷新一次自己管理的客户端信息到集群存储中(带 1 小时过期时间)。当服务端需要向某个客户端下发指令时,先从集群存储中查到该客户端连接在哪个节点上,再通过该节点转发。

    @ConditionalOnMissingBean 意味着你可以替换为自己的实现——比如基于 Redis 的集群存储,适用于跨机房部署的场景。

    十四、设计复盘:几个值得学习的点

    "事件驱动"——告别 if-else 地狱

    如果用传统的 if-elseswitch-case 来根据 className 分发消息,每新增一种数据包类型就要改分发逻辑。WebSocketMessageDispatcher 用工厂模式 + Spring Event,让新增类型变成了"只加不改"。

    "白名单"——安全不是事后补丁

    反序列化安全问题不是等出了安全漏洞再修——Phoenix 在设计之初就通过 WebSocketPkgPayloadWhitelistConstants 限制了允许反序列化的类型。上行和下行还分别有各自的白名单,最小权限原则。

    "因地制宜"——技术选型不是非此即彼

    服务端用 Netty(高并发刚需),客户端用 Tyrus(轻量兼容刚需),代理端用 JSR-356 标准(SpringBoot 内嵌支持)。三个角色根据各自的运行环境约束选择了最合适的技术方案,而不是"一刀切"地统一技术栈。

    "渐进式迁移"——不赌命

    从源码中被注释掉的 HTTP 代码可以看出,Phoenix 是逐步把高频上报从 HTTP 迁移到 WebSocket 的。每个数据类型都可以独立切换,出了问题可以单个回退。这种策略比"一夜之间全部替换"稳妥得多。

    十五、小结

    本篇从宏观视角拆解了 Phoenix WebSocket 通信通道的完整架构。回顾核心要点:

    • 为什么要 WebSocket:全双工长连接,解决 HTTP 无法服务端推送、高频上报协议开销大的问题
    • 服务端(Netty)WebSocketServer 基于 Boss/Worker 双线程池,WebSocketServerInitializer 编排了 7 层 Pipeline,InvokerHolder 桥接 Netty 与 Spring
    • 业务路由WebSocketFrameHandler 根据 URI 路径段路由到 IWebSocketBusinessHandler(monitoring/arthas),新增业务只需实现接口并注册为 Spring Bean
    • 消息分发WebSocketMessageDispatcher 通过工厂模式 + Spring Event 实现开闭原则,9 种数据包类型的分发一行代码不改
    • 数据模型WebSocketPackage(className + payload)替代了 HTTP 的 URL 路由,白名单校验防止反序列化攻击
    • 客户端(Tyrus)WebsocketClient 基于 JSR-356 标准 API,支持指数退避重连、并发保护、重复连接识别;DataExchanger 封装了初始化和加密发送
    • 代理端中继RelayWebSocketEndpoint 通过 Session 配对实现上下游透传,双向添加链路信息,Guava Cache 管理连接生命周期
    • 集群支持IWebSocketClusterStore 接口支持多节点部署时的客户端定位,默认提供 Caffeine 缓存实现
    • 安全:WSS/SSL 支持、传输加密(共享 HTTP 通道的加密基础设施)、反序列化白名单

    如果说 HTTP 通道像打电话——拨号、说话、挂断,每次通话都要重新拨号;那 WebSocket 通道就像对讲机——按下通话键就一直保持连接,随时都能说话,对方也能随时说话。对于监控这种需要持续、高频、双向通信的场景,对讲机显然比电话更合适。

    下一篇,我们将把镜头拉近,深入 Netty WebSocket 服务端的启动与初始化流程——从 WebSocketServer.start() 的引导过程到 SSL 上下文构建,从 Pipeline 每一层的设计考量到连接健康巡检与优雅关闭,来一次 Netty 核心概念的沉浸式解读。

    end
  1. 作者: 锋哥 (联系作者)
  2. 发表时间: 2026-03-27 09:11
  3. 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  4. 转载声明:如果是转载博主转载的文章,请附上原文链接
  5. 公众号转载:请在文末添加作者公众号二维码(公众号二维码见右边,欢迎关注)
  6. 评论

    站长头像 知录

    你一句春不晚,我就到了真江南!

    文章0
    浏览0

    文章分类

    标签云