Java 並行編程是進階開發的核心技能。本章涵蓋線程基礎、線程池、鎖機制、原子操作以及並行工具類的正確使用方式。
線程生命週期#
Java 線程有 6 種狀態,定義在 Thread.State 枚舉中:
stateDiagram-v2
[*] --> NEW: new Thread()
NEW --> RUNNABLE: start()
RUNNABLE --> BLOCKED: 等待 synchronized 鎖
RUNNABLE --> WAITING: wait() / join() / park()
RUNNABLE --> TIMED_WAITING: sleep() / wait(timeout)
BLOCKED --> RUNNABLE: 獲得鎖
WAITING --> RUNNABLE: notify() / unpark()
TIMED_WAITING --> RUNNABLE: 逾時或被喚醒
RUNNABLE --> TERMINATED: 執行完畢或例外
TERMINATED --> [*]一個線程只能啟動一次。對已啟動的線程再次呼叫
start()會拋出IllegalThreadStateException。
狀態轉換說明#
| 狀態 | 說明 | 觸發條件 |
|---|---|---|
| NEW | 已創建未啟動 | new Thread() |
| RUNNABLE | 可執行 | start() |
| BLOCKED | 阻塞等待鎖 | 等待 synchronized |
| WAITING | 無限等待 | wait(), join(), park() |
| TIMED_WAITING | 有時限等待 | sleep(), wait(timeout) |
| TERMINATED | 終止 | 執行完畢或例外 |
線程池#
為什麼需要線程池?#
- 降低資源消耗:複用線程,避免頻繁創建銷毀
- 提高回應速度:任務到達時不需等待線程創建
- 統一管理:便於監控和調校
ThreadPoolExecutor 核心參數#
public ThreadPoolExecutor(
int corePoolSize, // 核心線程數
int maximumPoolSize, // 最大線程數
long keepAliveTime, // 非核心線程存活時間
TimeUnit unit, // 時間單位
BlockingQueue<Runnable> workQueue, // 工作佇列
ThreadFactory threadFactory, // 線程工廠
RejectedExecutionHandler handler // 拒絕策略
)線程池工作流程#
flowchart TD
A[提交任務] --> B{當前線程數 < corePoolSize?}
B -->|是| C[創建核心線程執行]
B -->|否| D{工作佇列未滿?}
D -->|是| E[加入佇列等待]
D -->|否| F{當前線程數 < maximumPoolSize?}
F -->|是| G[創建非核心線程執行]
F -->|否| H[執行拒絕策略]
C --> I[任務執行]
E --> I
G --> I不要使用 Executors 快捷方法!
// 危險!使用無界佇列,可能導致 OOM Executors.newFixedThreadPool(10); // LinkedBlockingQueue 無界 Executors.newSingleThreadExecutor(); // LinkedBlockingQueue 無界 Executors.newCachedThreadPool(); // 最大線程數為 Integer.MAX_VALUE
正確創建線程池#
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, // 核心線程數
20, // 最大線程數
60, TimeUnit.SECONDS, // 空閒存活時間
new ArrayBlockingQueue<>(100), // 有界佇列
new ThreadFactoryBuilder()
.setNameFormat("worker-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
);四種內建拒絕策略
| 策略 | 行為 |
|---|---|
| AbortPolicy | 拋出 RejectedExecutionException(預設) |
| CallerRunsPolicy | 由提交任務的線程執行 |
| DiscardPolicy | 靜默丟棄任務 |
| DiscardOldestPolicy | 丟棄佇列中最老的任務 |
推薦使用 CallerRunsPolicy:提供了一種優雅的降級機制,不會丟失任務。
鎖機制#
synchronized vs ReentrantLock#
| 特性 | synchronized | ReentrantLock |
|---|---|---|
| 實現方式 | JVM 內建 | API 層面 |
| 鎖的獲取 | 隱式 | 顯式 lock/unlock |
| 可中斷 | 否 | 是(lockInterruptibly) |
| 逾時獲取 | 否 | 是(tryLock) |
| 公平鎖 | 否 | 可選 |
| 條件變數 | 單一(wait/notify) | 多個 Condition |
// synchronized
synchronized (lock) {
// 臨界區
}
// ReentrantLock
Lock lock = new ReentrantLock();
lock.lock();
try {
// 臨界區
} finally {
lock.unlock(); // 必須在 finally 中釋放
}選擇建議:
- 簡單同步場景優先使用
synchronized(JVM 最佳化後效能差異不大)- 需要高級特性(中斷、逾時、多條件)時使用
ReentrantLock
讀寫鎖 ReentrantReadWriteLock#
適用於讀多寫少的場景:
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock readLock = rwLock.readLock();
Lock writeLock = rwLock.writeLock();
// 讀操作(可並行)
readLock.lock();
try {
return data;
} finally {
readLock.unlock();
}
// 寫操作(獨佔)
writeLock.lock();
try {
data = newValue;
} finally {
writeLock.unlock();
}CAS 與原子類#
CAS 原理#
CAS(Compare-And-Swap)是一種無鎖演算法:
CAS(V, Expected, New)
if (V == Expected)
V = New
return true
else
return false// AtomicInteger 的 incrementAndGet 實現
public final int incrementAndGet() {
return U.getAndAddInt(this, VALUE, 1) + 1;
}
// Unsafe 中的 CAS 操作
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}CAS 的 ABA 問題
線程 1 讀取值 A,線程 2 將 A 改為 B 再改回 A,線程 1 的 CAS 仍會成功。
解決方案:使用
AtomicStampedReference加入版本號。
常用原子類#
// 基本類型原子類
AtomicInteger count = new AtomicInteger(0);
count.incrementAndGet();
count.compareAndSet(expected, newValue);
// 引用類型原子類
AtomicReference<User> ref = new AtomicReference<>(user);
ref.compareAndSet(oldUser, newUser);
// 陣列原子類
AtomicIntegerArray array = new AtomicIntegerArray(10);
array.getAndIncrement(0);
// 字段更新器(減少物件開銷)
AtomicIntegerFieldUpdater<Counter> updater =
AtomicIntegerFieldUpdater.newUpdater(Counter.class, "count");並行工具類#
CountDownLatch(倒數門閂)#
等待多個操作完成:
CountDownLatch latch = new CountDownLatch(3);
// 工作線程
for (int i = 0; i < 3; i++) {
executor.execute(() -> {
try {
doWork();
} finally {
latch.countDown(); // 計數減 1
}
});
}
latch.await(); // 等待計數歸零
System.out.println("所有任務完成");CountDownLatch 是一次性的,計數歸零後不能重置。
CyclicBarrier(循環屏障)#
讓一組線程互相等待到達屏障點:
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("所有線程到達屏障");
});
for (int i = 0; i < 3; i++) {
executor.execute(() -> {
doPhase1();
barrier.await(); // 等待其他線程
doPhase2();
barrier.await(); // 可重複使用
});
}CountDownLatch vs CyclicBarrier
| 特性 | CountDownLatch | CyclicBarrier |
|---|---|---|
| 重用性 | 一次性 | 可重複使用 |
| 計數方式 | 遞減到 0 | 遞增到 N |
| 典型場景 | 主線程等待多個子線程 | 多線程分階段協作 |
| 回呼 | 無 | 支援屏障動作 |
Semaphore(信號量)#
控制並行訪問數量:
// 限制最多 3 個線程同時訪問
Semaphore semaphore = new Semaphore(3);
void accessResource() throws InterruptedException {
semaphore.acquire(); // 獲取許可
try {
useResource();
} finally {
semaphore.release(); // 釋放許可
}
}Semaphore 常用於實現限流器或連線池。
ThreadLocal#
ThreadLocal 提供線程局部變數:
private static final ThreadLocal<SimpleDateFormat> dateFormat =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
public String format(Date date) {
return dateFormat.get().format(date);
}ThreadLocal 內存洩漏風險
在使用線程池時,線程不會銷毀,ThreadLocal 中的值會一直存在。必須在使用完畢後清理:
try { threadLocal.set(value); // 使用 } finally { threadLocal.remove(); // 必須清理! }
實踐建議#
- 手動創建線程池:避免使用 Executors 快捷方法
- 給線程池線程命名:便於排查問題
- 合理設置佇列大小:使用有界佇列防止 OOM
- 優先使用並行集合:ConcurrentHashMap 優於 synchronized Map
- 使用原子類替代鎖:簡單計數場景更高效
- ThreadLocal 用完即清:特別是在線程池環境
- 理解 volatile 的局限:只保證可見性,不保證原子性