Java 并发容器和框架

ConcurrentHashMap 的实现原理与使用

  • ConcurrentHashMap 是线程安全且高效的 HashMap。
  • 为什么要使用 ConcurrentHashMap
    • 在并发编程中使用 HashMap 可能导致程序死循环。
    • 使用线程安全的 HashTable 效率非常低下。

锁分段技术

  • 将数据分成一段一段地存储,然后给每一段数据配一把锁。当一个线程占用锁访问其中一个段数据时,其他段的数据也能被其他线程访问。

ConcurrentHashMap 的结构

  • ConcurrentHashMap 里包含一个 Segment 数组

Segment 数组结构

  • Segment 是一种可重入锁(ReentrantLock)。
  • 一个 Segment 里包含一个 HashEntry 数组。
  • 当对 HashEntry 数组的数据进行修改时,必须首先获得与它对应的 Segment 锁。

HashEntry 数组结构

  • 存储键值对数据。

ConcurrentHashMap 的初始化

  • ConcurrentHashMap 初始化方法是通过 initialCapacityloadFactorconcurrencyLevel(预估的并发数)等几个参数来初始化 Segment 数组、段偏移量 segmentShift、段掩码 segmentMask 和每个 Segment 里的 HashEntry 数组来实现的。

初始化 segments 数组

  • segments 数组的长度 ssize 是通过 concurrencyLevel 计算得出
  • 必须保证 segments 数组的长度是 2 的 N 次方
  • concurrencyLevel 的最大值是 65535,这意味着 segments 数组的长度最大为 65536。
  • sshift 等于 ssize 从 1 向左移位的次数。在默认情况下 concurrencyLevel 等于 16,1 需要向左移位移动 4 次,所以 sshift 等于 4。

初始化 segmentShift 和 segmentMask

  • 这两个全局变量需要在定位 segment 时的散列算法里使用。
  • segmentShift 用于定位参与散列运算的位数,segmentShift 等于 32 减 sshift
  • segmentMask 是散列运算的掩码,等于 ssize 减 1

初始化每个 segment

  • initialCapacity 是 ConcurrentHashMap 的初始化容量。
  • loadFactor 是每个 segment 的负载因子。
  • segment 的容量 threshold =(int)cap * loadFactor,默认情况下 initialCapacity 等于 16,loadFactor 等于 0.75。
  • cap 就是 segment 里 HashEntry 数组的长度

ConcurrentHashMap 的操作

get 操作

  • Segment 的 get 操作非常简单和高效。先经过一次再散列,然后使用这个散列值通过散列运算定位到 Segment,再通过散列算法定位到元素。
  • get 方法里将要使用的共享变量都定义成 volatile 类型,如用于统计当前 Segment 大小的 count 字段和用于存储值的 HashEntry 的 value。

put 操作(必须加锁)

  • 判断是否需要对 Segment 里的 HashEntry 数组进行扩容:
    • 是否需要扩容:
      • 先判断 Segment 里的 HashEntry 数组是否超过容量(threshold)
    • 如何扩容:
      • 创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里。为了高效,ConcurrentHashMap 不会对整个容器进行扩容,而只对某个 Segment 进行扩容
    • 定位添加元素的位置,然后将其放在 HashEntry 数组。

size 操作

  • 先尝试 2 次通过不锁住 Segment 的方式来统计各个 Segment 大小,如果统计的过程中,容器的 count 发生了变化,则再采用加锁的方式来统计所有 Segment 的大小。

ConcurrentLinkedQueue

  • 线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法
    • 阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。
    • 非阻塞的实现方式则可以使用循环 CAS 的方式来实现

ConcurrentLinkedQueue 的实现

  • ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,它会返回队列头部的元素。它采用了“wait-free”算法(即 CAS 算法)来实现。

单线程入队

  • 将入队节点设置成当前队列尾节点的下一个节点
  • 更新 tail 节点。如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点;如果 tail 节点的 next 节点为空,则将入队节点设置成 tail 的 next 节点。

多线程入队

  • 定位出尾节点。
  • 使用 CAS 算法将入队节点设置成尾节点的 next 节点
  • 入队方法永远返回 true,所以不要通过返回值判断是否成功。

出队列

  • 当 head 节点里有元素时,直接弹出 head 节点里的元素,而不会更新 head 节点。只有当 head 节点里没有元素时,出队操作才会更新 head 节点。

ConcurrentLinkedQueue 出队列

Java 中的阻塞队列

  • 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  • 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列

  • JDK 7 提供了 7 个阻塞队列:

    1. ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。

      • 默认情况下不保证线程公平的访问队列。
      • 访问者的公平性是使用可重入锁实现的。
    2. LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。

      • 默认和最大长度为 Integer.MAX_VALUE。
      • 按照先进先出的原则对元素进行排序。
    3. PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。

      • 元素采取自然顺序升序排列。也可以自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。
      • 不能保证同优先级元素的顺序。
    4. DelayQueue:一个使用优先级队列实现的无界阻塞队列。

      • 队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
      • 应用场景:
        • 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
        • 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,比如 TimerQueue 就是使用 DelayQueue 实现的。
      • 如何实现 Delayed 接口:
        1. 在对象创建时,初始化基本数据。
        2. 实现 getDelay 方法,该方法返回当前元素还需要延时多长时间,单位是纳秒。
        3. 实现 compareTo 方法来指定元素的顺序。例如,让延时时间最长的放在队列的末尾。
      • 如何实现延时阻塞队列:
        • 当消费者从队列里获取元素时,如果元素没有达到延时时间,就阻塞当前线程。
    5. SynchronousQueue:一个不存储元素的阻塞队列。

      • 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
      • 负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。
    6. LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

      • 相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfertransfer 方法。
        • transfer 方法:
          • 如果当前有消费者正在等待接收元素(消费者使用 take() 方法或带时间限制的 poll() 方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回。
        • tryTransfer 方法:
          • tryTransfer 方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回 false。transfer 方法的区别是 tryTransfer 方法无论消费者是否接收,方法立即返回,而 transfer 方法是必须等到消费者消费了才返回
    7. LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

      • 双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。

阻塞队列的实现原理

  • 问题:如果队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢?
    • 使用通知模式实现:
      • **当往队列里插入一个元素时,如果队列不可用,那么阻塞生产者主要通过 LockSupport.park(this)**。
        • 只有以下 4 种情况中的一种发生时,该方法才会返回:
          1. 与 park 对应的 unpark 执行或已经执行时。“已经执行”是指 unpark 先执行,然后再执行 park 的情况
          2. 线程被中断时。
          3. 等待完 time 参数指定的毫秒数时。
          4. 异常现象发生时,这个异常现象没有任何原因。