並行編程是 Java 開發中最容易出錯的領域之一。本章涵蓋 ThreadLocal、並行容器、鎖機制、線程池與連線池等核心議題的常見陷阱與最佳實踐。
ThreadLocal 線程重用陷阱#
問題現象#
在 Web 應用程式中使用 ThreadLocal 快取用戶資訊,卻發現有時獲取到的是其他用戶的資料。
原因分析#
Tomcat 等 Web 服務器使用線程池處理請求,線程會被重用。如果不在請求結束時清理 ThreadLocal,下一個請求可能會獲取到前一個請求遺留的資料。
// 錯誤示範:未清理 ThreadLocal
private static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);
@GetMapping("wrong")
public Map wrong(@RequestParam("userId") Integer userId) {
// 設置用戶資訊之前查詢,可能獲取到上一個請求的資料
String before = Thread.currentThread().getName() + ":" + currentUser.get();
currentUser.set(userId);
String after = Thread.currentThread().getName() + ":" + currentUser.get();
Map result = new HashMap();
result.put("before", before);
result.put("after", after);
return result;
}正確做法#
// 正確示範:在 finally 中清理 ThreadLocal
@GetMapping("right")
public Map right(@RequestParam("userId") Integer userId) {
String before = Thread.currentThread().getName() + ":" + currentUser.get();
currentUser.set(userId);
try {
String after = Thread.currentThread().getName() + ":" + currentUser.get();
Map result = new HashMap();
result.put("before", before);
result.put("after", after);
return result;
} finally {
// 務必在 finally 中清理 ThreadLocal
currentUser.remove();
}
}核心原則:使用 ThreadLocal 時,務必在業務邏輯結束前顯式呼叫
remove()方法清理資料。
ConcurrentHashMap 原子性陷阱#
問題現象#
使用 ConcurrentHashMap 進行「先查詢後修改」的複合操作,結果與預期不符。
原因分析#
ConcurrentHashMap 只能保證單個操作的原子性,無法保證複合操作的原子性。size()、isEmpty() 等方法在並行情況下回傳的是近似值。
// 錯誤示範:複合操作非原子性
@GetMapping("wrong")
public String wrong() throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(900);
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
// size() 和 putAll() 不是原子操作
int gap = 1000 - concurrentHashMap.size();
log.info("gap size:{}", gap);
concurrentHashMap.putAll(getData(gap));
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
log.info("finish size:{}", concurrentHashMap.size()); // 可能遠超 1000
return "OK";
}正確做法#
方案一:對複合操作加鎖
@GetMapping("right")
public String right() throws InterruptedException {
ConcurrentHashMap<String, Long> concurrentHashMap = getData(900);
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
// 對複合操作加鎖
synchronized (concurrentHashMap) {
int gap = 1000 - concurrentHashMap.size();
log.info("gap size:{}", gap);
concurrentHashMap.putAll(getData(gap));
}
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
log.info("finish size:{}", concurrentHashMap.size()); // 正確為 1000
return "OK";
}方案二:使用原子性 API(推薦)
// 使用 computeIfAbsent 實現原子性的「查詢-插入」操作
private Map<String, Long> gooduse() throws InterruptedException {
ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(10);
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10000000).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(10);
// computeIfAbsent 是原子操作,配合 LongAdder 實現線程安全計數
freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
}));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return freqs.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().longValue()));
}使用
computeIfAbsent配合LongAdder的方式,效能比加鎖方式提升約 10 倍。
CopyOnWriteArrayList 使用場景陷阱#
問題現象#
使用 CopyOnWriteArrayList 作為頻繁修改的快取,發現效能極差。
原因分析#
CopyOnWriteArrayList 的每次寫入操作都會複製整個陣列。它適用於「讀多寫少」的場景,不適合頻繁修改的場景。
// CopyOnWriteArrayList 的 add 方法源碼
public boolean add(E e) {
synchronized (lock) {
Object[] elements = getArray();
int len = elements.length;
// 每次 add 都會創建新陣列
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
}
}效能比較#
| 操作類型 | CopyOnWriteArrayList | synchronizedList |
|---|---|---|
| 10 萬次寫入 | ~10 秒 | ~0.1 秒 |
| 100 萬次讀取 | ~0.02 秒 | ~0.1 秒 |
正確做法#
- 讀多寫少:使用 CopyOnWriteArrayList
- 讀寫均衡或寫多:使用 Collections.synchronizedList 或其他併發容器
鎖的粒度與層級陷阱#
問題一:鎖的層級錯誤#
// 錯誤示範:靜態變數用實體鎖保護
class Data {
private static int counter = 0;
// 實體方法的 synchronized 只能保護實體變數
public synchronized void wrong() {
counter++; // 靜態變數無法被正確保護
}
}
// 正確示範:靜態變數用類級別鎖保護
class Data {
private static int counter = 0;
private static Object locker = new Object();
public void right() {
synchronized (locker) {
counter++;
}
}
}靜態字段屬於類,需要類級別的鎖;非靜態字段屬於實體,實體級別的鎖即可保護。
問題二:鎖的粒度過粗#
// 錯誤示範:鎖粒度過粗
@GetMapping("wrong")
public int wrong() {
IntStream.rangeClosed(1, 1000).parallel().forEach(i -> {
synchronized (this) {
slow(); // 耗時操作不涉及共享資源
data.add(i); // 只有這裡需要加鎖
}
});
return data.size();
}
// 正確示範:只對需要保護的資源加鎖
@GetMapping("right")
public int right() {
IntStream.rangeClosed(1, 1000).parallel().forEach(i -> {
slow(); // 耗時操作不加鎖
synchronized (data) {
data.add(i); // 只對共享資源加鎖
}
});
return data.size();
}正確加鎖的版本耗時 1.4 秒,而粗粒度加鎖的版本耗時 11 秒。
死鎖陷阱#
問題現象#
多商品下單時,不同線程以不同順序獲取商品鎖,導致死鎖。
原因分析#
// 錯誤示範:購物車商品順序隨機,可能導致死鎖
private List<Item> createCart() {
return IntStream.rangeClosed(1, 3)
.mapToObj(i -> "item" + ThreadLocalRandom.current().nextInt(items.size()))
.map(name -> items.get(name))
.collect(Collectors.toList());
}死鎖場景:
- 線程 A 獲取 item1 的鎖,等待 item2 的鎖
- 線程 B 獲取 item2 的鎖,等待 item1 的鎖
- 雙方互相等待,形成死鎖
正確做法#
// 正確示範:對商品排序後再獲取鎖,避免循環等待
@GetMapping("right")
public long right() {
long success = IntStream.rangeClosed(1, 100).parallel()
.mapToObj(i -> {
// 按商品名排序,確保所有線程以相同順序獲取鎖
List<Item> cart = createCart().stream()
.sorted(Comparator.comparing(Item::getName))
.collect(Collectors.toList());
return createOrder(cart);
})
.filter(result -> result)
.count();
return success;
}避免死鎖的核心策略:確保所有線程以相同的順序獲取鎖,打破循環等待條件。
線程池 OOM 陷阱#
問題現象#
使用 Executors.newFixedThreadPool() 或 Executors.newCachedThreadPool() 創建線程池,導致 OOM。
原因分析#
newFixedThreadPool 的 OOM 原因
// newFixedThreadPool 使用無界佇列
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()); // 默認容量 Integer.MAX_VALUE
}當任務執行較慢時,無界佇列會不斷堆積任務,最終撐爆內存。
newCachedThreadPool 的 OOM 原因
// newCachedThreadPool 最大線程數無上限
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 最大線程數無上限
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>()); // 不快取任務
}當任務執行較慢時,會不斷創建新線程,最終因線程數過多導致 OOM。
正確做法#
// 手動創建線程池,控制核心參數
private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
10, // 核心線程數
50, // 最大線程數
2, TimeUnit.SECONDS, // 線程空閒回收時間
new ArrayBlockingQueue<>(1000), // 有界佇列
new ThreadFactoryBuilder()
.setNameFormat("demo-threadpool-%d") // 有意義的線程名稱
.get(),
new ThreadPoolExecutor.AbortPolicy() // 拒絕策略
);線程池最佳實踐:
- 禁止使用 Executors 快捷方法,手動創建 ThreadPoolExecutor
- 設置有界的工作佇列和合理的最大線程數
- 為線程指定有意義的名稱,便於排查問題
- 根據任務類型選擇合適的拒絕策略
線程池工作行為#
flowchart TD
A[任務提交] --> B{核心線程<br/>是否已滿?}
B -->|否| C[創建核心線程處理]
B -->|是| D{工作佇列<br/>是否已滿?}
D -->|否| E[加入工作佇列]
D -->|是| F{是否達到<br/>最大線程數?}
F -->|否| G[創建非核心線程處理]
F -->|是| H[觸發拒絕策略]
I[非核心線程空閒] --> J{空閒時間 ><br/>keepAliveTime?}
J -->|是| K[回收線程]
J -->|否| I
style C fill:#c8e6c9
style E fill:#fff9c4
style G fill:#bbdefb
style H fill:#ffcdd2
style K fill:#e1bee7線程池復用陷阱#
問題現象#
每次業務操作都創建新的線程池,導致線程數暴漲。
// 錯誤示範:每次呼叫都創建新線程池
class ThreadPoolHelper {
public static ThreadPoolExecutor getThreadPool() {
return (ThreadPoolExecutor) Executors.newCachedThreadPool();
}
}正確做法#
// 正確示範:使用靜態字段保存線程池引用,確保復用
class ThreadPoolHelper {
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10, 50,
2, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get()
);
public static ThreadPoolExecutor getRightThreadPool() {
return threadPoolExecutor;
}
}線程池混用陷阱#
問題現象#
IO 密集型任務和 CPU 密集型任務共用同一個線程池,導致相互干擾。
正確做法#
根據任務特性使用不同的線程池:
- IO 密集型任務:可以組態較多線程(如 CPU 核數 * 2 或更多),較短的佇列
- CPU 密集型任務:線程數等於 CPU 核數或核數 + 1,較長的佇列做緩衝
// IO 密集型任務線程池
private static ThreadPoolExecutor ioThreadPool = new ThreadPoolExecutor(
200, 200,
1, TimeUnit.HOURS,
new ArrayBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("io-threadpool-%d").get()
);
// CPU 密集型任務線程池
private static ThreadPoolExecutor cpuThreadPool = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1, TimeUnit.HOURS,
new ArrayBlockingQueue<>(10000),
new ThreadFactoryBuilder().setNameFormat("cpu-threadpool-%d").get()
);Java 8 的 parallel stream 使用共享的 ForkJoinPool,如果涉及 IO 操作,建議使用自定義線程池。
總結:並行編程檢查清單#
| 檢查項目 | 正確做法 |
|---|---|
| ThreadLocal | 業務結束前呼叫 remove() 清理 |
| ConcurrentHashMap 複合操作 | 加鎖或使用原子性 API |
| CopyOnWriteArrayList | 僅用於讀多寫少場景 |
| 鎖的層級 | 靜態變數用類鎖,實體變數用實體鎖 |
| 鎖的粒度 | 僅對必要的程式碼塊加鎖 |
| 多把鎖 | 固定獲取鎖的順序,避免死鎖 |
| 線程池創建 | 手動創建,禁用 Executors 快捷方法 |
| 線程池復用 | 使用靜態字段保存,確保復用 |
| 線程池隔離 | 根據任務類型使用不同線程池 |