非同步處理是提升系統吞吐量的重要手段。本章介紹 Java 中的非同步編程技術及其在實際場景中的應用。

非同步處理的價值#

同步 vs 非同步#

同步處理(串行)
┌─────┐   ┌─────┐   ┌─────┐
│任務A│ → │任務B│ → │任務C│  總耗時: 300ms
└─────┘   └─────┘   └─────┘
 100ms     100ms     100ms

非同步處理(並行)
┌─────┐
│任務A│ ─┐
└─────┘  │
┌─────┐  ├→ 總耗時: 100ms
│任務B│ ─┤
└─────┘  │
┌─────┐  │
│任務C│ ─┘
└─────┘

適用場景#

場景說明
I/O 密集型檔案讀寫、網路請求、資料庫操作
獨立任務多個任務之間無依賴關係
耗時操作發送郵件、生成報表、資料同步
削峰填谷高峰期將請求暫存,平緩處理

Future 與 Callable#

基本使用#

ExecutorService executor = Executors.newFixedThreadPool(10);

// 提交任務
Future<String> future = executor.submit(() -> {
    Thread.sleep(1000);
    return "任務完成";
});

// 獲取結果(阻塞)
String result = future.get();  // 等待完成
String result = future.get(5, TimeUnit.SECONDS);  // 逾時等待

// 檢查狀態
boolean isDone = future.isDone();
boolean isCancelled = future.isCancelled();

Future 的局限性#

Future 的 get() 方法會阻塞當前線程,無法實現真正的非同步回呼。

// 問題:需要阻塞等待結果
Future<String> future1 = executor.submit(task1);
Future<String> future2 = executor.submit(task2);

// 必須按順序等待
String result1 = future1.get();  // 阻塞
String result2 = future2.get();  // 阻塞

CompletableFuture#

創建 CompletableFuture#

// 1. 無回傳值的非同步任務
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("執行任務");
});

// 2. 有回傳值的非同步任務
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "結果";
});

// 3. 使用自定義線程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "結果";
}, executor);

鏈式處理#

CompletableFuture.supplyAsync(() -> {
    return "Hello";
})
.thenApply(s -> {
    return s + " World";  // 轉換結果
})
.thenAccept(s -> {
    System.out.println(s);  // 消費結果
})
.thenRun(() -> {
    System.out.println("完成");  // 執行後續動作
});

組合多個 Future#

// 兩個任務都完成後處理
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");

future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2)
       .thenAccept(System.out::println);  // 輸出: Hello World

// 任一任務完成即可
future1.acceptEither(future2, System.out::println);

// 等待所有任務完成
CompletableFuture.allOf(future1, future2).join();

// 等待任一任務完成
CompletableFuture.anyOf(future1, future2).join();

例外處理#

CompletableFuture.supplyAsync(() -> {
    if (true) throw new RuntimeException("出錯了");
    return "結果";
})
.exceptionally(ex -> {
    // 處理例外,回傳默認值
    return "默認值";
})
.handle((result, ex) -> {
    // 同時處理結果和例外
    if (ex != null) {
        return "出錯: " + ex.getMessage();
    }
    return result;
});

實戰示例:並行呼叫多個服務#

public OrderDTO getOrderDetail(String orderId) {
    // 並行呼叫三個服務
    CompletableFuture<Order> orderFuture =
        CompletableFuture.supplyAsync(() -> orderService.getOrder(orderId));

    CompletableFuture<User> userFuture =
        CompletableFuture.supplyAsync(() -> userService.getUser(orderId));

    CompletableFuture<List<Product>> productsFuture =
        CompletableFuture.supplyAsync(() -> productService.getProducts(orderId));

    // 等待所有結果
    CompletableFuture.allOf(orderFuture, userFuture, productsFuture).join();

    // 組裝結果
    return new OrderDTO(
        orderFuture.join(),
        userFuture.join(),
        productsFuture.join()
    );
}

非同步 I/O#

NIO 非阻塞 I/O#

傳統 I/O 的問題:

傳統 BIO(阻塞 I/O)
┌────────┐    ┌────────┐
│ Thread │ →  │ Socket │ → 阻塞等待資料
└────────┘    └────────┘
   └→ 每個連線需要一個線程,線程數量有限

NIO 的解決方案:

NIO(非阻塞 I/O)
┌────────┐    ┌──────────┐    ┌────────┐
│ Thread │ →  │ Selector │ →  │ Channel│ × N
└────────┘    └──────────┘    └────────┘
   └→ 一個線程管理多個連線

NIO 核心組件#

組件作用
Buffer緩衝區,讀寫資料的中轉站
Channel通道,資料讀寫的管道
Selector多路復用器,監聽多個 Channel

DirectBuffer 減少記憶體複製#

// 普通 Buffer:堆記憶體,需要複製到直接記憶體
ByteBuffer heapBuffer = ByteBuffer.allocate(1024);

