Java线程池的使用
对于线程池的使用,应该将线程池置为单例模式,节省内存的开销,线程池线程的数量可以根据业务,内核,cup等进行评估。
常用的线程池是使用java并发包中的ThreadPoolExecutor对象。
其核心构造参数:
- 核心线程数量(corePoolSize)
- 最大线程数量(maxiNumPoolSize)
- 生存时间(keepAliveTime)
- 时间单位(TimeUnit)
- 队列大小(QueueSize)
- 可选参数:[线程工厂(ThreadFactory)、拒绝处理策略(RejectedExecutionHandler)]
注:如果不传最后两个参数,则使用默认参数
线程的开辟策略:
- 当池中正在运行的线程数(包括空闲线程)小于corePoolSize时,新建线程执行任务。
- 当池中正在运行的线程数大于等于corePoolSize时,新的任务进入任务队列等待。
- 当队列里的任务数达到上限,且线程池中正在运行的线程数小于maxiNumPoolSize,对于新加入的任务,新建线程
- 当队列里的任务数达到上限,并且池中正在运行的线程数等于maximumPoolSize,对于新加入的任务,执行拒绝策略
拒绝策略:
CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。
AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
这种策略直接抛出异常,丢弃任务。(默认策略)
DiscardPolicy:不能执行的任务将被删除
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。
- 自定义策略:实现RejectedExecutionHandler接口,自定义对拒绝任务的处理
可参考如下代码:
package com.lt;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolFactory {
private static ExecutorService executor;
private static ThreadPoolFactory threadPoolFactory;
/** * 核心线程数量 */
private static final int CORE_POOL_SIZE = 10;
/** * 最大线程数量 */
private static final int MAXI_NUM_POOLSIZE = 20;
/** * 线程存活时间 */
private static final long KEEP_ALIVE_TIME = 5L;
/** * 任务队列大小 */
private static final int QUEUE_SIZE = 15;
/** * 阻塞队列 */
private static BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(QUEUE_SIZE);
/** * 私有构造器 */
private ThreadPoolFactory(String threadName){
executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXI_NUM_POOLSIZE, KEEP_ALIVE_TIME, TimeUnit.MINUTES, workQueue, new MyThreadFactory(threadName), new MyRejectedExecutionHandler());
}
/** * 线程池构建工厂 */
public static ThreadPoolFactory getInstance(String threadName){
if (null == threadPoolFactory) {
synchronized (ThreadPoolFactory.class) {
if (null == threadPoolFactory) {
threadPoolFactory = new ThreadPoolFactory(threadName);
}
}
}
return threadPoolFactory;
}
public ExecutorService getThreadPoolExecutor(){
return executor;
}
/** * 内部类,线程工厂,用于创造线程池所需要的线程 */
class MyThreadFactory implements ThreadFactory {
private String threadName;
private final AtomicInteger threadNumber = new AtomicInteger(1);
public MyThreadFactory (String threadName) {
this.threadName = threadName;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, threadName + "-" + threadNumber.getAndIncrement());
thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("Exception: " + e.getMessage());
}
});
return thread;
}
}
/** * 内部类,拒绝任务后的请求处理 */
class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("ThreadPool over");
}
}
}
还没有评论,来说两句吧...