目录

    Phoenix监控平台技术解析(五):WebSocket 客户端连接与重连机制


    Phoenix监控平台技术解析(五):WebSocket 客户端连接与重连机制

    上一篇我们深入拆解了 WebSocket 服务端的 start() 引导过程——从 SSL 证书加载到 Pipeline 七层链编排,再到连接健康巡检和优雅关闭。本篇将镜头转向客户端,走进 WebsocketClientDataExchanger 的内部世界:CAS 无锁并发保护、CountDownLatch 连接同步、指数退避重连策略、重复连接识别、DCL 双重检查锁初始化以及 SPI 消息处理器注册——一场关于"如何在不确定的网络中建立可靠连接"的技术之旅。


    一、回忆:为什么客户端不用 Netty?

    第三篇博客中我们提到过一个设计决策——服务端用 Netty,客户端用 Tyrus(JSR-356)。这不是随性之举,而是一个深思熟虑的权衡。

    服务端需要同时管理成百上千个客户端的长连接,对并发能力和内存效率有极高的要求——Netty 的 Reactor 模型和零拷贝正是为此而生。但客户端是一个 SDK,要嵌入到用户的 Java 应用中运行。如果客户端也引入 Netty,会带来两个问题:

    1. 依赖冲突:用户的项目可能已经使用了不同版本的 Netty。两个 Netty 版本共存,轻则方法签名不兼容(NoSuchMethodError),重则类加载器冲突导致应用无法启动。
    2. 依赖膨胀:Netty 的完整依赖包括 netty-codec-httpnetty-handlernetty-transport 等一系列模块,对于一个只需要维护一条 WebSocket 连接的客户端来说,这些都是不必要的重量。

    Phoenix 选择了 Tyrus——JSR-356(Java WebSocket API)的参考实现。JSR-356 是 Java EE 标准的一部分,API 极其简洁:用四个注解(@ClientEndpoint@OnOpen@OnMessage@OnClose@OnError)就能完成一个完整的 WebSocket 客户端。依赖轻量,和用户项目的技术栈几乎不会冲突。

    这个选择体现了一个重要的工程哲学:SDK 的依赖越少越好。你不能假设用户的运行环境——他们可能用 Spring Boot 3.x,可能用传统的 Tomcat WAR 部署,甚至可能是一个没有任何框架的纯 Java 程序。用标准 API 而非特定框架,是对用户最大的尊重。

    二、WebsocketClient 全景图

    先从宏观视角看看 WebsocketClient 这个类的"装备清单":

    @Slf4j
    @ClientEndpoint
    public class WebsocketClient {
        // ─── 静态共享资源 ───
        private static final ScheduledExecutorService GLOBAL_SCHEDULER = ...;  // 全局重连调度器
        private static final ClientManager SHARED_CLIENT = ClientManager.createClient();  // 全局 Tyrus 引擎
        
        // ─── 构造时确定的不可变配置 ───
        private final String serverUri;              // 服务端地址
        private final int connectTimeoutSeconds;     // 连接超时(默认5秒)
        private final int reconnectDelaySeconds;     // 重连初始间隔(默认5秒)
        private final boolean autoReconnect;         // 是否自动重连(配置开关)
        
        // ─── 运行时可变状态 ───
        private volatile Session session;            // 当前会话
        private volatile boolean connected = false;  // 连接状态
        private volatile CountDownLatch connectLatch;  // 连接同步门闩
        private volatile Consumer<String> messageHandler;  // 消息处理器
        
        // ─── 并发控制 ───
        private final AtomicBoolean connectionPending = new AtomicBoolean(false);  // CAS 连接保护
        private volatile boolean enableReconnect = true;  // 运行时重连开关
    }
    

    十几个字段,从上到下可以分为四组:静态共享资源不可变配置运行时状态并发控制。这种分组本身就是一种设计信号——静态的共享、final 的不变、volatile 的可变、AtomicBoolean 的并发保护——每个字段的修饰符都在告诉你它的生命周期和线程安全语义。

    接下来,我们逐个拆解这些字段背后的设计考量。

    三、静态共享资源:一次创建,全局复用

    3.1 全局重连调度器:GLOBAL_SCHEDULER

    private static final ScheduledExecutorService GLOBAL_SCHEDULER = 
        ThreadPoolAcquirer.getWebsocketClientReconnectScheduledThreadPoolExecutor();
    

    重连任务需要延迟执行——"5 秒后重连""30 秒后重连""5 分钟后重连"。这就需要一个 ScheduledExecutorService

    Phoenix 没有让每个 WebsocketClient 实例各自创建调度器,而是通过 ThreadPoolAcquirer 获取一个全局共享的调度线程池。ThreadPoolAcquirer 内部使用 DCL(双重检查锁)保证线程池只初始化一次:

    public static MonitoredScheduledThreadPoolExecutor getWebsocketClientReconnectScheduledThreadPoolExecutor() {
        if (websocketClientReconnectScheduledThreadPoolExecutor == null) {
            synchronized (ThreadPoolAcquirer.class) {
                if (websocketClientReconnectScheduledThreadPoolExecutor == null) {
                    websocketClientReconnectScheduledThreadPoolExecutor = new MonitoredScheduledThreadPoolExecutor(
                        (int) (ProcessorsUtils.getAvailableProcessors() / (1 - 0.8)),
                        new BasicThreadFactory.Builder()
                            .namingPattern("phoenix-websocket-client-scheduled-pool-thread-%d")
                            .daemon(true)
                            .build(),
                        new ThreadPoolExecutor.AbortPolicy(),
                        "phoenix-websocket-client-scheduled-pool", true);
                }
            }
        }
        return websocketClientReconnectScheduledThreadPoolExecutor;
    }
    

    几个值得注意的细节:

    • 线程数计算Ncpu / (1 - 0.8)——这是 I/O 密集型任务的经典公式。阻塞系数 0.8 表示线程大部分时间都在等待网络 I/O,所以需要更多线程来保持 CPU 利用率。
    • 守护线程.daemon(true) 确保这些线程不会阻止 JVM 退出。当用户的应用关闭时,重连线程会自动随主线程结束。
    • MonitoredScheduledThreadPoolExecutor:这是 Phoenix 自研的"可监控线程池"——线程池本身也被纳入了监控体系,可以在 UI 上看到它的核心线程数、活跃线程、队列深度等指标。"监控系统自身也被监控",有点套娃的味道。

    3.2 全局 Tyrus 客户端:SHARED_CLIENT

    private static final ClientManager SHARED_CLIENT = ClientManager.createClient();
    

    ClientManager 是 Tyrus 的客户端引擎,内部管理着连接所需的底层资源(线程池、ByteBuffer 池等)。如果每次 connect() 都创建一个新的 ClientManager,这些底层资源会被反复创建和销毁——既浪费内存,又增加 GC 压力。

    用一个 static finalSHARED_CLIENT,所有 WebsocketClient 实例共享同一个 Tyrus 引擎,底层的线程和缓冲区都能被复用。这和我们在第二篇博客中看到的 HTTP 连接池是同样的理念——连接资源是昂贵的,能复用就复用

    四、构造器:Fail-Fast 的参数校验

    WebsocketClient 提供了两个构造器——一个默认参数的简化版和一个全参数的完整版:

    // 简化版:默认5秒超时、5秒重连间隔、开启自动重连
    public WebsocketClient(String serverUri) {
        this(serverUri, 5, 5, true);
    }
    
    // 全参数版
    public WebsocketClient(String serverUri, int connectTimeoutSeconds, 
                            int reconnectDelaySeconds, boolean autoReconnect) {
        // 1. 地址严格校验
        Objects.requireNonNull(serverUri, "serverUri不能为空!");
        String trimmed = serverUri.trim();
        try {
            URI uri = new URI(trimmed);
            String scheme = uri.getScheme();
            if (!"ws".equals(scheme) && !"wss".equals(scheme)) {
                throw new IllegalArgumentException("WebSocket协议必须为ws或wss!");
            }
            if (uri.getHost() == null || uri.getHost().isEmpty()) {
                throw new IllegalArgumentException("WebSocket地址缺少Host!");
            }
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("WebSocket地址格式错误!", e);
        }
        this.serverUri = trimmed;
        // 2. 参数校验并赋值
        this.connectTimeoutSeconds = connectTimeoutSeconds > 0 ? connectTimeoutSeconds : 5;
        this.reconnectDelaySeconds = reconnectDelaySeconds > 0 ? reconnectDelaySeconds : 5;
        this.autoReconnect = autoReconnect;
    }
    

    构造器做了严格的 Fail-Fast 校验

    1. 非空检查Objects.requireNonNull() 是 JDK 提供的标准方式,如果传入 null 立即抛出 NullPointerException,不留隐患。
    2. 协议校验:只接受 ws://wss:// 两种 scheme。如果传了 http://,在构造阶段就报错,而不是在连接阶段才发现协议不对。
    3. Host 校验:没有 Host 的 URI(如 ws:///path)在构造时就被拦下。
    4. 参数兜底:超时和重连间隔如果传了负数或零,默认回退到 5 秒——防御性编程,不信任调用方。

    这种"在最早的时机暴露错误"的理念,是所有健壮代码的共同特征。比起在 connect() 时才发现 URI 格式不对,在构造时就报错的调用栈更短、更容易定位问题。

    五、connect():一场精心编排的连接仪式

    connect() 方法是 WebsocketClient 的核心。看起来只有 40 多行代码,但每一行都有它存在的理由。

    5.1 快速返回:已连接就别折腾了

    public void connect() throws IOException, DeploymentException, InterruptedException {
        // 已连接则快速返回
        if (this.isConnected()) {
            if (log.isDebugEnabled()) {
                log.debug("已连接到WebSocket服务端[URI:{}],跳过重复连接!", this.serverUri);
            }
            return;
        }
    

    第一步是短路检查——如果已经连上了,直接返回。isConnected() 方法做了三重判断:

    public boolean isConnected() {
        Session sess = this.session;  // 快照引用,避免并发问题
        return this.connected && sess != null && sess.isOpen();
    }
    

    注意这里先把 this.session 读到一个局部变量 sess 中,再做后续判断。为什么不直接用 this.session?因为 sessionvolatile 的,在多线程环境下可能在两次读取之间被另一个线程置为 null——先读成非 null 通过了判空,再读时已经是 null 了,session.isOpen() 就会抛 NullPointerException。读到局部变量后,后续操作都基于同一个快照值,就不会有这个问题。

    这是一种经典的 volatile 快照读 模式,在并发编程中非常实用。

    5.2 CAS 并发保护:只允许一个线程连接

        // 防止并发连接:仅允许一个线程进入连接流程
        if (!this.connectionPending.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("已有连接请求正在处理中,跳过重复连接[URI:{}]!", this.serverUri);
            }
            return;
        }
    

    想象一下:客户端启动时,心跳线程、JVM 采集线程、服务器采集线程几乎同时启动。如果它们都发现"还没连上",都去调用 connect()——三个线程同时发起三个 WebSocket 连接请求,最终服务端收到三个来自同一客户端的连接,这就乱套了。

    AtomicBoolean.compareAndSet(false, true) 是一个原子操作——它做了两件事:检查当前值是否为 false,如果是则设为 true 并返回 true;如果当前值已经是 true(说明有人已经在连接了),直接返回 false。整个过程无锁、无阻塞

    为什么不用 synchronized?因为 synchronized 会让后来的线程排队等待——等前一个连接完成后,它们还是会依次发起连接请求。而 CAS 的语义是"发现有人在连了,我就不连了"——直接放弃,不浪费时间排队。对于"防止重复连接"这个场景,CAS 比 synchronized 更合适。

    5.3 CountDownLatch:等待 onOpen 的"结业通知"

        Session newSession = null;
        try {
            // 重置门闩:确保每次 connect 使用新的同步点
            this.connectLatch = new CountDownLatch(1);
            // 发起连接(Tyrus 内部处理)
            newSession = SHARED_CLIENT.connectToServer(this, URI.create(this.serverUri));
            // 等待 onOpen 确认(或超时)
            boolean success = this.connectLatch.await(this.connectTimeoutSeconds, TimeUnit.SECONDS);
            if (!success) {
                this.safeCloseSession(newSession);
                throw new IOException(String.format(
                    "连接WebSocket服务端[URI:%s]超时(%d秒)!", this.serverUri, this.connectTimeoutSeconds));
            }
        }
    

    这里有一个容易被忽略的问题:connectToServer() 返回了,不代表 WebSocket 连接真的建立成功了。

    connectToServer() 是 Tyrus 提供的连接方法。它在底层会完成 TCP 三次握手和 WebSocket 协议升级(HTTP 101 Switching Protocols),然后返回一个 Session 对象。但在 Tyrus 的实现中,Session 对象返回时,@OnOpen 回调可能还没有执行——毕竟回调是在 Tyrus 的 I/O 线程中异步调用的。

    如果 connect() 方法在 connectToServer() 返回后就立即声明"连接成功",后续的 sendMessage() 可能在 @OnOpen 还没执行之前就尝试发送数据——此时 session 字段还是旧值,消息会发到一个过期的会话上。

    CountDownLatch 解决了这个问题:

    1. connect() 调用 connectToServer() 后,立即调用 connectLatch.await()阻塞等待
    2. 当 Tyrus 在 I/O 线程中调用 @OnOpen 回调时,回调方法执行 connectLatch.countDown()唤醒 connect()
    3. connect() 被唤醒后,可以确认 sessionconnected 已经在 @OnOpen 中被正确设置了。

    如果等了 5 秒(connectTimeoutSeconds@OnOpen 还没来,await() 返回 false——超时了。这时候要做两件事:关闭已经创建但没完成握手的 Session(防止资源泄漏),然后抛出异常。

    为什么每次 connect() 都要创建一个新的 CountDownLatch?因为 CountDownLatch 是一次性的——计数到 0 之后就不能重置了。如果复用上一次的 Latch,它的计数已经是 0,await() 会立即返回而不等待。

    5.4 异常处理与资源清理

        } catch (IOException | DeploymentException | InterruptedException e) {
            this.safeCloseSession(newSession);
            throw e;
        } catch (Exception e) {
            this.safeCloseSession(newSession);
            throw new IOException(String.format(
                "连接WebSocket服务端[URI:%s]失败,原因:%s", this.serverUri, e.getMessage()), e);
        } finally {
            // 释放连接占位,允许下次连接尝试
            this.connectionPending.set(false);
        }
    

    异常处理有两层:

    1. 已知异常IOExceptionDeploymentExceptionInterruptedException):关闭可能已创建的 Session,然后原封不动地抛出去——让调用方根据异常类型做不同的处理。
    2. 未知异常:同样关闭 Session,但包装成 IOException 再抛出——统一异常接口,简化调用方的 catch 逻辑。

    finally 块中的 connectionPending.set(false) 至关重要——无论连接成功还是失败,都要释放 CAS 占位。如果忘了这一步,connectionPending 永远停在 true,后续所有连接请求都会被直接跳过,客户端就"死"了。

    safeCloseSession() 方法的实现也很防御性:

    private void safeCloseSession(Session session) {
        if (session != null && session.isOpen()) {
            try {
                session.close();
            } catch (IOException e) {
                log.error("关闭WebSocket连接[URI:{}]时发生异常:{}", this.serverUri, e.getMessage());
            }
        }
    }
    

    先判空、再判状态、最后 try-catch——不会因为 Session 已关闭就抛异常,也不会因为关闭过程出错就影响主流程。这种"安全关闭"的写法在网络编程中非常常见。

    六、JSR-356 生命周期回调:四个注解,四段人生

    WebsocketClient@ClientEndpoint 标注,Tyrus 会通过反射发现它的四个生命周期方法,并在对应时机自动调用。

    6.1 @OnOpen:连接成功的"入学典礼"

    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        this.connected = true;
        // 释放等待中的 connect() 调用
        CountDownLatch latch = this.connectLatch;
        if (latch != null) {
            latch.countDown();
        }
        log.info("成功建立WebSocket连接[URI:{}]!", this.serverUri);
    }
    

    三件事:保存 Session、标记已连接、释放 CountDownLatch。

    注意 connectLatch 的读取也用了局部快照——先读到 latch 变量,再判空和操作。虽然 connectLatchvolatile 的,但在极端并发下,close() 方法也可能访问它。用局部变量可以避免读到一个"半更新"的状态。

    6.2 @OnMessage:收到消息的"传话"

    @OnMessage
    public void onMessage(String message) {
        if (log.isDebugEnabled()) {
            log.debug("从WebSocket服务端[URI:{}]收到消息:{}", this.serverUri, message);
        }
        Consumer<String> handler = this.messageHandler;
        if (handler != null) {
            try {
                handler.accept(message);
            } catch (Exception e) {
                // 隔离异常,防止连接中断
                log.error("处理来自WebSocket服务端[URI:{}]的消息时发生异常:{}", this.serverUri, e.getMessage());
            }
        }
    }
    

    客户端不仅发数据,也要接收服务端下发的指令——比如线程池调参命令。messageHandler 是一个 Consumer<String>,通过 setMessageHandler() 在外部设置。

    这里有一个关键的设计决策:异常隔离。消息处理器的异常被 try-catch 包裹,不会向上层抛出。为什么?因为 @OnMessage 方法是在 Tyrus 的 I/O 线程中执行的——如果抛出未捕获异常,Tyrus 可能会认为这个连接出了问题,进而触发 @OnError@OnClose,导致整个连接断开。一个消息处理器的 bug 不应该影响连接的存活——所以异常必须在这里被"吃掉"。

    6.3 @OnClose:连接断开的"毕业仪式"

    @OnClose
    public void onClose(CloseReason reason) {
        this.connected = false;
        this.session = null;
        String closeReasonStr = (reason != null) ? reason.getReasonPhrase() : "Unknown (no close handshake)";
        String msg = StringUtils.isBlank(closeReasonStr) 
            ? "WebSocket连接已关闭[URI:" + this.serverUri + "]!" 
            : "WebSocket连接已关闭[URI:" + this.serverUri + "],原因:" + closeReasonStr;
        log.warn(msg);
        // 重复连接,直接结束,不再重连
        if (reason != null && reason.getCloseCode().getCode() == WebSocketCloseReasonEnums.DUPLICATE_CONNECTION.getCode()) {
            log.warn("WebSocket服务端检测到已有活跃会话,拒绝重复连接!");
            return;
        }
        // 满足条件则调度重连任务
        if (this.enableReconnect && this.autoReconnect) {
            log.info("WebSocket连接关闭,将在 {} 秒后尝试重新连接WebSocket服务端[URI:{}]...", 
                this.reconnectDelaySeconds, this.serverUri);
            GLOBAL_SCHEDULER.schedule(() -> this.attemptReconnect(), this.reconnectDelaySeconds, TimeUnit.SECONDS);
        }
    }
    

    @OnClose 是整个重连机制的触发入口。它做了三件事:

    第一,清理状态:把 connected 设为 falsesession 设为 null

    第二,重复连接识别:如果关闭原因的状态码是 4000DUPLICATE_CONNECTION),说明服务端发现了这个客户端已经有另一个活跃连接——当前连接是"多余的"。这种情况下不应该重连,否则会陷入"连上→被踢→重连→又连上→又被踢"的死循环。

    回忆一下第三篇博客中服务端 MonitoringFrameHandler 的重复连接处理逻辑:当同一个 instanceId 的客户端建立了新连接,服务端会向旧连接发送 CloseWebSocketFrame(4000, "Duplicate connection")。客户端在 @OnClose 中检测到这个特殊状态码,就知道"我是那个旧连接,我应该安静地退出,不要再折腾了"。

    第三,调度重连:如果不是重复连接,并且重连开关是开着的(enableReconnect && autoReconnect),就用 GLOBAL_SCHEDULER 调度一个延迟任务——reconnectDelaySeconds 秒后执行 attemptReconnect()

    为什么不立即重连?因为服务端可能正在重启、网络可能正在波动。立即重连大概率还是失败,白白浪费资源。等几秒钟再试,给服务端和网络一个恢复的窗口。

    6.4 @OnError:异常的"急诊室"

    @OnError
    public void onError(Throwable throwable) {
        this.connected = false;
        this.session = null;
        log.error("WebSocket连接异常[URI:{}],原因:{}", this.serverUri, throwable.getMessage());
        // 不在此处调度重连,onClose 会随后被调用并统一处理重连
    }
    

    注意最后一行注释:不在此处调度重连。为什么?

    在 JSR-356 规范中,@OnError 被调用后,@OnClose 通常会紧接着被调用(因为底层 TCP 连接大概率已经断了)。如果在 @OnError@OnClose 中都调度重连,就会出现两个重连任务同时执行的情况——两个线程同时发起连接,又回到了并发问题。

    Phoenix 的策略是:@OnError 只负责清理状态和记录日志,重连统一由 @OnClose 处理。这样重连逻辑只有一个入口,简单可靠。

    七、指数退避重连:不要在暴风雨中反复敲门

    7.1 首次重连入口

    private void attemptReconnect() {
        this.attemptReconnect(1);
    }
    

    一个简单的转发——把重试次数初始化为 1,交给真正的重连方法。

    7.2 核心重连逻辑

    private void attemptReconnect(int attempt) {
        // 三重防护:运行时开关 + 配置开关 + 当前状态
        if (!this.enableReconnect || !this.autoReconnect || this.isConnected()) {
            return;
        }
        try {
            log.info("正在尝试重新连接WebSocket服务端[URI:{}],第{}次...", this.serverUri, attempt);
            // 重用主连接逻辑
            this.connect();
        } catch (Exception e) {
            log.error("重连WebSocket服务端[URI:{}] 第{}次 失败,原因:{}", this.serverUri, attempt, e.getMessage());
            // 必须再次检查,避免禁用后仍无限重试
            if (this.enableReconnect && this.autoReconnect) {
                // 退避,上限5分钟
                long delay = Math.min(300, this.reconnectDelaySeconds * (long) Math.pow(1.5, attempt - 1));
                GLOBAL_SCHEDULER.schedule(() -> this.attemptReconnect(attempt + 1), delay, TimeUnit.SECONDS);
            }
        }
    }
    

    这段代码有几个精妙之处:

    三重防护:进入方法的第一件事就是检查三个条件——运行时开关(enableReconnect)、配置开关(autoReconnect)、当前连接状态(isConnected())。只要任意一个不满足,就直接返回。这是一种"入口防御"——在做任何实际工作之前先确认前置条件。

    重用 connect():重连时直接调用 connect() 方法,不重复实现连接逻辑。connect() 内部已经有 CAS 并发保护和 CountDownLatch 同步机制——这些防护在重连时同样有效。

    指数退避算法:重连间隔的计算公式是:

    delay = min(300, reconnectDelaySeconds × 1.5^(attempt-1))
    

    以默认配置(reconnectDelaySeconds = 5)为例:

    重试次数 计算公式 延迟(秒) 实际延迟
    第1次 5 × 1.5^0 5 5秒
    第2次 5 × 1.5^1 7.5 7秒
    第3次 5 × 1.5^2 11.25 11秒
    第4次 5 × 1.5^3 16.87 16秒
    第5次 5 × 1.5^4 25.31 25秒
    ... ... ... ...
    第N次 5 × 1.5^(N-1) ≥300 300秒(上限)

    为什么要指数退避?想象一下服务端突然宕机,同时有 100 个客户端在重连。如果每个客户端都固定 5 秒重连一次,服务端重启后的第 5 秒会同时收到 100 个连接请求——这可能直接把刚重启的服务端再次压垮。指数退避让重连间隔越来越长,100 个客户端的重连请求会被分散到不同的时间点,避免"重连风暴"。

    为什么底数是 1.5 而不是 2?因为 2 的指数增长太快——第 10 次重试就要等 2560 秒(42 分钟),对于监控场景来说太久了。1.5 的增长更温和,到第 20 次左右才达到 5 分钟上限。

    为什么上限是 5 分钟?因为恰好和服务端的 IdleStateHandler 读空闲超时(300 秒)一致——如果 5 分钟都没连上,说明问题不是暂时的,但也不能放弃,所以以 5 分钟为周期持续重试。

    重连前的二次检查:注意在 catch 块中调度下一次重连之前,又检查了一次 enableReconnect && autoReconnect。为什么?因为在重连过程中,close() 方法可能被调用(比如应用正在关闭),enableReconnect 被设为 false。如果不检查就调度,会导致关闭后还在重连——违反了 close() 的语义。

    7.3 connectWithRetry():启动时的首次连接

    public void connectWithRetry() {
        try {
            this.connect();
        } catch (Exception e) {
            if (this.enableReconnect && this.autoReconnect) {
                log.info("WebSocket连接失败,将在 {} 秒后尝试重新连接WebSocket服务端[URI:{}]...", 
                    this.reconnectDelaySeconds, this.serverUri);
                GLOBAL_SCHEDULER.schedule(() -> this.attemptReconnect(), this.reconnectDelaySeconds, TimeUnit.SECONDS);
            }
        }
    }
    

    connectWithRetry() 是对 connect() 的一层薄封装——首次连接失败时,不直接抛异常,而是启动重连流程。这个方法在 DataExchanger.run() 中被调用,用于客户端启动时的首次连接。它的语义是"尽力连,连不上就后台慢慢重试"——不会阻塞调用方。

    八、sendMessage():发送消息的防御性编程

    public void sendMessage(String message) {
        Session sess = this.session;
        if (sess == null || !this.connected || !sess.isOpen()) {
            if (log.isDebugEnabled()) {
                log.debug("未连接到WebSocket服务端[URI:{}],消息已丢弃!", this.serverUri);
            }
            return;
        }
        try {
            sess.getBasicRemote().sendText(message);
            if (log.isDebugEnabled()) {
                log.debug("已向WebSocket服务端[URI:{}]发送消息:{}", this.serverUri, message);
            }
        } catch (IOException e) {
            log.error("向WebSocket服务端[URI:{}]发送消息失败:{}", this.serverUri, e.getMessage());
            this.connected = false;
            this.safeCloseSession(sess);
        }
    }
    

    又一次看到了 volatile 快照读——Session sess = this.session。在多线程环境下,this.session 可能在判空和 sendText() 之间被 @OnClose 置为 null。用局部变量锁住引用,就不会有这个问题。

    发送失败时的处理也很讲究:不仅标记 connected = false,还主动关闭 Session。为什么要主动关闭?因为 IOException 通常意味着底层 TCP 连接已经出了问题。主动关闭可以触发 @OnClose 回调,进而触发重连流程——比被动等待 TCP 超时(可能几分钟后才感知到断开)快得多。

    另外注意:sess.getBasicRemote().sendText(message) 使用的是 BasicRemote(同步发送),而不是 AsyncRemote(异步发送)。这意味着 sendText() 会阻塞直到消息完全写入底层缓冲区。对于监控数据来说,同步发送更安全——可以及时发现发送失败并处理。

    九、close():温柔而坚定的告别

    public void close() {
        // 永久禁用重连(优先级高于 autoReconnect)
        this.enableReconnect = false;
        // 防止 connect() 永久阻塞
        CountDownLatch latch = this.connectLatch;
        if (latch != null) {
            latch.countDown();
        }
        // 安全关闭会话
        log.info("正在主动关闭WebSocket连接[URI:{}]...", this.serverUri);
        this.safeCloseSession(this.session);
        this.connected = false;
        this.session = null;
    }
    

    close() 方法做了三件事:

    第一,关闭重连总闸this.enableReconnect = false。这是一个"不可逆操作"——一旦设为 false,后续所有重连检查都会被拦下,客户端再也不会自动重连。

    第二,释放 CountDownLatch:如果 connect() 正在 connectLatch.await() 中阻塞等待,close() 调用 latch.countDown() 将其唤醒。被唤醒后 connect() 会发现 Session 已经被关了,走正常的失败处理流程。如果不释放 Latch,connect() 可能永远阻塞在那里——线程泄漏。

    第三,安全关闭 SessionsafeCloseSession() 会发送 WebSocket Close 帧给服务端,然后关闭底层 TCP 连接。这是一个"正常关闭"流程,服务端会收到 Close 帧并清理对应的客户端信息。

    十、双层重连开关:配置与运行时的分工

    WebsocketClient 设计了两个重连开关,它们的职责不同:

    字段 类型 可变性 语义
    autoReconnect final boolean 构造时确定,不可变 "这个客户端是否支持自动重连"
    enableReconnect volatile boolean 运行时可变 "当前是否允许自动重连"

    为什么需要两个开关?

    autoReconnect 是一个配置级的开关——如果创建客户端时传了 autoReconnect = false,那这个客户端从始至终都不会自动重连。它是 final 的,不可修改。

    enableReconnect 是一个运行时的开关——默认是 true。调用 close()disableReconnect() 时会被设为 false。它的作用是"关闭正在运行的重连机制"——比如应用正在优雅停机,需要阻止重连。

    两者是 AND 关系:

    if (this.enableReconnect && this.autoReconnect) {
        // 才会真正执行重连
    }
    

    只有两个开关都为 true 时才重连。这种"双层保险"的设计保证了:

    • 不想要重连功能的客户端,从构造时就永远不会重连(autoReconnect = false
    • 需要运行时关闭重连的场景,可以随时关闭(enableReconnect = false),不需要重建客户端

    十一、DataExchanger:客户端的数据交换中枢

    DataExchanger 是客户端与服务端之间的"桥梁"——它封装了 WebsocketClient 的初始化、消息处理器注册和加密发送的完整流程。

    11.1 DCL 双重检查锁:只初始化一次

    public static void run() {
        // 双重检查锁定(DCL)风格,避免 synchronized 开销
        if (started) {
            if (log.isDebugEnabled()) {
                log.debug("数据交换器已启动,跳过重复初始化!");
            }
            return;
        }
        String serverUri = ConfigLoader.getMonitoringProperties().getComm().getWebsocket().getUrl();
        if (StringUtils.isBlank(serverUri)) {
            return;
        }
        synchronized (OBJECT_LOCK) {
            // 二次检查 + 设置 started
            if (started) {
                return;
            }
            // 1. 创建客户端
            String endpoint = ConfigLoader.getMonitoringProperties().getInstance().getEndpoint();
            String instanceId = InstanceGenerator.getInstanceId();
            String uri = serverUri + "/websocket/relay/" + WebSocketBusinessTypeConstants.MONITORING 
                + "?endpoint=" + endpoint + "&instanceId=" + instanceId;
            wsClient = new WebsocketClient(uri);
            // 标记为已启动
            started = true;
            // 捕获局部快照,防止 close() 并发将 wsClient 置 null 导致 NPE
            WebsocketClient clientRef = wsClient;
            ThreadPool.getCommonIoIntensiveThreadPoolExecutor().execute(() -> {
                try {
                    // 2. 注册消息处理器
                    MulticastWebsocketMessageHandler dispatcher = new MulticastWebsocketMessageHandler();
                    ServiceLoader<IWebsocketMessageHandler> loader = ServiceLoader.load(IWebsocketMessageHandler.class);
                    for (IWebsocketMessageHandler handler : loader) {
                        dispatcher.registerHandler(handler);
                    }
                    clientRef.setMessageHandler(dispatcher::onRawMessage);
                    log.info("已注册 {}个 WebSocket消息处理器!", dispatcher.getHandlerCount());
                    // 3. 阻塞直到连接成功或超时
                    clientRef.connectWithRetry();
                } catch (Exception e) {
                    log.error("运行数据交换器失败(将依赖WebsocketClient内部重连机制):{}", e.getMessage());
                }
            });
        }
    }
    

    DCL(Double-Checked Locking)的结构我们在 ThreadPoolAcquirer 中已经见过了,但 DataExchanger 的实现有几个独特之处:

    started 标志的设置时机:注意 started = true 是在创建完 WebsocketClient 之后、连接之前就设置了。这意味着即使连接失败,也不会再次进入初始化流程。为什么?因为 WebsocketClient 有自己的重连机制——创建成功后连接失败,交给它内部重试就好了,不需要重新走 DataExchanger.run() 的初始化流程。

    局部快照 clientRef:把 wsClient 赋值给局部变量 clientRef,然后在 Lambda 中使用 clientRef 而不是 wsClient。为什么?因为异步任务执行时,close() 方法可能已经把 wsClient 置为 null 了。局部变量是 Lambda 捕获的"快照",不受外部修改影响。

    异步初始化:连接和消息处理器注册都放在 ThreadPool.getCommonIoIntensiveThreadPoolExecutor().execute(...) 中异步执行。这意味着 DataExchanger.run() 调用后会立即返回,不会阻塞 Monitor 的启动流程。

    11.2 SPI 消息处理器注册:自动发现的魔法

    MulticastWebsocketMessageHandler dispatcher = new MulticastWebsocketMessageHandler();
    ServiceLoader<IWebsocketMessageHandler> loader = ServiceLoader.load(IWebsocketMessageHandler.class);
    for (IWebsocketMessageHandler handler : loader) {
        dispatcher.registerHandler(handler);
    }
    clientRef.setMessageHandler(dispatcher::onRawMessage);
    

    客户端不仅要发数据,也要接收服务端下发的指令。那么问题来了:客户端怎么知道要注册哪些消息处理器?

    Phoenix 使用了 Java 标准的 SPI(Service Provider Interface) 机制。在 META-INF/services/ 目录下有一个以接口全限定名命名的文件:

    # 文件路径:META-INF/services/com.gitee.pifeng.monitoring.plug.core.wsclient.inf.IWebsocketMessageHandler
    # 客户端模块中的实现
    com.gitee.pifeng.monitoring.plug.core.wsclient.JavaThreadPoolMessageHandler
    

    ServiceLoader.load() 会自动扫描 classpath 下所有 JAR 包的 META-INF/services/ 目录,找到 IWebsocketMessageHandler 接口的所有实现类,并实例化它们。开发者只需要:

    1. 实现 IWebsocketMessageHandler 接口
    2. META-INF/services/ 中注册

    不需要修改 DataExchanger 的任何代码,新的处理器就会被自动发现和注册。这就是 SPI 的魅力——约定优于配置

    目前 Phoenix 注册了两个消息处理器:

    • JavaThreadPoolMessageHandler(客户端模块):处理服务端下发的线程池调参指令
    • DockerMessageHandler(代理端模块):处理服务端下发的 Docker 操作指令

    JavaThreadPoolMessageHandler 为例:

    public class JavaThreadPoolMessageHandler implements IWebsocketMessageHandler {
        @Override
        public void handleMessage(WebSocketPackage responsePackage) {
            String className = responsePackage.getClassName();
            if (!StringUtils.equals(JavaThreadPoolPackage.class.getName(), className)) {
                return;
            }
            Object payload = responsePackage.getPayload();
            JavaThreadPoolPackage javaThreadPoolPackage = (JavaThreadPoolPackage) payload;
            JavaThreadPool threadPool = javaThreadPoolPackage.getJavaThreadPool();
            List<JavaThreadPool.ThreadPoolInfoDomain> threadPoolInfoDomains = threadPool.getThreadPoolInfoDomains();
            AtomicBoolean dynamicUpdateSuccess = new AtomicBoolean(false);
            if (CollectionUtils.isNotEmpty(threadPoolInfoDomains)) {
                threadPoolInfoDomains.forEach((threadPoolInfoDomain) -> {
                    boolean success = ThreadPoolManager.dynamicUpdateThreadPool(threadPoolInfoDomain);
                    if (success) {
                        dynamicUpdateSuccess.set(true);
                    }
                });
            }
            // 存在成功,立即发送Java线程池信息
            if (dynamicUpdateSuccess.get()) {
                ThreadPoolAcquirer.getInstanceScheduledThreadPoolExecutor().execute(new JavaThreadPoolThread());
            }
        }
    }
    

    整个流程:收到服务端下发的 JavaThreadPoolPackage → 提取线程池配置 → 动态修改本地线程池参数 → 立即上报最新的线程池状态。这就是 WebSocket 双向通信的价值——运维人员在 UI 上调了参数,几秒钟内客户端就完成了调整并反馈了结果。

    11.3 MulticastWebsocketMessageHandler:多播分发器

    public class MulticastWebsocketMessageHandler {
        private final List<IWebsocketMessageHandler> handlers = Lists.newArrayList();
        
        @SneakyThrows
        public void onRawMessage(String message) {
            if (StringUtils.isBlank(message)) {
                return;
            }
            // 解密 → 反序列化 → 白名单校验
            WebSocketPackage pkg = WebSocketPackage.convert(message, DOWNSTREAM_ALLOWED_CLASS_NAMES);
            for (IWebsocketMessageHandler handler : this.handlers) {
                try {
                    handler.handleMessage(pkg);
                } catch (Exception e) {
                    // 隔离异常,防止一个处理器失败影响其他处理器
                    log.error("Websocket消息处理器执行异常,消息:{}", message, e);
                }
            }
        }
    }
    

    MulticastWebsocketMessageHandler 是一个经典的 广播器(Multicaster) 模式。它持有一个处理器列表,收到消息后依次调用每个处理器,异常相互隔离。

    注意它使用的是 下行白名单 DOWNSTREAM_ALLOWED_CLASS_NAMES

    public static final Set<String> DOWNSTREAM_ALLOWED_CLASS_NAMES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
        CommandPackage.class.getName(),
        JavaThreadPoolPackage.class.getName()
    )));
    

    只有两种类型:命令包和线程池包。这和上行白名单(9 种类型)形成了鲜明对比——上行方向(客户端→服务端)允许的类型远多于下行方向(服务端→客户端)。这是因为客户端上报的数据种类繁多(心跳、JVM、服务器、Docker……),而服务端下发给客户端的指令种类有限。上行和下行分别维护独立的白名单,严格遵循最小权限原则

    11.4 sendMessage():TOCTOU 安全的消息发送

    public static void sendMessage(WebSocketPackage requestPackage) {
        String requestPackageJsonStr = requestPackage.toJsonString();
        if (log.isDebugEnabled()) {
            log.debug("发送数据包:{}", requestPackageJsonStr);
        }
        // 将 明文JSON字符串 转换成 密文JSON字符串
        String encryptStr = MsgPayloadUtils.encryptPayload(requestPackageJsonStr);
        WebsocketClient client = wsClient;  // 读一次 volatile
        if (client == null) {
            log.warn("WebSocket客户端尚未初始化,消息已丢弃!");
            return;
        }
        client.sendMessage(encryptStr);
    }
    

    这里又出现了我们的老朋友——volatile 快照读WebsocketClient client = wsClientvolatile 变量读到局部变量 client 中,后续操作都基于这个快照。

    这解决了一个经典的并发问题——TOCTOU(Time-of-Check-to-Time-of-Use)竞态条件。如果直接用 wsClient

    // 危险的写法!
    if (wsClient != null) {       // 时刻1:检查,不为 null
        // 此时 close() 把 wsClient 置为 null
        wsClient.sendMessage(...);  // 时刻2:使用,NullPointerException!
    }
    

    在"检查"和"使用"之间的时间窗口内,另一个线程可能调用了 close()wsClient 置为 null。读到局部变量后,即使另一个线程修改了 wsClientclient 变量仍然指向原来的对象——检查和使用的是同一个引用。

    11.5 isReady():数据交换器的就绪检查

    public static boolean isReady() {
        WebsocketClient client = wsClient;
        if (client == null) {
            return false;
        }
        return client.isConnected();
    }
    

    所有的定时上报线程(心跳、JVM、服务器、线程池等)在执行任务前,都会先调用 DataExchanger.isReady() 检查连接是否就绪。如果还没连上,这次采集就直接跳过——不浪费 CPU 去采集数据,因为发不出去。

    // HeartbeatThread 中的用法
    @Override
    public void run() {
        if (!DataExchanger.isReady()) {
            return;  // 连接没好,这次不发
        }
        // 构建心跳包并发送...
    }
    

    11.6 close():关闭数据交换器

    public static void close() {
        synchronized (OBJECT_LOCK) {
            if (wsClient != null) {
                wsClient.close();
            }
            wsClient = null;
            started = false;
        }
    }
    

    close()synchronized 块中执行——和 run() 使用同一个锁对象 OBJECT_LOCK,保证初始化和关闭不会同时执行。关闭时把 started 重置为 false,理论上允许 DataExchanger 被重新启动——虽然在当前 Phoenix 的生命周期中,这种情况不会发生。

    十二、从 Monitor 启动看全链路

    把所有组件串起来,看看客户端从启动到发送第一个心跳包的完整链路:

    ┌──────────────────┐
    │ Monitor.start()  │
    └────────┬─────────┘
             │
             ├── ① 初始化加解密配置
             │
             ├── ② DataExchanger.run()
             │     │
             │     ├── 读取 WebSocket URL 配置
             │     ├── 构建带 endpoint + instanceId 的完整 URI
             │     ├── new WebsocketClient(uri)  ──── 参数校验
             │     ├── started = true
             │     └── 异步执行 ──────────────────────────────────────┐
             │                                                       │
             ├── ③ HeartbeatTaskScheduler.run()                      │
             │     └── 每30秒执行 HeartbeatThread                     │
             │          └── if (!DataExchanger.isReady()) return;  ←─┼── 连接未就绪时跳过
             │                                                       │
             ├── ④ JvmTaskScheduler.run()                            │
             │     └── 每60秒执行 JvmThread                           │
             │                                                       │
             ├── ⑤ 其他定时任务...                                    │
             │                                                       │
             └── ⑥ ShutdownHook.addShutdownHook()                    │
                                                                     │
                        ┌────────────────────────────────────────────┘
                        │ 异步线程:
                        ├── ServiceLoader 发现并注册消息处理器
                        ├── clientRef.setMessageHandler(dispatcher::onRawMessage)
                        └── clientRef.connectWithRetry()
                              │
                              ├── connect()
                              │     ├── CAS 占位
                              │     ├── new CountDownLatch(1)
                              │     ├── SHARED_CLIENT.connectToServer(this, uri)
                              │     ├── connectLatch.await(5, SECONDS)
                              │     │     │
                              │     │     │ ◀── onOpen() 回调
                              │     │     │       ├── session = session
                              │     │     │       ├── connected = true
                              │     │     │       └── connectLatch.countDown()  ──── 唤醒
                              │     │     │
                              │     └── CAS 释放
                              │
                              └── 连接成功!DataExchanger.isReady() 返回 true
                                    │
                                    └── 定时任务开始正常发送数据 ✅
    

    整个过程是异步的、非阻塞的Monitor.start() 不需要等待 WebSocket 连接成功就能返回——定时任务先启动,但通过 isReady() 检查保证在连接就绪前不会发送数据。一旦连接成功,定时任务的下一次执行就会通过 isReady() 检查,开始正常发送数据。

    十三、设计复盘:几个值得学习的模式

    "CAS vs synchronized"——选择正确的并发工具

    connect() 中使用 AtomicBoolean.compareAndSet() 而不是 synchronized,原因是语义不同:

    • synchronized 的语义是"排队等待"——后来的线程会阻塞,等前面的线程完成后继续执行。
    • CAS 的语义是"发现有人在做了,我就不做了"——后来的线程直接放弃,不浪费时间。

    对于"防止重复连接"这个场景,CAS 更合适——重复的连接请求不需要排队等待,直接跳过就好。

    但在 DataExchanger.run() 中,使用的是 synchronized——因为初始化流程中有多步操作(创建客户端、设置标志、提交异步任务),需要原子性保证。CAS 只能保护一个布尔值的翻转,无法保护多步操作的原子性。

    选择哪种工具,取决于你要保护什么。保护一个状态标志用 CAS,保护一段初始化逻辑用 synchronized——各得其所。

    "volatile 快照读"——低成本的并发安全

    isConnected()sendMessage()DataExchanger.sendMessage() 等方法中,反复出现一个模式:

    Session sess = this.session;  // 快照读
    if (sess != null && sess.isOpen()) {
        sess.sendText(message);
    }
    

    这是一种零开销的并发安全技巧——不需要锁,不需要 CAS,只需要一行赋值语句。它的原理是:volatile 保证了可见性,但不保证"两次读取得到相同值"。通过赋值到局部变量,把"多次读 volatile"变成"一次读 volatile + 多次读局部变量",消除了 TOCTOU 竞态。

    "SPI + 广播器"——无侵入的扩展机制

    DataExchanger 通过 SPI 发现消息处理器,MulticastWebsocketMessageHandler 广播消息给所有处理器。新增一种服务端下发指令的处理,只需要:

    1. 实现 IWebsocketMessageHandler 接口
    2. META-INF/services/ 中注册

    不需要修改 DataExchangerMulticastWebsocketMessageHandler 或任何现有代码。这种面向扩展开放、面向修改关闭的设计,让客户端 SDK 在不修改源码的情况下就能被扩展——对于一个要嵌入用户应用的 SDK 来说,这是非常重要的品质。

    "双层开关"——配置与运行时的分离

    autoReconnectfinal,不可变)和 enableReconnectvolatile,可变)的组合设计,让"是否支持重连"和"是否允许重连"成为两个独立的关注点。构造时决定能力,运行时控制行为——清晰、灵活、不会混淆。

    十四、小结

    本篇深入拆解了 Phoenix WebSocket 客户端的连接与重连机制。回顾核心要点:

    • 技术选型:客户端用 Tyrus(JSR-356)而非 Netty,避免依赖冲突和依赖膨胀
    • 静态共享资源:全局调度器 GLOBAL_SCHEDULER 和全局 Tyrus 引擎 SHARED_CLIENT,复用底层资源
    • 构造器:Fail-Fast 校验 URI 格式、协议、Host,在最早时机暴露错误
    • connect() 方法:CAS 无锁并发保护(connectionPending)→ CountDownLatch 连接同步(connectLatch)→ 异常处理与资源清理
    • JSR-356 回调@OnOpen 更新状态并释放 Latch,@OnMessage 异常隔离保护连接,@OnClose 识别重复连接(状态码 4000)并触发重连,@OnError 只清理不重连
    • 指数退避重连:初始 5 秒 → 1.5 倍递增 → 上限 5 分钟,三重防护(运行时开关 + 配置开关 + 连接状态)
    • 双层重连开关autoReconnect(final,配置级)+ enableReconnect(volatile,运行时级),AND 关系
    • DataExchanger:DCL 双重检查锁初始化、SPI 自动发现消息处理器、volatile 快照读防 TOCTOU、异步初始化不阻塞启动
    • 消息处理MulticastWebsocketMessageHandler 广播器 + IWebsocketMessageHandler SPI 扩展点,下行白名单独立于上行白名单

    如果说服务端的 WebSocket 实现像一座精密的工厂——Boss/Worker 双线程池、7 层 Pipeline、定时健康巡检——那客户端就像一个坚韧的旅行者:它知道自己要去哪里(serverUri),懂得在路途中保护自己(CAS、CountDownLatch、异常隔离),遇到挫折不放弃但也不蛮干(指数退避),收到"你不该来"的信号时能体面地退出(状态码 4000),在安顿好之后也能听从总部的指挥(SPI 消息处理器)。

    下一篇,我们将进入数据安全领域——深入拆解 Phoenix 的加解密体系:AES、DES、SM4 国密算法的统一抽象,看看监控数据是如何在传输过程中被加密保护的。


    项目地址
    https://gitcode.com/monitoring-platform/phoenix
    https://gitee.com/monitoring-platform/phoenix
    https://github.com/709343767/phoenix

    end
  1. 作者: 锋哥 (联系作者)
  2. 发表时间: 2026-03-30 10:17
  3. 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)
  4. 转载声明:如果是转载博主转载的文章,请附上原文链接
  5. 公众号转载:请在文末添加作者公众号二维码(公众号二维码见右边,欢迎关注)
  6. 评论

    站长头像 知录

    你一句春不晚,我就到了真江南!

    文章0
    浏览0

    文章分类

    标签云