Java 线程池

线程池概念

线程池,其实就是一个容纳多个线程的容器,其中的线程可以反复使用,省去了频繁创建线程对象的操作,无需反复创建线程而消耗过多资源。
在实际使用中,创建和销毁线程花费的时间和消耗的系统资源都相当大,甚至可能要比在处理实际的用户请求的时间和资源要多的多。除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。如果在一个 jvm 里创建太多的线程,可能会使系统由于过度消耗内存或“切换过度”而导致系统资源不足。为了防止资源不足,需要采取一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁线程的次数,特别是一些资源耗费比较大的线程的创建和销毁,尽量利用已有对象来进行服务。线程池主要用来解决线程生命周期开销问题和资源不足问题。通过对多个任务重复使用线程,线程创建的开销就被分摊到了多个任务上了,而且由于在请求到达时线程已经存在,所以消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使用应用程序响应更快。另外,通过适当的调整线程中的线程数目可以防止出现资源不足的情况。

Java 线程池的设计简介

Java 中的线程池核心实现类是 ThreadPoolExecutor;ThreadPoolExecutor 的 UML 类图如下:





ThreadPoolExecutor 的 UML 类图


1. ThreadPoolExecutor 实现的顶层接口是 Executor,顶层接口 Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由 Executor 框架完成线程的调配和任务的执行部分。
2. ExecutorService 接口增加了一些能力:
* 扩充执行任务的能力,补充可以为一个或一批异步任务生成 Future 的方法;
* 提供了管控线程池的方法,比如停止线程池的运行。
3. AbstractExecutorService 则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。
4. 最下层的实现类 ThreadPoolExecutor 实现最复杂的运行部分,ThreadPoolExecutor 将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。
线程池的运行主要分成两部分:任务管理、线程管理。
* 任务管理部分充当生产者的角色,当任务提交后,线程池会判断该任务后续的流转:
1. 直接申请线程执行该任务;
2. 缓冲到队列中等待线程执行;
3. 拒绝该任务。
* 线程管理部分是消费者,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

线程池的使用

创建线程池

通过 ThreadPoolExecutor 来创建一个线程池:

new  ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);

参数详解

  • corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的 prestartAllCoreThreads 方法,线程池会提前创建并启动所有基本线程。
  • runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。
    | 名称 | 描述 |
    | --------------------- | ------------------------------------------------------------ |
    | ArrayBlockingQueue | 一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁。 |
    | LinkedBlockingQueue | 一个由链表结构组成的有界队列,此队列按照先进先出(FIFO)的原则对元素进行排序。此队列的默认长度为 Integer.MAX_VALUE,所以默认创建的该队列有容量危险。 |
    | PriorityBlockingQueue | 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现 compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序。 |
    | DelayQueue | 一个实现 PriorityBlockingQueue 实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。 |
    | SynchronousQueue | 一个不存储元素的阻塞队列,每一个 put 操作必须等待 take 操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue 的一个使用场景是在线程池里:Executors.newCachedThreadPool()就使用了 SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。 |
    | LinkedTransferQueue | 一个由链表结构组成的无界阻塞队列,相当于其它队列,LinkedTransferQueue 队列多了 transfer 和 tryTransfer 方法。 |
    | LinkedBlockingDeque | 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半。 |
  • maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
  • ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
  • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常。其他可选策略有:(当然也可以根据应用场景需要来实现 RejectedExecutionHandler 接口自定义策略,如记录日志或持久化不能处理的任务。)
    | 名称 | 描述 |
    | -------------------------------------- | ------------------------------------------------------------ |
    | ThreadPoolExecutor.AbortPolicy | 丢弃任务并抛出 RejectedExecutionException 异常。这是线程池默认的拒绝策略.在任务不能再提交的时候,抛出异常,及时反馈程序运行状态。如果是比较关键的业务,推荐使用此拒绝策略,这样子在系统不能承载更大的并发量的时候,能够及时的通过异常发现。 |
    | ThreadPoolExecutor.DiscardPolicy | 丢弃任务,但是不抛出异常。使用此策略,可能会使我们无法发现系统的异常状态。建议是一些无关紧要的业务采用此策略。 |
    | ThreadPoolExecutor.DiscardOldestPolicy | 丢弃队列最前面的任务,然后重新提交被拒绝的任务。是否要采用此种拒绝策略,还得根据实际业务是否允许丢弃老任务来认真衡量。 |
    | ThreadPoolExecutor.CallerRunsPolicy | 由调用线程(提交任务的线程)处理该任务。这种情况是需要让所有任务都执行完毕,那么就适合大量计算的任务类型去执行,多线程仅仅是增大吞吐量的手段,最终必须要让每个任务都执行完毕。 |
  • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
  • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒 (MILLISECONDS),微秒 (MICROSECONDS, 千分之一毫秒) 和毫微秒 (NANOSECONDS, 千分之一微秒)。

向线程池提交任务

1.使用 execute 提交的任务,但是 execute 方法没有返回值,所以无法判断任务是否被线程池执行成功。execute 方法输入的任务是一个 Runnable 类的实例。

