ConcurrentLinkedQueue
在考虑并发的时候可以先考虑单线程的情况,然后再将并发的情况考虑进来。
比如ConcurrentLinkedQueue:- 先考虑单线的offer
- 再考虑多线程时候的offer:
- 多个线程offer
- 部分线程offer,部分线程poll
- offer比poll快
- poll比offer快
offer
public boolean offer(E e) { checkNotNull(e); // 新建一个node final NodenewNode = new Node (e); // 不断重试(for只有初始化条件,没有判断条件),直到将node加入队列 // 初始化p、t都是指向tail // 循环过程中一直让p指向最后一个节点。让t指向tail for (Node t = tail, p = t;;) { // q一直指向p的下一个 Node q = p.next; if (q == null) { // p is last node // 如果q为null表示p是最后一个元素,尝试加入队列 // 如果失败,表示其他线程已经修改了p指向的节点 if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". // node加入队列之后,tail距离最后一个节点已经相差大于一个了,需要更新tail if (p != t) // hop two nodes at a time // 这儿允许设置tail为最新节点的时候失败,因为添加node的时候是根据p.next是不是为null判断的, casTail(t, newNode); // Failure is OK. return true; } // Lost CAS race to another thread; re-read next } else if (p == q) // 虽然q是p.next,但是因为是多线程,在offer的同时也在poll,如offer的时候正好p被poll了,那么在poll方法中的updateHead方法会将head指向当前的q,而把p.next指向自己,即:p.next == p // 这个时候就会造成tail在head的前面,需要重新设置p // 如果tail已经改变,将p指向tail,但这个时候tail依然可能在head前面 // 如果tail没有改变,直接将p指向head // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops. // tail已经不是最后一个节点,将p指向最后一个节点 p = (p != t && t != (t = tail)) ? t : q; }}
poll
public E poll() { // 如果出现p被删除的情况需要从head重新开始 restartFromHead: for (;;) { for (Nodeh = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { // 队列为空 updateHead(h, p); return null; } else if (p == q) // 当一个线程在poll的时候,另一个线程已经把当前的p从队列中删除——将p.next = p,p已经被移除不能继续,需要重新开始 continue restartFromHead; else p = q; } }}final void updateHead(Node h, Node p) { if (h != p && casHead(h, p)) h.lazySetNext(h);}
为什么会出现p == q
假设下面这种情况:
在第一种情况下,线程A执行offer操作:
第一次循环的时候- tail = node0, p = t = node0
- 执行
Node<E> q = p.next;
之后q = p.next,也就是node0.next - 执行
if (q == null)
,不满足,继续往下 - 到了
else if (p == q)
这一步的时候线程A暂停
线程A现在的结果是:
head = node0tail = node0t = node0p = node0q = node0.next
因为程序时多线程的,我们假设线程A暂定在了第4步,接下来看线程B,线程B执行poll操作:
第一次循环:- head = node0, p = h = node0;
- 执行
E item = p.item;
,item = null if (item != null && p.casItem(item, null))
不满足- 执行
else if ((q = p.next) == null)
,q = node1,条件不满足 - 执行
else if (p == q)
,条件不满足 - 执行
p = q;
,p = node1
第二次循环:
- head = node0, h = node0, p = node1;
- 执行
E item = p.item;
,item = node2 if (item != null && p.casItem(item, null))
,item != null
满足,如果CAS操作成功p != h
成立,调用updateHead- 执行
updateHead(h, ((q = p.next) != null) ? q : p);
之后,q = node2 - 在updateHead里面
h != p
成立,如果CAS操作成功(将head设置为node2)- 执行
h.lazySetNext(h);
,这个时候h = node0
,这个方法执行完之后,node0.next = node0
- 将item返回
这个时候队列就是图中第二种情况,线程A结果为:
head = node2tail = node0t = node0p = node0q = node0.next = node0
看到结果了吧,这个时候 p = q = node0
!其实主要原因是在 updateHead方法的 h.lazySetNext(h)
操作里面,将旧的head.next设置成为了自己即 head.next = head。但是要注意:是旧的head
从上面分析的过程要注意:
- 多线程执行环境,单线程下一定不会出现这种情况
- 注意引用赋值比如
Node<E> q = p.next
,q指向的是p.next,虽然目前p.next = node1
,但是当p.next指向变了之后,q也就跟着变了 - 再就是阅读源码的时候一定要弄清楚调用的每个方法的作用,这样才能对整个方法有一个准确的理解,比如这里的
h.lazySetNext(h);