面试官问如何动态修改线程池参数,我们聊了三十分钟

快来打我* 2021-09-25 06:56 271阅读 0赞

欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时欢迎大家加我微信「java_front」一起交流学习

0 文章概述

流量洪峰是互联网生产环境经常遇到的情况,例如某个时间点进行商品抢购活动,或者某个时间点集中触发定时任务,这些场景都有可能引发流量洪峰,所以如何应对流量洪峰我们必须面对的问题。

纵向维度我们可以从代理层、WEB层、服务层、缓存层、数据层进行思考,横向维度我们可以从高频检测、缓存前置、节点冗余、服务降级等方向进行思考。本文我们从服务层动态线程池这个角度思考应对流量洪峰。

动态线程池是指我们可以根据流量的不同调节线程池某些参数,例如可以在业务低峰期调低线程数,在业务高峰期调高线程数增加处理线程从而应对流量洪峰。本文我们结合Apollo和线程池实现一个动态线程池。

如何实现动态线程池.jpeg

1 线程池基础

1.1 七个参数

我们首先回顾一下Java线程池七大参数,这对后续设置线程池参数有帮助。我们查看ThreadPoolExecutor构造函数如下:

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. public ThreadPoolExecutor(int corePoolSize,
  3. int maximumPoolSize,
  4. long keepAliveTime,
  5. TimeUnit unit,
  6. BlockingQueue<Runnable> workQueue,
  7. ThreadFactory threadFactory,
  8. RejectedExecutionHandler handler) {
  9. if (corePoolSize < 0 ||
  10. maximumPoolSize <= 0 ||
  11. maximumPoolSize < corePoolSize ||
  12. keepAliveTime < 0)
  13. throw new IllegalArgumentException();
  14. if (workQueue == null || threadFactory == null || handler == null)
  15. throw new NullPointerException();
  16. this.acc = System.getSecurityManager() == null ?
  17. null :
  18. AccessController.getContext();
  19. this.corePoolSize = corePoolSize;
  20. this.maximumPoolSize = maximumPoolSize;
  21. this.workQueue = workQueue;
  22. this.keepAliveTime = unit.toNanos(keepAliveTime);
  23. this.threadFactory = threadFactory;
  24. this.handler = handler;
  25. }
  26. }

(1) corePoolSize

线程池核心线程数,类比业务大厅开设的固定窗口。例如业务大厅开设2个固定窗口,那么这两个窗口不会关闭,全天都会进行业务办理

(2) workQueue

存储已提交但尚未执行的任务,类比业务大厅等候区。例如业务大厅一开门进来很多顾客,2个固定窗口进行业务办理,其他顾客到等候区等待

(3) maximumPoolSize

线程池可以容纳同时执行最大线程数,类比业务大厅最大窗口数。例如业务大厅最大窗口数是5个,业务员看到2个固定窗口和等候区都满了,可以临时增加3个窗口

(4) keepAliveTime

非核心线程数存活时间。当业务不忙时刚才新增的3个窗口需要关闭,空闲时间超过keepAliveTime空闲会被关闭

(5) unit

keepAliveTime存活时间单位

(6) threadFactory

线程工厂可以用来指定线程名

(7) handler

线程池线程数已达到maximumPoolSize且队列已满时执行拒绝策略。例如业务大厅5个窗口全部处于忙碌状态且等候区已满,业务员根据实际情况选择拒绝策略

1.2 四种拒绝策略

(1) AbortPolicy

默认策略直接抛出RejectExecutionException阻止系统正常运行

  1. /** * AbortPolicy * * @author 微信公众号「JAVA前线」 * */
  2. public class AbortPolicyTest {
  3. public static void main(String[] args) {
  4. int coreSize = 1;
  5. int maxSize = 2;
  6. int queueSize = 1;
  7. AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
  8. ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), abortPolicy);
  9. for (int i = 0; i < 100; i++) {
  10. executor.execute(new Runnable() {
  11. @Override
  12. public void run() {
  13. System.out.println(Thread.currentThread().getName() + " -> run");
  14. }
  15. });
  16. }
  17. }
  18. }

程序执行结果:

  1. pool-1-thread-1 -> run
  2. pool-1-thread-2 -> run
  3. pool-1-thread-1 -> run
  4. Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.xy.juc.threadpool.reject.AbortPolicyTest$1@70dea4e rejected from java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
  5. at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
  6. at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
  7. at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
  8. at com.xy.juc.threadpool.reject.AbortPolicyTest.main(AbortPolicyTest.java:21)

