Java并发工具之线程池
1. 什么是线程池?
- 线程的创建和销毁是有一定的开销的,为了减少开销(内存和垃圾回收),我们通过创建线程池来执行大量的任务,避免反复创建并销毁线程所带来的问题,如果不用线程池,一旦任务数量过多,新建线程过多可能会OOM异常,即内存溢出。
好处:
- 限制线程资源的总量,复用每一个线程,同一管理资源
- 加快响应速度,合理利用CPU和内存
适用场景:
- 服务器接收大量请求(Tomcat本身就是用线程池来实现的)
- 开发中,如果需要创建5个以上的线程,就可以用线程池
2. 创建和停止线程池?
2.1. 线程池构造函数的参数
参数 | 含义 |
---|---|
corePoolSize | 核心线程数 |
maxPoolSize | 最大线程数 |
keepAliveTime | 保持存活时间 |
workQueue | 任务存储队列 |
threadFactory | 当线程池需要新的线程的时候,会使用threadFactory来生成新的线程 |
Handler | 由于线程池无法接受你所提交的任务的拒绝策略 |
- corePoolSize :线程完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务;
maxPoolSize:线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一个上线,就是最大线程数;
什么情况下扩展到maxPoolSize?
1. 如果线程数小于corePoolSize ,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务;
2. 如果线程数等于或大于corePoolSize 但是少于maxPoolSize,则将任务放入workQueue队列(指定队列的容量);
3. 如果队列已满,并且线程数小于maxPoolSize,则创建一个新线程来运行任务;
4. 如果队列已满,并在线程数大于maxPoolSize,则执行Handler拒绝策略;
总之,是否需要增加线程的判断顺序是:
corePoolSize -》workQueue-》maxPoolSize
增减线程的特点:
1. 通过corePoolSize 和maxPoolSize相同,就可以创建固定大小的线程池;
2. 线程池希望保持较少的线程数,并且只有在负载变得很大的时候才增加它;
3. 通过设置maxPoolSize为很高的值,可以允许线程池容纳任意数量的并发任务;
4. 只有在队列填满时才多于corePoolSize 的线程,所以如果你使用的是无界队列,那么线程数就不会超过corePoolSize ;
- keepAliveTime:如果线程池当前的线程数多于corePoolSize ,那么如果多余的线程空闲时间超过keepAliveTime,它们就会被终止,比如:corePoolSize 为5,当前线程数为10,如果线程空闲了,就会恢复到5;
- ThreadFactory:用来创建线程,默认是非守护线程,优先级是5,一共有10个等级;
workQueue:有3种最常见的队列类型:
1. 直接交接:SynchronousQueue,本身没有容量,maxpoolSize要设置大一些;
2. 无界队列:LinkedBlockingQueue,maxpoolSize设置多大都没有用,会一直缓存新增的任务,有可能造成OOM;
3. 有界队列:ArrayBlockingQueue,队列容量指定,满了之后,就创建新的线程;
2.2 线程池应该手动创建还是自动创建
Excutors类创建线程池的方法如下:
1. newFixedThreadPool:由于传进去的LinkedBlockingQueue是没有容量上限的,所以当请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成OOM。
2. newSingleThreadExecutor:和newFixedThreadPool的原理基本一样,都采用LinkedBlockingQueue无界队列,只不过把corePoolSize 和maxPoolSize都改成了1,请求堆积的时候也会占用大量的内存。
3. newCachedThreadPool:可缓存线程池,使用SynchronousQueue直接队列,corePoolSize 为0,maxPoolSize设置为Integer.Max_Value,线程可以无限创建,但是请求过多且无法及时处理完毕,也会导致OOM。
4. newScheduledThreadPool:支持定时以及周期性任务执行的线程池,采用DelayedWorkQueue延迟队列,可以指定延迟时间执行任务,还可以以一定时间间隔来运行。
正确创建线程池的方法是手动创建:根据不同的业务场景,自己设置线程池参数,比如内存大小,线程名字等等。
2.3 线程池里的线程数量设定为多少比较合适?
- CPU密集型:比如八核处理器,那可以把corePoolSize 设置为核心数的1-2倍;
- 耗时IO型(读写文件):这种类型的CPU是不工作的,可以设置为核心数的很多倍
- 线程数=CPU核心数*(1+平均等待时间/平均工作时间)
2.4 停止线程池的正确方法
shutdown,初始化整个关闭过程,不是直接关闭,把正在执行以及队列中等待的任务执行完毕再关闭,然后拒绝新的任务;
public class StopThreadPool {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 50; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println("执行已经提交的线程:" +Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
Thread.sleep(1000);
executorService.shutdown();
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("继续提交线程池");
}
});
}
}
执行已经提交的线程:pool-1-thread-1
执行已经提交的线程:pool-1-thread-2
执行已经提交的线程:pool-1-thread-7
执行已经提交的线程:pool-1-thread-8
执行已经提交的线程:pool-1-thread-3
执行已经提交的线程:pool-1-thread-4
执行已经提交的线程:pool-1-thread-5
执行已经提交的线程:pool-1-thread-6
Exception in thread “main” 执行已经提交的线程:pool-1-thread-1
执行已经提交的线程:pool-1-thread-9
执行已经提交的线程:pool-1-thread-10
执行已经提交的线程:pool-1-thread-2
执行已经提交的线程:pool-1-thread-7
执行已经提交的线程:pool-1-thread-8
java.util.concurrent.RejectedExecutionException: Task tools.threadpool.StopThreadPool$2@4b67cf4d rejected from java.util.concurrent.ThreadPoolExecutor@7ea987ac[Shutting down, pool size = 10, active threads = 10, queued tasks = 26, completed tasks = 14]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at tools.threadpool.StopThreadPool.main(StopThreadPool.java:24)
执行已经提交的线程:pool-1-thread-4
执行已经提交的线程:pool-1-thread-3
执行已经提交的线程:pool-1-thread-6
执行已经提交的线程:pool-1-thread-5
可以看出,shutdown后,再次提交是被拒绝的,但是已经提交的任务得先执行完毕。
- awaitTermination,检测一段时间后线程池是否完全终止,是用来检测的;
- isShutdown,判断是否被shutdown了;
- isTerminated,判断线程池是否完全终止;
shutdownNow,立刻停止线程池,如何立刻停止:利用线程InterruptedException中断抛出异常停止线程,同时该方法会返回一个集合,包含已经放进队列中还没来得急执行的线程对象;
public class StopThreadPool {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 50; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println("执行已经提交的线程:" +Thread.currentThread().getName());
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断了");
}
}
});
}
Thread.sleep(1000);
executorService.shutdownNow();
// System.out.println(executorService.shutdownNow());
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("继续提交线程池");
}
});
}
}
执行已经提交的线程:pool-1-thread-3
执行已经提交的线程:pool-1-thread-8
pool-1-thread-7被中断了
pool-1-thread-6被中断了
pool-1-thread-1被中断了
pool-1-thread-4被中断了
pool-1-thread-10被中断了
pool-1-thread-9被中断了
pool-1-thread-2被中断了
pool-1-thread-5被中断了
Exception in thread “main” java.util.concurrent.RejectedExecutionException: Task tools.threadpool.StopThreadPool$2@4b67cf4d rejected from java.util.concurrent.ThreadPoolExecutor@7ea987ac[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 20]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at tools.threadpool.StopThreadPool.main(StopThreadPool.java:25)
3. 常见线程池的特点和用法?
线程池创建方法 | corePoolSize | maxPoolSize | keepAliveTime | workQueue |
---|---|---|---|---|
newFixedThreadPool | 指定 | 等于maxPoolSize | 0s | LinkedBlockingQueue |
newSingleThreadExecutor | 1 | 1 | 0s | LinkedBlockingQueue |
newCachedThreadPool | 0 | Integer.MAX_VALUE | 60s | SynchronousQueue |
newScheduledThreadPool | 指定 | Integer.MAX_VALUE | 0s | DelayedWorkQueue |
4. 任务太多,怎么拒绝?
- 一旦线程池shutDown,后续的任务就被拒绝了;
- 当Executor对最大线程maxPoolSize 和工作队列容量使用ArrayBlockingQueue有限边界并饱和时候,就直接拒绝了;
4种拒绝策略:
1. AbortPolicy,抛出异常;
2. DiscardPolicy,默默抛弃,不抛出异常;
3. DiscardOldestPolicy,丢弃最老的,存在时间最久的任务给丢掉;
4. CallerRunsPolicy,如果线程池没办法处理,谁提交的任务谁去跑,比如主线程提交的任务,那么线程池饱和的情况下,就交还给主线程执行这个任务;
5. 钩子方法
每个任务执行前后,做一些事情,比如日志、统计等;
public class PauseableThreadPool extends ThreadPoolExecutor {
private final Lock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition();
private boolean isPaused;
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
private void resume() {
lock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("执行中...;");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
//暂停
pauseableThreadPool.pause();
System.out.println("线程池被暂停了");
Thread.sleep(1500);
//恢复
System.out.println("线程池被恢复了");
pauseableThreadPool.resume();
}
}
6. Executor家族辨析
- Executor:接口,ExecutorService继承于Executor ;
- Executors:工具类,帮我们自动创建线程池,类似于Collections;
- ThreadPoolExecutor:线程池类,ThreadPoolExecutor继承于ExecutorService;
- 所以,Executor、ExecutorService、ThreadPoolExecutor都是线程池类型,只不过是继承关系;
7. 线程池实现任务复用的原理
相同线程执行不同任务,调用新的任务的run方法;
final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
task.run();
}
}
在runWork中,拿到一个又一个的task,while循环中判断如果不为空,则执行完毕task的run方法,如果task为空,即执行完毕了一个任务,则用getTask方法去取下一个任务,直到没有任务了。
8. 线程池的5种状态
- RUNNING:接收新任务并处理排队任务,即执行execute后的状态;
- SHUTDOWN:不接受新任务,但是处理排队任务,即执行shutdown方法后的状态;
- STOP:不接受新任务,也不处理派对任务,并中断正在进行的任务,即执行shutdownNow方法后的状态;
- TIDYING:整洁,所有任务都已经终止,线程会转到TIDYING状态,并将运行terminate钩子方法;
- TERMINATED:terminate方法完成;
9. 使用线程池的注意点
- 避免任务堆积
- 避免线程数过度增加
- 排查线程泄漏
还没有评论,来说两句吧...