threadsPool.execute(new Runnable() {
           @Override
           public void run() {
               // TODO Auto-generated method stub
           }
       });

2.使用 submit 方法来提交任务,它会返回一个 future, 可以通过这个 future 来判断任务是否执行成功,通过 future 的 get 方法来获取返回值,get 方法会阻塞住直到任务完成,而使用 get(long timeout, TimeUnit unit) 方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。submit 方法输入的任务是一个 Runnable 类或 Callable 类的实例。虽然两类实例都返回了 Future,但是 runnable 的情况下 Future 将不包含任何值。

Future<Object> future = executor.submit(harReturnValuetask);
try {
    Object s = future.get();
} catch (InterruptedException e) {
   // 处理中断异常
} catch (ExecutionException e) {
   // 处理无法执行任务异常
} finally {
   // 关闭线程池
   executor.shutdown();
}

线程池的关闭

  • 可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池,它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应的中断任务可能永远无法终止。
  • shutdown 或 shutdownNow 的区别在于,shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
  • 只要调用了这两个关闭方法的其中一个,isShutdown 方法就会返回 true。
  • 当所有的任务都已关闭后, 才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。

线程池的实践

简化线程池配置

线程池构造参数有8个,但是最核心的是3个:corePoolSize、maximumPoolSize,workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。

实际应用的并发性的场景主要是两种:

  • 并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。

  • 并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。

所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择,就可以满足绝大多数的业务需求。

核心线程数参数设置

1.分析任务特性,可以从以下几个角度来进行分析:

  • 任务的性质:CPU 密集型任务,IO 密集型任务和混合型任务。

| 名称 | 概念 | 常用核心线程数设置 | 说明 |
| -------------- | ------------------------------------------------------------ | ------------------- | ------------------------------------------------------------ |
| CPU 密集型任务 | 任务消耗的主要是 CPU 资源 | N(CPU 核心数)+1 | 比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。 |
| I/O 密集型任务 | 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理 | 2N | |
| 混合型任务 | | N_cpu * (1 + WT/ST) | 如果可以拆分,则将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解 |

对于混合型任务

理论上的核心线程数计算方法为:
N_cpu = number of CPUs
U_cpu = target CPU utilization, 0 ≤ U ≤ 1
W/C = ratio of wait time to compute time
N_threads = N_cpu * U_cpu * (l + W/C)
在实际开发过程中可使用的计算方法:
N_threads = N_cpu * (1 + WT/ST)
WT:线程等待时间
ST:线程时间运行时间
可以通过 JDK 自带的工具 VisualVM 来查看 WT/ST 比例

  • 任务的优先级:高,中和低。

    • 优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

  • 任务的执行时间:长,中和短。

    • 执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

    • 依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,如果等待的时间越长 CPU 空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用 CPU。

获得当前设备的 CPU 个数:

Runtime.getRuntime().availableProcessors()

2.建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。

动态化线程池

参数动态化

  • JDK 允许线程池使用方通过 ThreadPoolExecutor 的实例来动态设置线程池的核心策略,以 setCorePoolSize 为方法例,在运行期线程池使用方调用此方法设置 corePoolSize 之后,线程池会直接覆盖原来的 corePoolSize 值,并且基于当前值和原始值的比较结果采取不同的处理策略。对于当前值小于当前工作线程数的情况,说明有多余的 worker 线程,此时会向当前 idle 的 worker 线程发起中断请求以实现回收,多余的 worker 在下次 idel 的时候也会被回收;对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的 worker 线程来执行队列任务。

实战设计
公司近期正在开发一个统一管理中心——UMC,针对公司已有的安全产品进行统一的管理、监控及信息统计整合,以解决大型企业集团中,各分公司和子公司在数据共享等问题;其中涉及到各个集控点信息的定时同步模块,就使用了动态线程池的设计。





动态化同步模块整体设计

线程池的监控

  • 通过线程池提供的参数进行监控。基于JDK原生线程池ThreadPoolExecutor提供的几个public的getter方法,可以读取到当前线程池的运行状态以及参数:
    1. getActiveCount():int:获取活动的线程数。
    2. getCompletedTaskCount():long:线程池在运行过程中已完成的任务数量,小于或等于 taskCount。
    3. getLargestPoolSize():int:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
    4. getTaskCount():long:线程池需要执行的任务数量。
    5. getPoolSize():int:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。
      ……
  • 通过继承线程池并重写线程池的 beforeExecute,afterExecute 和 terminated 方法,我们可以在任务执行前,执行后和线程池关闭前加入一段自定义的逻辑。如监控任务的平均执行时间,最大执行时间和最小执行时间等。

    protected void beforeExecute(Thread t, Runnable r) { }
    
  • 线程池性能监控的几个重要维度:

    1. 线程池活跃度 = activeCount / maximumPoolSize;代表当活跃线程数趋向于maximumPoolSize的时候,代表线程负载趋高。
    2. 是否发生了Reject异常,队列中是否有等待任务;可以作为线程池的过载判定条件。