(2) CallerRunsPolicy

任务回退给调用者自己运行

  1. /** * CallerRunsPolicy * * @author 微信公众号「JAVA前线」 * */
  2. public class CallerRunsPolicyTest {
  3. public static void main(String[] args) {
  4. int coreSize = 1;
  5. int maxSize = 2;
  6. int queueSize = 1;
  7. CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
  8. ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), callerRunsPolicy);
  9. for (int i = 0; i < 10; i++) {
  10. executor.execute(new Runnable() {
  11. @Override
  12. public void run() {
  13. System.out.println(Thread.currentThread().getName() + " -> run");
  14. }
  15. });
  16. }
  17. }
  18. }

程序执行结果:

  1. main -> run
  2. pool-1-thread-1 -> run
  3. pool-1-thread-2 -> run
  4. pool-1-thread-1 -> run
  5. main -> run
  6. main -> run
  7. pool-1-thread-2 -> run
  8. pool-1-thread-1 -> run
  9. main -> run
  10. pool-1-thread-2 -> run

(3) DiscardOldestPolicy

抛弃队列中等待最久的任务不会抛出异常

  1. /** * DiscardOldestPolicy * * @author 微信公众号「JAVA前线」 * */
  2. public class DiscardOldestPolicyTest {
  3. public static void main(String[] args) {
  4. int coreSize = 1;
  5. int maxSize = 2;
  6. int queueSize = 1;
  7. DiscardOldestPolicy discardOldestPolicy = new ThreadPoolExecutor.DiscardOldestPolicy();
  8. ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), discardOldestPolicy);
  9. for (int i = 0; i < 10; i++) {
  10. executor.execute(new Runnable() {
  11. @Override
  12. public void run() {
  13. System.out.println(Thread.currentThread().getName() + " -> run");
  14. }
  15. });
  16. }
  17. }
  18. }

程序执行结果:

  1. pool-1-thread-1 -> run
  2. pool-1-thread-2 -> run
  3. pool-1-thread-1 -> run

(4) DiscardPolicy

直接丢弃任务不会抛出异常

  1. /** * DiscardPolicy * * @author 微信公众号「JAVA前线」 * */
  2. public class DiscardPolicyTest {
  3. public static void main(String[] args) {
  4. int coreSize = 1;
  5. int maxSize = 2;
  6. int queueSize = 1;
  7. DiscardPolicy discardPolicy = new ThreadPoolExecutor.DiscardPolicy();
  8. ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(queueSize), Executors.defaultThreadFactory(), discardPolicy);
  9. for (int i = 0; i < 10; i++) {
  10. executor.execute(new Runnable() {
  11. @Override
  12. public void run() {
  13. System.out.println(Thread.currentThread().getName() + " -> run");
  14. }
  15. });
  16. }
  17. }
  18. }

程序执行结果:

  1. pool-1-thread-1 -> run
  2. pool-1-thread-2 -> run
  3. pool-1-thread-1 -> run

1.3 修改参数

如果初始化线程池完成后,我们是否可以修改线程池某些参数呢?答案是可以。我们选择线程池提供的四个修改方法进行源码分析。

(1) setCorePoolSize

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. public void setCorePoolSize(int corePoolSize) {
  3. if (corePoolSize < 0)
  4. throw new IllegalArgumentException();
  5. // 新核心线程数减去原核心线程数
  6. int delta = corePoolSize - this.corePoolSize;
  7. // 新核心线程数赋值
  8. this.corePoolSize = corePoolSize;
  9. // 如果当前线程数大于新核心线程数
  10. if (workerCountOf(ctl.get()) > corePoolSize)
  11. // 中断空闲线程
  12. interruptIdleWorkers();
  13. // 如果需要新增线程则通过addWorker增加工作线程
  14. else if (delta > 0) {
  15. int k = Math.min(delta, workQueue.size());
  16. while (k-- > 0 && addWorker(null, true)) {
  17. if (workQueue.isEmpty())
  18. break;
  19. }
  20. }
  21. }
  22. }

(2) setMaximumPoolSize

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. public void setMaximumPoolSize(int maximumPoolSize) {
  3. if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
  4. throw new IllegalArgumentException();
  5. this.maximumPoolSize = maximumPoolSize;
  6. // 如果当前线程数量大于新最大线程数量
  7. if (workerCountOf(ctl.get()) > maximumPoolSize)
  8. // 中断空闲线程
  9. interruptIdleWorkers();
  10. }
  11. }

