线程池

线程池

引用:

1. 线程池优点

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  • 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

如何创建线程池?通过ThreadPoolExecutor构造函数来创建。

通过构造方法实现

2. 线程池的重要参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
*/
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
int maximumPoolSize,//线程池的最大线程数
long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();

this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 任务队列未达到队列容量时,最大可以同时运行的线程数量。
  • maximumPoolSize : 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数 :

  • keepAliveTime:当线程池中的线程数量大于 corePoolSize ,即有非核心线程(线程池中核心线程以外的线程)时,这些非核心线程空闲后不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁。
  • unit : keepAliveTime 参数的时间单位。
  • threadFactory :executor 创建新线程的时候会用到。
  • handler :拒绝策略

3. 线程池处理任务的流程

图解线程池实现原理

线程池在提交任务前,可以提前创建线程吗?

答案是可以的!ThreadPoolExecutor 提供了两个方法帮助我们在提交任务之前,完成核心线程的创建,从而实现线程池预热的效果:

  • prestartCoreThread():启动一个线程,等待任务,如果已达到核心线程数,这个方法返回 false,否则返回 true;
  • prestartAllCoreThreads():启动所有的核心线程,并返回启动成功的核心线程数。

4. 线程池的拒绝策略

  • AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理
  • DiscardPolicy:不处理新任务,直接丢弃掉
  • DiscardOldestPolicy:此策略将丢弃最早的未处理的任务请求
  • CallerRunsPolicy:调用执行者自己的线程运行任务(比如说main线程调用了execute,但线程池内线程数量不够,就让main线程自己去执行任务,有可能会导致main线程阻塞,影响正常程序执行)

5. 线程池有哪几种阻塞队列?

线程池常用阻塞队列

6. 线程池提交execute()和submit()有什么区别?

execute() 方法没有返回值,适用于不关心结果和异常的简单任务。

1
2
3
4
5
threadsPool.execute(new Runnable() {
@Override public void run() {
System.out.println("execute() 方法提交的任务");
}
});

submit() 有返回值,适用于需要获取结果或处理异常的场景。

1
2
3
4
5
6
7
Future<Object> future = executor.submit(harReturnValuetask);
try { Object s = future.get(); }
catch (InterruptedException e | ExecutionException e) {
// 处理无法执行任务异常
} finally {
// 关闭线程池 executor.shutdown();
}

7. 线程池怎么关闭?

可以调用线程池的shutdownshutdownNow方法来关闭线程池。

shutdown 不会立即停止线程池,而是等待所有任务执行完毕后再关闭线程池

shutdownNow 会尝试通过一系列动作来停止线程池,包括停止接收外部提交的任务、忽略队列里等待的任务、尝试将正在跑的任务 interrupt 中断。

需要注意的是,shutdownNow 不会真正终止正在运行的任务,只是给任务线程发送 interrupt 信号,任务是否能真正终止取决于线程是否响应 InterruptedException

8. 线程池有几种状态?

有 5 种状态,它们的转换遵循严格的状态流转规则,不同状态控制着线程池的任务调度和关闭行为。

状态由 RUNNING → SHUTDOWN → STOP → TIDYING → TERMINATED 依次流转。

线程池状态切换图

  • RUNNING 状态的线程池可以接收新任务,并处理阻塞队列中的任务;
  • SHUTDOWN 状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
  • STOP 状态的线程池不会接收新任务,也不会处理阻塞队列中的任务,并且会尝试中断正在执行的任务;
  • TIDYING 状态表示所有任务已经终止;
  • TERMINATED 状态表示线程池完全关闭,所有线程销毁。
状态 状态码 是否接收新任务 是否执行队列中的任务 是否中断正在执行的任务
RUNNING 111 ✅ 是 ✅ 是 ❌ 否
SHUTDOWN 000 ❌ 否 ✅ 是 ❌ 否
STOP 001 ❌ 否 ❌ 否 ✅ 是
TIDYING 010 ❌ 否 ❌ 否 ❌ 否
TERMINATED 011 ❌ 否 ❌ 否 ❌ 否

状态码是线程池通过一个int类型变量来表示的,int类型变量4bit,32位,它的高3位用来表示线程池状态,而剩下的29位则用来表示线程数量,所以理论上一个线程池的最大线程数是:2^29 - 1

9. 线程池的线程数应该如何配置?

  • 对于 CPU 密集型任务,我的目标是尽量减少线程上下文切换,以优化 CPU 使用率。一般来说,核心线程数设置为处理器的核心数 或 核心数 + 1是较理想的选择。

    +1 是为了以备不时之需,如果某线程因等待系统资源而阻塞时,可以有多余的线程顶上去,不至于影响整体性能

  • 对于 IO 密集型任务,由于线程经常处于等待状态,等待 IO 操作完成,所以可以设置更多的线程来提高并发,比如说 CPU 核心数的两倍

10. 手写一个线程池

核心线程池类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package MyThreadPool;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class MyThreadPoolExecutor extends ThreadPoolExecutor{

private final int corePoolSize;
private final int maxPoolSize;
private final long keepAliveTime;
private final TimeUnit timeUnit;
private final BlockingQueue<Runnable> workQueue;
private final RejectedExecutionHandler handler;

private volatile boolean isShutdown;
private int currentPoolSize;


public MyThreadPoolExecutor(int corePoolSize,
int maxPoolSize,
long keepAliveTime,
TimeUnit timeUnit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
super(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue, handler);
this.corePoolSize = corePoolSize;
this.maxPoolSize = maxPoolSize;
this.keepAliveTime = keepAliveTime;
this.timeUnit = timeUnit;
this.workQueue = workQueue;
this.handler = handler;
}


public void execute(Runnable task) {
if(isShutdown){
throw new IllegalStateException("ThreadPool is closed");
}

if(currentPoolSize < corePoolSize){
new Worker(task).start();
currentPoolSize++;
return;
}

if(!workQueue.offer(task)){
if(currentPoolSize < maxPoolSize){
new Worker(task).start();
currentPoolSize++;
}else{
handler.rejectedExecution(task, this);
}
}
}

public void shutdown() {
isShutdown = true;
}

// 工作线程
private class Worker extends Thread{

private Runnable task;

public Worker(Runnable task){
this.task = task;
}

@Override
public void run() {
while(task != null || (task = getTask()) != null){
try {
task.run();
} finally {
task = null;
}
}
}

private Runnable getTask(){
try {
return workQueue.poll(keepAliveTime, timeUnit);
} catch (InterruptedException e) {
return null;
}
}
}
}

拒绝策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package MyThreadPool;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class MyRejectedExecutionHandler {
// AbortPolicy 抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RuntimeException("Task " + r.toString() + " rejected from " + e.toString());
}
}

// DiscardPolicy 什么都不做
public static class DiscardPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// Do nothing
}
}

// CallerRunsPolicy 让调用执行方法的线程去执行,这里是main线程
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
}

测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package MyThreadPool;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class MyThreadPoolTest {

public static void main(String[] args) {
MyThreadPoolExecutor myThreadPool = new MyThreadPoolExecutor(
2, 4, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2), new MyRejectedExecutionHandler.CallerRunsPolicy()
);
for (int i = 0; i < 10; i++) {
final int index = i;
myThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " Task " + index + " is running");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}

myThreadPool.shutdown();
}
}

输出结果

image-20250523144408614


线程池
http://example.com/2025/05/23/线程池/
作者
Kon4tsu
发布于
2025年5月23日
许可协议