Java 並行編程是進階開發的核心技能。本章涵蓋線程基礎、線程池、鎖機制、原子操作以及並行工具類的正確使用方式。

線程生命週期#

Java 線程有 6 種狀態,定義在 Thread.State 枚舉中:

stateDiagram-v2
    [*] --> NEW: new Thread()
    NEW --> RUNNABLE: start()

    RUNNABLE --> BLOCKED: 等待 synchronized 鎖
    RUNNABLE --> WAITING: wait() / join() / park()
    RUNNABLE --> TIMED_WAITING: sleep() / wait(timeout)

    BLOCKED --> RUNNABLE: 獲得鎖
    WAITING --> RUNNABLE: notify() / unpark()
    TIMED_WAITING --> RUNNABLE: 逾時或被喚醒

    RUNNABLE --> TERMINATED: 執行完畢或例外
    TERMINATED --> [*]

一個線程只能啟動一次。對已啟動的線程再次呼叫 start() 會拋出 IllegalThreadStateException

狀態轉換說明#

狀態說明觸發條件
NEW已創建未啟動new Thread()
RUNNABLE可執行start()
BLOCKED阻塞等待鎖等待 synchronized
WAITING無限等待wait(), join(), park()
TIMED_WAITING有時限等待sleep(), wait(timeout)
TERMINATED終止執行完畢或例外

線程池#

為什麼需要線程池?#

  1. 降低資源消耗:複用線程,避免頻繁創建銷毀
  2. 提高回應速度:任務到達時不需等待線程創建
  3. 統一管理:便於監控和調校

ThreadPoolExecutor 核心參數#

public ThreadPoolExecutor(
    int corePoolSize,      // 核心線程數
    int maximumPoolSize,   // 最大線程數
    long keepAliveTime,    // 非核心線程存活時間
    TimeUnit unit,         // 時間單位
    BlockingQueue<Runnable> workQueue,  // 工作佇列
    ThreadFactory threadFactory,         // 線程工廠
    RejectedExecutionHandler handler     // 拒絕策略
)

線程池工作流程#

flowchart TD
    A[提交任務] --> B{當前線程數 < corePoolSize?}
    B -->|是| C[創建核心線程執行]
    B -->|否| D{工作佇列未滿?}
    D -->|是| E[加入佇列等待]
    D -->|否| F{當前線程數 < maximumPoolSize?}
    F -->|是| G[創建非核心線程執行]
    F -->|否| H[執行拒絕策略]

    C --> I[任務執行]
    E --> I
    G --> I

不要使用 Executors 快捷方法!

// 危險!使用無界佇列,可能導致 OOM
Executors.newFixedThreadPool(10);    // LinkedBlockingQueue 無界
Executors.newSingleThreadExecutor(); // LinkedBlockingQueue 無界
Executors.newCachedThreadPool();     // 最大線程數為 Integer.MAX_VALUE

正確創建線程池#

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10,                      // 核心線程數
    20,                      // 最大線程數
    60, TimeUnit.SECONDS,    // 空閒存活時間
    new ArrayBlockingQueue<>(100),  // 有界佇列
    new ThreadFactoryBuilder()
        .setNameFormat("worker-%d")
        .build(),
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒絕策略
);
四種內建拒絕策略
策略行為
AbortPolicy拋出 RejectedExecutionException(預設)
CallerRunsPolicy由提交任務的線程執行
DiscardPolicy靜默丟棄任務
DiscardOldestPolicy丟棄佇列中最老的任務

推薦使用 CallerRunsPolicy:提供了一種優雅的降級機制,不會丟失任務。


鎖機制#

synchronized vs ReentrantLock#

特性synchronizedReentrantLock
實現方式JVM 內建API 層面
鎖的獲取隱式顯式 lock/unlock
可中斷是(lockInterruptibly)
逾時獲取是(tryLock)
公平鎖可選
條件變數單一(wait/notify)多個 Condition
// synchronized
synchronized (lock) {
    // 臨界區
}

// ReentrantLock
Lock lock = new ReentrantLock();
lock.lock();
try {
    // 臨界區
} finally {
    lock.unlock();  // 必須在 finally 中釋放
}

選擇建議

  • 簡單同步場景優先使用 synchronized(JVM 最佳化後效能差異不大)
  • 需要高級特性(中斷、逾時、多條件)時使用 ReentrantLock

