java多线程中的Queue

在Java多线程中,队列的使用率很高,很多生产者消费者模型的首选数据结构就是队列。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,今天也主要说一下这两个队列。
之后可能会说一下Atomic家族,先备忘。

BlockingQueue

BlockingQueue是一个接口,继承自Queue。

接口方法

先看一下它有哪些方法:
blockingqueue methods
再加上其父类Queue中方法,整理一下:

可能报异常 返回布尔值或null 可能阻塞 设定等待时间
入队 add(e) offer(e) put(e) offer(e, timeout, unit)
出队 remove() poll() take() poll(timeout, unit)
查看 element() peek()

说明:

  1. poll(), peek(), element()是Queue中方法。
  2. BlockingQueue中不接受null。当add, put, offer将null作为参数时,会抛出NullPointerException,这是因为当poll失败时会返回null,此时不知道queue中本身是null还是poll失败。

    A BlockingQueue does not accept null elements. Implementations throw NullPointerException on attempts to add, put or offer a null. A null is used as a sentinel value to indicate failure of poll operations.

  3. add(e), remove(), element()都不会阻塞线程,但是当不满足条件时,会抛出IllegalStateException, 否则返回true。比如,当队列已满时,再调用add(e)方法,会抛出异常;如果队列没满,此元素会入到队列中,并返回true。
  4. offer(e), poll(), peek()方法,既不会阻塞线程,也不会抛出异常,只是返回布尔值或null。
  5. put(e), take()方法当不满足条件时,会阻塞线程(waiting if necessary until an element becomes available)。
  6. offer(e, timeout, unit), poll(timeout, unit)可以设定等待时间,如果超时返回false或null。

实现

  • ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。
  • LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO顺序排序的。
  • PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。
  • SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。


ConcurrentLinkedQueue

ConcurrentLinkedQueue的size()方法需要遍历一遍,所以要尽量避免使用,可以考虑使用isEmpty()方法。
ConcurrentLinkedQueue充分使用了atomic包的实现打造了一个无锁并发线程安全的队列。
以offer(e)方法为例,看看在无锁情况下是怎么保证原子性的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) { //--------------------------- a
if (p.casNext(null, newNode)) { //--------------------------- b
if (p != t)
casTail(t, newNode); //--------------------------- c
return true;
}
}
else if (p == q)
p = (t != (t = tail)) ? t : head; //--------------------------- d
else
p = (p != t && t != (t = tail)) ? t : q; //--------------------------- e
}
}

此方法的循环内首先获得tail指针和其next指向的对象,由于tail和next均是volatile的,所以保证了获得的分别都是最新的值。

  1. 代码a 情况1,q==null。
    这意味着q是尾节点的下一个节点。此时,通过p.casNext(null, newNode)将“p的下一个节点设为newNode”,若设置成功的话,则比较“p和t”,若p不等于t,则设置newNode为新的尾节点,然后返回true。否则的话(意味着“其它线程对尾节点进行了修改”),什么也不做,继续进行for循环。

  2. 代码b p.casNext(null, newNode) 调用CAS对p进行操作。若“p的下一个节点等于null”,则设置“p的下一个节点等于newNode”;设置成功的话,返回true,失败的话返回false。

  3. 代码c 更新tail的指向,最有意思的协调在这儿又有了体现。从代码看casTail(t, newNode)不管是否成功都会接着返回true标志着更新的成功。首先如果成功则表明本线程完成了两步的更新,返回true是理所当然的;如果 casTail(t, newNode)不成功呢?要清楚的是完成代码b则代表着更新进入了中间态,代码c不成功则是tail的指向被其他线程改变。意味着对于其他的线程而言:它们得到的是中间态的更新,q!=null,进入代码e帮助本线程执行最后一步并且先于本线程成功。这样本线程虽然代码c失败了,但是是由于别的线程的协助先完成了,所以返回true也就理所当然了。

  4. 代码d 情况2,q==p。
    这种情况什么时候会发生呢?通过“情况3”,我们知道,经过“情况3”的处理后,p的值可能等于q。此时,若尾节点没有发生变化的话,那么,应该是头节点发生了变化,则设置p为头节点,然后重新遍历链表;否则(尾节点变化的话),则设置p为尾节点。

  5. 代码e 情况3。
    如果p和t相等,则设置p为q。否则的话,判断“尾节点是否发生变化”,没有变化的话,则设置p为q;否则,设置p为尾节点。

参考