Core Java Tutorial -- Thread Pool
Java 线程池管理工作线程池,它包含一个让任务等待执行的队列。我们可以使用 ThreadPoolExecutor 在 Java 中创建线程池。
Java 线程池管理 Runnable 线程集合,并且工作相称从队列中执行 Runnable。java.util.concurrent.Executors
提供 java.util.concurrent.Executor
接口的实现来在 Java 中创建线程池。
首先我们需要一个 Runnable 类,名为 WorkerThread.java
package Thread;
public class WorkerThread implements Runnable {
private String command;
public WorkerThread(String s) {
this.command = s;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start. Command = " + command);
processCommand();
System.out.println(Thread.currentThread().getName() + " End.");
}
private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return this.command;
}
}
ExecutorService Example
这里是测试程序类 SimpleThreadPool.java,我们从Executors 框架创建固定线程池。
package Thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SimpleThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
Runnable worker = new WorkerThread("" + i);
executorService.execute(worker);
}
executorService.shutdown();
while (!executorService.isTerminated()) {
}
System.out.println("Finish all threads");
}
}
上述程序中,我们创建了 5 个工作线程的固定大小的线程池。然后我们向这个池提交 10 个 worker,因为池大小为 5,它将开始工作 5 个工作,其他工作将处于等待状态,只要一个工作完成,等待队列中的另一个工作将会被工作者线程拾起并执行。
pool-1-thread-2 Start. Command = 1 pool-1-thread-1 Start. Command = 0 pool-1-thread-4 Start. Command = 3 pool-1-thread-3 Start. Command = 2 pool-1-thread-5 Start. Command = 4 pool-1-thread-5 End. pool-1-thread-5 Start. Command = 5 pool-1-thread-2 End. pool-1-thread-2 Start. Command = 6 pool-1-thread-3 End. pool-1-thread-3 Start. Command = 7 pool-1-thread-4 End. pool-1-thread-4 Start. Command = 8 pool-1-thread-1 End. pool-1-thread-1 Start. Command = 9 pool-1-thread-2 End. pool-1-thread-5 End. pool-1-thread-3 End. pool-1-thread-1 End. pool-1-thread-4 End. Finish all threads
输出确认从“pool-1-thread-1”到“pool-1-thread-5”的池中有五个线程,它们负责将提交的任务执行到池中。
ThreadPoolExecutor Example
Executors 类使用 ThreadPoolExecutor 提供简单的 ExecutorService 实现,但是 ThreadPoolExecutor 提供了笔者更多的功能。我们可以指定创建 ThreadPoolExecutor 实例时将存活的线程数量,并且可以限制线程池的大小并创建自己的 RejectedExecutionHandler 实现来处理不适合工作队列的作业。
这是我们的 RejectedExecutionHandler 接口的自定义实现。
package Thread;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + " is rejected");
}
}
ThreadPoolExecutor 提供了几种方法,同故宫这些方法我们可以找出执行程序的当前状态,池大小,活动线程数和任务计数。所以我有一个监视器线程,它会在特定的时间间隔打印执行程序信息。
package Thread;
import java.util.concurrent.ThreadPoolExecutor;
public class MyMonitorThread implements Runnable {
private ThreadPoolExecutor executor;
private int seconds;
private boolean run = true;
public MyMonitorThread(ThreadPoolExecutor executor, int delay) {
this.executor = executor;
this.seconds = delay;
}
public void shutdown() {
this.run = false;
}
@Override
public void run() {
while (run) {
System.out.println(
String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, " +
"isTerminated: %s",
this.executor.getPoolSize(),
this.executor.getCorePoolSize(),
this.executor.getActiveCount(),
this.executor.getCompletedTaskCount(),
this.executor.getTaskCount(),
this.executor.isShutdown(),
this.executor.isTerminated()));
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
使用 ThreadPoolExecutor 的线程池实现
package Thread;
import java.util.concurrent.*;
public class WorkerPool {
public static void main(String args[]) throws InterruptedException {
//RejectedExecutionHandler implementation
RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
//Get the ThreadFactory implementation to use
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//creating the ThreadPoolExecutor
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new
ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);
//start the monitoring thread
MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);
Thread monitorThread = new Thread(monitor);
monitorThread.start();
//submit work to the thread pool
for (int i = 0; i < 10; i++) {
executorPool.execute(new WorkerThread("cmd" + i));
}
Thread.sleep(30000);
//shut down the pool
executorPool.shutdown();
//shut down the monitor thread
Thread.sleep(5000);
monitor.shutdown();
}
}
请注意,在初始化ThreadPoolExecutor时,我们将初始池大小保持为2,最大池大小为4,工作队列大小为2。因此,如果有4个正在运行的任务并提交了更多任务,则工作队列将只保留其中的2个,其余的部分将由RejectedExecutionHandlerImpl 处理。
输出:
[monitor] [0/2] Active: 0, Completed: 0, Task: 0, isShutdown: false, isTerminated: false
cmd6 is rejected
cmd7 is rejected
cmd8 is rejected
cmd9 is rejected
pool-1-thread-2 Start. Command = cmd1 pool-1-thread-1 Start. Command = cmd0 pool-1-thread-3 Start. Command = cmd4 pool-1-thread-4 Start. Command = cmd5 [monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false pool-1-thread-2 End. pool-1-thread-2 Start. Command = cmd2 pool-1-thread-1 End. pool-1-thread-3 End. pool-1-thread-1 Start. Command = cmd3 pool-1-thread-4 End. [monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false [monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false pool-1-thread-2 End. pool-1-thread-1 End. [monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true
注意执行程序的活动,已完成和总完成任务计数的变化。 我们可以调用 shutdown() 方法来完成所有提交的任务的执行并终止线程池。
还没有评论,来说两句吧...