Core Java Tutorial -- Thread Pool

淩亂°似流年 2022-05-28 07:21 229阅读 0赞

Java 线程池管理工作线程池,它包含一个让任务等待执行的队列。我们可以使用 ThreadPoolExecutor 在 Java 中创建线程池。

Java 线程池管理 Runnable 线程集合,并且工作相称从队列中执行 Runnable。java.util.concurrent.Executors 提供 java.util.concurrent.Executor 接口的实现来在 Java 中创建线程池。

首先我们需要一个 Runnable 类,名为 WorkerThread.java

  1. package Thread;
  2. public class WorkerThread implements Runnable {
  3. private String command;
  4. public WorkerThread(String s) {
  5. this.command = s;
  6. }
  7. @Override
  8. public void run() {
  9. System.out.println(Thread.currentThread().getName() + " Start. Command = " + command);
  10. processCommand();
  11. System.out.println(Thread.currentThread().getName() + " End.");
  12. }
  13. private void processCommand() {
  14. try {
  15. Thread.sleep(5000);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. @Override
  21. public String toString() {
  22. return this.command;
  23. }
  24. }

ExecutorService Example

这里是测试程序类 SimpleThreadPool.java,我们从Executors 框架创建固定线程池。

  1. package Thread;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. public class SimpleThreadPool {
  5. public static void main(String[] args) {
  6. ExecutorService executorService = Executors.newFixedThreadPool(5);
  7. for (int i = 0; i < 10; i++) {
  8. Runnable worker = new WorkerThread("" + i);
  9. executorService.execute(worker);
  10. }
  11. executorService.shutdown();
  12. while (!executorService.isTerminated()) {
  13. }
  14. System.out.println("Finish all threads");
  15. }
  16. }

上述程序中,我们创建了 5 个工作线程的固定大小的线程池。然后我们向这个池提交 10 个 worker,因为池大小为 5,它将开始工作 5 个工作,其他工作将处于等待状态,只要一个工作完成,等待队列中的另一个工作将会被工作者线程拾起并执行。

  1. pool-1-thread-2 Start. Command = 1 pool-1-thread-1 Start. Command = 0 pool-1-thread-4 Start. Command = 3 pool-1-thread-3 Start. Command = 2 pool-1-thread-5 Start. Command = 4 pool-1-thread-5 End. pool-1-thread-5 Start. Command = 5 pool-1-thread-2 End. pool-1-thread-2 Start. Command = 6 pool-1-thread-3 End. pool-1-thread-3 Start. Command = 7 pool-1-thread-4 End. pool-1-thread-4 Start. Command = 8 pool-1-thread-1 End. pool-1-thread-1 Start. Command = 9 pool-1-thread-2 End. pool-1-thread-5 End. pool-1-thread-3 End. pool-1-thread-1 End. pool-1-thread-4 End. Finish all threads

输出确认从“pool-1-thread-1”到“pool-1-thread-5”的池中有五个线程,它们负责将提交的任务执行到池中。

ThreadPoolExecutor Example

Executors 类使用 ThreadPoolExecutor 提供简单的 ExecutorService 实现,但是 ThreadPoolExecutor 提供了笔者更多的功能。我们可以指定创建 ThreadPoolExecutor 实例时将存活的线程数量,并且可以限制线程池的大小并创建自己的 RejectedExecutionHandler 实现来处理不适合工作队列的作业。

这是我们的 RejectedExecutionHandler 接口的自定义实现。

  1. package Thread;
  2. import java.util.concurrent.RejectedExecutionHandler;
  3. import java.util.concurrent.ThreadPoolExecutor;
  4. public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
  5. @Override
  6. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  7. System.out.println(r.toString() + " is rejected");
  8. }
  9. }

ThreadPoolExecutor 提供了几种方法,同故宫这些方法我们可以找出执行程序的当前状态,池大小,活动线程数和任务计数。所以我有一个监视器线程,它会在特定的时间间隔打印执行程序信息。

  1. package Thread;
  2. import java.util.concurrent.ThreadPoolExecutor;
  3. public class MyMonitorThread implements Runnable {
  4. private ThreadPoolExecutor executor;
  5. private int seconds;
  6. private boolean run = true;
  7. public MyMonitorThread(ThreadPoolExecutor executor, int delay) {
  8. this.executor = executor;
  9. this.seconds = delay;
  10. }
  11. public void shutdown() {
  12. this.run = false;
  13. }
  14. @Override
  15. public void run() {
  16. while (run) {
  17. System.out.println(
  18. String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, " +
  19. "isTerminated: %s",
  20. this.executor.getPoolSize(),
  21. this.executor.getCorePoolSize(),
  22. this.executor.getActiveCount(),
  23. this.executor.getCompletedTaskCount(),
  24. this.executor.getTaskCount(),
  25. this.executor.isShutdown(),
  26. this.executor.isTerminated()));
  27. try {
  28. Thread.sleep(seconds * 1000);
  29. } catch (InterruptedException e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. }

使用 ThreadPoolExecutor 的线程池实现

  1. package Thread;
  2. import java.util.concurrent.*;
  3. public class WorkerPool {
  4. public static void main(String args[]) throws InterruptedException {
  5. //RejectedExecutionHandler implementation
  6. RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
  7. //Get the ThreadFactory implementation to use
  8. ThreadFactory threadFactory = Executors.defaultThreadFactory();
  9. //creating the ThreadPoolExecutor
  10. ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new
  11. ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);
  12. //start the monitoring thread
  13. MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
  14. Thread monitorThread = new Thread(monitor);
  15. monitorThread.start();
  16. //submit work to the thread pool
  17. for (int i = 0; i < 10; i++) {
  18. executorPool.execute(new WorkerThread("cmd" + i));
  19. }
  20. Thread.sleep(30000);
  21. //shut down the pool
  22. executorPool.shutdown();
  23. //shut down the monitor thread
  24. Thread.sleep(5000);
  25. monitor.shutdown();
  26. }
  27. }

请注意,在初始化ThreadPoolExecutor时,我们将初始池大小保持为2,最大池大小为4,工作队列大小为2。因此,如果有4个正在运行的任务并提交了更多任务,则工作队列将只保留其中的2个,其余的部分将由RejectedExecutionHandlerImpl 处理。

输出:

  1. [monitor] [0/2] Active: 0, Completed: 0, Task: 0, isShutdown: false, isTerminated: false
  2. cmd6 is rejected
  3. cmd7 is rejected
  4. cmd8 is rejected
  5. cmd9 is rejected
  6. pool-1-thread-2 Start. Command = cmd1 pool-1-thread-1 Start. Command = cmd0 pool-1-thread-3 Start. Command = cmd4 pool-1-thread-4 Start. Command = cmd5 [monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false pool-1-thread-2 End. pool-1-thread-2 Start. Command = cmd2 pool-1-thread-1 End. pool-1-thread-3 End. pool-1-thread-1 Start. Command = cmd3 pool-1-thread-4 End. [monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false [monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false pool-1-thread-2 End. pool-1-thread-1 End. [monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true

注意执行程序的活动,已完成和总完成任务计数的变化。 我们可以调用 shutdown() 方法来完成所有提交的任务的执行并终止线程池。

发表评论

表情:
评论列表 (有 0 条评论,229人围观)

还没有评论,来说两句吧...

相关阅读