(3) setKeepAliveTime

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. public void setKeepAliveTime(long time, TimeUnit unit) {
  3. if (time < 0)
  4. throw new IllegalArgumentException();
  5. if (time == 0 && allowsCoreThreadTimeOut())
  6. throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
  7. long keepAliveTime = unit.toNanos(time);
  8. // 新超时时间减去原超时时间
  9. long delta = keepAliveTime - this.keepAliveTime;
  10. this.keepAliveTime = keepAliveTime;
  11. // 如果新超时时间小于原超时时间
  12. if (delta < 0)
  13. // 中断空闲线程
  14. interruptIdleWorkers();
  15. }
  16. }

(4) setRejectedExecutionHandler

  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
  3. if (handler == null)
  4. throw new NullPointerException();
  5. // 设置拒绝策略
  6. this.handler = handler;
  7. }
  8. }

现在我们知道线程池系统上述调整参数的方法,但仅仅分析到此是不够的,因为如果没有动态调整参数的方法,那么每次修改必须重新发布才可以生效,那么有没有方法不用发布就可以动态调整线程池参数呢?

2 Apollo配置中心

2.1 核心原理

Apollo是携程框架部门研发的分布式配置中心,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性,适用于微服务配置管理场景。Apollo开源地址如下:

  1. https://github.com/ctripcorp/apollo

我们在使用配置中心时第一步用户在配置中心修改配置项,第二步配置中心通知Apollo客户端有配置更新,第三步Apollo客户端从配置中心拉取最新配置,更新本地配置并通知到应用,官网基础模型图如下:

01 基础结构.jpg

配置中心配置项发生变化客户端如何感知呢?分为推和拉两种方式。推依赖客户端和服务端保持了一个长连接,发生数据变化时服务端推送信息给客户端,这就是长轮询机制。拉依赖客户端定时从配置中心服务端拉取应用最新配置,这是一个fallback机制。官网客户端设计图如下:

02 客户端设计.jpg

本文重点分析配置更新推送方式,我们首先看官网服务端设计图:

03 服务端设计.jpg

ConfigService模块提供配置的读取推送等功能,服务对象是Apollo客户端。AdminService模块提供配置的修改发布等功能,服务对象是Portal模块即管理界面。需要说明Apollo并没有引用消息中间件,图中发送异步消息是指ConfigService定时扫描异步消息数据表:

04 推送方式.jpg

