1. 引言
在并發(fā)編程中我們有時候需要使用線程安全的隊列。如果我們要實現(xiàn)一個線程安全的隊列有兩種實現(xiàn)方式:一種是使用阻塞算法,另一種是使用非阻塞算法。使用阻塞算法的隊列可以用一個鎖(入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實現(xiàn),而非阻塞的實現(xiàn)方式則可以使用循環(huán)CAS的方式來實現(xiàn),本文讓我們一起來研究下Doug Lea是如何使用非阻塞的方式來實現(xiàn)線程安全隊列ConcurrentLinkedQueue的,相信從大師身上我們能學(xué)到不少并發(fā)編程的技巧。
2. ConcurrentLinkedQueue的介紹
ConcurrentLinkedQueue是一個基于鏈接節(jié)點的無界線程安全隊列,它采用先進(jìn)先出的規(guī)則對節(jié)點進(jìn)行排序,當(dāng)我們添加一個元素的時候,它會添加到隊列的尾部,當(dāng)我們獲取一個元素時,它會返回隊列頭部的元素。它采用了“wait-free”算法來實現(xiàn),該算法在Michael & Scott算法上進(jìn)行了一些修改, Michael & Scott算法的詳細(xì)信息可以參見 參考資料一 。
3. ConcurrentLinkedQueue的結(jié)構(gòu)
我們通過ConcurrentLinkedQueue的類圖來分析一下它的結(jié)構(gòu)。
(圖1)
ConcurrentLinkedQueue由head節(jié)點和tair節(jié)點組成,每個節(jié)點(Node)由節(jié)點元素(item)和指向下一個節(jié)點的引用(next)組成,節(jié)點與節(jié)點之間就是通過這個next關(guān)聯(lián)起來,從而組成一張鏈表結(jié)構(gòu)的隊列。默認(rèn)情況下head節(jié)點存儲的元素為空,tair節(jié)點等于head節(jié)點。
private transient volatile Node<E> tail = head;
4. 入隊列
入隊列就是將入隊節(jié)點添加到隊列的尾部 。為了方便理解入隊時隊列的變化,以及head節(jié)點和tair節(jié)點的變化,每添加一個節(jié)點我就做了一個隊列的快照圖。
(圖二)
- 第一步添加元素1。隊列更新head節(jié)點的next節(jié)點為元素1節(jié)點。又因為tail節(jié)點默認(rèn)情況下等于head節(jié)點,所以它們的next節(jié)點都指向元素1節(jié)點。
- 第二步添加元素2。隊列首先設(shè)置元素1節(jié)點的next節(jié)點為元素2節(jié)點,然后更新tail節(jié)點指向元素2節(jié)點。
- 第三步添加元素3,設(shè)置tail節(jié)點的next節(jié)點為元素3節(jié)點。
- 第四步添加元素4,設(shè)置元素3的next節(jié)點為元素4節(jié)點,然后將tail節(jié)點指向元素4節(jié)點。
通過debug入隊過程并觀察head節(jié)點和tail節(jié)點的變化,發(fā)現(xiàn)入隊主要做兩件事情,第一是將入隊節(jié)點設(shè)置成當(dāng)前隊列尾節(jié)點的下一個節(jié)點。第二是更新tail節(jié)點,如果tail節(jié)點的next節(jié)點不為空,則將入隊節(jié)點設(shè)置成tail節(jié)點,如果tail節(jié)點的next節(jié)點為空,則將入隊節(jié)點設(shè)置成tail的next節(jié)點,所以tail節(jié)點不總是尾節(jié)點,理解這一點對于我們研究源碼會非常有幫助。
上面的分析讓我們從單線程入隊的角度來理解入隊過程,但是多個線程同時進(jìn)行入隊情況就變得更加復(fù)雜,因為可能會出現(xiàn)其他線程插隊的情況。如果有一個線程正在入隊,那么它必須先獲取尾節(jié)點,然后設(shè)置尾節(jié)點的下一個節(jié)點為入隊節(jié)點,但這時可能有另外一個線程插隊了,那么隊列的尾節(jié)點就會發(fā)生變化,這時當(dāng)前線程要暫停入隊操作,然后重新獲取尾節(jié)點。讓我們再通過源碼來詳細(xì)分析下它是如何使用CAS算法來入隊的。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); //入隊前,創(chuàng)建一個入隊節(jié)點 Node<E> n = new Node<E>(e); retry: //死循環(huán),入隊不成功反復(fù)入隊。 for (;;) { //創(chuàng)建一個指向tail節(jié)點的引用 Node<E> t = tail; //p用來表示隊列的尾節(jié)點,默認(rèn)情況下等于tail節(jié)點。 Node<E> p = t; for (int hops = 0; ; hops++) { //獲得p節(jié)點的下一個節(jié)點。 Node<E> next = succ(p); //next節(jié)點不為空,說明p不是尾節(jié)點,需要更新p后在將它指向next節(jié)點 if (next != null) { //循環(huán)了兩次及其以上,并且當(dāng)前節(jié)點還是不等于尾節(jié)點 if (hops > HOPS && t != tail) continue retry; p = next; } //如果p是尾節(jié)點,則設(shè)置p節(jié)點的next節(jié)點為入隊節(jié)點。 else if (p.casNext(null, n)) { //如果tail節(jié)點有大于等于1個next節(jié)點,則將入隊節(jié)點設(shè)置成tair節(jié)點,更新失敗了也 沒關(guān)系,因為失敗了表示有其他線程成功更新了tair節(jié)點。 if (hops >= HOPS) casTail(t, n); // 更新tail節(jié)點,允許失敗 return true; } // p有next節(jié)點,表示p的next節(jié)點是尾節(jié)點,則重新設(shè)置p節(jié)點 else { p = succ(p); } } } }
從源代碼角度來看整個入隊過程主要做二件事情 。第一是定位出尾節(jié)點,第二是使用CAS算法能將入隊節(jié)點設(shè)置成尾節(jié)點的next節(jié)點,如不成功則重試。
第一步定位尾節(jié)點 。tail節(jié)點并不總是尾節(jié)點,所以每次入隊都必須先通過tail節(jié)點來找到尾節(jié)點,尾節(jié)點可能就是tail節(jié)點,也可能是tail節(jié)點的next節(jié)點。代碼中循環(huán)體中的第一個if就是判斷tail是否有next節(jié)點,有則表示next節(jié)點可能是尾節(jié)點。獲取tail節(jié)點的next節(jié)點需要注意的是p節(jié)點等于p的next節(jié)點的情況,只有一種可能就是p節(jié)點和p的next節(jié)點都等于空,表示這個隊列剛初始化,正準(zhǔn)備添加第一次節(jié)點,所以需要返回head節(jié)點。獲取p節(jié)點的next節(jié)點代碼如下
final Node<E> succ(Node<E> p) { Node<E> next = p.getNext(); return (p == next) ? head : next; }
第二步設(shè)置入隊節(jié)點為尾節(jié)點 。p.casNext(null, n)方法用于將入隊節(jié)點設(shè)置為當(dāng)前隊列尾節(jié)點的next節(jié)點,p如果是null表示p是當(dāng)前隊列的尾節(jié)點,如果不為null表示有其他線程更新了尾節(jié)點,則需要重新獲取當(dāng)前隊列的尾節(jié)點。
hops的設(shè)計意圖 。上面分析過對于先進(jìn)先出的隊列入隊所要做的事情就是將入隊節(jié)點設(shè)置成尾節(jié)點,doug lea寫的代碼和邏輯還是稍微有點復(fù)雜。那么我用以下方式來實現(xiàn)行不行?
public boolean offer(E e) { if (e == null) throw new NullPointerException(); Node<E> n = new Node<E>(e); for (;;) { Node<E> t = tail; if (t.casNext(null, n) && casTail(t, n)) { return true; } } }
讓tail節(jié)點永遠(yuǎn)作為隊列的尾節(jié)點,這樣實現(xiàn)代碼量非常少,而且邏輯非常清楚和易懂。但是這么做有個缺點就是每次都需要使用循環(huán)CAS更新tail節(jié)點。如果能減少CAS更新tail節(jié)點的次數(shù),就能提高入隊的效率,所以doug lea使用hops變量來控制并減少tail節(jié)點的更新頻率,并不是每次節(jié)點入隊后都將 tail節(jié)點更新成尾節(jié)點,而是當(dāng) tail節(jié)點和尾節(jié)點的距離大于等于常量HOPS的值(默認(rèn)等于1)時才更新tail節(jié)點,tail和尾節(jié)點的距離越長使用CAS更新tail節(jié)點的次數(shù)就會越少,但是距離越長帶來的負(fù)面效果就是每次入隊時定位尾節(jié)點的時間就越長,因為循環(huán)體需要多循環(huán)一次來定位出尾節(jié)點,但是這樣仍然能提高入隊的效率,因為從本質(zhì)上來看它通過增加對volatile變量的讀操作來減少了對volatile變量的寫操作,而對volatile變量的寫操作開銷要遠(yuǎn)遠(yuǎn)大于讀操作,所以入隊效率會有所提升。
private static final int HOPS = 1;
還有一點需要注意的是入隊方法永遠(yuǎn)返回true,所以不要通過返回值判斷入隊是否成功。
5. 出隊列
出隊列的就是從隊列里返回一個節(jié)點元素,并清空該節(jié)點對元素的引用。讓我們通過每個節(jié)點出隊的快照來觀察下head節(jié)點的變化。
從上圖可知,并不是每次出隊時都更新head節(jié)點,當(dāng)head節(jié)點里有元素時,直接彈出head節(jié)點里的元素,而不會更新head節(jié)點。只有當(dāng)head節(jié)點里沒有元素時,出隊操作才會更新head節(jié)點。這種做法也是通過hops變量來減少使用CAS更新head節(jié)點的消耗,從而提高出隊效率。讓我們再通過源碼來深入分析下出隊過程。
public E poll() { Node<E> h = head; // p表示頭節(jié)點,需要出隊的節(jié)點 Node<E> p = h; for (int hops = 0;; hops++) { // 獲取p節(jié)點的元素 E item = p.getItem(); // 如果p節(jié)點的元素不為空,使用CAS設(shè)置p節(jié)點引用的元素為null,如果成功則返回p節(jié)點的元素。 if (item != null && p.casItem(item, null)) { if (hops >= HOPS) { //將p節(jié)點下一個節(jié)點設(shè)置成head節(jié)點 Node<E> q = p.getNext(); updateHead(h, (q != null) ? q : p); } return item; } // 如果頭節(jié)點的元素為空或頭節(jié)點發(fā)生了變化,這說明頭節(jié)點已經(jīng)被另外一個線程修改了。那么獲取p節(jié)點的下一個節(jié)點 Node<> next = succ(p); // 如果p的下一個節(jié)點也為空,說明這個隊列已經(jīng)空了 if (next == null) { // 更新頭節(jié)點。 updateHead(h, p); break; } // 如果下一個元素不為空,則將頭節(jié)點的下一個節(jié)點設(shè)置成頭節(jié)點 p = next; } return null; }
首先獲取頭節(jié)點的元素,然后判斷頭節(jié)點元素是否為空,如果為空,表示另外一個線程已經(jīng)進(jìn)行了一次出隊操作將該節(jié)點的元素取走,如果不為空,則使用CAS的方式將頭節(jié)點的引用設(shè)置成null,如果CAS成功,則直接返回頭節(jié)點的元素,如果不成功,表示另外一個線程已經(jīng)進(jìn)行了一次出隊操作更新了head節(jié)點,導(dǎo)致元素發(fā)生了變化,需要重新獲取頭節(jié)點。
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號聯(lián)系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元
