Fork me on GitHub

Java并发(2)-线程池

此处输入图片的描述
本文介绍Java中线程池的相关概念和部分源码解析。

一、为什么需要线程池?

因为服务器需要接受并处理请求,通常会给每一个请求分配一个线程来处理,假设每次请求都创建一个线程,如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。

线程池具有以下优点:

  • 线程复用,降低资源消耗:重用线程池中的线程,减少因对象创建,销毁所带来的性能开销;
  • 控制并发数量,提高响应速度:能有效的控制线程的最大并发数,提高系统资源利用率,同时避免过多的资源竞争,避免堵塞;
  • 提高线程的可管理性:能够对线程进行简单的管理并提供定时执行、间隔执行、单线程、并发数控制等功能,使线程的使用简单、高效。

二、线程池原理

线程池的特点

(1)线程复用:实现线程复用的原理应该就是要保持线程处于存活状态(就绪,运行或阻塞)

(2)控制并发数量:(核心线程和最大线程数控制)

(3)管理线程(设置线程的状态)

当往线程池中添加任务时

(1)线程数量未达到corePoolSize,则新建一个线程(核心线程)执行任务;

(2)线程数量达到了corePoolSize,则将任务移入队列等待空闲线程将其取出去执行(通过getTask()方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源,整个getTask操作在自旋下完成)

(4)队列已满,新建线程(非核心线程)执行任务

(5)队列已满,总线程数又达到了maximumPoolSize,就会执行任务拒绝策略。

线程池状态:一共五种

RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;

SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用shutdown()方法会使线程池进入到该状态。(finalize()方法在执行过程中也会调用shutdown()方法进入该状态);

STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;

TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。

TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
进入TERMINATED的条件如下:

  • 线程池不是RUNNING状态;
  • 线程池状态不是TIDYING状态或TERMINATED状态;
  • 如果线程池状态是SHUTDOWN并且workerQueue为空;
  • workerCount为0;
  • 设置TIDYING状态成功。

三、Executor框架接口

此处输入图片的描述

JUC(Java.util.concurrent)中一共有三个Executor接口:

Executor:一个运行新任务的简单接口;

ExecutorService:扩展了Executor接口。添加了一些用来管理执行器生命周期和任务生命周期的方法;

ScheduledExecutorService:扩展了ExecutorService。支持Future和定期执行任务。

Executor接口

1
2
3
public interface Executor {
void execute(Runnable command);
}

用execute方法替代创建或者启动线程的方法。

1
2
Thread t = new Thread();
executor.execute(t);

ExecutorService接口

ExecutorService接口继承自Executor接口,提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future的方法。增加了shutDown()shutDownNow()invokeAll()invokeAny()submit()等方法。如果需要支持即时关闭,也就是shutDownNow()方法,则任务需要正确处理中断。

ScheduledExecutorService接口

ScheduledExecutorService扩展ExecutorService接口并增加了schedule方法。调用schedule方法可以在指定的延时后执行一个Runnable或者Callable任务。ScheduledExecutorService接口还定义了按照指定时间间隔定期执行任务的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。

四、ThreadPoolExecutor类

ThreadPoolExecutor继承自AbstractExecutorService,实现了ExecutorService接口。

重要字段

1
2
3
4
5
6
7
8
9
10
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

ctl是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),使用了Integer类型来保存,高3位保存runState,低29位保存workerCountCOUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。

ThreadPoolExecutor构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

构造方法的各个字段意义如下:

corePoolSize:核心线程数(最小存活的工作线程数量)

maxPoolSize:最大线程数

keepAliveTime:线程存活时间(在corePoreSize<maxPoolSize情况下有用,线程的空闲时间超过了keepAliveTime就会销毁)

timeUnit:存活时间的时间单位

workQueue:阻塞队列,用来保存等待被执行的任务(①synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务;②LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;③ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小)

threadFactory:线程工厂,主要用来创建线程;

