ConcurrentLinkedQueue

ConcurrentLinkedQueue

如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方式则可以使用循环CAS的方式来实现。下面主要讨论使用非阻塞的方式来实现线程安全队列。

ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,采用先进先出的规则对节点进行排序,采用了CAS算法进行实现。

ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。

作为一个链表,自然需要定义有关链表内的节点,在ConcurrentLinkedQueue中,定义的节点Node核心如下:

1
2
3
4
5
private static class Node<E>
{
volatile E item;
volatile Node<E> next;
}// item 表示目标元素,next表示当前node的下一个元素

对Node进行操作时,使用了CAS操作.

1
2
3
4
5
6
7
8
9
10
11
12
boolean casItem(E cmp,E val)
{// 设置当前Node的item值.
return UNSAFE.compareAndSwapObject(this,itemOffset,cmp,val);
}
void lazySetNext(Node<E> val)
{
UNSAFE.putOrderedObject(this,nextOffset,val);
}
boolean casNext(Node<E> cmp, Node<E> val)
{ //设置next字段
return UNSAFE.compareAndSwapObject(this,nextOffset,cmp,val);
}

ConcurrentLinkedQueue内部实现十分复杂,它允许在运行时链表处于多个不同的状态.以tail为例,一般来说,我们总是期望tail为链表的末尾,但是实际上,tail的更新并不是及时的,可能会产生拖延现象,

详见并发编程的艺术p163

不变模式下的CopyOnWriteArrayList

很多应用场景中,读操作远远大于写操作,由于读操作根本不会修改原有的数据,因此每次读取都加锁是一种很大的浪费,应该允许多个线程同时访问List的内部数据,.

为了将读取性能发挥到极致,JDK中提供了CopyOnWriteArrayList类,对它来说,读取完全不用加锁,写入操作也不会阻塞读取操作,只有写入和写入之间需要同步等待.

CopyOnWrite就是在写入操作时,进行一次自我复制,当这个List需要修改时,并不修改原有的内容,而对原有数据进行一次复制,将修改的内容写入副本中,写完之后,再将修改完的副本替换原来的数据.,这样就保证写操作不会影响读了.

阻塞队列

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列,下图展示了如何通过阻塞队列来合作:

阻塞队列常用作数据共享通道.BlockingQueue是一个接口,

主要的实现有ArrayBlockingQueue和LinkedBlockingQueue,

阻塞队列之所以适合作为数据共享的通道,关键在于blocking上.

当服务线程处理完成队列中所有的消息后,如何知道下一条消息何时到来.

实现原理

使用通知模式实现,JDK源码中ArrayBlockingQueue使用了*Condition 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private final Condition notFull;
private final Condition notEmpty;

public ArrayBlockingQueue(int capacity,boolean faie)
{
notFull = lock.newCondition();
notEmpty = lock.newCondition();
}

public void put(E e) throws InterruptException
{
checkNotFull(e);
final ReentrantLock lock =this.lock;
lock.lockInterruptibly();// 优先考虑中断的lock.lock();
try
{
while(count==items.length)
{
notFull.await();
}
insert(e);
}finally
{
lock.unlock();
}
}
private void insert(E x)
{
item[putIndex]=x;
putIndex =inc(putindex);
++count;
notEmpty.singal();
}


public E take() throws InterruptException
{
final ReentrantLock lock = this.lock();
lock.lockInterruptibly();
try
{
while(count==0)
{
notEmpty.await();
}
return extract();
}finally
{
lock.unlock();
}
}

private E extract()
{
final Object[] items =this.items;
E x =this.<E>cast(items[takeIndex]);
items[takeIndex]=null;
takeIndex= inc(takeIndex);
--count;
notFull.singal();
return x;
}