消息数据保存在MySQL消息表:

  1. CREATE TABLE `releasemessage` (
  2. `Id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增主键',
  3. `Message` varchar(1024) NOT NULL DEFAULT '' COMMENT '发布的消息内容',
  4. `DataChange_LastTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后修改时间',
  5. PRIMARY KEY (`Id`),
  6. KEY `DataChange_LastTime` (`DataChange_LastTime`),
  7. KEY `IX_Message` (`Message`(191))
  8. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT='发布消息'

Apollo核心原理本文暂且谈到这里,后续我会写文章通过源码分析Apollo长轮询机制请继续关注。

2.2 实例分析

2.2.1 服务端安装

服务端关键步骤是导入数据库和修改端口号,具体步骤请参看官方网站:

  1. https://ctripcorp.github.io/apollo/#/zh/deployment/quick-start

启动成功后访问地址:

  1. http://localhost:8070

05 访问首页.jpg

输入用户名apollo、密码admin登录:

06 界面1.jpg

点击进入我创建myApp项目,我们看到在DEV环境、default集群、application命名空间包含一个timeout配置项,100是这个配置项的值,下面我们在应用程序读取这个配置项:

07 界面2.jpg

2.2.2 应用程序

(1) 引入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.ctrip.framework.apollo</groupId>
  4. <artifactId>apollo-client</artifactId>
  5. <version>1.7.0</version>
  6. </dependency>
  7. </dependencies>

(2) 简单实例

  1. public class GetApolloConfigTest extends BaseTest {
  2. /** * -Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080 * * myApp+DEV+default+application */
  3. @Test
  4. public void testGet() throws InterruptedException {
  5. Config appConfig = ConfigService.getAppConfig();
  6. while (true) {
  7. String value = appConfig.getProperty("timeout", "200");
  8. System.out.println("timeout=" + value);
  9. TimeUnit.SECONDS.sleep(1);
  10. }
  11. }
  12. }

因为上述程序是通过while(true)不断获取配置项的值,所以程序输出结果如下:

  1. timeout=100
  2. timeout=100
  3. timeout=100
  4. timeout=100
  5. timeout=100
  6. timeout=100

我们现在把配置项的值改为200程序输出结果如下:

  1. timeout=100
  2. timeout=100
  3. timeout=100
  4. timeout=100
  5. timeout=200
  6. timeout=200
  7. timeout=200

(3) 监听实例

生产环境我们一般不用while(true)监听变化,而是通过注册监听器方式感知变化信息:

  1. public class GetApolloConfigTest extends BaseTest {
  2. /** * 监听命名空间变化 * * -Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080 * * myApp+DEV+default+application */
  3. @Test
  4. public void testListen() throws InterruptedException {
  5. Config config = ConfigService.getConfig("application");
  6. config.addChangeListener(new ConfigChangeListener() {
  7. @Override
  8. public void onChange(ConfigChangeEvent changeEvent) {
  9. System.out.println("发生变化命名空间=" + changeEvent.getNamespace());
  10. for (String key : changeEvent.changedKeys()) {
  11. ConfigChange change = changeEvent.getChange(key);
  12. System.out.println(String.format("发生变化key=%s,oldValue=%s,newValue=%s,changeType=%s", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType()));
  13. }
  14. }
  15. });
  16. Thread.sleep(1000000L);
  17. }
  18. }

我们现在把timeout值从200改为300,程序输出结果:

  1. 发生变化命名空间=application
  2. 发生变化key=timeout,oldValue=200,newValue=300,changeType=MODIFIED

3 动态线程池

现在我们把线程池和Apollo结合起来构建动态线程池,具备了上述知识编写起来并不复杂。首先我们用默认值构建一个线程池,然后线程池会监听Apollo关于相关配置项,如果相关配置有变化则刷新相关参数。第一步在Apollo配置中心设置三个线程池参数(本文没有设置拒绝策略其原理一致):

08 线程数1.jpg

第二步编写核心代码:

  1. /** * 动态线程池工厂 * * @author 微信公众号「JAVA前线」 * */
  2. @Slf4j
  3. @Component
  4. public class DynamicThreadPoolFactory {
  5. private static final String NAME_SPACE = "threadpool-config";
  6. /** 线程执行器 **/
  7. private volatile ThreadPoolExecutor executor;
  8. /** 核心线程数 **/
  9. private Integer CORE_SIZE = 10;
  10. /** 最大值线程数 **/
  11. private Integer MAX_SIZE = 20;
  12. /** 等待队列长度 **/
  13. private Integer QUEUE_SIZE = 2000;
  14. /** 线程存活时间 **/
  15. private Long KEEP_ALIVE_TIME = 1000L;
  16. /** 线程名 **/
  17. private String threadName;
  18. public DynamicThreadPoolFactory() {
  19. Config config = ConfigService.getConfig(NAME_SPACE);
  20. init(config);
  21. listen(config);
  22. }
  23. /** * 初始化 */
  24. private void init(Config config) {
  25. if (executor == null) {
  26. synchronized (DynamicThreadPoolFactory.class) {
  27. if (executor == null) {
  28. String coreSize = config.getProperty(KeysEnum.CORE_SIZE.getNodeKey(), CORE_SIZE.toString());
  29. String maxSize = config.getProperty(KeysEnum.MAX_SIZE.getNodeKey(), MAX_SIZE.toString());
  30. String keepAliveTIme = config.getProperty(KeysEnum.KEEP_ALIVE_TIME.getNodeKey(), KEEP_ALIVE_TIME.toString());
  31. BlockingQueue<Runnable> queueToUse = new LinkedBlockingQueue<Runnable>(QUEUE_SIZE);
  32. executor = new ThreadPoolExecutor(Integer.valueOf(coreSize), Integer.valueOf(maxSize), Long.valueOf(keepAliveTIme), TimeUnit.MILLISECONDS, queueToUse, new NamedThreadFactory(threadName, true), new AbortPolicyDoReport(threadName));
  33. }
  34. }
  35. }
  36. }
  37. /** * 监听器 */
  38. private void listen(Config config) {
  39. config.addChangeListener(new ConfigChangeListener() {
  40. @Override
  41. public void onChange(ConfigChangeEvent changeEvent) {
  42. log.info("命名空间发生变化={}", changeEvent.getNamespace());
  43. for (String key : changeEvent.changedKeys()) {
  44. ConfigChange change = changeEvent.getChange(key);
  45. String newValue = change.getNewValue();
  46. refreshThreadPool(key, newValue);
  47. log.info("发生变化key={},oldValue={},newValue={},changeType={}", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType());
  48. }
  49. }
  50. });
  51. }
  52. /** * 刷新线程池 */
  53. private void refreshThreadPool(String key, String newValue) {
  54. if (executor == null) {
  55. return;
  56. }
  57. if (KeysEnum.CORE_SIZE.getNodeKey().equals(key)) {
  58. executor.setCorePoolSize(Integer.valueOf(newValue));
  59. log.info("修改核心线程数key={},value={}", key, newValue);
  60. }
  61. if (KeysEnum.MAX_SIZE.getNodeKey().equals(key)) {
  62. executor.setMaximumPoolSize(Integer.valueOf(newValue));
  63. log.info("修改最大线程数key={},value={}", key, newValue);
  64. }
  65. if (KeysEnum.KEEP_ALIVE_TIME.getNodeKey().equals(key)) {
  66. executor.setKeepAliveTime(Integer.valueOf(newValue), TimeUnit.MILLISECONDS);
  67. log.info("修改活跃时间key={},value={}", key, newValue);
  68. }
  69. }
  70. public ThreadPoolExecutor getExecutor(String threadName) {
  71. return executor;
  72. }
  73. enum KeysEnum {
  74. CORE_SIZE("coreSize", "核心线程数"),
  75. MAX_SIZE("maxSize", "最大线程数"),
  76. KEEP_ALIVE_TIME("keepAliveTime", "线程活跃时间")
  77. ;
  78. private String nodeKey;
  79. private String desc;
  80. KeysEnum(String nodeKey, String desc) {
  81. this.nodeKey = nodeKey;
  82. this.desc = desc;
  83. }
  84. public String getNodeKey() {
  85. return nodeKey;
  86. }
  87. public void setNodeKey(String nodeKey) {
  88. this.nodeKey = nodeKey;
  89. }
  90. public String getDesc() {
  91. return desc;
  92. }
  93. public void setDesc(String desc) {
  94. this.desc = desc;
  95. }
  96. }
  97. }
  98. /** * 动态线程池执行器 * * @author 微信公众号「JAVA前线」 * */
  99. @Component
  100. public class DynamicThreadExecutor {
  101. @Resource
  102. private DynamicThreadPoolFactory threadPoolFactory;
  103. public void execute(String bizName, Runnable job) {
  104. threadPoolFactory.getExecutor(bizName).execute(job);
  105. }
  106. public Future<?> sumbit(String bizName, Runnable job) {
  107. return threadPoolFactory.getExecutor(bizName).submit(job);
  108. }
  109. }

第三步运行测试用例并结合VisualVM观察线程数:

  1. /** * 动态线程池测试 * * @author 微信公众号「JAVA前线」 * */
  2. public class DynamicThreadExecutorTest extends BaseTest {
  3. @Resource
  4. private DynamicThreadExecutor dynamicThreadExecutor;
  5. /** * -Dapp.id=myApp -Denv=DEV -Dapollo.cluster=default -Ddev_meta=http://localhost:8080 * * myApp+DEV+default+thread-pool */
  6. @Test
  7. public void testExecute() throws InterruptedException {
  8. while (true) {
  9. dynamicThreadExecutor.execute("bizName", new Runnable() {
  10. @Override
  11. public void run() {
  12. System.out.println("bizInfo");
  13. }
  14. });
  15. TimeUnit.SECONDS.sleep(1);
  16. }
  17. }
  18. }

visualvm1.jpg

我们在配置中心修改配置项把核心线程数设置为50,最大线程数设置为100:

10.jpg

通过VisualVM可以观察到线程数显著上升:

11.jpg

4 文章总结

本文我们首先介绍了线程池基础知识,包括七大参数和四个拒绝策略,随后我们介绍了Apollo配置中心的原理和应用,最后我们将线程池和配置中心相结合,实现了动态调整线程数的效果,希望本文对大家有所帮助。

欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时欢迎大家加我微信「java_front」一起交流学习

发表评论

表情:
评论列表 (有 0 条评论,271人围观)

还没有评论,来说两句吧...

相关阅读

    相关 线

    引言 最近恰好在组内分享线程池,又看了看四年前自己写的[线程池][Link 1]文章,一是感叹时光荏苒,二是感叹当时的理解太浅薄了,三是感叹自己这么多年依然停留在浅薄的理