// DirectBuffer:直接記憶體,減少一次複製
ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024);

DirectBuffer 適合大量 I/O 操作的場景,但創建和銷毀成本高,適合長期使用。

反應式編程(Reactive)#

反應式的核心概念#

反應式編程是一種面向資料流和變化傳播的編程範式。

傳統同步
request → wait → response

反應式
request → callback(response) → 繼續處理

Spring WebFlux#

@RestController
public class UserController {

    @GetMapping("/users/{id}")
    public Mono<User> getUser(@PathVariable String id) {
        return userRepository.findById(id);  // 非阻塞
    }

    @GetMapping("/users")
    public Flux<User> getAllUsers() {
        return userRepository.findAll();  // 流式回傳
    }
}

Reactor 操作符#

Flux.just("A", "B", "C")
    .map(String::toLowerCase)
    .filter(s -> !s.equals("b"))
    .flatMap(s -> Mono.just(s + "!"))
    .subscribe(System.out::println);  // 輸出: a!, c!

反應式編程的學習曲線較陡,適合高並行、I/O 密集型的場景。對於一般業務邏輯,同步編程更易維護。

非同步 HTTP 用戶端#

HttpClient (Java 11+)#

HttpClient client = HttpClient.newBuilder()
    .connectTimeout(Duration.ofSeconds(10))
    .build();

HttpRequest request = HttpRequest.newBuilder()
    .uri(URI.create("https://api.example.com/users"))
    .GET()
    .build();

// 非同步請求
client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
    .thenApply(HttpResponse::body)
    .thenAccept(System.out::println);

WebClient (Spring WebFlux)#

WebClient client = WebClient.create("https://api.example.com");

Mono<User> user = client.get()
    .uri("/users/{id}", 1)
    .retrieve()
    .bodyToMono(User.class);

// 訂閱結果
user.subscribe(
    u -> System.out.println("用戶: " + u),
    error -> System.err.println("錯誤: " + error),
    () -> System.out.println("完成")
);

訊息佇列#

為什麼需要訊息佇列#

同步呼叫
┌────────┐    ┌────────┐    ┌────────┐
│訂單服務│ →  │庫存服務│ →  │通知服務│
└────────┘    └────────┘    └────────┘
              任一失敗,整體失敗

非同步解耦
┌────────┐    ┌─────────┐    ┌────────┐
│訂單服務│ →  │訊息佇列 │ ←→ │庫存服務│
└────────┘    └─────────┘    └────────┘
                  ↑↓
              ┌────────┐
              │通知服務│
              └────────┘
              服務解耦,獨立擴展

訊息佇列的應用場景#

場景說明
非同步處理發送郵件、推送通知
流量削峰秒殺活動的訂單處理
服務解耦訂單與庫存、積分等服務解耦
日誌處理日誌收集與分析

常見訊息佇列對比#

特性RabbitMQKafkaRocketMQ
吞吐量萬級百萬級十萬級
延遲微秒級毫秒級毫秒級
可靠性
適用場景業務訊息日誌/大資料業務訊息

Spring 整合示例#

// 發送訊息
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendOrder(Order order) {
    rabbitTemplate.convertAndSend("order.exchange", "order.created", order);
}

// 接收訊息
@RabbitListener(queues = "order.created.queue")
public void handleOrder(Order order) {
    // 處理訂單
    stockService.deduct(order);
    notificationService.send(order);
}

非同步編程最佳實踐#

線程池組態#

// 推薦:根據任務類型組態線程池
// CPU 密集型:核心數 + 1
// I/O 密集型:核心數 * 2 或更多

ExecutorService executor = new ThreadPoolExecutor(
    10,                      // 核心線程數
    20,                      // 最大線程數
    60, TimeUnit.SECONDS,    // 空閒線程存活時間
    new LinkedBlockingQueue<>(1000),  // 任務佇列
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒絕策略
);

逾時控制#

// CompletableFuture 逾時
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 長時間任務
    return "結果";
});

try {
    String result = future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    future.cancel(true);  // 取消任務
}

// Java 9+ 的逾時處理
future.orTimeout(5, TimeUnit.SECONDS)
      .exceptionally(ex -> "逾時默認值");

錯誤處理#

CompletableFuture.supplyAsync(() -> {
    return riskyOperation();
})
.exceptionally(ex -> {
    logger.error("任務失敗", ex);
    return fallbackValue;
})
.thenAccept(result -> {
    // 處理結果
});

總結#

技術適用場景複雜度
Future簡單非同步任務
CompletableFuture複雜非同步流程
NIO高並行網路 I/O
反應式極高並行場景
訊息佇列服務解耦、削峰
非同步編程檢查清單
  • 是否需要非同步?(不要為了非同步而非同步)
  • 線程池組態是否合理?
  • 是否有逾時控制?
  • 是否正確處理例外?
  • 是否有降級方案?
  • 是否考慮了資源釋放?