Java 中的线程池

合理地使用线程池可以带来以下三个主要好处:

  1. 降低资源消耗:通过重复利用已创建的线程,减少线程创建和销毁所造成的开销。
  2. 提高响应速度:当任务到达时,无需等待线程创建即可立即执行任务。
  3. 提高线程的可管理性:线程是稀缺资源,若无限制地创建线程,不仅会消耗系统资源,还会降低系统的稳定性。通过线程池,可以对线程进行统一分配、调优和监控。

线程池的实现原理

线程池实现原理

处理流程

  1. 判断核心线程池的线程是否都在执行任务
    • 如果没有,则创建一个新的工作线程来执行任务。
    • 如果核心线程池的线程都在执行任务,则进入下一步。
  2. 判断工作队列是否已满
    • 如果工作队列未满,则将新提交的任务存储在工作队列中。
    • 如果工作队列已满,则进入下一步。
  3. 判断线程池的线程是否都处于工作状态
    • 如果没有,则创建一个新的工作线程来执行任务。
    • 如果线程池已满,则交给饱和策略来处理该任务。

ThreadPoolExecutor

ThreadPoolExecutor 工作原理

execute 方法的执行流程

ThreadPoolExecutor 在执行 execute 方法时,会根据以下四种情况进行处理:

  1. 如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(此步骤需要获取全局锁)。
  2. 如果运行的线程数等于或多于 corePoolSize,则将任务加入 BlockingQueue
  3. 如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务(此步骤需要获取全局锁)。
  4. 如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution() 方法。

工作线程

线程池在创建线程时,会将线程封装成工作线程 WorkerWorker 在执行完任务后,还会循环获取工作队列中的任务继续执行。

线程池中工作线程的执行

  • 线程池中的线程执行任务有两种情况:
    1. execute() 方法中创建一个线程时,会让该线程执行当前任务。
    2. 该线程执行完当前任务后,会反复从 BlockingQueue 获取任务执行。

线程池的使用

创建线程池

通过 ThreadPoolExecutor 来创建线程池,参数包括:

  • corePoolSize:线程池的基本大小。

  • runnableTaskQueue(任务队列):

    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • SynchronousQueue
    • PriorityBlockingQueue
  • maximumPoolSize:线程池允许创建的最大线程数(如果使用无界的任务队列,该参数几乎没有效果)。

  • ThreadFactory

    用于设置创建线程的工厂,可通过线程工厂给每个创建出来的线程设置更有意义的名字。

    • 使用开源框架 Guava 提供的

      ThreadFactoryBuilder

      可以快速为线程池中的线程设置有意义的名字:

      new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
  • RejectedExecutionHandler(饱和策略):

    当队列和线程池都满了,说明线程池处于饱和状态,需要采取一种策略处理提交的新任务。默认策略是 AbortPolicy

    • AbortPolicy:直接抛出异常。
    • CallerRunsPolicy:使用调用者所在线程来运行任务。
    • DiscardOldestPolicy:丢弃队列中最近的一个任务,并执行当前任务。
    • DiscardPolicy:不处理,直接丢弃任务。
    • 可以根据应用场景实现 RejectedExecutionHandler 接口,定制化策略(如记录日志或持久化不能处理的任务)。
  • keepAliveTime:线程池工作线程空闲后的保持存活时间(可调大时间以提高线程利用率)。

  • TimeUnit:线程活动保持时间的单位。

提交任务

  • execute() 方法用于提交不需要返回值的任务,因此无法判断任务是否被线程池执行成功。
  • submit() 方法用于提交需要返回值的任务。线程池会返回一个 future 对象,通过它可以判断任务是否执行成功,并且可以通过 future.get() 方法获取返回值:
    • get() 方法会阻塞当前线程直到任务完成。
    • get(long timeout, TimeUnit unit) 方法会阻塞当前线程一段时间后立即返回,此时任务可能尚未执行完毕。

关闭线程池

  • 使用 shutdown 或 shutdownNow 关闭线程池:
    • 原理:遍历线程池中的工作线程,逐个调用线程的 interrupt 方法中断线程,因此无法响应中断的任务可能永远无法终止。
    • 区别:
      • shutdownNow:首先将线程池状态设置为 STOP,尝试停止所有正在执行或暂停任务的线程,并返回等待执行任务的列表。
      • shutdown:仅将线程池状态设置为 SHUTDOWN,中断所有未执行任务的线程。
    • 调用任意一个关闭方法后,isShutdown 方法会返回 true
    • 当所有任务都已关闭后,isTerminated 方法会返回 true
    • 通常使用 shutdown 方法关闭线程池;若任务不必执行完,可调用 shutdownNow

合理配置线程池

合理配置线程池需要分析任务特性,可以从以下角度进行分析:

  1. 任务的性质
    • CPU 密集型任务:应配置尽可能少的线程,例如 Ncpu + 1 个线程的线程池。
    • IO 密集型任务:由于线程在执行过程中并非一直占用 CPU,建议配置较多的线程,例如 2 * Ncpu
    • 混合型任务:若可以拆分,将任务拆分为 CPU 密集型和 IO 密集型任务并行执行。若拆分后的执行时间相差不大,则分解后执行的吞吐量高于串行执行。
  2. 任务的优先级:优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 处理,让优先级高的任务先执行。
  3. 任务的执行时间:可交给不同规模的线程池处理,或使用优先级队列让执行时间短的任务先执行。
  4. 任务的依赖性:依赖数据库连接池的任务,由于线程提交 SQL 后需要等待数据库返回结果,建议配置更多的线程以更好地利用 CPU。
  5. 建议使用有界队列:有界队列增加系统稳定性和预警能力。队列大小可设大些,例如几千。

线程池的监控

通过以下属性进行线程池监控:

  • taskCount:线程池需要执行的任务数量。
  • completedTaskCount:线程池运行过程中已完成的任务数量,应小于或等于 taskCount
  • largestPoolSize:线程池曾经创建过的最大线程数量。若等于线程池的最大大小,表示线程池曾经满过。
  • getPoolSize:线程池的线程数量。线程池不销毁时,线程池中的线程不会自动销毁,此大小只增不减。
  • getActiveCount:获取活动的线程数。

可通过继承线程池来自定义线程池,重写 beforeExecuteafterExecuteterminated 方法,在任务执行前、执行后和线程池关闭前执行一些代码进行监控。

线程池运行流程图

image-20240811161704724