目录

    Phoenix监控平台技术解析(四):Netty WebSocket 服务端启动与初始化流程

    上一篇我们宏观地拆解了 WebSocket 通道的架构设计——从 Boss/Worker 双线程池到 7 层 Pipeline 处理链。本篇将镜头拉近,深入 WebSocketServer.start() 的完整引导过程,逐行剖析 SSL 上下文构建、Pipeline 编排细节、连接健康巡检机制以及优雅关闭策略——来一次 Netty 核心概念的沉浸式解读。


    一、从 Spring Boot 的生命周期说起

    在 Phoenix 监控平台的 WebSocketServerConfiguration 配置类中,有一个精心设计的 Bean 定义:

    @Bean(initMethod = "start", destroyMethod = "stop")
    @ConditionalOnMissingBean
    public WebSocketServer webSocketServer(@Autowired(required = false) IWebSocketClusterStore clusterStore) {
        WebSocketServer webSocketServer = new WebSocketServer();
        webSocketServer.setHost(this.webSocketProperties.getServer().getHost());
        webSocketServer.setPort(this.webSocketProperties.getServer().getPort());
        webSocketServer.setSsl(this.webSocketProperties.getServer().getSsl());
        webSocketServer.setPath(this.webSocketProperties.getServer().getPath());
        webSocketServer.setClientConnectHost(this.webSocketProperties.getServer().getClientConnectHost());
        if (clusterStore != null) {
            webSocketServer.setClusterStore(clusterStore);
        }
        return webSocketServer;
    }
    

    注意这两个属性:initMethod = "start"destroyMethod = "stop"

    这意味着什么?当 Spring 容器启动时,创建完 WebSocketServer 这个 Bean 后,会自动调用它的 start() 方法;当 Spring 容器关闭时,会自动调用它的 stop() 方法。这是一种经典的生命周期回调模式——把组件的初始化和销毁逻辑封装在 Bean 自身中,Spring 负责在合适的时机触发。

    所以,整个 WebSocket 服务端的启动流程,就从 start() 方法开始。

    二、启动流程全景图

    先上一张完整的时序图,再来逐行拆解:

    ┌──────────────┐          ┌─────────────────┐                         ┌──────────────────────┐
    │Spring 容器    │          │  WebSocketServer│                         │   ServerBootstrap    │
    └──────┬───────┘          └────────┬────────┘                         └──────────┬───────────┘
           │                           │                                             │
           │ ① initMethod="start"      │                                             │
           ├──────────────────────────▶│                                             │
           │                           │                                             │
           │                           │ ② TimeInterval timer()                      │
           │                           │ 开始计时                                      │
           │                           │                                             │
           │                           │ ③ buildSslContext()                         │
           │                           │ (如果启用 SSL)                                │
           │                           │                                             │
           │                           │ ④ new ServerBootstrap()                     │
           │                           ├──────────────────────────———————————————────▶│
           │                           │                                              │
           │                           │ ⑤ group(bossGroup, workerGroup)              │
           │                           │ ⑥ channel(NioServerSocketChannel.class)      │
           │                           │ ⑦ handler(LoggingHandler)                    │
           │                           │ ⑧ childHandler(WebSocketServerInitializer)   │
           │                           ├─────────────────────────——————————————————──▶│
           │                           │                                              │
           │                           │ ⑨ bind(port).sync()                          │
           │                           ├───────────────────────────————————————————───▶│
           │                           │                                               │
           │                           │ ⑩ scheduleWithFixedDelay(                     │
           │                           │     cleanupAndRefreshCluster,                 │
           │                           │     60, 60, SECONDS)                          │
           │                           │                                               │
           │                           │ ⓫ log("WebSocket 服务端启动完成")                │
           │                           │                                                │
           │ ✅ start() 完成           │                                                │
           │◀──────────────────────────│                                                │
           │                           │                                                │
    

    2.1 第一步:计时器与 SSL 上下文构建

    public void start() throws Exception {
        // 计时器
        TimeInterval timer = DateUtil.timer();
    
        SslContext sslCtx = null;
        if (this.ssl != null && this.ssl.isEnabled()) {
            // 构建 SSL 上下文
            sslCtx = this.buildSslContext();
        }
        // ...后续代码
    }
    

    这里用到了 Hutool 工具包的 TimeInterval——一个轻量级的计时器。它的作用很简单:记录启动耗时,最后打印日志时用上。对于这种“只想简单计个时”的场景,Hutool 的工具类比 JDK 原生的 System.currentTimeMillis() 更语义化。

    SSL 上下文的构建是可选的——只有当配置文件中启用了 ws.server.ssl.enabled=true 时才会执行。让我们先跳过 SSL 细节,先看主流程。

    2.2 第二步:ServerBootstrap 的装备清单

    ServerBootstrap b = new ServerBootstrap();
    b.group(this.bossGroup, this.workerGroup)
            .channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new WebSocketServerInitializer(this, sslCtx));
    

    这段代码是 Netty 服务端启动的标准姿势。让我们逐个拆解每个方法的含义:

    new ServerBootstrap():这是 Netty 服务端的启动引导类。你可以把它理解为一个“装备装配器”——通过链式调用,一步步给服务端装备各种组件。

    .group(this.bossGroup, this.workerGroup):设置两个 EventLoopGroup。还记得第三篇博客中提到的 Reactor 模型吗?

    • bossGroup:只负责“接客”——接受新的 TCP 连接。就像餐厅门口的迎宾员,只负责把客人带进包厢,不管点菜上菜。
    • workerGroup:负责“服务”——处理已建立连接上的所有 I/O 操作。就像餐厅的服务员,负责点菜、上菜、结账等全套服务。

    这两个线程池在 WebSocketServer 类定义时就初始化了:

    private EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("monitoring-websocket-server-boss", true));
    private EventLoopGroup workerGroup = new NioEventLoopGroup(new DefaultThreadFactory("monitoring-websocket-server-worker", true));
    

    注意 bossGroup 的大小是1——因为.accept() 操作是单线程的,一个线程就够了。而 workerGroup 默认是CPU 核心数 × 2(Netty 的默认策略),因为 I/O 操作是并发的,需要更多线程来处理。

    .channel(NioServerSocketChannel.class):指定 Channel 的实现类。NioServerSocketChannel 是基于 Java NIO 的服务器端 Channel 实现——非阻塞 I/O,高并发场景下的性能利器。

    .handler(new LoggingHandler(LogLevel.INFO)):为 bossGroup 设置一个日志处理器。当有新的 TCP 连接建立时,会打印 INFO 级别的日志。这对于调试和监控非常有用——谁连进来了,什么时候连的,一目了然。

    .childHandler(new WebSocketServerInitializer(this, sslCtx)):为 workerGroup 设置子 Channel 的初始化器。注意这里是 childHandler 而不是 handler——前者用于 workerGroup(处理已建立的连接),后者用于 bossGroup(处理接受连接的 ServerSocketChannel)。

    WebSocketServerInitializer 是一个 ChannelInitializer,它的作用是:每当有一个新连接建立,就为这个连接初始化一条专属的 Pipeline 处理链。这就是第三篇博客中提到的“7 层 Pipeline”的来源。

    2.3 第三步:绑定端口与同步等待

    if (StringUtils.isBlank(this.host)) {
        this.channel = b.bind(this.port).sync().channel();
    } else {
        this.channel = b.bind(this.host, this.port).sync().channel();
    }
    

    这里有一个分支判断:如果 host 为空,则绑定到 0.0.0.0(监听所有网卡);否则绑定到指定的 IP 地址。

    .bind(this.port):异步操作,立即返回一个 ChannelFuture 对象。这个 Future 代表着“绑定操作的未来结果”。

    .sync():阻塞当前线程,直到绑定操作完成。为什么要阻塞?因为 WebSocket 服务端必须启动成功后,才能继续后续的启动流程——如果绑定失败(比如端口被占用),整个应用应该启动失败,而不是带着一个没启动的 WebSocket 继续跑。

    .channel():获取绑定成功后的 Channel 对象。这个 Channel 就是服务端的“总开关”——后续关闭服务时,就是通过关闭这个 Channel 来停止接收新连接。

    2.4 第四步:定时清理任务

    // 定时检查客户端信息:清理失效连接 + 刷新集群存储
    this.workerGroup.scheduleWithFixedDelay(this::cleanupAndRefreshCluster, 60, 60, TimeUnit.SECONDS);
    

    这行代码提交了一个每 60 秒执行一次的定时任务。注意这里用的是 workerGroup.scheduleWithFixedDelay()——为什么不用 JDK 的 ScheduledExecutorService

    答案很简单:复用现有的线程池资源workerGroup 本身就是一个 EventLoopGroup,它内部已经管理好了线程调度。用它来执行定时任务,不需要额外创建线程,节省系统资源。

    这个定时任务做两件大事:

    1. 清理失效连接:移除掉那些 TCP 连接已经断开但还没来得及从 Map 中删除的“幽灵客户端”
    2. 刷新集群存储:在集群部署模式下,定期刷新客户端连接到哪个 server 节点的信息

    关于这个定时任务的详细实现,后面会单独展开。

    2.5 第五步:启动完成日志

    // 时间差(毫秒)
    String betweenDay = timer.intervalPretty();
    log.info("WebSocket服务端启动:{}:{},耗时:{}", StringUtils.defaultIfBlank(this.host, "0.0.0.0"), this.port, betweenDay);
    

    最后一句日志,宣告启动完成。timer.intervalPretty() 会返回一个格式化的耗时字符串,比如“1 秒 234 毫秒”。

    到这里,整个 start() 方法执行完毕。

    三、SSL 上下文构建:WSS 模式的核心

    如果启用了 WSS(WebSocket Secure),那么 buildSslContext() 方法会在 start() 方法的第一步被调用。这个方法有 38 行代码,但信息密度很高。

    3.1 证书加载:从 classpath 到 KeyStore

    String keyStorePath = this.ssl.getKeyStore();
    String certPassword = this.ssl.getKeyStorePassword();
    String keyStoreType = this.ssl.getKeyStoreType();
    String keyAlias = this.ssl.getKeyAlias();
    
    // 以类路径开头
    String prefixClassPath = "classpath:";
    if (StringUtils.startsWith(keyStorePath, prefixClassPath)) {
        keyStorePath = StringUtils.removeStart(keyStorePath, prefixClassPath);
    }
    
    @Cleanup
    InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream(keyStorePath);
    if (inputStream == null) {
        throw new NotFoundConfigFileException("WebSocket无法加载证书文件: " + keyStorePath);
    }
    

    首先从配置对象中读取 SSL 相关参数:

    • keyStore:证书文件路径,支持 classpath: 前缀
    • keyStorePassword:密钥库密码
    • keyStoreType:密钥库类型(如 JKS、PKCS12)
    • keyAlias:密钥别名(可选)

    然后处理 classpath: 前缀——这是 Spring 生态中的常见约定,表示证书文件放在项目的 src/main/resources 目录下。去掉前缀后,通过类加载器读取证书文件的输入流。

    这里用到了 Lombok 的 @Cleanup 注解——这是 Lombok 提供的自动资源管理功能,相当于在 finally 块中自动调用 inputStream.close(),避免资源泄漏。比手动写 try-finally 简洁得多。

    3.2 KeyStore 加载与 KeyManagerFactory 初始化

    char[] passwordChars = certPassword.toCharArray();
    KeyStore ks = KeyStore.getInstance(keyStoreType);
    ks.load(inputStream, passwordChars);
    
    KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
    if (StringUtils.isNotBlank(keyAlias)) {
        KeyStore.Entry entry = ks.getEntry(keyAlias, new KeyStore.PasswordProtection(passwordChars));
        if (entry == null) {
            throw new IllegalArgumentException("WebSocket的Keystore中未找到别名: " + keyAlias);
        }
        // 创建一个只包含指定 alias 的 KeyStore 视图(更安全)
        KeyStore singleEntryKs = KeyStore.getInstance(keyStoreType);
        // 初始化空 keystore
        singleEntryKs.load(null, null);
        singleEntryKs.setEntry(keyAlias, entry, new KeyStore.PasswordProtection(passwordChars));
        kmf.init(singleEntryKs, passwordChars);
    } else {
        kmf.init(ks, passwordChars);
    }
    

    这段代码做了以下几件事:

    Step 1:加载 KeyStore

    使用 JDK 的 KeyStore API 加载证书文件。KeyStore 是 Java 中用于存储证书和密钥的数据库——可以理解为一个“保险箱”,里面装着服务器的公钥证书和私钥。

    Step 2:创建 KeyManagerFactory

    KeyManagerFactory 是用于创建 KeyManager 的工厂类。KeyManager 的作用是在 SSL 握手期间管理密钥材料——简单来说,就是向客户端证明“我是我”。

    Step 3:按别名提取单个证书(可选)

    如果配置了 keyAlias,则从 KeyStore 中提取出这一个证书条目,然后创建一个新的、只包含这一个证书的 KeyStore 视图。

    为什么要多此一举?最小权限原则。原始的 KeyStore 可能包含多个证书条目(比如开发环境为了测试方便,可能把多个服务的证书都放在一个 JKS 文件中)。如果直接把整个 KeyStore 传给 KeyManagerFactory,理论上所有这些证书都能被用于 SSL 握手——但实际上只需要其中一个。

    创建一个只包含目标证书的“视图 KeyStore”,就像给 KeyManager 戴了一副“聚焦眼镜”——让它只能看到需要的那个证书,其他证书视而不见。这是一种安全最佳实践。

    3.3 构建 SslContext

    final SslContext sslCtx = SslContextBuilder.forServer(kmf).build();
    log.info("WebSocket启用SSL,证书文件:{},密钥别名:{}", keyStorePath, StringUtils.defaultIfBlank(keyAlias, "未指定"));
    return sslCtx;
    

    最后一步最简单——使用 Netty 的 SslContextBuilder 构建 SslContext

    这里有一个被注释掉的旧代码:

    // SSLContext sslContext = SSLContext.getInstance("TLS");
    // sslContext.init(kmf.getKeyManagers(), null, null);
    // SelfSignedCertificate ssc = new SelfSignedCertificate();
    // final SslContext sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).keyManager(kmf).build();
    

    这是开发过程中留下的“历史遗迹”——早期可能尝试过不同的实现方案,后来发现直接用 SslContextBuilder.forServer(kmf) 最简洁,就把旧代码注释掉了。这种“注释掉的代码”在源码中很常见,记录了作者的思考轨迹。

    3.4 SslHandler:Pipeline 的第一道关卡

    构建好的 SslContext 会被传递给 WebSocketServerInitializer,然后在 initChannel() 方法中被用来创建 SslHandler

    if (this.sslCtx != null) {
        pipeline.addLast(this.sslCtx.newHandler(ch.alloc()));
    }
    

    SslHandler 是 Netty 中对 SSL/TLS 协议的实现——它负责所有的加解密工作。作为 Pipeline 的第一层(如果启用了 SSL),所有进入的字节流都会先经过它解密,所有出去的字节流都会先经过它加密。

    想象一下:客户端发来的加密数据,像水流一样流过 Pipeline——第一层 SslHandler 把它解密成明文,后续的 HTTP 编解码器、WebSocket 协议处理器才能理解里面的内容。反过来,服务端要发送的数据,也要先经过 SslHandler 加密,才能在网络上传输。

    四、Pipeline 七层处理链:数据的奇幻漂流

    第三篇博客中提到了 7 层 Pipeline,现在来看看它们是怎么被添加到 Pipeline 中的。

    4.1 第①层:SslHandler(可选)

    if (this.sslCtx != null) {
        pipeline.addLast(this.sslCtx.newHandler(ch.alloc()));
    }
    

    职责:WSS 模式下的 TLS 加解密。

    位置:Pipeline 的最前端(如果存在)。

    工作原理:所有入站数据先被解密成明文,所有出站数据先被加密成密文。

    4.2 第②层:HttpServerCodec

    pipeline.addLast(new HttpServerCodec());
    

    职责:HTTP 协议的编解码器。

    为什么需要它:WebSocket 协议的握手阶段使用的是 HTTP 协议。客户端发起 WebSocket 连接时,实际上发送的是一个特殊的 HTTP 请求:

    GET /phoenix/websocket/relay/monitoring?endpoint=client&instanceId=xxx HTTP/1.1
    Host: localhost:16001
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
    Sec-WebSocket-Version: 13
    

    HttpServerCodec 的作用就是把这样的 HTTP 请求从字节流解码成 HttpRequest 对象,同时也把服务端的 HttpResponse 编码成字节流。

    内部结构HttpServerCodec 其实是一个组合处理器,内部包含了:

    • HttpRequestDecoder:解码 HTTP 请求
    • HttpResponseEncoder:编码 HTTP 响应

    4.3 第③层:HttpObjectAggregator

    pipeline.addLast(new HttpObjectAggregator(WebSocketConfigConstants.MAX_HTTP_CONTENT_LENGTH));
    

    职责:HTTP 消息聚合器。

    为什么需要它:HTTP 消息可能被分成多个部分传输。比如一个大的 HTTP POST 请求,可能被拆分成:

    • HttpRequest(请求头)
    • 多个 HttpContent(数据块)

    HttpObjectAggregator 的作用就是把这些碎片聚合成一个完整的 FullHttpRequestFullHttpResponse,方便后续处理。

    参数含义MAX_HTTP_CONTENT_LENGTH 默认是10MB——允许聚合的最大消息体大小。如果超过这个限制,会抛出 TooLongFrameException。这是一个保护机制——防止恶意客户端发送超大数据导致内存溢出。

    4.4 第④层:WebSocketServerCompressionHandler

    pipeline.addLast(new WebSocketServerCompressionHandler());
    

    职责:WebSocket 帧级压缩(permessage-deflate)。

    压缩什么:WebSocket 数据帧(TextWebSocketFrameBinaryWebSocketFrame)。

    压缩算法:DEFLATE(和 Gzip 同宗同源)。

    为什么不是 Gzip:WebSocket 协议有自己的压缩扩展标准——permessage-deflate。它在 WebSocket 握手阶段协商是否启用压缩,然后对每个数据帧进行压缩。相比于 HTTP 层面的 Gzip,WebSocket 压缩更轻量、更实时。

    4.5 第⑤层:IdleStateHandler

    pipeline.addLast(new IdleStateHandler(300, 0, 0));
    

    职责:空闲检测。

    参数含义(readerIdleTime, writerIdleTime, allIdleTime) ——分别指定读空闲、写空闲、全空闲的超时时间(单位:秒)。这里只设置了 readerIdleTime=300,意味着5 分钟内没有收到任何入站数据就会触发 ReaderIdle 事件。

    触发时机:当连接超过 5 分钟没有任何入站数据(包括 Ping/Pong 控制帧、业务消息),IdleStateHandler会向Pipeline下游抛出一个IdleStateEvent事件。

    下游处理:这个事件会被 WebSocketSimpleChannelInboundHandler.userEventTriggered() 捕获,然后发送 Close 帧关闭连接。

    4.6 第⑥层:WebSocketServerProtocolHandler

    pipeline.addLast(new WebSocketServerProtocolHandler(
        this.webSocketServer.getPath(),  // WebSocket 路径,如 "/phoenix"
        null,                             // 子协议(可选)
        true,                             // 允许 WebSocket 扩展(如 permessage-deflate)
        WebSocketConfigConstants.MAX_HTTP_CONTENT_LENGTH, // 最大帧大小
        false,                            // 不允许掩码不匹配
        true,                             // 路径前缀匹配
        10000L                            // 握手超时时间(毫秒)
    ));
    

    职责:这是整个 Pipeline 中最核心的处理器之一,负责两件事:

    1. WebSocket 握手升级:把 HTTP 连接升级为 WebSocket 连接
    2. 控制帧处理:自动处理 Ping、Pong、Close 等控制帧

    握手升级流程

    当客户端发起 WebSocket 握手请求时,WebSocketServerProtocolHandler 会:

    1. 验证请求头中的 Upgrade: websocketConnection: Upgrade
    2. 计算 Sec-WebSocket-Accept(基于客户端的 Sec-WebSocket-Key
    3. 返回 101 Switching Protocols 响应
    4. 完成后,HTTP 协议正式升级为 WebSocket 协议

    控制帧处理

    WebSocket 协议定义了三种控制帧:

    • Ping:心跳探测,对方收到后要回复 Pong
    • Pong:对 Ping 的响应
    • Close:关闭连接

    WebSocketServerProtocolHandler 会自动响应 Ping 帧(回复 Pong),也会处理 Close 帧(发送 Close 响应并关闭连接)。这样开发者就不需要手动处理这些底层细节了。

    4.7 第⑦层:WebSocketSimpleChannelInboundHandler

    pipeline.addLast(new WebSocketSimpleChannelInboundHandler());
    

    职责:Phoenix 业务的入口。

    处理的帧类型

    由于上游的 WebSocketServerProtocolHandler 已经消费了 Ping 帧(自动回复 Pong)和 Close 帧,实际到达 channelRead0 的帧类型主要是:

    • TextWebSocketFrame:文本消息(业务数据)
    • BinaryWebSocketFrame:二进制消息
    • PongWebSocketFrame:Pong 帧(心跳响应)

    核心方法

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        Invoker invoker = InvokerHolder.getInvoker(IWebSocketFrameHandler.class, "channelRead0");
        if (invoker == null) {
            return;
        }
        invoker.invoke(this, ctx, frame);
    }
    

    这里出现了一个关键角色:InvokerHolder。它是一个桥接器,把 Netty 世界和 Spring 世界连接起来。

    为什么需要这座桥?因为 WebSocketSimpleChannelInboundHandler 是在每次新连接时 new出来的——它不是 Spring Bean,无法直接注入 Service。InvokerHolder通过反射机制,找到 Spring 容器中实现了IWebSocketFrameHandler 接口的 Bean(实际是 WebSocketFrameHandler),然后把调用转发给它。

    这就好比:Netty Handler 是一个前台接待员,Spring Bean 是后台业务专家。客户(数据)来了,前台接待员自己不处理,而是打电话叫后台专家来处理。InvokerHolder 就是这个电话系统。

    五、连接健康巡检:每 60 秒的大扫除

    start() 方法的最后,提交了一个定时任务:

    this.workerGroup.scheduleWithFixedDelay(this::cleanupAndRefreshCluster, 60, 60, TimeUnit.SECONDS);
    

    这个任务每 60 秒执行一次,做两件大事:清理失效连接刷新集群存储

    5.1 清理失效连接

    // 移除掉已经失去连接的客户端
    this.clientInfoMap.entrySet().removeIf(e -> !e.getValue().getChannelHandlerContext().channel().isActive());
    this.clientConnectionInfoMap.entrySet().removeIf(e -> !e.getValue().getChannelHandlerContext().channel().isActive());
    

    这两行代码使用了 Java 8 的 removeIf() 方法——一种函数式的集合过滤方式。它的意思是:遍历 Map 中的所有条目,如果某个客户端的 Channel 已经不活跃了(!channel.isActive()),就从 Map 中移除它

    什么是“不活跃的 Channel”?有几种情况:

    1. 客户端主动关闭:客户端调用了 session.close(),TCP 连接正常关闭
    2. 网络异常断开:网线被拔了、WiFi 断了,但服务端还没收到 FIN 包
    3. 空闲超时关闭:超过 5 分钟无通信,被 IdleStateHandler 检测到并关闭
    4. 重复连接关闭:同一个客户端建立了新连接,旧连接被关闭

    这些情况下,Channel 的 isActive() 方法会返回 false。定时清理任务会把它们从内存中清除,避免 Map 无限膨胀。

    5.2 刷新集群存储

    // 更新集群key信息
    if (this.clusterStore != null && this.clientConnectHost != null) {
        for (Map.Entry<String, WebSocketClientInfo> entry : this.clientInfoMap.entrySet()) {
            // 因为有个过期时间,所有定时添加能覆盖之前的,也就刷新了过期时间
            this.clusterStore.addClient(entry.getKey(), new WebSocketClientClusterInfo(entry.getValue(), this.clientConnectHost), 60 * 60, TimeUnit.SECONDS);
        }
    }
    

    这段代码只在集群部署模式下执行。什么是集群模式?当有多个 phoenix-server实例同时运行时,同一个客户端可能连接到不同的 server 节点。这时候就需要知道“客户端 A 连接在哪个 server 上”。

    IWebSocketClusterStore 就是用来存储这种映射关系的接口。它的实现可以是:

    • Caffeine 缓存:单机内存存储,适用于单节点或共享内存的集群
    • Redis 缓存:分布式存储,适用于跨机房部署

    addClient() 方法的第三个参数是过期时间(60 分钟)。这意味着集群存储中的每条记录都有保质期——如果客户端 60 分钟内没有刷新自己的信息,这条记录就会自动过期被删除。

    定时任务每 60 秒刷新一次——相当于告诉集群存储:“我还活着,这些客户端还在我这里”。这种设计避免了“客户端已经断开,但集群存储中还认为它在线”的问题。

    六、优雅关闭:好聚好散

    和启动流程对应的是关闭流程——stop() 方法。

    public void stop() {
        try {
            if (this.channel != null) {
                this.channel.close().sync();
            }
            this.bossGroup.shutdownGracefully().sync();
            this.workerGroup.shutdownGracefully().sync();
            log.info("WebSocket服务端优雅关闭:{}:{}", StringUtils.defaultIfBlank(this.host, "0.0.0.0"), this.port);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn("优雅关闭WebSocket服务端时被中断!", e);
        }
    }
    

    关闭顺序很有讲究:

    Step 1:关闭服务端 Channel

    this.channel.close().sync();
    

    这一步会关闭服务端的 ServerSocketChannel——不再接受新的 TCP 连接。但已经建立的连接仍然正常工作。

    Step 2:关闭 bossGroup

    this.bossGroup.shutdownGracefully().sync();
    

    shutdownGracefully() 是 Netty 提供的优雅关闭方法。它会:

    1. 停止接受新任务
    2. 等待已提交的任务执行完成
    3. 释放所有资源(线程、Channel 等)

    Step 3:关闭 workerGroup

    this.workerGroup.shutdownGracefully().sync();
    

    同样的优雅关闭流程。由于 workerGroup 管理着所有已建立连接的 I/O 操作,关闭它会断开所有客户端连接。

    优雅在哪里?对比一下粗暴的 System.exit(0)

    • 优雅关闭:给正在处理中的请求一个完成的机会,比如正在写入一半的数据可以写完
    • 粗暴退出:立即杀死进程,所有正在进行的操作都会被中断,可能导致数据不一致

    InterruptedException 的处理也值得注意:

    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        log.warn("优雅关闭WebSocket服务端时被中断!", e);
    }
    

    捕获 InterruptedException 后,第一件事是恢复中断状态——Thread.currentThread().interrupt()。这是一个重要的编程习惯:当你捕获到中断异常但不打算立即终止时,应该重新设置中断标志,让上层调用者知道“我被中断了”。

    七、设计复盘:几个值得学习的细节

    “线程池复用”的智慧

    注意这个细节:定时清理任务用的是 workerGroup.scheduleWithFixedDelay()——复用了 I/O 线程池来执行定时任务。

    为什么不单独创建一个 ScheduledExecutorService?答案是资源复用。Netty 的 EventLoopGroup 内部已经维护好了线程调度机制,用它来执行定时任务,不需要额外创建线程,节省系统资源。

    这种“一物多用”的设计思想在 Netty 中随处可见——比如 ChannelPipeline 既处理 I/O 事件,也处理定时任务;EventLoop 既是 I/O 线程,也是定时任务调度器。

    “最小权限原则”在 SSL 中的应用

    buildSslContext() 方法中,如果配置了 keyAlias,会创建一个只包含该证书的“视图 KeyStore”:

    KeyStore singleEntryKs = KeyStore.getInstance(keyStoreType);
    singleEntryKs.load(null, null);
    singleEntryKs.setEntry(keyAlias, entry, new KeyStore.PasswordProtection(passwordChars));
    kmf.init(singleEntryKs, passwordChars);
    

    这不是多此一举,而是安全最佳实践。想象一下:如果你的 KeyStore 中有 10 个证书,但只有 1 个用于 WebSocket。如果不创建视图,理论上所有 10 个证书都能被用于 SSL 握手——这就扩大了攻击面。

    创建视图 KeyStore,就像给 KeyManager 戴了一副“聚焦眼镜”——让它只能看到需要的那个证书。即使攻击者 somehow 控制了 KeyManager,也只能拿到这一个证书,其他证书安然无恙。

    “函数式过滤”的现代 Java 风格

    清理失效连接时,使用了 Java 8 的 removeIf() 方法:

    this.clientInfoMap.entrySet().removeIf(e -> !e.getValue().getChannelHandlerContext().channel().isActive());
    

    对比一下传统的迭代器写法:

    for (Iterator<Map.Entry<String, WebSocketClientInfo>> it = clientInfoMap.entrySet().iterator(); it.hasNext(); ) {
        Map.Entry<String, WebSocketClientInfo> entry = it.next();
        if (!entry.getValue().getChannelHandlerContext().channel().isActive()) {
            it.remove();
        }
    }
    

    高下立判。removeIf() 不仅代码简洁,而且内部做了优化(批量操作、减少锁竞争)。这是现代 Java 应该推崇的写法。

    “桥接模式”解耦 Netty 与 Spring

    WebSocketSimpleChannelInboundHandler通过InvokerHolder 把调用转发给 Spring 容器中的IWebSocketFrameHandler实现类。

    这是一种桥接模式——把两个独立的框架(Netty 和 Spring)连接起来,而不需要它们互相依赖。Netty Handler 不知道 Spring 的存在,Spring Bean 也不知道 Netty 的存在,InvokerHolder 在中间穿针引线。

    这种设计的好处是:关注点分离。Netty 只管网络通信,Spring 只管业务逻辑,两者通过一个薄薄的桥接层协作。

    八、小结

    本篇深入拆解了 Phoenix WebSocket 服务端的启动与初始化流程。回顾核心要点:

    • 生命周期管理:通过 Spring Boot 的 initMethod/destroyMethod 机制,自动调用 start()/stop() 方法
    • 启动流程:计时 → SSL 上下文构建(可选)→ ServerBootstrap 装备 → 绑定端口 → 定时任务 → 启动完成
    • Boss/Worker 线程池:bossGroup(1 线程)负责接受连接,workerGroup(CPU 核心数×2)负责 I/O 操作
    • SSL 上下文构建:从 classpath 加载证书 → KeyStore → KeyManagerFactory → SslContextBuilder,支持按别名提取单个证书(最小权限原则)
    • Pipeline 七层链:SslHandler → HttpServerCodec → HttpObjectAggregator → WebSocketServerCompressionHandler → IdleStateHandler → WebSocketServerProtocolHandler → WebSocketSimpleChannelInboundHandler
    • 连接健康巡检:每 60 秒清理失效连接 + 刷新集群存储,使用 removeIf() 函数式过滤
    • 优雅关闭:关闭 Channel → shutdownGracefully() bossGroup → shutdownGracefully() workerGroup,捕获 InterruptedException 并恢复中断状态

    如果说第三篇博客是“宏观架构”,那本篇就是“微观实现”。从 Spring 的生命周期回调,到 Netty 的 ServerBootstrap 引导;从 SSL 证书的加载细节,到 Pipeline 每一层的设计考量;从定时清理任务的函数式写法,到优雅关闭的中断处理——我们走了一遍完整的沉浸式解读。

    下一篇,我们将把镜头转向客户端,深入 WebsocketClient 的连接与重连机制——CAS 并发保护、CountDownLatch 同步、指数退避算法、重复连接识别,以及 DataExchanger 的 DCL 双重检查锁初始化。敬请期待!


    项目地址
    https://gitcode.com/monitoring-platform/phoenix
    https://gitee.com/monitoring-platform/phoenix
    https://github.com/709343767/phoenix

    end
  1. 作者: 锋哥 (联系作者)
  2. 发表时间: 2026-03-30 10:16
  3. 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  4. 转载声明:如果是转载博主转载的文章,请附上原文链接
  5. 公众号转载:请在文末添加作者公众号二维码(公众号二维码见右边,欢迎关注)
  6. 评论

    站长头像 知录

    你一句春不晚,我就到了真江南!

    文章0
    浏览0

    文章分类

    标签云