handler:表示当拒绝处理任务时的策略(线程池提供了4种策略,都实现了RejectedExecutionHandler接口:
AbortPolicy:直接抛出异常,这是默认策略;
CallerRunsPolicy:用调用者所在的线程来执行任务;
DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
DiscardPolicy:直接丢弃任务;)

重要的方法

execute()
Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
适合不关注返回值的场景。

1
2
3
4
5
6
7
8
public class ThreadPool {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "执行");
});
}
}

submit()
在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,实际上它还是调用的execute()方法,只不过它利用了Future来获取任务执行结果。
适用于需要关注返回值的场景.

1
2
3
4
5
6
7
public interface ExecutorService extends Executor {
  ...
  <T> Future<T> submit(Callable<T> task);
  <T> Future<T> submit(Runnable task, T result);
  Future<?> submit(Runnable task);
  ...
}

shutdown()
不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务。

shutdownNow()
立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。

判断线程池线程是否执行完

isTerminated()
调用ExecutorService.shutdown()方法的时候,线程池不再接收任何新任务,但此时线程池并不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。在调用shutdown方法后我们可以在一个死循环里面用isTerminated()方法判断是否线程池中的所有线程已经执行完毕,如果子线程都结束了,我们就可以做关闭流等后续操作了。

使用闭锁(CountDownLatch)
CountDownLatch是一种灵活的闭锁实现,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到零,即表示需要等待的事情都已经发生。

使用信号量机制

五、常见的四种线程池的用法

Java通过Executors类提供四种线程池

1. CachedThreadPool()-可缓存线程池

newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

1
2
3
4
5
6
7
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

这种线程池内部没有核心线程,线程的数量是有没限制的。

在创建任务时,若有空闲的线程时则复用空闲的线程,若没有则新建线程。没有工作的线程(闲置状态)在超过了60S还不做事,就会销毁。

适用:执行很多短期异步的小程序或者负载较轻的服务器。

2. FixedThreadPool()-定长线程池

newFixedThreadPool创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

1
2
3
4
5
6
7
8
public static ExecutorService newFixedThreadPool(int nThreads) {     	
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

该线程池的最大线程数等于核心线程数,所以在默认情况下,该线程池的线程不会因为闲置状态超时而被销毁。

如果当前线程数小于核心线程数,并且也有闲置线程的时候提交了任务,这时也不会去复用之前的闲置线程,会创建新的线程去执行任务。如果当前执行任务数大于了核心线程数,大于的部分就会进入队列等待。等着有闲置的线程来执行这个任务。

适用:执行长期的任务,性能好很多。

3. SingleThreadPool()-单线程池

newSingleThreadExecutor创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

1
2
3
4
5
6
7
8
9
public static ExecutorService newSingleThreadExecutor() {       
return new FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

有且仅有一个工作线程执行任务

所有任务按照指定顺序执行,即遵循队列的入队出队规则。

适用:一个任务一个任务执行的场景。

4. ScheduledThreadPool()-调度线程池

newScheduledThreadPool创建一个定长线程池,支持定时及周期性任务执行。

1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {     	       
return new ScheduledThreadPoolExecutor(corePoolSize);
}

其中ScheduledThreadPoolExecutor()的构造方法为:

1
2
3
4
5
6
7
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize,
Integer.MAX_VALUE,
0,
NANOSECONDS,
new DelayedWorkQueue());
}

同样是用到了父类ThreadPoolExecutor的构造方法。

1
2
3
4
5
6
7
8
9
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
Executors.defaultThreadFactory(),
defaultHandler);
}

DEFAULT_KEEPALIVE_MILLIS就是默认10L,这里就是10秒。这个线程池有点像是CachedThreadPoolFixedThreadPool 结合了一下。

不仅设置了核心线程数,最大线程数也是Integer.MAX_VALUE。

这个线程池是上述4个中唯一一个有延迟执行和周期执行任务的线程池。

适用:周期性执行任务的场景(定期的同步数据)


参考链接
深入理解 Java 线程池:ThreadPoolExecutor

-------------本文结束感谢阅读-------------