前言
线程池:本质上是一种对象池,用于管理线程资源。在任务执行前,需要从线程池中拿出线程来执行。在任务执行完成之后,需要把线程放回线程池。通过线程的这种反复利用机制,可以有效地避免直接创建线程所带来的坏处。
优点:1、降低资源的消耗。线程本身是一种资源,创建和销毁线程会有CPU开销;创建的线程也会占用一定的内存;2、提高任务执行的响应速度。任务执行时,可以不必等到线程创建完之后再执行;3、提高线程的可管理性。线程不能无限制地创建,需要进行统一的分配、调优和监控。
缺点:1、频繁的线程创建和销毁会占用更多的CPU和内存;2、频繁的线程创建和销毁会对GC产生比较大的压力;3、线程太多,线程切换带来的开销将不可忽视;4、线程太少,多核CPU得不到充分利用,是一种浪费。
流程
- 判断核心线程池是否已满,如果不是,则创建线程执行任务;
- 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中;
- 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务;
- 如果线程池也满了,则按照拒绝策略对任务进行处理。
方式
入门级例子
package cn.com.codingce.juc;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = ; i < 10; i++) {
executor.submit(() -> {
System.out.println("Thread id is " + Thread.currentThread().getId());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
在这个例子中,首先创建了一个固定长度为5的线程池。然后使用循环的方式往线程池中提交了10个任务,每个任务休眠1秒。在任务休眠之前,将任务所在的线程id进行打印输出。
Thread id is 11
Thread id is 13
Thread id is 12
Thread id is 15
Thread id is 14
Thread id is 11
Thread id is 13
Thread id is 15
Thread id is 14
Thread id is 12
Executors
Executors
是一个线程池工厂,提供了很多的工厂方法。
// 创建单一线程的线程池
public static ExecutorService newSingleThreadExecutor();
// 创建固定数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads);
// 创建带缓存的线程池
public static ExecutorService newCachedThreadPool();
// 创建定时调度的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
// 创建流式(fork-join)线程池
public static ExecutorService newWorkStealingPool();
newSingleThreadExecutor
创建一个单线程的线程池,若多个任务被提交到此线程池,那么会被缓存到队列(队列长度为Integer.MAX_VALUE
),可保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
private static void createSingleThreadPool() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = ; i < 10; i++) {
final int index = i;
executorService.execute(() -> {
// 获取线程名称,默认格式:pool-1-thread-1
System.out.println(new Date() + " " + Thread.currentThread().getName() + " " + index);
// 等待2秒
try {
sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
output
Mon Jan 02 11:49:58 CST 2023 pool-1-thread-1 0
Mon Jan 02 11:50:00 CST 2023 pool-1-thread-1 1
Mon Jan 02 11:50:02 CST 2023 pool-1-thread-1 2
Mon Jan 02 11:50:04 CST 2023 pool-1-thread-1 3
Mon Jan 02 11:50:06 CST 2023 pool-1-thread-1 4
Mon Jan 02 11:50:08 CST 2023 pool-1-thread-1 5
Mon Jan 02 11:50:10 CST 2023 pool-1-thread-1 6
Mon Jan 02 11:50:12 CST 2023 pool-1-thread-1 7
Mon Jan 02 11:50:14 CST 2023 pool-1-thread-1 8
Mon Jan 02 11:50:16 CST 2023 pool-1-thread-1 9
因为只有一个线程,所以线程名均相同,且是每隔2秒按顺序输出的。
newFixedThreadPool
创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待。和创建单一线程的线程池
类似,只是可以并行处理任务的线程数更多一些。若多个任务被提交到此线程池,会有下面的处理过程。
- 如果线程的数量未达到指定数量,则创建线程来执行任务;
- 如果线程池的数量达到了指定数量,并且有线程是空闲的,则取出空闲线程执行任务;
- 如果没有线程是空闲的,则将任务缓存到队列(队列长度为
Integer.MAX_VALUE
)。当线程空闲的时候,按照FIFO的方式进行处理
private static void createFixedThreadPool() {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = ; i < 10; i++) {
final int index = i;
executorService.execute(() -> {
// 获取线程名称,默认格式:pool-1-thread-1
System.out.println(new Date() + " " + Thread.currentThread().getName() + " " + index);
// 等待2秒
try {
sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
output
Mon Jan 02 11:49:10 CST 2023 pool-1-thread-2 1
Mon Jan 02 11:49:10 CST 2023 pool-1-thread-1 0
Mon Jan 02 11:49:10 CST 2023 pool-1-thread-3 2
Mon Jan 02 11:49:12 CST 2023 pool-1-thread-2 3
Mon Jan 02 11:49:12 CST 2023 pool-1-thread-3 5
Mon Jan 02 11:49:12 CST 2023 pool-1-thread-1 4
Mon Jan 02 11:49:14 CST 2023 pool-1-thread-1 6
Mon Jan 02 11:49:14 CST 2023 pool-1-thread-2 7
Mon Jan 02 11:49:14 CST 2023 pool-1-thread-3 8
Mon Jan 02 11:49:16 CST 2023 pool-1-thread-2 9
因为线程池大小是固定的,这里设置的是3个线程,所以线程名只有3个。因为线程不足会进入队列等待线程空闲,所以日志间隔2秒输出。
newCachedThreadPool
创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程。这种方式创建的线程池,核心线程池的长度为0,线程池大长度为Integer.MAX_VALUE
。由于本身使用SynchronousQueue
作为等待队列的缘故,导致往队列里面每插入一个元素,必须等待另一个线程从这个队列删除一个元素。
private static void createCachedThreadPool() {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = ; i < 10; i++) {
final int index = i;
executorService.execute(() -> {
// 获取线程名称,默认格式:pool-1-thread-1
System.out.println(new Date() + " " + Thread.currentThread().getName() + " " + index);
// 等待2秒
try {
sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
output
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-8 7
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-3 2
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-1 0
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-5 4
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-9 8
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-6 5
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-2 1
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-4 3
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-7 6
Mon Jan 02 11:56:03 CST 2023 pool-1-thread-10 9
因为初始线程池没有线程,而线程不足会不断新建线程,所以线程名都是不一样的。
newScheduledThreadPool
创建一个周期性的线程池,支持定时及周期性执行任务。
private static void createScheduledThreadPool() {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
System.out.println(new Date() + " 提交任务");
for (int i = ; i < 10; i++) {
final int index = i;
executorService.schedule(() -> {
// 获取线程名称,默认格式:pool-1-thread-1
System.out.println(new Date() + " " + Thread.currentThread().getName() + " " + index);
// 等待2秒
try {
sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 3, TimeUnit.SECONDS);
}
}
Output
Mon Jan 02 11:59:19 CST 2023 提交任务
Mon Jan 02 11:59:22 CST 2023 pool-1-thread-1 0
Mon Jan 02 11:59:22 CST 2023 pool-1-thread-2 1
Mon Jan 02 11:59:22 CST 2023 pool-1-thread-3 2
Mon Jan 02 11:59:24 CST 2023 pool-1-thread-2 4
Mon Jan 02 11:59:24 CST 2023 pool-1-thread-1 5
Mon Jan 02 11:59:24 CST 2023 pool-1-thread-3 3
Mon Jan 02 11:59:26 CST 2023 pool-1-thread-2 6
Mon Jan 02 11:59:26 CST 2023 pool-1-thread-1 8
Mon Jan 02 11:59:26 CST 2023 pool-1-thread-3 7
Mon Jan 02 11:59:28 CST 2023 pool-1-thread-3 9
因为设置了延迟3秒,所以提交后3秒才开始执行任务。因为这里设置核心线程数为3个,而线程不足会进入队列等待线程空闲,所以日志间隔2秒输出。
newWorkStealingPool(jdk1.8新增)
创建一个含有足够多线程的线程池,来维持相应的并行级别,它会通过工作窃取的方式,使得多核的 CPU 不会闲置,总会有活着的线程让 CPU 去运行。
工作窃取概念(Work stealing):工作窃取不是什么 Java 独有的东西,.NET 的 TPL 库早就存在好几年了。所谓工作窃取,指的是闲置的线程去处理本不属于它的任务。每个处理器核,都有一个队列存储着需要完成的任务。对于多核的机器来说,当一个核对应的任务处理完毕后,就可以去帮助其他的核处理任务。
private static void createNewWorkStealingPool() {
ExecutorService forkJoin = Executors.newWorkStealingPool();
forkJoin.execute(() -> {
System.out.println("i====>" + 1 + " " + Thread.currentThread().getId());
});
forkJoin.execute(() -> {
System.out.println("i====>" + 2 + " " + Thread.currentThread().getId());
});
forkJoin.execute(() -> {
System.out.println("i====>" + 3 + " " + Thread.currentThread().getId());
});
forkJoin.execute(() -> {
System.out.println("i====>" + 4 + " " + Thread.currentThread().getId());
});
forkJoin.execute(() -> {
System.out.println("i====>" + 5 + " " + Thread.currentThread().getId());
});
}
output
i====>1 11
i====>2 11
i====>3 12
i====>4 12
i====>5 12
ThreadPoolExecutor
理论上,可以通过Executors
来创建线程池,这种方式非常简单。但正是因为简单,所以限制了线程池的功能。比如:无长度限制的队列,可能因为任务堆积导致OOM,这是非常严重的bug,应尽可能地避免。怎么避免?归根结底,还是需要通过更底层的方式来创建线程池。
ThreadPoolExecutor
提供了好几个构造方法,但是底层的构造方法却只有一个。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}
这个构造方法有7个参数,逐一来进行分析。
corePoolSize
,线程池中的核心线程数;maximumPoolSize
,线程池中的大线程数;keepAliveTime
,空闲时间,当线程池数量超过核心线程数时,多余的空闲线程存活的时间,即:这些线程多久被销毁;unit
,空闲时间的单位,可以是毫秒、秒、分钟、小时和天,等等;workQueue
,等待队列,线程池中的线程数超过核心线程数时,任务将放在等待队列,它是一个BlockingQueue
类型的对象;ArrayBlockingQueue
,队列是有界的,基于数组实现的阻塞队列;LinkedBlockingQueue
,队列可以有界,也可以无界。基于链表实现的阻塞队列;SynchronousQueue
,不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作将一直处于阻塞状态。该队列也是Executors.newCachedThreadPool()
的默认队列;PriorityBlockingQueue
,带优先级的无界阻塞队列。
threadFactory
,线程工厂,可以使用它来创建一个线程;Executors
的实现使用了默认的线程工厂-DefaultThreadFactory
。它的实现主要用于创建一个线程,线程的名字为pool-{poolNum}-thread-{threadNum}
。handler
,拒绝策略,当线程池和等待队列都满了之后,需要通过该对象的回调函数进行回调处理。- AbortPolicy:丢弃任务,抛运行时RejectedExecutionException异常;
- CallerRunsPolicy:在调用者线程执行任务;
- DiscardPolicy:忽视,任务直接丢弃,什么都不会发生;
- DiscardOldestPolicy:从队列中踢出先进入队列(后一个执行)的任务(旧的那个任务),再尝试执行当前任务。
线程池的执行规则如下:1、当线程数小于核心线程数时,创建线程;2、当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列;3、当线程数大于等于核心线程数,且任务队列已满。若线程数小于大线程数,创建线程。若线程数等于大线程数,抛出异常,拒绝任务。
private static void createThreadPool() {
ExecutorService executorService = new ThreadPoolExecutor(2, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<>(5, true), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
for (int i = ; i < 10; i++) {
final int index = i;
executorService.execute(() -> {
// 获取线程名称,默认格式:pool-1-thread-1
System.out.println(new Date() + " " + Thread.currentThread().getName() + " " + index);
// 等待2秒
try {
sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
output
Mon Jan 02 12:34:11 CST 2023 pool-1-thread-1 0
Mon Jan 02 12:34:11 CST 2023 pool-1-thread-2 1
Mon Jan 02 12:34:11 CST 2023 pool-1-thread-3 7
Mon Jan 02 12:34:11 CST 2023 pool-1-thread-5 9
Mon Jan 02 12:34:11 CST 2023 pool-1-thread-4 8
Mon Jan 02 12:34:13 CST 2023 pool-1-thread-4 2
Mon Jan 02 12:34:13 CST 2023 pool-1-thread-1 3
Mon Jan 02 12:34:13 CST 2023 pool-1-thread-2 4
Mon Jan 02 12:34:13 CST 2023 pool-1-thread-5 5
Mon Jan 02 12:34:13 CST 2023 pool-1-thread-3 6
因为核心线程数为2,队列大小为5,存活时间1分钟,所以流程是第0-1号任务来时,陆续创建2个线程,然后第2-6号任务来时,因为无线程可用,均进入了队列等待,第7-9号任务来时,没有空闲线程,队列也满了,所以陆续又创建了3个线程。所以你会发现7-9号任务反而是先执行的。又因为各任务只需要2秒,而线程存活时间有1分钟,所以线程进行了复用,所以总共只创建了5个线程。
如何正确配置线程池的参数:CPU密集型:corePoolSize = CPU核数 + 1;IO密集型:corePoolSize = CPU核数 * 2。
提交任务的几种方式:往线程池中提交任务,主要有两种方法,execute()
和submit()
。submit()
用于提交一个需要返回果的任务。该方法返回一个Future
对象,通过调用这个对象的get()
方法,就能获得返回结果。get()
方法会一直阻塞,直到返回结果返回。另外,也可以使用它的重载方法get(long timeout, TimeUnit unit)
,这个方法也会阻塞,但是在超时时间内仍然没有返回结果时,将抛出异常TimeoutException
。
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Long> future = executor.submit(() -> {
System.out.println("task is executed");
return System.currentTimeMillis();
});
System.out.println("task execute time is: " + future.get());
}
output
task is executed
task execute time is: 1672634764296
线程池监控:
- ThreadPoolExecutor自带:
long getTaskCount()
:获取已经执行或正在执行的任务数;long getCompletedTaskCount()
:获取已经执行的任务数;int getLargestPoolSize()
:获取线程池曾经创建过的大线程数,根据这个参数,可以知道线程池是否满过;int getPoolSize()
:获取线程池线程数;int getActiveCount()
:获取活跃线程数(正在执行任务的线程数)。
- ThreadPoolExecutor自定义处理:
protected void beforeExecute(Thread t, Runnable r)
:任务执行前被调用;protected void afterExecute(Runnable r, Throwable t)
:任务执行后被调用;protected void terminated()
:线程池结束后被调用。
关闭线程池:1、shutdown()
会将线程池状态置为SHUTDOWN
,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束;2、shutdownNow()
会将线程池状态置为SHUTDOWN
,对所有线程执行interrupt()
操作,清空队列,并将队列中的任务返回回来。关闭线程池涉及到两个返回boolean的方法,isShutdown()
和isTerminated
,分别表示是否关闭和是否终止。
注意
- 尽量使用手动的方式创建线程池,避免使用
Executors
工厂类; - 根据场景,合理设置线程池的各个参数,包括线程池数量、队列、线程工厂和拒绝策略;
- 在调线程池
submit()
方法的时候,一定要尽量避免任务执行异常被吞掉的问题。