上一篇我们拆解了 HTTP 通道的完整实现——从连接池、压缩加密到 AOP 透明切面。本篇聚焦 WebSocket 通道的宏观架构,看看 Phoenix 如何在这条长连接链路上编排消息封装、业务路由、事件分发与集群感知——服务端 Netty 的启动细节和客户端 Tyrus 的连接重连机制,将分别在后续两篇中深入展开。
一、HTTP 够用了,为什么还要 WebSocket?
上一篇结尾我们留了个悬念:HTTP 通道是"基本款",WebSocket 通道才是"性能款"。
先回忆一下 HTTP 通道的工作方式:客户端每 30 秒发一次心跳包,每 60 秒发一次 JVM 信息,如果还开了服务器采集、线程池采集……每一次上报都要走一遍"建连 → 发请求 → 等响应 → 关连接"的流程。虽然有连接池可以复用 TCP 连接,但 HTTP 本质上是请求-响应模型——客户端不问,服务端不说。
这在监控场景下有两个明显的不舒服:
第一,服务端想"主动找你",做不到。 比如运维人员在 UI 上点了"修改线程池核心线程数",服务端需要把这个指令下发给对应的客户端应用。用 HTTP 怎么做?只能等客户端下一次心跳来的时候"捎带"回去——延迟不可控,实现也别扭。
第二,每次上报都有协议开销。 HTTP 请求头动辄几百字节,对于心跳这种只有几十字节有效载荷的场景,协议开销比数据本身还大。虽然 keep-alive 避免了 TCP 握手的开销,但 HTTP 头部的冗余在高频场景下还是很浪费。
WebSocket 就是为这种场景而生的:一次握手,长期持有,双向通信。连接建立之后,客户端和服务端可以随时互发消息,没有额外的 HTTP 头部开销,也不需要轮询。
从 Phoenix 的源码中可以清晰地看到这个演进过程——几乎每个定时上报线程里都有一段被注释掉的 HTTP 代码:
// 改成用 WebSocket,弃用 HTTP
// String result = Sender.send(UrlConstants.HEARTBEAT_URL, heartbeatPackage.toJsonString());
WebSocketPackage requestPackage = new WebSocketPackage();
requestPackage.setClassName(HeartbeatPackage.class.getName());
requestPackage.setPayload(heartbeatPackage);
DataExchanger.sendMessage(requestPackage);
高频的定时上报(心跳、JVM、服务器、线程池、网络设备、Docker)全部迁移到了 WebSocket;而低频的操作指令(告警上报、异常上报、配置刷新等)仍然保留 HTTP。两条通道各司其职,而不是一刀切地替换。
二、先看全局:WebSocket 通道的角色分布
在深入代码之前,先梳理一下 WebSocket 通道涉及的关键角色和它们的关系:
┌──────────────────────┐
│ phoenix-client │ 客户端 SDK(集成到你的 Java 应用中)
│ ┌────────────────┐ │
│ │ WebsocketClient│ │ 基于 Tyrus(JSR-356),不依赖 Spring
│ │ (Tyrus/JSR356)│ │ 自动重连 + 指数退避
│ └───────┬────────┘ │
│ │ ws(s):// │
└──────────┼───────────┘
│
▼
┌──────────────────────┐
│ phoenix-agent │ 代理端(WebSocket 中继站)
│ ┌──────────────────┐ │
│ │RelayWebSocket │ │ 基于 JSR-356 @ServerEndpoint
│ │ Endpoint │ │ 下游连接 → 中转 → 上游连接
│ └───────┬──────────┘ │
│ │ ws(s):// │
└─────────┼────────────┘
│
▼
┌──────────────────────┐
│ phoenix-server │ 服务端(WebSocket 终点站)
│ ┌──────────────────┐ │
│ │ WebSocketServer │ │ 基于 Netty,高性能 NIO
│ │ (Netty) │ │ Boss/Worker 双线程池
│ └──────────────────┘ │
└──────────────────────┘
注意到了吗?服务端用 Netty,客户端用 Tyrus(JSR-356)。这不是随意的选择:
- 服务端需要同时处理成百上千个客户端的长连接,对性能和并发能力要求极高——Netty 的 Reactor 模型正是为此而生。
- 客户端 SDK 需要能在任何 Java 程序中运行,不能引入 Netty 的重量级依赖(避免和用户项目中的 Netty 版本冲突)——用标准的 JSR-356 WebSocket API + Tyrus 作为实现,既轻量又兼容。
而代理端作为"中继站",同时扮演着服务端和客户端两个角色:对下游(客户端)来说它是 WebSocket 服务端(使用 SpringBoot 内嵌的 JSR-356 支持),对上游(phoenix-server)来说它是 WebSocket 客户端。
三、服务端概览:Netty 构建的 WebSocket 引擎
Phoenix 的 WebSocket 服务端核心是 WebSocketServer 类,基于 Netty 的 Reactor 主从线程模型 构建:
- bossGroup(1 个线程):只负责接受新的 TCP 连接,就像餐厅门口的迎宾。
- workerGroup(默认 CPU 核心数 × 2 个线程):负责已建立连接上的所有 I/O 操作——读数据、写数据、编解码、业务处理。
每一个新连接都会被 WebSocketServerInitializer 装配一条有序的 7 层 Pipeline 处理链:
| 层级 | Handler | 职责 |
|---|---|---|
| ① | SslHandler(可选) | WSS 模式下的 TLS 加解密 |
| ② | HttpServerCodec | HTTP 编解码(WebSocket 握手阶段使用) |
| ③ | HttpObjectAggregator(10MB) | HTTP 消息聚合,应对大型数据包 |
| ④ | WebSocketServerCompressionHandler | WebSocket 帧级压缩(permessage-deflate) |
| ⑤ | IdleStateHandler(300 秒) | 5 分钟无入站数据触发空闲事件 |
| ⑥ | WebSocketServerProtocolHandler | 自动处理握手升级(HTTP → WebSocket)和控制帧 |
| ⑦ | WebSocketSimpleChannelInboundHandler | Phoenix 业务入口 |
数据像水一样流过这条 Pipeline——从字节流到 HTTP,从 HTTP 升级到 WebSocket,经过压缩、空闲检测,最终到达业务处理层。
最后一层 WebSocketSimpleChannelInboundHandler 做了一件巧妙的事:通过 InvokerHolder 机制把 Netty 世界和 Spring 世界连接起来,最终会被转发给 Spring 容器中的 IWebSocketFrameHandler。为什么需要这座"桥"?因为 Netty Handler 是在每次新连接时 new 出来的,它不是 Spring Bean,无法直接注入 Service——InvokerHolder 把调用转发给 Spring 容器管理的 Bean,解决了这个问题。
空闲连接的处理也值得一提:5 分钟内没有收到任何入站数据(包括 Ping/Pong),服务端会发送一个 CloseWebSocketFrame(状态码 1001,Going Away),由 WebSocketServerProtocolHandler 自动完成后续的关闭流程——这比粗暴地 ctx.close() 优雅得多。
此外,服务端还有一个 60 秒一轮的连接健康巡检任务:清理已失效的"幽灵连接",并在集群模式下刷新集群存储中的客户端信息。
下一篇《Netty WebSocket服务端启动与初始化流程》将深入展开
WebSocketServer.start()的完整引导过程、Boss/Worker 线程池配置、SSL 上下文构建(buildSslContext())、Pipeline 每一层的设计考量、连接健康巡检机制以及优雅关闭策略。
四、WebSocketFrameHandler——业务路由的枢纽
前面说到消息最终会被转发给 Spring 容器中的 IWebSocketFrameHandler。在服务端,它的实现类是 WebSocketFrameHandler——一个精心设计的业务路由分发器。
@Component
@ConditionalOnProperty(name = "ws.server.enable", havingValue = "true")
public class WebSocketFrameHandler implements IWebSocketFrameHandler {
public static final AttributeKey<String> WS_BUSINESS_TYPE = AttributeKey.valueOf("ws_business_type");
private final Map<String, IWebSocketBusinessHandler> businessHandlers;
public WebSocketFrameHandler(@Autowired List<IWebSocketBusinessHandler> handlers) {
this.businessHandlers = Maps.newHashMap();
for (IWebSocketBusinessHandler handler : handlers) {
this.businessHandlers.put(handler.businessType().toLowerCase(), handler);
}
}
}
构造函数通过 Spring 注入所有 IWebSocketBusinessHandler 的实现类,构建一个 业务类型 → 处理器 的映射表。目前 Phoenix 有两种业务类型:
monitoring:监控数据上报(心跳、JVM、服务器、线程池等)arthas:在线 Java 诊断(基于 Alibaba Arthas)
当 WebSocket 握手完成时,WebSocketFrameHandler 从 URI 路径的最后一段提取业务类型,然后委托给对应的处理器:
@Override
public void userEventTriggered(...) {
if (evt instanceof HandshakeComplete) {
HandshakeComplete handshake = (HandshakeComplete) evt;
UriComponents uriComponents = UriComponentsBuilder.fromUriString(handshake.requestUri()).build();
List<String> pathSegments = uriComponents.getPathSegments();
if (CollectionUtils.isEmpty(pathSegments)) {
ctx.fireUserEventTriggered(evt);
return;
}
// URI 最后一段就是业务类型,例如 /phoenix/websocket/relay/monitoring
String webSocketBusinessType = pathSegments.get(pathSegments.size() - 1).toLowerCase();
IWebSocketBusinessHandler handler = this.businessHandlers.get(webSocketBusinessType);
if (handler != null) {
handler.handle(simpleChannelInboundHandler, ctx, handshake, uriComponents);
// 把业务类型存到 Channel 属性中,后续消息处理时用
ctx.channel().attr(WS_BUSINESS_TYPE).set(webSocketBusinessType);
} else {
ctx.writeAndFlush(new CloseWebSocketFrame(1008, "不支持的业务类型:" + webSocketBusinessType));
}
} else {
// 非握手事件,继续向 Pipeline 下游传播
ctx.fireUserEventTriggered(evt);
}
}
这个设计的扩展性非常好:新增一种 WebSocket 业务,只需实现 IWebSocketBusinessHandler 接口并注册为 Spring Bean,路由逻辑一行代码都不用改。握手完成后的业务类型被存储到了 Netty Channel 的属性中(AttributeKey),后续收到数据帧时可以直接取出来路由到正确的处理器。
五、MonitoringFrameHandler——监控业务的"接待员"
MonitoringFrameHandler 是监控数据业务的核心处理器。它负责三件大事:客户端注册、消息接收和消息下发。
5.1 握手后的客户端注册
当一个监控客户端通过 WebSocket 连接上来时,handle() 方法被调用:
@Override
public void handle(WebSocketSimpleChannelInboundHandler handler, ChannelHandlerContext ctx,
HandshakeComplete handshake, UriComponents uriComponents) {
MultiValueMap<String, String> parameters = uriComponents.getQueryParams();
String endpoint = parameters.getFirst("endpoint"); // client/agent
String instanceId = parameters.getFirst("instanceId"); // 应用实例唯一ID
// ... 参数校验 ...
// 构造全局唯一的客户端标识
final String websocketClientId = WebsocketClientIdGenerator.generate(endpoint, instanceId);
// 检查是否已有相同标识的客户端在线(防止重复注册)
Optional<WebSocketClientInfo> socketClientInfo = this.webSocketServer.findClient(websocketClientId);
if (socketClientInfo.isPresent()) {
// 旧连接存在!先关闭旧的,再注册新的
// ... 重复连接处理逻辑 ...
}
// 没有重复,直接注册
this.doRegister(ctx, handshake, websocketClientId);
}
客户端连接时通过 URL 查询参数携带 endpoint(端点类型)和 instanceId(实例ID),服务端据此生成一个全局唯一的客户端标识。
重复连接的处理值得特别关注。在网络不稳定的环境下,客户端可能已经重连成功了,但旧连接还没有完全关闭——这时候就会出现"同一个客户端有两个连接"的情况。Phoenix 的处理策略是:
- 向旧连接发送 Close 帧(自定义状态码 4000,表示"重复连接")
- 用
AtomicBoolean+ 超时保护(5秒)确保新连接只被注册一次 - 优先等待旧连接正常关闭后再注册新连接;如果 5 秒内旧连接没有关闭,强制注册
这种"双保险"设计(优先路径 + 超时兜底)在分布式系统中非常常见——你永远不能假设网络一定会按预期工作。客户端收到 4000 状态码后会识别并跳过重连——这部分将在第五篇详细展开。
5.2 消息接收与分发
客户端通过 WebSocket 发来的监控数据,最终会到达 onMessageReceived:
@Override
public void onMessageReceived(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
if (frame == null || !frame.isFinalFragment()) { return; }
String message = frame.text();
if (StringUtils.isBlank(message)) { return; }
// 将原始 "密文 WebSocket 消息JSON字符串" 解析并转换为 "WebSocketPackage 数据包"
WebSocketPackage pkg = WebSocketPackage.convert(message, UPSTREAM_ALLOWED_CLASS_NAMES);
// 委托给 WebSocket 消息分发器
this.dispatcher.dispatch(ctx, pkg);
}
几个重要的细节:
isFinalFragment()检查:WebSocket 协议支持消息分片传输(一条大消息拆成多个帧发送)。这里只处理完整的最终帧,避免拼接半截消息。- 白名单校验:
UPSTREAM_ALLOWED_CLASS_NAMES是一个Set<String>,只允许特定类型的数据包被反序列化——这是一道重要的安全防线,防止恶意客户端发送可能触发反序列化漏洞的类名。 - 密文解析:
WebSocketPackage.convert()内部会调用MsgPayloadUtils.decryptPayload()解密——没错,WebSocket 通道上的数据同样是加密传输的,和 HTTP 通道共享同一套加密体系。
5.3 服务端主动下发消息
这是 WebSocket 相对于 HTTP 的杀手级能力。MonitoringFrameHandler 提供了两种下发方式:
// 异步发送——Fire and forget
public void sendMsgToClient(String websocketClientId, WebSocketPackage requestPackage) {
ChannelHandlerContext ctx = this.validateAndResolveContext(websocketClientId, requestPackage);
if (ctx == null) { return; }
String encryptStr = MsgPayloadUtils.encryptPayload(requestPackage.toJsonString());
ctx.writeAndFlush(new TextWebSocketFrame(encryptStr)).addListener(future -> {
if (!future.isSuccess()) {
log.warn("向客户端[{}]发送消息失败!", websocketClientId, future.cause());
}
});
}
// 同步发送——等到发完或超时
public void sendMsgToClientSync(String websocketClientId, WebSocketPackage requestPackage,
long timeout, TimeUnit unit) {
ChannelHandlerContext ctx = this.validateAndResolveContext(websocketClientId, requestPackage);
// ... 校验 ...
String encryptStr = MsgPayloadUtils.encryptPayload(requestPackage.toJsonString());
ChannelFuture future = ctx.writeAndFlush(new TextWebSocketFrame(encryptStr));
boolean completed = future.await(timeout, unit);
if (!completed) {
throw new WebSocketException("发送消息超时!");
}
}
举个实际场景:运维人员在 UI 上修改了某个应用的线程池核心线程数。这个操作通过 HTTP 请求到达服务端后,服务端需要把新配置推送给对应的客户端应用。通过 sendMsgToClient(),服务端可以立刻把指令送达——不需要等下一次心跳周期。
validateAndResolveContext() 方法做了三重防护:
- 参数非空校验
- 从
clientInfoMap中查找客户端 - 检查 Channel 是否仍然活跃(
ctx.channel().isActive())
如果 Channel 已经不活跃了,说明连接实际上已经断了,还会主动清理掉这个"僵尸"客户端。
六、WebSocketPackage——WebSocket 时代的"数据信封"
在 HTTP 通道中,每种数据包都有自己的 URL 路由(/heartbeat/accept-heartbeat-package、/jvm/accept-jvm-package 等)。但在 WebSocket 通道上,所有消息都走同一条连接——怎么区分这条消息是心跳包还是 JVM 信息包?
Phoenix 设计了 WebSocketPackage 来解决这个问题:
public class WebSocketPackage extends AbstractSuperBean {
private String className; // 负载数据对应的 Java 类全限定名
private Object payload; // 实际数据
}
只有两个字段,简洁到极致。className 告诉接收方"这个包里装的是什么类型的数据",payload 就是数据本身。这就像一个快递箱上贴的标签——标签写着"心跳包",里面装的就是 HeartbeatPackage 对象。
客户端发送时的用法也很直观:
// 构建心跳数据包
HeartbeatPackage heartbeatPackage = this.clientPackageConstructor.structureHeartbeatPackage();
// 装进 WebSocketPackage 信封
WebSocketPackage requestPackage = new WebSocketPackage();
requestPackage.setClassName(HeartbeatPackage.class.getName());
requestPackage.setPayload(heartbeatPackage);
// 发送
DataExchanger.sendMessage(requestPackage);
接收方通过 WebSocketPackage.convert() 方法反序列化时,有一个重要的安全设计——白名单校验:
public static WebSocketPackage convert(String jsonMessage, Set<String> allowedClassNames)
throws ClassNotFoundException {
String decryptStr = MsgPayloadUtils.decryptPayload(jsonMessage);
JSONObject root = JSON.parseObject(decryptStr);
String className = root.getString("className");
// 只允许白名单中的类被反序列化
if (!allowedClassNames.contains(className)) {
throw new WebSocketException("拒绝反序列化未授权的类:" + className);
}
WebSocketPackage pkg = new WebSocketPackage();
pkg.setClassName(className);
pkg.setPayload(root.getObject("payload", Class.forName(className)));
return pkg;
}
为什么要白名单?因为 Class.forName() + 反序列化是 Java 安全领域的经典攻击面。如果不加限制,攻击者可以构造一个 className 为 java.lang.Runtime 的消息包,利用反序列化漏洞执行任意代码。Phoenix 的白名单(WebSocketPkgPayloadWhitelistConstants)明确列出了所有允许的类型——心跳包、JVM 包、服务器包、告警包等——任何不在白名单中的类名直接拒绝。而且上行和下行分别有各自独立的白名单,严格遵循最小权限原则。
七、WebSocketMessageDispatcher——事件驱动的消息分发
服务端收到 WebSocketPackage 后,怎么把它交给正确的业务处理器?Phoenix 用了一套 Spring Event 事件驱动 的分发机制。
@Component
@RequiredArgsConstructor
public class WebSocketMessageDispatcher {
private final ApplicationEventPublisher eventPublisher;
// 路由表:className → 事件创建工厂
private static final Map<String, IWebSocketEventFactory> EVENT_FACTORY_MAP = new HashMap<>();
static {
register(AlarmPackage.class, (ctx, payload) -> new AlarmEvent(ctx, (AlarmPackage) payload));
register(DockerPackage.class, (ctx, payload) -> new DockerEvent(ctx, (DockerPackage) payload));
register(ExceptionPackage.class, (ctx, payload) -> new ExceptionEvent(ctx, (ExceptionPackage) payload));
register(HeartbeatPackage.class, (ctx, payload) -> new HeartbeatEvent(ctx, (HeartbeatPackage) payload));
register(JvmPackage.class, (ctx, payload) -> new JvmEvent(ctx, (JvmPackage) payload));
register(JavaThreadPoolPackage.class, (ctx, payload) -> new JavaThreadPoolEvent(ctx, (JavaThreadPoolPackage) payload));
register(NetworkDevicePackage.class, (ctx, payload) -> new NetworkDeviceEvent(ctx, (NetworkDevicePackage) payload));
register(OfflinePackage.class, (ctx, payload) -> new OfflineEvent(ctx, (OfflinePackage) payload));
register(ServerPackage.class, (ctx, payload) -> new ServerEvent(ctx, (ServerPackage) payload));
}
public void dispatch(ChannelHandlerContext ctx, WebSocketPackage pkg) {
String className = pkg.getClassName();
IWebSocketEventFactory factory = EVENT_FACTORY_MAP.get(className);
if (factory == null) {
log.warn("收到未注册的WebSocket消息类型,className:{}", className);
return;
}
ApplicationEvent event = factory.create(ctx, pkg.getPayload());
this.eventPublisher.publishEvent(event);
}
}
这个设计堪称教科书级的开闭原则实践:
- 注册阶段:在
static块中,为每种数据包类型注册一个事件创建工厂(Lambda 表达式),目前共 9 种类型 - 分发阶段:根据
className查表,创建对应的 Spring 事件并发布 - 处理阶段:对应的
@EventListener方法在 Spring 容器中监听事件并处理
新增一种数据包类型时,只需:
- 在
static块中加一行register(...) - 写一个新的 Event 类和对应的
@EventListener处理方法
分发逻辑一行不改。
和 HTTP 通道的 Controller + AOP 切面相比,WebSocket 通道的消息处理走的是完全不同的路径——不经过 Spring MVC 的 Controller 层,而是通过 Netty Handler → InvokerHolder → Spring Event 这条链路。但底层的加解密逻辑(MsgPayloadUtils)是复用的。
八、客户端概览:Tyrus 的轻量之道
为什么不用 Netty 做客户端?
phoenix-client-core 的设计目标是能在任何 Java 程序中运行。如果客户端也用 Netty,会引入一大堆依赖(netty-codec-http、netty-handler、netty-transport 等),而且如果用户的项目中已经用了不同版本的 Netty——恭喜,你将获得一场精彩的 NoSuchMethodError 表演。
Phoenix 选择了 Tyrus——一个 JSR-356(Java WebSocket API)的参考实现。JSR-356 是 Java EE 标准的一部分,API 极其简洁:
@ClientEndpoint
public class WebsocketClient {
@OnOpen
public void onOpen(Session session) { ... }
@OnMessage
public void onMessage(String message) { ... }
@OnClose
public void onClose(CloseReason reason) { ... }
@OnError
public void onError(Throwable throwable) { ... }
}
四个注解,四个回调方法,一目了然。
核心能力一览
WebsocketClient 围绕连接可靠性做了大量设计:
| 能力 | 实现手段 | 要点 |
|---|---|---|
| 并发连接保护 | AtomicBoolean CAS 操作 |
防止多线程同时发起连接,无锁、高效 |
| 连接同步 | CountDownLatch |
等待 @OnOpen 回调确认连接真正建立 |
| 指数退避重连 | 初始 5 秒 → 1.5 倍递增 → 上限 5 分钟 | 避免服务端故障时的重连风暴 |
| 重复连接识别 | 自定义状态码 4000 | 收到后不触发重连,防止"连上→被踢→重连"死循环 |
| 双层重连开关 | autoReconnect(不可变)+ enableReconnect(运行时可变) |
close() 关闭总闸后彻底停止重连 |
| 共享连接引擎 | 全局 SHARED_CLIENT(Tyrus ClientManager) |
复用底层连接资源 |
DataExchanger——客户端的数据交换中枢
DataExchanger 是客户端 WebSocket 通信的统一入口,封装了连接初始化、消息处理器注册和加密发送的完整流程。核心设计包括:
- DCL 双重检查锁 保证单次初始化
- SPI 机制(
ServiceLoader)自动发现和注册消息处理器——客户端不仅发数据,也要接收服务端下发的指令(比如线程池调参) - 加密发送 与 HTTP 通道共享同一套加密基础设施(
MsgPayloadUtils) volatile可见性 保证多线程安全读取客户端引用,sendMessage中先读到局部变量再判空,避免 TOCTOU 竞态条件
第五篇《Netty WebSocket客户端连接与重连机制》将深入展开
WebsocketClient的连接流程源码、CAS 并发保护细节、CountDownLatch 同步机制、指数退避重连策略实现、重复连接的客户端处理,以及DataExchanger的 DCL 初始化和 SPI 消息处理器注册。
九、代理端中继:WebSocket 的"驿站"
在 Phoenix 的架构中,代理端(agent)扮演着"驿站"角色——客户端的消息先到代理端,代理端再转发给服务端。HTTP 通道中是 RestTemplate 转发,WebSocket 通道中则是 Session 配对中继。
代理端同时暴露了一个 @ServerEndpoint 接收下游连接,和一个 UpstreamClientEndpoint 连接上游:
@ServerEndpoint("/websocket/relay/{subPath}")
public class RelayWebSocketEndpoint {
@OnOpen
public void onOpen(Session downstreamSession, @PathParam("subPath") String subPath) {
// 1. 检查连接数限制(最大1000个)
if (WebSocketRelayHelper.isConnectionLimitReached(SESSION_CACHE, CACHE_MAXIMUM_SIZE)) {
WebSocketUtils.safeClose(downstreamSession, ...);
return;
}
// 2. 构建上游 URL
String upstreamUrl = WebSocketUtils.buildUrl(UPSTREAM_BASE_URL + "/websocket/relay", subPath, downstreamSession);
// 3. 连接上游
UpstreamClientEndpoint clientEndpoint = new UpstreamClientEndpoint(...);
Session upstreamSession = container.connectToServer(clientEndpoint, URI.create(upstreamUrl));
// 4. 建立下游 ↔ 上游的配对关系
WebSocketRelayHelper.establishPairing(..., downstreamSessionId, upstreamSessionId, ...);
}
}
核心思路是**"对偶配对"**:每个下游 Session 对应一个上游 Session,消息从一头进来,原封不动(或稍加处理)从另一头出去。配对关系通过两个 ConcurrentMap 维护——DOWNSTREAM_TO_UPSTREAM 和 UPSTREAM_TO_DOWNSTREAM——双向映射,方便从任意一端找到对端。
代理端在中继监控业务(subPath = "monitoring")的消息时,会在两个方向都做一件额外的事:添加链路信息。
以上行方向(下游→上游,RelayWebSocketEndpoint.onMessage())为例:
if (WebSocketBusinessTypeConstants.MONITORING.equals(subPath)) {
WebSocketPackage pkg = WebSocketPackage.convert(message, UPSTREAM_ALLOWED_CLASS_NAMES);
AbstractSuperPackage superPackage = (AbstractSuperPackage) pkg.getPayload();
// 添加链路信息:记录"经过了代理端"
superPackage.setChain(AGENT_PACKAGE_CONSTRUCTOR.getChain(superPackage));
pkg.setPayload(superPackage);
message = MsgPayloadUtils.encryptPayload(pkg.toJsonString());
}
下行方向(上游→下游,UpstreamClientEndpoint.onMessage())也有同样的逻辑,使用的是下行白名单 DOWNSTREAM_ALLOWED_CLASS_NAMES。这就是第一篇博客中提到的 chain 字段——数据在流转过程中,双向都会记录下自己的行经路线(client → agent → server),后续被用来构建服务拓扑图。
Session 缓存使用了 Guava 的 Cache,配置了 30 分钟过期(expireAfterAccess)和最大 1000 个连接的限制。过期时自动触发 removalListener 回调,安全关闭对应的 Session。此外,onOpen 中配置的 MAX_IDLE_TIMEOUT(10 分钟)也会让 WebSocket 容器在连接空闲时主动关闭——多层清理机制确保不会出现资源泄漏。
十、配置体系
WebSocket 通道的配置分为两部分:服务端配置和客户端配置。
服务端配置(application.yml):
ws:
server:
enable: true # 是否开启 WebSocket 服务
host: 0.0.0.0 # 绑定地址
port: 16001 # WebSocket 端口(和 HTTP 端口独立)
path: /phoenix # WebSocket 路径
client-connect-host: # 集群部署时的外部连接地址
ssl:
enabled: false
key-store: classpath:keystore.jks
key-store-password: changeit
key-store-type: JKS
key-alias: myalias
注意 ws.server.enable 这个开关——通过 @ConditionalOnProperty 注解,WebSocket 功能可以被完全关闭。如果你只想用 HTTP 通道,不需要改一行代码,改个配置就行。
客户端配置(monitoring.properties 或 application.yml):
# 方式一:直接配置 WebSocket URL
monitoring.comm.websocket.url=ws://127.0.0.1:12000/phoenix-agent
# 方式二:只配置 HTTP URL,客户端自动推导 WebSocket URL
monitoring.comm.http.url=http://127.0.0.1:12000/phoenix-agent
Phoenix 的客户端有一个贴心的设计:如果没有显式配置 WebSocket URL,它会自动从 HTTP URL 推导——把 http:// 替换为 ws://,https:// 替换为 wss://。不过有一个前提条件:上游服务不能是直连 phoenix-server(因为 server 的 WebSocket 端口和 HTTP 端口是独立的)。只有通过代理端(agent)连接时,WebSocket 才能从 HTTP URL 自动推导。
十一、双通道对比:全景图
把 HTTP 和 WebSocket 两条通道放在一起对比:
| 维度 | HTTP 通道 | WebSocket 通道 |
|---|---|---|
| 通信模式 | 请求-响应,短连接 | 全双工,长连接 |
| 服务端实现 | Spring MVC Controller | Netty Handler + Spring Event |
| 客户端实现 | Apache HttpClient | Tyrus(JSR-356) |
| 加解密 | AOP 切面(RequestBodyAdvice) |
手动调用 MsgPayloadUtils |
| 路由方式 | URL 路径(/heartbeat/accept-...) |
WebSocketPackage.className |
| 消息分发 | Controller 方法直接处理 | WebSocketMessageDispatcher 事件驱动 |
| 服务端下推 | 不支持 | sendMsgToClient()、sendMsgToClientSync() |
| 代理端中继 | RestTemplate 转发 | Session 配对透传 |
| 重试机制 | HttpClient 内置 3 次 / @Retryable |
指数退避自动重连 |
| 典型场景 | 告警上报、配置刷新、数据库操作 | 心跳、JVM、服务器、线程池等高频上报 |
两者在数据安全层面完全一致——都使用 MsgPayloadUtils 进行压缩/加密,都传输 CiphertextPackage 格式的密文。差异主要在传输层和消息分发层。
十二、画一张完整的 WebSocket 消息流转图
以客户端通过代理端发送心跳包到服务端为例:
┌──────────────┐ ┌──────────────────┐ ┌────────────────────┐
│phoenix-client│ │ phoenix-agent │ │ phoenix-server │
│ │ │ (中继站) │ │ (终点站) │
└──────┬───────┘ └────────┬─────────┘ └──────────┬─────────┘
│ │ │
│ ① 构造 HeartbeatPackage │ │
│ ② 封装为 WebSocketPackage │ │
│ className=HeartbeatPackage │ │
│ ③ DataExchanger.sendMessage() │ │
│ toJsonString → encryptPayload │ │
│ │ │
│─── ws TextFrame (密文) ────────▶│ │
│ │ │
│ ④ RelayWebSocketEndpoint.onMessage() │
│ 解密 → 添加链路信息 → 重新加密 │
│ │ │
│ │── ws TextFrame (密文) ───────▶ │
│ │ │
│ │ ⑤ WebSocketSimpleChannel │
│ │ InboundHandler.channelRead0 │
│ │ ⑥ WebSocketFrameHandler │
│ │ 路由到 monitoring 处理器 │
│ │ ⑦ MonitoringFrameHandler │
│ │ .onMessageReceived() │
│ │ ⑧ WebSocketPackage.convert() │
│ │ 解密 → 白名单校验 → 反序列化 │
│ │ ⑨ WebSocketMessageDispatcher │
│ │ .dispatch() │
│ │ ⑩ eventPublisher │
│ │ .publishEvent(HeartbeatEvent) │
│ │ ⑪ @EventListener 处理心跳 │
│ │ 写入 MONITOR_INSTANCE 表 │
如果是直连模式(不经过代理端),去掉中间的中继环节,消息从客户端直达服务端。但注意:直连服务端时走的是 Netty 的 WebSocket 端口(如 16001),而不是 HTTP 端口(如 16000)。
十三、集群部署支持
当 Phoenix 以集群模式部署多个 server 实例时,同一个客户端可能连接到不同的 server 节点。这时候就需要知道"客户端 A 连接在哪个 server 上"——IWebSocketClusterStore 接口就是为此而生的。
public interface IWebSocketClusterStore {
void addClient(String clientId, WebSocketClientClusterInfo info, long expire, TimeUnit timeUnit);
WebSocketClientClusterInfo findClient(String clientId);
void removeClient(String clientId);
Collection<String> allClientIds();
Map<String, WebSocketClientClusterInfo> clientInfo(String appName);
}
Phoenix 默认提供了基于 Caffeine 缓存 的实现:
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "spring.cache.type", havingValue = "caffeine")
public IWebSocketClusterStore webSocketClusterStore(@Autowired CacheManager cacheManager) {
Cache cache = cacheManager.getCache(WebSocketCacheConstants.CACHE_IN_CAFFEINE_WEB_SOCKET_CLUSTER);
WebSocketInCaffeineClusterStore store = new WebSocketInCaffeineClusterStore();
store.setCache(cache);
return store;
}
在集群场景下,每个 server 节点每 60 秒刷新一次自己管理的客户端信息到集群存储中(带 1 小时过期时间)。当服务端需要向某个客户端下发指令时,先从集群存储中查到该客户端连接在哪个节点上,再通过该节点转发。
@ConditionalOnMissingBean 意味着你可以替换为自己的实现——比如基于 Redis 的集群存储,适用于跨机房部署的场景。
十四、设计复盘:几个值得学习的点
"事件驱动"——告别 if-else 地狱
如果用传统的 if-else 或 switch-case 来根据 className 分发消息,每新增一种数据包类型就要改分发逻辑。WebSocketMessageDispatcher 用工厂模式 + Spring Event,让新增类型变成了"只加不改"。
"白名单"——安全不是事后补丁
反序列化安全问题不是等出了安全漏洞再修——Phoenix 在设计之初就通过 WebSocketPkgPayloadWhitelistConstants 限制了允许反序列化的类型。上行和下行还分别有各自的白名单,最小权限原则。
"因地制宜"——技术选型不是非此即彼
服务端用 Netty(高并发刚需),客户端用 Tyrus(轻量兼容刚需),代理端用 JSR-356 标准(SpringBoot 内嵌支持)。三个角色根据各自的运行环境约束选择了最合适的技术方案,而不是"一刀切"地统一技术栈。
"渐进式迁移"——不赌命
从源码中被注释掉的 HTTP 代码可以看出,Phoenix 是逐步把高频上报从 HTTP 迁移到 WebSocket 的。每个数据类型都可以独立切换,出了问题可以单个回退。这种策略比"一夜之间全部替换"稳妥得多。
十五、小结
本篇从宏观视角拆解了 Phoenix WebSocket 通信通道的完整架构。回顾核心要点:
- 为什么要 WebSocket:全双工长连接,解决 HTTP 无法服务端推送、高频上报协议开销大的问题
- 服务端(Netty):
WebSocketServer基于 Boss/Worker 双线程池,WebSocketServerInitializer编排了 7 层 Pipeline,InvokerHolder桥接 Netty 与 Spring - 业务路由:
WebSocketFrameHandler根据 URI 路径段路由到IWebSocketBusinessHandler(monitoring/arthas),新增业务只需实现接口并注册为 Spring Bean - 消息分发:
WebSocketMessageDispatcher通过工厂模式 + Spring Event 实现开闭原则,9 种数据包类型的分发一行代码不改 - 数据模型:
WebSocketPackage(className + payload)替代了 HTTP 的 URL 路由,白名单校验防止反序列化攻击 - 客户端(Tyrus):
WebsocketClient基于 JSR-356 标准 API,支持指数退避重连、并发保护、重复连接识别;DataExchanger封装了初始化和加密发送 - 代理端中继:
RelayWebSocketEndpoint通过 Session 配对实现上下游透传,双向添加链路信息,Guava Cache 管理连接生命周期 - 集群支持:
IWebSocketClusterStore接口支持多节点部署时的客户端定位,默认提供 Caffeine 缓存实现 - 安全:WSS/SSL 支持、传输加密(共享 HTTP 通道的加密基础设施)、反序列化白名单
如果说 HTTP 通道像打电话——拨号、说话、挂断,每次通话都要重新拨号;那 WebSocket 通道就像对讲机——按下通话键就一直保持连接,随时都能说话,对方也能随时说话。对于监控这种需要持续、高频、双向通信的场景,对讲机显然比电话更合适。
下一篇,我们将把镜头拉近,深入 Netty WebSocket 服务端的启动与初始化流程——从 WebSocketServer.start() 的引导过程到 SSL 上下文构建,从 Pipeline 每一层的设计考量到连接健康巡检与优雅关闭,来一次 Netty 核心概念的沉浸式解读。
评论