目录

    phoenix云监控平台动态线程池使用文档

    1. 概述

    Phoenix 提供了一套完整的动态线程池管理方案,支持在应用运行时对线程池参数进行热更新,无需重启服务。核心能力包括:

    • 统一注册管理:所有线程池通过 ThreadPoolManager 统一注册、查询、关闭
    • 运行时指标采集:活跃线程数、任务数、利用率、拒绝任务数等
    • 动态参数热更新:核心线程数、最大线程数、队列类型/容量、拒绝策略等均可在线修改
    • Web UI 可视化配置:通过前端表单页面直观地配置线程池参数
    • WebSocket 下发:服务端通过 WebSocket 将配置变更推送至客户端应用,实时生效

    2. 核心架构

    2.1 核心类一览

    类名 所属模块 职责
    ThreadPoolManager phoenix-common-core 线程池注册表 + 动态配置入口
    MonitoredThreadPoolExecutor phoenix-common-core 可监控的普通线程池(自动注册 + 拒绝计数)
    MonitoredScheduledThreadPoolExecutor phoenix-common-core 可监控的定时线程池(自动注册 + 拒绝计数)
    ThreadPool phoenix-common-core 公共线程池工厂(CPU/IO 密集型)
    JavaThreadPool.ThreadPoolInfoDomain phoenix-common-core 线程池信息数据模型
    QueueTypeEnum phoenix-common-core 队列类型枚举 + 工厂方法
    RejectedPolicyTypeEnum phoenix-common-core 拒绝策略枚举 + SPI 扩展
    ResizableLinkedBlockingQueue phoenix-common-core 支持动态修改容量的阻塞队列
    JavaThreadPoolMessageHandler phoenix-client-core WebSocket 消息处理器,接收并应用线程池配置变更

    2.2 数据流

    Phoenix UI(Web页面)
        │  用户修改线程池参数并提交
        ▼
    Phoenix Server(服务端)
        │  封装为 JavaThreadPoolPackage
        │  通过 WebSocket 下发
        ▼
    Phoenix Client(客户端应用)
        │  JavaThreadPoolMessageHandler 接收消息
        │  调用 ThreadPoolManager.dynamicUpdateThreadPool()
        ▼
    ThreadPoolManager
        │  查找注册表中对应的 ThreadPoolExecutor
        │  依次设置各项参数
        ▼
    线程池配置生效(无需重启)
        │  配置成功后立即触发一次线程池信息上报
        ▼
    Phoenix Server 更新监控数据
    

    3. 线程池注册

    3.1 自动注册(推荐)

    使用 MonitoredThreadPoolExecutorMonitoredScheduledThreadPoolExecutor 创建线程池时,构造方法会自动调用 ThreadPoolManager.register() 完成注册。

    普通线程池

    MonitoredThreadPoolExecutor executor = new MonitoredThreadPoolExecutor(
            corePoolSize,       // 核心线程数
            maximumPoolSize,    // 最大线程数
            keepAliveTime,      // 空闲回收时间
            TimeUnit.SECONDS,   // 时间单位
            workQueue,          // 工作队列
            threadFactory,      // 线程工厂
            rejectedHandler,    // 拒绝策略
            "my-thread-pool",   // 线程池名称(全局唯一)
            true                // 是否需要管理关闭
    );
    

    定时线程池

    MonitoredScheduledThreadPoolExecutor scheduledExecutor = new MonitoredScheduledThreadPoolExecutor(
            corePoolSize,       // 核心线程数
            threadFactory,      // 线程工厂
            rejectedHandler,    // 拒绝策略
            "my-scheduled-pool",// 线程池名称(全局唯一)
            false               // 是否需要管理关闭
    );
    

    参数说明

    参数 说明
    threadPoolName 线程池名称,全局唯一,重复注册会抛出 MonitoringUniversalException
    needShutdown true:注册到 NEED_SHUTDOWN_THREAD_POOLS,应用关闭时由 ThreadPoolManager 统一优雅关闭
    false:注册到 UN_NEED_SHUTDOWN_THREAD_POOLS,仅做监控,不参与统一关闭(适合 Spring Bean 自行管理生命周期的场景)

    3.2 手动注册

    对于已有的 ThreadPoolExecutor 实例,可以手动注册:

    ThreadPoolManager.register("legacy-pool",existingExecutor, false);
    

    3.3 取消注册

    ThreadPoolManager.unregister("my-thread-pool");
    

    4. 项目中的使用示例

    4.1 公共线程池(ThreadPool 工厂)

    ThreadPool 类采用双重检查锁 + volatile 的单例模式,提供 4 种公共线程池:

    方法 线程池名称 类型
    getCommonCpuIntensiveThreadPoolExecutor() phoenix-common-cpu-intensive-pool CPU 密集型普通线程池
    getCommonIoIntensiveThreadPoolExecutor() phoenix-common-io-intensive-pool IO 密集型普通线程池
    getCommonCpuIntensiveScheduledThreadPoolExecutor() phoenix-common-cpu-intensive-scheduled-pool CPU 密集型定时线程池
    getCommonIoIntensiveScheduledThreadPoolExecutor() phoenix-common-io-intensive-scheduled-pool IO 密集型定时线程池

    线程数计算公式:Ncpu / (1 - 阻塞系数),CPU 密集型阻塞系数 = 0.2,IO 密集型阻塞系数 = 0.8。

    4.2 Spring Bean 方式(ThreadPoolConfig)

    phoenix-serverphoenix-agent 中,通过 @Bean + @Lazy 注册线程池:

    
    @Lazy
    @Bean(name = "instanceMonitorThreadPoolExecutor", destroyMethod = "shutdown")
    public MonitoredThreadPoolExecutor instanceMonitorThreadPoolExecutor() {
        return new MonitoredThreadPoolExecutor(
                (int) (ProcessorsUtils.getAvailableProcessors() / (1 - 0.8)),
                (int) (ProcessorsUtils.getAvailableProcessors() / (1 - 0.8)),
                1L, TimeUnit.HOURS,
                new LinkedBlockingQueue<>(Integer.MAX_VALUE),
                new BasicThreadFactory.Builder()
                        .namingPattern("phoenix-server-instance-monitor-pool-thread-%d")
                        .daemon(true).build(),
                new ThreadPoolExecutor.AbortPolicy(),
                "phoenix-server-instance-monitor-pool",  // 线程池名称
                false);  // needShutdown=false,由Spring容器管理关闭
    }
    

    5. 动态配置详解

    5.1 可配置参数

    通过 ThreadPoolManager.dynamicUpdateThreadPool() 方法,支持以下参数的运行时热更新:

    参数 字段名 类型 说明 约束
    核心线程数 corePoolSize Integer 线程池始终保持的最小线程数 ≥ 0,且 ≤ 最大线程数
    最大线程数 maximumPoolSize Integer 线程池允许创建的最大线程数 ≥ 1,且 ≥ 核心线程数;ScheduledThreadPoolExecutor 不支持修改(DelayedWorkQueue 无界)
    队列类型 queueType String 工作队列的实现类型 队列类型ScheduledThreadPoolExecutor 不支持变更
    队列容量 queueCapacity Long 工作队列的最大容量 ResizableLinkedBlockingQueue 支持在不切换队列类型时动态修改
    空闲回收时间 keepAliveTime Long 空闲线程的存活时间(秒) ≥ 0
    核心线程空闲回收 allowCoreThreadTimeOut Boolean 是否允许核心线程在空闲超时后被回收 开启前 keepAliveTime 必须 > 0,否则跳过设置
    拒绝策略 rejectedExecutionHandlerName String 线程池满时的拒绝处理方式 拒绝策略

    5.2 支持的队列类型

    队列通过 QueueTypeEnum 枚举管理,默认容量为 1024

    队列类型 说明 是否支持动态修改容量 备注
    ArrayBlockingQueue 基于数组的有界阻塞队列,FIFO 仅切换队列类型时 创建后容量不可变(JDK 限制)
    LinkedBlockingQueue 基于链表的有界阻塞队列 仅切换队列类型时 JDK 实现的 capacity 为 final
    LinkedBlockingDeque 基于链表的双端阻塞队列 仅切换队列类型时 支持头尾双向操作
    SynchronousQueue 不存储元素的同步传递队列 不支持 容量恒为 0,每个 put 必须等待 take
    LinkedTransferQueue 无界链表传输队列 不支持 无容量概念
    PriorityBlockingQueue 支持优先级排序的无界阻塞队列 仅切换队列类型时 任务需实现 Comparable
    ResizableLinkedBlockingQueue 可动态调整容量的链表阻塞队列 支持 推荐使用,调用 setCapacity() 即可热更新容量
    DelayedWorkQueue ScheduledThreadPoolExecutor 内部私有队列 不支持 仅展示,不可配置

    推荐:如果需要在不切换队列类型的情况下动态修改队列容量,请使用 ResizableLinkedBlockingQueue

    队列类型变更机制

    当队列类型发生变更时,ThreadPoolManager 会执行以下步骤:

    1. 通过 QueueTypeEnum.createBlockingQueue() 创建目标类型的新队列
    2. 调用 oldQueue.drainTo(tasks) 将旧队列中的待执行任务迁移到新队列
    3. 通过反射替换 ThreadPoolExecutorprivate final workQueue 字段

    当队列类型未变更,仅修改容量时:

    • 如果是 ResizableLinkedBlockingQueue:直接调用 setCapacity(int) 动态修改
    • 如果是其他类型:因 JDK 队列的 capacity 是 final 的,日志提示跳过

    5.3 支持的拒绝策略

    拒绝策略通过 RejectedPolicyTypeEnum 枚举管理,并支持 SPI 扩展

    策略名称 说明
    AbortPolicy 中止策略(JDK 默认):直接抛出 RejectedExecutionException
    CallerRunsPolicy 调用者运行策略:由提交任务的线程直接执行被拒绝的任务,起到反压效果
    DiscardOldestPolicy 丢弃最老任务策略:丢弃队列头部(最早入队)的任务,然后重新提交被拒绝的任务
    DiscardPolicy 丢弃策略:直接丢弃被拒绝的任务,不做任何处理
    RunsOldestTaskPolicy 执行最老任务策略(自定义):从队列中取出最早的任务并执行它,再将新任务放入队列
    SyncPutQueuePolicy 同步阻塞入队策略(自定义):通过 queue.put() 阻塞等待直到队列有空间
    CustomRejectedPolicy 自定义 SPI 策略:通过 Java SPI 机制加载用户自定义的拒绝策略

    自定义拒绝策略(SPI 扩展)

    1. 实现 CustomRejectedExecutionHandler 接口:
    public class MyCustomRejectedHandler implements CustomRejectedExecutionHandler {
    
        @Override
        public String getName() {
            return "MyCustomRejectedHandler";
        }
    
        @Override
        public RejectedExecutionHandler generateRejected() {
            return (r, executor) -> {
                // 自定义拒绝逻辑,例如记录日志、持久化任务等
                log.warn("任务被拒绝: {}", r);
            };
        }
    }
    
    1. META-INF/services/ 下创建 SPI 配置文件:
    文件名: com.gitee.pifeng.monitoring.common.threadpool.rejected.CustomRejectedExecutionHandler
    内容: com.example.MyCustomRejectedHandler
    
    1. 在 Web UI 配置页面中选择 CustomRejectedPolicy(自定义SPI策略) 即可使用。

    6. 监控指标

    ThreadPoolManager.getAllThreadPoolInfo() 方法采集所有注册线程池的运行时指标:

    指标 字段名 类型 说明
    线程池名称 name String 注册时指定的唯一名称
    活跃线程数 activeCount Integer 当前正在执行任务的线程数
    当前线程数 poolSize Integer 线程池中当前存活的线程总数
    核心线程数 corePoolSize Integer 配置的核心线程数
    最大线程数 maximumPoolSize Integer 配置的最大线程数
    历史峰值线程数 largestPoolSize Integer 线程池曾达到的最大线程数
    总任务数 taskCount Long 累计接收的任务数(含已完成、执行中、等待中)
    已完成任务数 completedTaskCount Long 已执行完毕的任务数
    拒绝任务数 rejectedTaskCount Long 被拒绝的任务累计数量(仅 Monitored 类型线程池支持)
    队列类型 queueType String 当前使用的阻塞队列类型
    队列大小 queueSize Integer 队列中当前等待的任务数
    队列容量 queueCapacity Long 队列的总容量
    队列剩余容量 queueRemainingCapacity Integer 队列还能容纳的任务数
    拒绝策略 rejectedExecutionHandlerName String 当前使用的拒绝策略名称
    核心线程空闲回收 allowCoreThreadTimeOut Boolean 核心线程是否在空闲时被回收
    空闲回收时间 keepAliveTime Long 空闲线程存活时间(秒)
    利用率 utilizationRate Double activeCount / poolSize,保留 4 位小数

    无界队列说明DelayedWorkQueue、无参构造的 LinkedBlockingQueue 等无界队列的 remainingCapacity() 返回 Integer.MAX_VALUE,此时 queueCapacity 统一设为 Integer.MAX_VALUE(2147483647),表示"无限制"。

    7. 线程池生命周期管理

    7.1 优雅关闭

    ThreadPoolManager.shutdownGracefully() 方法实现了优雅关闭逻辑:

    1. 调用 shutdown() 阻止新任务提交
    2. 等待 15 秒 让已提交任务完成
    3. 超时未完成则调用 shutdownNow() 强制中断
    4. 再等待 15 秒 确认关闭完成
    5. 若线程被中断,调用 shutdownNow() 并恢复中断状态

    7.2 统一关闭

    调用 ThreadPoolManager.shutdownAllGracefullyAndUnregister() 会遍历所有 needShutdown=true 的线程池,依次执行优雅关闭并取消注册。

    8. Web UI 配置页面

    配置页面 setup-instance-java-thread-pool.html 基于 Layui 框架,提供以下交互能力:

    8.1 表单字段与交互规则

    字段 控件类型 可编辑条件 校验规则
    核心线程数 number 输入框 始终可编辑 必填、数字、不能大于最大线程数
    最大线程数 number 输入框 DelayedWorkQueue 时只读 必填、数字、不能小于核心线程数
    队列类型 下拉选择框 DelayedWorkQueue 时禁用 必选
    队列容量 number 输入框 DelayedWorkQueue/SynchronousQueue/LinkedTransferQueue 时只读 必填、数字
    核心线程空闲回收 开关组件 始终可编辑 -
    空闲回收时间(秒) number 输入框 始终可编辑 必填、数字
    拒绝策略 下拉选择框 始终可编辑 必选

    8.2 联动逻辑

    • 队列类型切换时:选择 SynchronousQueueLinkedTransferQueue 后,队列容量自动设为 0 并变为只读
    • ScheduledThreadPoolExecutor 限制:当队列类型为 DelayedWorkQueue 时,最大线程数和队列类型/容量均为只读,因为 ScheduledThreadPoolExecutorDelayedWorkQueue 是内部私有队列,不支持变更

    9. 注意事项与最佳实践

    9.1 ScheduledThreadPoolExecutor 的限制

    ScheduledThreadPoolExecutor 使用内部私有的 DelayedWorkQueue(无界队列),因此:

    • 最大线程数修改无意义(由 DelayedWorkQueue 的无界特性决定,仅核心线程数生效)
    • 队列类型和容量不支持变更

    9.2 allowCoreThreadTimeOut 前置条件

    开启 allowCoreThreadTimeOut 前,keepAliveTime 必须 > 0。否则 JDK 会抛出 IllegalArgumentExceptionThreadPoolManager 已内置此校验,不满足条件时会跳过设置并输出警告日志。

    9.3 corePoolSize 与 maximumPoolSize 设置顺序

    动态修改时,若新的 corePoolSize > 当前 maximumPoolSize,需先设置 maximumPoolSize 再设置 corePoolSize,否则 JDK 会抛出 IllegalArgumentExceptionThreadPoolManager 已自动处理此问题。

    9.4 队列容量动态修改建议

    • 如果有动态调整队列容量的需求,建议创建线程池时直接使用 ResizableLinkedBlockingQueue 作为工作队列
    • 其他 JDK 内置队列的 capacityfinal 字段,无法在不切换队列类型的情况下修改容量
    • 切换队列类型涉及反射替换 ThreadPoolExecutor.workQueue 字段,存在一定风险,建议在低峰期操作

    9.5 线程池命名规范

    • 线程池名称全局唯一,重复注册会抛出异常
    • 建议使用 模块名-功能-pool 的命名规范,例如:phoenix-server-instance-monitor-pool
    • 线程命名建议使用 模块名-功能-pool-thread-%d 的格式,便于排查问题
    end
    站长头像 知录

    你一句春不晚,我就到了真江南!

    文章0
    浏览0

    文章分类

    标签云