在此對java并發包做一個大致總結,如有錯誤,請指正。
juc包的總體結構大致如下
外層框架主要有Lock(ReentrantLock、ReadWriteLock等)、同步器(semaphores等)、阻塞隊列(BlockingQueue等)、Executor(線程池)、并發容器(ConcurrentHashMap等)、還有Fork/Join框架;
內層有AQS(AbstractQueuedSynchronizer類,鎖功能都由他實現)、非阻塞數據結構、原子變量類(AtomicInteger等無鎖線程安全類)三種。
底層就實現是volatile和CAS。整個并發包其實都是由這兩種思想構成的。
理解volatile特性的一個好方法是把對volatile變量的單個讀/寫,看成是使用同一個鎖對這
些單個讀/寫操作做了同步。一個volatile變量的單個讀/寫操作,與一個普通變量的讀/寫操作都
是使用同一個鎖來同步,它們之間的執行效果相同。
volatile具有的特性。
可見性。對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫
入。
原子性:對任意單個volatile變量的讀/寫具有原子性,但類似于volatile++這種復合操作不
具有原子性。
volatile讀的內存語義如下。
當讀一個volatile變量時,JMM會把該線程對應的本地內存置為無效。線程接下來將從主
內存中讀取共享變量。
volatile寫的內存語義如下。
當寫一個volatile變量時,JMM會把該線程對應的本地內存中的共享變量值刷新到主內
存。
為了實現volatile的內存語義,編譯器在生成字節碼時,會在指令序列中插入內存屏障來
禁止特定類型的處理器重排序。對于編譯器來說,發現一個最優布置來最小化插入屏障的總
數幾乎不可能。為此,JMM采取保守策略。下面是基于保守策略的JMM內存屏障插入策略。
·在每個volatile寫操作的前面插入一個StoreStore屏障。
·在每個volatile寫操作的后面插入一個StoreLoad屏障。
·在每個volatile讀操作的后面插入一個LoadLoad屏障。
·在每個volatile讀操作的后面插入一個LoadStore屏障。
上面內存屏障的簡單意思就是:StoreStore屏障,禁止上面的寫操作和下面的volatile寫重排序;StoreLoad屏障,禁止上面的寫操作和下面的volatile讀重排序;LoadLoad屏障,禁止上面的讀/寫操作和下面的volatile讀操作重排序;LoadStore屏障,禁止上面的讀操作和下面的volatile寫操作重排序。
由于Java的CAS同時具有volatile讀和volatile寫的內存語義,因此Java線程之間的通信現
在有了下面4種方式。
1)A線程寫volatile變量,隨后B線程讀這個volatile變量。
2)A線程寫volatile變量,隨后B線程用CAS更新這個volatile變量。
3)A線程用CAS更新一個volatile變量,隨后B線程用CAS更新這個volatile變量。
4)A線程用CAS更新一個volatile變量,隨后B線程讀這個volatile變量。
總結一下適合使用volatile變量的使用條件(必須滿足所有條件):
1、對變量的寫操作不依賴變量的當前值,或者你能確保只有單線程更新變量的值。(簡單來說就是單線程寫,多線程讀的場景)
2、該變量不會與其他狀態變量一起納入不變性條件中。
3、在訪問變量時不需要加鎖。(如果要加鎖的話用普通變量就行了,沒必要用volatile了)
鎖的釋放和獲取的內存語義
當線程釋放鎖時,JMM會把該線程對應的本地內存中的共享變量刷新到主內存中。
當線程獲取鎖時,JMM會把該線程對應的本地內存置為無效。從而使得被監視器保護的
臨界區代碼必須從主內存中讀取共享變量。
總結一下就是:
線程A釋放一個鎖,實質上是線程A向接下來將要獲取這個鎖的某個線程發出了(線程A
對共享變量所做修改的)消息。
·線程B獲取一個鎖,實質上是線程B接收了之前某個線程發出的(在釋放這個鎖之前對共
享變量所做修改的)消息。
·線程A釋放鎖,隨后線程B獲取這個鎖,這個過程實質上是線程A通過主內存向線程B發
送消息。
這里借助ReentrantLock的源代碼,來分析鎖內存語義的具體實現機制。
在ReentrantLock中,調用lock()方法獲取鎖;調用unlock()方法釋放鎖。
public void lock() { sync.lock(); } public void unlock() { sync.release(1); }
它的鎖由靜態內部類Sync實現,分為公平鎖和非公平鎖。
abstract static class Sync extends AbstractQueuedSynchronizer static final class NonfairSync extends Sync //非公平鎖 static final class FairSync extends Sync //公平鎖
鎖的實現都依賴于Java同步器框架AbstractQueuedSynchronizer(AQS),AQS使用一個整型的volatile變量(命名為state)來維護同步狀態,它是ReentrantLock內存語義實現的關鍵。
/** * The synchronization state. */ private volatile int state;
拿非公平鎖的實現舉例
final void lock() { //AQS內方法,采用CAS實現監視器鎖,如果state為0,則獲得鎖,并設為1 if (compareAndSetState(0, 1)) //獲取鎖之后設置擁有鎖的線程為當前線程 setExclusiveOwnerThread(Thread.currentThread()); else //如果鎖已被獲取,則加入等待隊列 acquire(1); } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
非公平鎖在釋放鎖的最后寫volatile變量state,在獲取鎖時首先讀這個volatile變量。根據
volatile的happens-before規則,釋放鎖的線程在寫volatile變量之前可見的共享變量,在獲取鎖
的線程讀取同一個volatile變量后將立即變得對獲取鎖的線程可見。
compareAndSetState(0, 1)采用的CAS操作。JDK文檔對該方法的說明如下:如果當前狀態值等于預期值,則以原子方式將同步狀態設置為給定的更新值。
可能會疑惑為什么這個方法可以完成原子操作,原因是此操作具有volatile讀和寫的內存語義。主要是由sun.misc.Unsafe類實現的,它是native方法,這里不做深入了。
分析一下同步器(AQS)的原理
AQS框架采用了模板模式,例如在獨占模式的獲取和釋放,再拿ReentrantLock的非公平鎖NonfairSync舉例
/** 獨占獲取 */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } //AQS中并未實現tryAcquire()方法,需在子類實現 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //NonfairSync中實現 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } /** 獨占釋放 */ public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } //AQS中未實現 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } //Sync類中實現 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
從源碼來看,同步狀態的維護、獲取、釋放動作是由子類實現的,而后續動作入線程的阻塞、喚醒機制等則由AQS框架實現。
AQS中使用LockSupport.park() 和 LockSupport.unpark() 的本地方法實現,實現線程的阻塞和喚醒。
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); //喚醒線程 }
AQS內部維護了一個隊列,AQS將阻塞線程封裝成內部類Node的對象,并維護到這個隊列中。
private transient volatile Node head; //頭結點 private transient volatile Node tail; //尾節點
這個隊列是非阻塞的 FIFO隊列,即插入移除節點的時候是非阻塞的,所以AQS內部采用CAS的方式保證節點插入和移除的原子性。
/** * CAS head field. Used only by enq. */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * CAS tail field. Used only by enq. */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
Node類源碼如下
static final class Node { /** 標記是共享模式*/ static final Node SHARED = new Node(); /** 標記是獨占模式*/ static final Node EXCLUSIVE = null; /** 代表線程已經被取消*/ static final int CANCELLED = 1; /** 代表后續節點需要喚醒 */ static final int SIGNAL = -1; /** 代表線程在等待某一條件/ static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; /***連接到等待condition的下一個節點 */ Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
獨占式同步狀態獲取流程,也就是acquire(int arg)方法調用流程,如圖
由圖所知,前驅節點為頭節點且能夠獲取同步狀態的判斷條件和線程進入等待狀態是獲
取同步狀態的自旋過程。當同步狀態獲取成功之后,當前線程從acquire(int arg)方法返回,如果
對于鎖這種并發組件而言,代表著當前線程獲取了鎖。
當前線程獲取同步狀態并執行了相應邏輯之后,就需要釋放同步狀態,使得后續節點能
夠繼續獲取同步狀態。通過調用同步器的release(int arg)方法可以釋放同步狀態,該方法在釋
放了同步狀態之后,會喚醒其后繼節點(進而使后繼節點重新嘗試獲取同步狀態)。
public final boolean release(int arg) { //嘗試釋放鎖,將擁有鎖的線程設為null,把state設為0 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
分析了獨占式同步狀態獲取和釋放過程后,適當做個總結:在獲取同步狀態時,同步器維護一個同步隊列,獲取狀態失敗的線程都會被加入到隊列中并在隊列中進行自旋;移出隊列(或停止自旋)的條件是前驅節點為頭節點且成功獲取了同步狀態。在釋放同步狀態時,同步器調用tryRelease(int arg)方法釋放同步狀態,然后喚醒頭節點的后繼節點。
原子變量類指的是java.util.concurrent.atomic包下的類。
整個包下面的類實現原理都接近,就是利用volatile和CAS來實現。
拿AtomicInteger舉例:
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value;
AtomicInteger類擁有一個unsafe對象,這是實現CAS的關鍵,private volatile int value就是它當前的值,用volatile來保證內存可見性。
public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }
這個方法完成CAS,如果value==expect,則把值設為update。這是核心方法,其他方法實現基本都用到它
例如
//獲取舊值設置新值 public final int getAndSet(int newValue) { for (;;) { int current = get(); if (compareAndSet(current, newValue)) return current; } } //獲取舊值并加1 public final int getAndIncrement() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; } }
等方法。