Java线程池的使用

Love The Way You Lie 2022-04-16 04:50 281阅读 0赞

对于线程池的使用,应该将线程池置为单例模式,节省内存的开销,线程池线程的数量可以根据业务,内核,cup等进行评估。
常用的线程池是使用java并发包中的ThreadPoolExecutor对象。

其核心构造参数:

  • 核心线程数量(corePoolSize)
  • 最大线程数量(maxiNumPoolSize)
  • 生存时间(keepAliveTime)
  • 时间单位(TimeUnit)
  • 队列大小(QueueSize)
  • 可选参数:[线程工厂(ThreadFactory)、拒绝处理策略(RejectedExecutionHandler)]
    注:如果不传最后两个参数,则使用默认参数

线程的开辟策略:

  • 当池中正在运行的线程数(包括空闲线程)小于corePoolSize时,新建线程执行任务。
  • 当池中正在运行的线程数大于等于corePoolSize时,新的任务进入任务队列等待。
  • 当队列里的任务数达到上限,且线程池中正在运行的线程数小于maxiNumPoolSize,对于新加入的任务,新建线程
  • 当队列里的任务数达到上限,并且池中正在运行的线程数等于maximumPoolSize,对于新加入的任务,执行拒绝策略

拒绝策略:

  • CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

    1. if (!e.isShutdown()) {
    2. r.run();
    3. }

    }

这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。

  • AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionException

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

    1. throw new RejectedExecutionException();

    }

这种策略直接抛出异常,丢弃任务。(默认策略)

  • DiscardPolicy:不能执行的任务将被删除

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

    }

这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。

  • DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

    1. if (!e.isShutdown()) {
    2. e.getQueue().poll();
    3. e.execute(r);
    4. }

    }

该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。

  • 自定义策略:实现RejectedExecutionHandler接口,自定义对拒绝任务的处理

可参考如下代码:

  1. package com.lt;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.ExecutorService;
  5. import java.util.concurrent.RejectedExecutionHandler;
  6. import java.util.concurrent.ThreadFactory;
  7. import java.util.concurrent.ThreadPoolExecutor;
  8. import java.util.concurrent.TimeUnit;
  9. import java.util.concurrent.atomic.AtomicInteger;
  10. public class ThreadPoolFactory {
  11. private static ExecutorService executor;
  12. private static ThreadPoolFactory threadPoolFactory;
  13. /** * 核心线程数量 */
  14. private static final int CORE_POOL_SIZE = 10;
  15. /** * 最大线程数量 */
  16. private static final int MAXI_NUM_POOLSIZE = 20;
  17. /** * 线程存活时间 */
  18. private static final long KEEP_ALIVE_TIME = 5L;
  19. /** * 任务队列大小 */
  20. private static final int QUEUE_SIZE = 15;
  21. /** * 阻塞队列 */
  22. private static BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
  23. /** * 私有构造器 */
  24. private ThreadPoolFactory(String threadName){
  25. executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXI_NUM_POOLSIZE, KEEP_ALIVE_TIME, TimeUnit.MINUTES, workQueue, new MyThreadFactory(threadName), new MyRejectedExecutionHandler());
  26. }
  27. /** * 线程池构建工厂 */
  28. public static ThreadPoolFactory getInstance(String threadName){
  29. if (null == threadPoolFactory) {
  30. synchronized (ThreadPoolFactory.class) {
  31. if (null == threadPoolFactory) {
  32. threadPoolFactory = new ThreadPoolFactory(threadName);
  33. }
  34. }
  35. }
  36. return threadPoolFactory;
  37. }
  38. public ExecutorService getThreadPoolExecutor(){
  39. return executor;
  40. }
  41. /** * 内部类,线程工厂,用于创造线程池所需要的线程 */
  42. class MyThreadFactory implements ThreadFactory {
  43. private String threadName;
  44. private final AtomicInteger threadNumber = new AtomicInteger(1);
  45. public MyThreadFactory (String threadName) {
  46. this.threadName = threadName;
  47. }
  48. @Override
  49. public Thread newThread(Runnable r) {
  50. Thread thread = new Thread(r, threadName + "-" + threadNumber.getAndIncrement());
  51. thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  52. @Override
  53. public void uncaughtException(Thread t, Throwable e) {
  54. System.out.println("Exception: " + e.getMessage());
  55. }
  56. });
  57. return thread;
  58. }
  59. }
  60. /** * 内部类,拒绝任务后的请求处理 */
  61. class MyRejectedExecutionHandler implements RejectedExecutionHandler {
  62. @Override
  63. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  64. System.out.println("ThreadPool over");
  65. }
  66. }
  67. }

发表评论

表情:
评论列表 (有 0 条评论,281人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Java线使用

    Java线程池的使用 前言 众所周知,近十年来摩尔定律逐渐在很多领域已经失效了,单个`CPU`的性能已经很难满足科技的需求,所以现在很多时候会采用多核的方式来提升整

    相关 Java线使用

    对于线程池的使用,应该将线程池置为单例模式,节省内存的开销,线程池线程的数量可以根据业务,内核,cup等进行评估。 常用的线程池是使用java并发包中的ThreadPool

    相关 Java线使用

     如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。   那么有没有一种办法

    相关 Java线使用

    线程池(Thread Pool): 一种线程的使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。