讀寫鎖 ReentrantReadWriteLock#

適用於讀多寫少的場景:

ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
Lock readLock = rwLock.readLock();
Lock writeLock = rwLock.writeLock();

// 讀操作(可並行)
readLock.lock();
try {
    return data;
} finally {
    readLock.unlock();
}

// 寫操作(獨佔)
writeLock.lock();
try {
    data = newValue;
} finally {
    writeLock.unlock();
}

CAS 與原子類#

CAS 原理#

CAS(Compare-And-Swap)是一種無鎖演算法:

CAS(V, Expected, New)
    if (V == Expected)
        V = New
        return true
    else
        return false
// AtomicInteger 的 incrementAndGet 實現
public final int incrementAndGet() {
    return U.getAndAddInt(this, VALUE, 1) + 1;
}

// Unsafe 中的 CAS 操作
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
}

CAS 的 ABA 問題

線程 1 讀取值 A,線程 2 將 A 改為 B 再改回 A,線程 1 的 CAS 仍會成功。

解決方案:使用 AtomicStampedReference 加入版本號。

常用原子類#

// 基本類型原子類
AtomicInteger count = new AtomicInteger(0);
count.incrementAndGet();
count.compareAndSet(expected, newValue);

// 引用類型原子類
AtomicReference<User> ref = new AtomicReference<>(user);
ref.compareAndSet(oldUser, newUser);

// 陣列原子類
AtomicIntegerArray array = new AtomicIntegerArray(10);
array.getAndIncrement(0);

// 字段更新器(減少物件開銷)
AtomicIntegerFieldUpdater<Counter> updater =
    AtomicIntegerFieldUpdater.newUpdater(Counter.class, "count");

並行工具類#

CountDownLatch(倒數門閂)#

等待多個操作完成:

CountDownLatch latch = new CountDownLatch(3);

// 工作線程
for (int i = 0; i < 3; i++) {
    executor.execute(() -> {
        try {
            doWork();
        } finally {
            latch.countDown();  // 計數減 1
        }
    });
}

latch.await();  // 等待計數歸零
System.out.println("所有任務完成");

CountDownLatch 是一次性的,計數歸零後不能重置。

CyclicBarrier(循環屏障)#

讓一組線程互相等待到達屏障點:

CyclicBarrier barrier = new CyclicBarrier(3, () -> {
    System.out.println("所有線程到達屏障");
});

for (int i = 0; i < 3; i++) {
    executor.execute(() -> {
        doPhase1();
        barrier.await();  // 等待其他線程
        doPhase2();
        barrier.await();  // 可重複使用
    });
}
CountDownLatch vs CyclicBarrier
特性CountDownLatchCyclicBarrier
重用性一次性可重複使用
計數方式遞減到 0遞增到 N
典型場景主線程等待多個子線程多線程分階段協作
回呼支援屏障動作

Semaphore(信號量)#

控制並行訪問數量:

// 限制最多 3 個線程同時訪問
Semaphore semaphore = new Semaphore(3);

void accessResource() throws InterruptedException {
    semaphore.acquire();  // 獲取許可
    try {
        useResource();
    } finally {
        semaphore.release();  // 釋放許可
    }
}

Semaphore 常用於實現限流器或連線池。


ThreadLocal#

ThreadLocal 提供線程局部變數:

private static final ThreadLocal<SimpleDateFormat> dateFormat =
    ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));

public String format(Date date) {
    return dateFormat.get().format(date);
}

ThreadLocal 內存洩漏風險

在使用線程池時,線程不會銷毀,ThreadLocal 中的值會一直存在。必須在使用完畢後清理:

try {
    threadLocal.set(value);
    // 使用
} finally {
    threadLocal.remove();  // 必須清理!
}

實踐建議#

  1. 手動創建線程池:避免使用 Executors 快捷方法
  2. 給線程池線程命名:便於排查問題
  3. 合理設置佇列大小:使用有界佇列防止 OOM
  4. 優先使用並行集合:ConcurrentHashMap 優於 synchronized Map
  5. 使用原子類替代鎖:簡單計數場景更高效
  6. ThreadLocal 用完即清:特別是在線程池環境
  7. 理解 volatile 的局限:只保證可見性,不保證原子性