动态调整线程池配置

Posted by Mark on April 20, 2020

简述

生产环境有遇到过需要动态调整线程池配置的需求, 参考美团技术团队发表的文章《Java线程池实现原理及其在美团业务中的实践》, 自己撸了一版, 实践的过程中也遇到一些细节问题, 以此记录.

知识储备

如果对线程池使用不太了解的话, 强烈建议先看看上面提到的美团技术团队发表的文章. 请注意, 请注意, 是强烈建议.

如果有兴趣的话可以看看下面几个类的源码,

调整线程池大小

调整线程池大小这个实现是非常简单的, 就是利用 ThreadPoolExecutor 的实例方法 setCorePoolSize, setMaximumPoolSize 去调整ThreadPoolExecutor实例的核心和最大线程池大小. 需要提一下的是 setCorePoolSize 在JDK 8是有Bug的, JDK 8 实现代码如下

1
2
3
4
5
public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    // 省略无关代码
}

可以看到设置 corePoolSize 的时候并没有和 maximumPoolSize 进行比较, 如果直接使用的话, 是允许设置corePoolSize大于maximumPoolSize的, 这就和我们创建线程池的认知产生了误差. 如果直接使用, 后面升级了JDK的话, 会出现报错的情况.

此Bug在2014-05-21日 JDK 9 版本已经修复. JDK 9 修复代码如下, 仅仅是多加了一个判断条件 maximumPoolSize < corePoolSize

1
2
3
4
5
public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0 || maximumPoolSize < corePoolSize)
        throw new IllegalArgumentException();
    // 省略无关代码
}

所以我们这里为了兼容以后升级JDK版本, 不用再去修改代码, 修改corePoolSizemaximumPoolSize的时候需要增加判断, 伪代码如下

1
2
3
4
5
6
7
8
9
10
public void resetThreadPoolSize() {
    if (newCorePoolSize > executor.getMaximumPoolSize()) {
        executor.setMaximumPoolSize(newMaximumPoolSize);
        // After jdk8 will throw exception if greater than the maximum pool size
        executor.setCorePoolSize(newCorePoolSize);
    } else {
        executor.setCorePoolSize(newCorePoolSize);
        executor.setMaximumPoolSize(newMaximumPoolSize);
    }
}

Bug Details: ThreadPoolExecutor’s setCorePoolSize method allows corePoolSize > maxPoolSize

Change Log: http://hg.openjdk.java.net/jdk9/jdk9/jdk/rev/eb6f07ec4509

调整线程池队列容量

不同的场景使用不同的线程池, 我们有一些需求会使用到fixed thread pool, 也就是固定线程池大小, 固定容量的线程池. 譬如: 写日志. 低延迟, 可以容忍丢失.

Executors.newFixedThreadPool()默认实现队列使用的是LinkedBlockingQueue, 这是一个可选容量的阻塞队列, 我们通常都是需要设置 capacity 的, 为了防止队列长度过大导致OOM, 因为默认容量是 231-1. LinkedBlockingQueue的 capacity 如下

1
2
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

看代码我们知道capacity是加了final关键字的, 初始化后无法改变, 如果我们想要修改它, 就必须自己实现一个可变容量的阻塞队列.

这里我们的实现方式参考rabbitmq-java-client项目中的 VariableLinkedBlockingQueue.java 容量修改的实现方法如下

1
2
3
4
5
6
7
8
9
10
11
12
/**
 * Set a new capacity for the queue. cao'c
 * @param capacity the new capacity for the queue
 */
public void setCapacity(int capacity) {
    final int oldCapacity = this.capacity;
    this.capacity = capacity;
    final int size = count.get();
    if (capacity > size && size >= oldCapacity) {
        signalNotFull();
    }
}

根据代码我们可以知道, 如果是增加容量超过当前队列大小, 则需要通知之前因为队列满而阻塞的调用, 使其成功执行. 另外要注意的就是如果队列容量由大变小, put/offer/take/poll/remove/clear/drainTo里面涉及到== capacity的判断条件需要改为 >= capacity.

总体设计

核心代码梳理完成后, 最终加上分布式配置+监控埋点+实时报警去修改线程池实例参数, 设计如下图

arch design

监控报警

发布测试环境后, 监控效果图如下, 当然你可以根据自己的实际情况去设置合理的报警阈值

thread monitor

总结

业务场景中碰到过太多次因为调整线程池需要重新发布应用的情况, 所以动态调整线程池配置对开发来说是非常有必要的, 另外开发对应用线程池资源的使用情况也需要监控和报警, 这样一旦出现类似的故障, 就可以第一时间处理, 而不是无从下手, 或者需要重新发布应用导致故障影响范围增加.

优化

通过字节码注入方式实现, 而不在需要封装一个Jar包的形式供客户端使用. 做到让客户端使用无感知, 这是我们优化的方向.

如有错误, 欢迎评论指正或者联系我, 我会及时修正。