Java 中的并发工具类

并发流程控制的手段

  • CountDownLatch
  • CyclicBarrier
  • Semaphore

在线程间交换数据的一种手段

  • Exchanger 工具类

CountDownLatch

  • CountDownLatch 允许一个或多个线程等待其他线程完成操作
  • 计数器必须大于等于 0,当计数器为 0 时,调用 await 方法不会阻塞当前线程。
  • CountDownLatch 不可能重新初始化或者修改 CountDownLatch 对象的内部计数器的值
  • 一个线程调用 countDown 方法 happen-before,另外一个线程调用 await 方法

同步屏障 CyclicBarrier

  • CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。
  • 让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
  • 每个线程调用 await 方法告诉 CyclicBarrier 我已经到达了屏障,然后当前线程被阻塞
  • **CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier(int parties, Runnable barrierAction)**,用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。

应用场景

  • 多线程计算数据,最后合并计算结果的场景。

CyclicBarrier 和 CountDownLatch 的区别

  • CountDownLatch 的计数器只能使用一次,而 CyclicBarrier 的计数器可以使用 reset() 方法重置
  • CyclicBarrier 还提供其他有用的方法:
    • getNumberWaiting 方法可以获得 CyclicBarrier 阻塞的线程数量
    • isBroken() 方法用来了解阻塞的线程是否被中断

Semaphore

  • Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

应用场景

  • Semaphore 可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接

使用方法

  • 使用 Semaphoreacquire() 方法获取一个许可证,使用完之后调用 release() 方法归还许可证。还可以用 tryAcquire() 方法尝试获取许可证。

其他方法

  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermits(int reduction):减少 reduction 个许可证(是个 protected 方法)。
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合(是个 protected 方法)。

Exchanger

  • Exchanger 用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。
  • 第一个线程先执行 exchange() 方法,它会一直等待第二个线程也执行 exchange 方法,当两个线程都到达同步点时,这两个线程就可以交换数据
  • 如果两个线程有一个没有执行 exchange() 方法,则会一直等待。如果担心有特殊情况发生,避免一直等待,可以使用 exchange(V x, long timeout, TimeUnit unit) 设置最大等待时长。