实现一个线程池 java 雨点打透心脏的1/2处 2024-04-03 12:02 45阅读 0赞 **现在我有一个任务,希望异步执行,首先就考虑创建一个线程嘛** ### 第一版 ### package com.su.demo.test; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; /** * @ClassName Demo * @Description TODO * @Author Hangover * @Date 2022/3/30 10:32 **/ @Slf4j public class Demo { public static void main(String[] args) { FlashExecutor flashExecutor = new FlashExecutor(); for (int i = 0; i < 10; i++) { int temp = i; flashExecutor.execute(()->{ log.debug("当前线程名称:{}",Thread.currentThread().getName()); log.debug("打印当前数值:{}",temp); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } } class FlashExecutor implements Executor{ @Override public void execute(Runnable command) { new Thread(command).start(); } } 就是一个任务对应创建一个线程去执行,显而易见,缺点是十分明显的。 ![197273e36a734f34a81967b470273f34.png][] ### 第二版 ### package com.su.demo; import lombok.extern.slf4j.Slf4j; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * @ClassName Server * @Description TODO * @Author Hangover * @Date 2022/3/10 8:52 **/ @Slf4j public class Main { public static void main(String[] args) { /** * 任务队列容量为10,但是有一百个任务需要执行 */ FlashExecutor flashExecutor = new FlashExecutor(10); for (int i = 0; i < 100; i++) { int temp = i; flashExecutor.execute(()->{ log.debug("当前线程名称:{}",Thread.currentThread().getName()); log.debug("打印当前数值:{}",temp); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); log.info("主线程循环次数:{}",temp); } } } @Slf4j class FlashExecutor implements Executor { Worker worker; //任务队列 ArrayBlockingQueue<Runnable> taskQueue; int queueCapacity; public FlashExecutor(int queueCapacity) { this.queueCapacity = queueCapacity; taskQueue = new ArrayBlockingQueue<>(queueCapacity); worker = new Worker(null); worker.start(); } @Override public void execute(Runnable command) { //当任务队列不满的时候 if(taskQueue.size() < queueCapacity){ try { taskQueue.add(command); } catch (Exception e) { log.debug("任务队列已满,无法继续添加任务"); e.printStackTrace(); } log.debug("向任务队列中添加一个任务:{}",command); } } class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { //task不为空直接执行task //task执行完毕,从任务队列里获取任务 while (true){ if(task != null ){ try{ log.debug("正在执行...{}",task); task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } }else{ if(!taskQueue.isEmpty()){ try { task = taskQueue.remove(); } catch (Exception e) { log.info("任务队列为空,无法获取任务"); e.printStackTrace(); } } } } } } } 把任务丢到一个任务队列中,然后只启动一个worker线程,不断地从任务队列中获取任务,执行任务 缺点:当任务队列满了之后,会抛弃任务。 #### 小步前进,实现不会丢弃任务 #### **调用put()、take()方法** package com.su.demo; import lombok.extern.slf4j.Slf4j; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * @ClassName Server * @Description TODO * @Author Hangover * @Date 2022/3/10 8:52 **/ @Slf4j public class Main { public static void main(String[] args) { /** * 任务队列容量为10,但是有一百个任务需要执行 */ FlashExecutor flashExecutor = new FlashExecutor(10); for (int i = 0; i < 100; i++) { int temp = i; flashExecutor.execute(()->{ log.debug("当前线程名称:{}",Thread.currentThread().getName()); log.debug("打印当前数值:{}",temp); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); log.info("主线程循环次数:{}",temp); } } } @Slf4j class FlashExecutor implements Executor { Worker worker; //任务队列 ArrayBlockingQueue<Runnable> taskQueue; int queueCapacity; public FlashExecutor(int queueCapacity) { this.queueCapacity = queueCapacity; taskQueue = new ArrayBlockingQueue<>(queueCapacity); worker = new Worker(null); worker.start(); } @Override public void execute(Runnable command) { try { //当任务队列满时,会进入阻塞状态 taskQueue.put(command); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("向任务队列中添加一个任务:{}",command); } class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { //task不为空直接执行task //task执行完毕,从任务队列里获取任务 while (true){ if(task != null){ try{ log.debug("正在执行...{}",task); task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } }else{ try { //当任务队列为空时,会阻塞 task = taskQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } ![ea3038fcca2146939854e9ae9da3257a.png][] 缺点:只有一个线程执行任务,太慢。如果有多个线程执行任务就好了。 还有就是线程一直死循环从任务队列中获取任务。 #### 小步前进 自己实现一个阻塞队列 #### package com.su.demo; import lombok.extern.slf4j.Slf4j; import java.util.*; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * @ClassName Server * @Description TODO * @Author Hangover * @Date 2022/3/10 8:52 **/ @Slf4j public class Main { public static void main(String[] args) { FlashExecutor flashExecutor = new FlashExecutor(10); for (int i = 0; i < 5; i++) { int temp = i; flashExecutor.execute(()->{ log.debug("当前线程名称:{}",Thread.currentThread().getName()); log.debug("打印当前数值:{}",temp); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } } @Slf4j class FlashExecutor implements Executor { Worker worker; BlockQueue<Runnable> taskQueue; public FlashExecutor(int queueCapacity) { taskQueue = new BlockQueue<>(queueCapacity); } @Override public void execute(Runnable command) { if(worker == null){ worker = new Worker(command); worker.start(); }else{ taskQueue.put(command); } } class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { //task不为空直接执行task //task执行完毕,从任务队列里获取任务 while (true){ if(task != null || (task = taskQueue.take()) != null){ try{ log.debug("正在执行...{}",task); task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } } } } } } class BlockQueue<T>{ private Deque<T> queue = new ArrayDeque<>(); //锁 private ReentrantLock lock = new ReentrantLock(); //两个条件变量 private Condition fullWaitSet = lock.newCondition(); private Condition emptyWaitSet = lock.newCondition(); //容量 private int capacity; public BlockQueue(int capacity) { this.capacity = capacity; } public T take(){ lock.lock(); try{ while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } fullWaitSet.signal(); return queue.removeFirst(); }finally { lock.unlock(); } } public void put(T element){ lock.lock(); try{ while (queue.size() == capacity) { try { fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(element); emptyWaitSet.signal(); }finally { lock.unlock(); } } //获取队列大小 public int size(){ lock.lock(); try{ return queue.size(); }finally { lock.unlock(); } } } ![07d2dbe17944428499b4fcf393849a7d.png][] 这个时候只有一个线程执行任务,它说它太累了啊。短时间内处理不完太多任务,万一任务队列满了。 @Slf4j public class Main { public static void main(String[] args) { /** * 任务队列容量为10,但是有一百个任务需要执行 */ FlashExecutor flashExecutor = new FlashExecutor(10); for (int i = 0; i < 100; i++) { int temp = i; flashExecutor.execute(()->{ log.debug("当前线程名称:{}",Thread.currentThread().getName()); log.debug("打印当前数值:{}",temp); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } } ### 第三版 ### 希望实现可以有多个线程同时执行任务队列中的任务 初始化时,直接启动corePoolSize个工作线程先跑着, 然后死循环不断地从任务队列中获取任务,执行任务。 package com.su.demo; import lombok.extern.slf4j.Slf4j; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * @ClassName Server * @Description TODO * @Author Hangover * @Date 2022/3/10 8:52 **/ @Slf4j public class Main { public static void main(String[] args) { /** * 任务队列容量为10,但是有一百个任务需要执行 */ FlashExecutor flashExecutor = new FlashExecutor(2,10); for (int i = 0; i < 100; i++) { int temp = i; flashExecutor.execute(()->{ log.debug("当前线程名称:{},打印当前数值:{}",Thread.currentThread().getName(),temp); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); log.info("主线程循环次数:{}",temp); } } } @Slf4j class FlashExecutor implements Executor { Worker[] workers; //任务队列 ArrayBlockingQueue<Runnable> taskQueue; //队列容量 int queueCapacity; //核心线程数 int corePoolSize; public FlashExecutor(int corePoolSize,int queueCapacity) { this.queueCapacity = queueCapacity; taskQueue = new ArrayBlockingQueue<>(queueCapacity); this.corePoolSize =corePoolSize; workers = new Worker[corePoolSize]; for (int i = 0; i < this.corePoolSize; i++) { workers[i] = new Worker(null); workers[i].start(); } } @Override public void execute(Runnable command) { try { //当任务队列满时,会进入阻塞状态 taskQueue.put(command); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("向任务队列中添加一个任务:{}",command); } class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { //task不为空直接执行task //task执行完毕,从任务队列里获取任务 while (true){ if(task != null){ try{ log.debug("正在执行...{}",task); task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } }else{ try { //当任务队列为空时,会阻塞 task = taskQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } ### 第四版 ### package com.su.demo.test; import lombok.extern.slf4j.Slf4j; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashSet; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * @ClassName Main * @Description TODO * @Author Hangover * @Date 2022/3/27 16:26 **/ @Slf4j(topic = "hang.Main") public class Main { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MICROSECONDS, 10); for (int i = 0; i < 5; i++) { int temp = i; threadPool.execute(()->{ log.debug("打印当前值:{}",temp); }); } } } @Slf4j(topic = "hang.ThreadPool") class ThreadPool{ //任务队列 private BlockQueue<Runnable> taskQueue; //线程集合 private HashSet<Worker> workers = new HashSet<>(); //核心线程数 private int coreSize; //超时时间,没有任务时, private long timeout; private TimeUnit timeUnit; public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockQueue<>(queueCapacity); } //执行任务 public void execute(Runnable task){ //任务数没有超过核心线程数,直接交给worker对象 if(workers.size() < coreSize){ Worker worker = new Worker(task); log.debug("新增 worker:{},任务:{}",worker,task); workers.add(worker); worker.start(); }else{ log.debug("加入任务队列{}",task); taskQueue.put(task); } //否则加入任务队列 } class Worker extends Thread{ private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { //task不为空直接执行task //task执行完毕,从任务队列里获取任务 while (task != null || (task = taskQueue.take()) != null){ try{ log.debug("正在执行...{}",task); task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } } synchronized (workers){ log.debug("worker被移除...{}",this); workers.remove(this); } } } } class BlockQueue<T>{ private Deque<T> queue = new ArrayDeque<>(); //锁 private ReentrantLock lock = new ReentrantLock(); //两个条件变量 private Condition fullWaitSet = lock.newCondition(); private Condition emptyWaitSet = lock.newCondition(); //容量 private int capacity; public BlockQueue(int capacity) { this.capacity = capacity; } /** * 带超时的阻塞获取 */ public T poll(long timeout, TimeUnit unit){ lock.lock(); try{ //将timeout统一转换为纳秒 long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { try { //虚假等待问题 if(nanos <= 0){ return null; } nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } fullWaitSet.signal(); return queue.removeFirst(); }finally { lock.unlock(); } } public T take(){ lock.lock(); try{ while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } fullWaitSet.signal(); return queue.removeFirst(); }finally { lock.unlock(); } } public void put(T element){ lock.lock(); try{ while (queue.size() == capacity) { try { fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(element); emptyWaitSet.signal(); }finally { lock.unlock(); } } //获取队列大小 public int size(){ lock.lock(); try{ return queue.size(); }finally { lock.unlock(); } } } [197273e36a734f34a81967b470273f34.png]: https://image.dandelioncloud.cn/pgy_files/images/2024/04/03/9949cb20beab4e288e82934b1ed84e00.png [ea3038fcca2146939854e9ae9da3257a.png]: https://image.dandelioncloud.cn/pgy_files/images/2024/04/03/c33bc875cd264909bf8ce3f81721f20a.png [07d2dbe17944428499b4fcf393849a7d.png]: https://image.dandelioncloud.cn/pgy_files/images/2024/04/03/4ad25cf6c0d741b38156de93a1ffa385.png
相关 多线程实践:如何在Java中实现一个线程池? 在Java中,我们可以使用`ExecutorService`和`ThreadPoolExecutor`类来创建和管理线程池。 以下是一个简单的例子,展示了如何创建一个固定大小 叁歲伎倆/ 2024年09月11日 09:42/ 0 赞/ 16 阅读
相关 C++实现一个线程池 一、为什么使用线程池 大家都知道C++支持多线程开发,也就是支持多个任务并行运行,我们也知道线程的生命周期中包括创建、就绪、运行、阻塞、销毁等阶段,所以如果要执行的任务很 爱被打了一巴掌/ 2024年04月06日 10:13/ 0 赞/ 30 阅读
相关 实现一个线程池 java 现在我有一个任务,希望异步执行,首先就考虑创建一个线程嘛 第一版 package com.su.demo.test; import lombo 雨点打透心脏的1/2处/ 2024年04月03日 12:02/ 0 赞/ 46 阅读
相关 使用Java实现一个动态线程池 可以使用Java的Executor框架来实现动态线程池。可以使用ThreadPoolExecutor类来创建线程池。可以设置核心线程数、最大线程数、线程存活时间等参数。 示例 ╰半橙微兮°/ 2024年03月26日 09:30/ 0 赞/ 35 阅读
相关 Java实现线程池 Java实现线程池 线程池是多线程编程中常用的工具,它可以管理和复用线程,从而提高程序的性能和效率。在Java中,我们可以使用`java.util.concurrent`包下 曾经终败给现在/ 2024年03月09日 02:43/ 0 赞/ 72 阅读
相关 【Java多线程】自己实现一个简单的线程池(二) 文章目录 前言 自定义任务完成后的通知 实现原理 JDK 的实现 实现带有返回值的线程 太过爱你忘了你带给我的痛/ 2022年11月25日 10:08/ 0 赞/ 168 阅读
相关 【Java多线程】自己实现一个简单的线程池(三) 前言 上次本人是基于java1.8新特性编写了一个简单的线程池,使用了原子变量、阻塞队列、可重入锁等等新特性,内容全部收录在下面两篇文章中 [【Java多线程】自 曾经终败给现在/ 2022年09月07日 14:58/ 0 赞/ 167 阅读
相关 java 注解实现一个可配置线程池 前言 项目需要多线程执行一些Task,为了方便各个服务的使用。特意封装了一个公共工具类,下面直接撸代码: PoolConfig(线程池核心配置参数): 亦凉/ 2022年03月20日 04:59/ 0 赞/ 191 阅读
相关 C++实现一个线程池 说明 本线程池使用了互斥锁、条件变量、函数指针等常用工具。 线程池的创建与执行 threadpool\_create创建线程池时,首先分配线程池数组和任务队列数组 淡淡的烟草味﹌/ 2022年02月16日 00:43/ 0 赞/ 202 阅读
还没有评论,来说两句吧...