非同步處理是提升系統吞吐量的重要手段。本章介紹 Java 中的非同步編程技術及其在實際場景中的應用。
非同步處理的價值#
同步 vs 非同步#
同步處理(串行)
┌─────┐ ┌─────┐ ┌─────┐
│任務A│ → │任務B│ → │任務C│ 總耗時: 300ms
└─────┘ └─────┘ └─────┘
100ms 100ms 100ms
非同步處理(並行)
┌─────┐
│任務A│ ─┐
└─────┘ │
┌─────┐ ├→ 總耗時: 100ms
│任務B│ ─┤
└─────┘ │
┌─────┐ │
│任務C│ ─┘
└─────┘適用場景#
| 場景 | 說明 |
|---|---|
| I/O 密集型 | 檔案讀寫、網路請求、資料庫操作 |
| 獨立任務 | 多個任務之間無依賴關係 |
| 耗時操作 | 發送郵件、生成報表、資料同步 |
| 削峰填谷 | 高峰期將請求暫存,平緩處理 |
Future 與 Callable#
基本使用#
ExecutorService executor = Executors.newFixedThreadPool(10);
// 提交任務
Future<String> future = executor.submit(() -> {
Thread.sleep(1000);
return "任務完成";
});
// 獲取結果(阻塞)
String result = future.get(); // 等待完成
String result = future.get(5, TimeUnit.SECONDS); // 逾時等待
// 檢查狀態
boolean isDone = future.isDone();
boolean isCancelled = future.isCancelled();Future 的局限性#
Future 的
get()方法會阻塞當前線程,無法實現真正的非同步回呼。
// 問題:需要阻塞等待結果
Future<String> future1 = executor.submit(task1);
Future<String> future2 = executor.submit(task2);
// 必須按順序等待
String result1 = future1.get(); // 阻塞
String result2 = future2.get(); // 阻塞CompletableFuture#
創建 CompletableFuture#
// 1. 無回傳值的非同步任務
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("執行任務");
});
// 2. 有回傳值的非同步任務
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "結果";
});
// 3. 使用自定義線程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "結果";
}, executor);鏈式處理#
CompletableFuture.supplyAsync(() -> {
return "Hello";
})
.thenApply(s -> {
return s + " World"; // 轉換結果
})
.thenAccept(s -> {
System.out.println(s); // 消費結果
})
.thenRun(() -> {
System.out.println("完成"); // 執行後續動作
});組合多個 Future#
// 兩個任務都完成後處理
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2)
.thenAccept(System.out::println); // 輸出: Hello World
// 任一任務完成即可
future1.acceptEither(future2, System.out::println);
// 等待所有任務完成
CompletableFuture.allOf(future1, future2).join();
// 等待任一任務完成
CompletableFuture.anyOf(future1, future2).join();例外處理#
CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("出錯了");
return "結果";
})
.exceptionally(ex -> {
// 處理例外,回傳默認值
return "默認值";
})
.handle((result, ex) -> {
// 同時處理結果和例外
if (ex != null) {
return "出錯: " + ex.getMessage();
}
return result;
});實戰示例:並行呼叫多個服務#
public OrderDTO getOrderDetail(String orderId) {
// 並行呼叫三個服務
CompletableFuture<Order> orderFuture =
CompletableFuture.supplyAsync(() -> orderService.getOrder(orderId));
CompletableFuture<User> userFuture =
CompletableFuture.supplyAsync(() -> userService.getUser(orderId));
CompletableFuture<List<Product>> productsFuture =
CompletableFuture.supplyAsync(() -> productService.getProducts(orderId));
// 等待所有結果
CompletableFuture.allOf(orderFuture, userFuture, productsFuture).join();
// 組裝結果
return new OrderDTO(
orderFuture.join(),
userFuture.join(),
productsFuture.join()
);
}非同步 I/O#
NIO 非阻塞 I/O#
傳統 I/O 的問題:
傳統 BIO(阻塞 I/O)
┌────────┐ ┌────────┐
│ Thread │ → │ Socket │ → 阻塞等待資料
└────────┘ └────────┘
│
└→ 每個連線需要一個線程,線程數量有限NIO 的解決方案:
NIO(非阻塞 I/O)
┌────────┐ ┌──────────┐ ┌────────┐
│ Thread │ → │ Selector │ → │ Channel│ × N
└────────┘ └──────────┘ └────────┘
│
└→ 一個線程管理多個連線NIO 核心組件#
| 組件 | 作用 |
|---|---|
| Buffer | 緩衝區,讀寫資料的中轉站 |
| Channel | 通道,資料讀寫的管道 |
| Selector | 多路復用器,監聽多個 Channel |
DirectBuffer 減少記憶體複製#
// 普通 Buffer:堆記憶體,需要複製到直接記憶體
ByteBuffer heapBuffer = ByteBuffer.allocate(1024);
// DirectBuffer:直接記憶體,減少一次複製
ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024);DirectBuffer 適合大量 I/O 操作的場景,但創建和銷毀成本高,適合長期使用。
反應式編程(Reactive)#
反應式的核心概念#
反應式編程是一種面向資料流和變化傳播的編程範式。
傳統同步
request → wait → response
反應式
request → callback(response) → 繼續處理Spring WebFlux#
@RestController
public class UserController {
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable String id) {
return userRepository.findById(id); // 非阻塞
}
@GetMapping("/users")
public Flux<User> getAllUsers() {
return userRepository.findAll(); // 流式回傳
}
}Reactor 操作符#
Flux.just("A", "B", "C")
.map(String::toLowerCase)
.filter(s -> !s.equals("b"))
.flatMap(s -> Mono.just(s + "!"))
.subscribe(System.out::println); // 輸出: a!, c!反應式編程的學習曲線較陡,適合高並行、I/O 密集型的場景。對於一般業務邏輯,同步編程更易維護。
非同步 HTTP 用戶端#
HttpClient (Java 11+)#
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("https://api.example.com/users"))
.GET()
.build();
// 非同步請求
client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(HttpResponse::body)
.thenAccept(System.out::println);WebClient (Spring WebFlux)#
WebClient client = WebClient.create("https://api.example.com");
Mono<User> user = client.get()
.uri("/users/{id}", 1)
.retrieve()
.bodyToMono(User.class);
// 訂閱結果
user.subscribe(
u -> System.out.println("用戶: " + u),
error -> System.err.println("錯誤: " + error),
() -> System.out.println("完成")
);訊息佇列#
為什麼需要訊息佇列#
同步呼叫
┌────────┐ ┌────────┐ ┌────────┐
│訂單服務│ → │庫存服務│ → │通知服務│
└────────┘ └────────┘ └────────┘
任一失敗,整體失敗
非同步解耦
┌────────┐ ┌─────────┐ ┌────────┐
│訂單服務│ → │訊息佇列 │ ←→ │庫存服務│
└────────┘ └─────────┘ └────────┘
↑↓
┌────────┐
│通知服務│
└────────┘
服務解耦,獨立擴展訊息佇列的應用場景#
| 場景 | 說明 |
|---|---|
| 非同步處理 | 發送郵件、推送通知 |
| 流量削峰 | 秒殺活動的訂單處理 |
| 服務解耦 | 訂單與庫存、積分等服務解耦 |
| 日誌處理 | 日誌收集與分析 |
常見訊息佇列對比#
| 特性 | RabbitMQ | Kafka | RocketMQ |
|---|---|---|---|
| 吞吐量 | 萬級 | 百萬級 | 十萬級 |
| 延遲 | 微秒級 | 毫秒級 | 毫秒級 |
| 可靠性 | 高 | 高 | 高 |
| 適用場景 | 業務訊息 | 日誌/大資料 | 業務訊息 |
Spring 整合示例#
// 發送訊息
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendOrder(Order order) {
rabbitTemplate.convertAndSend("order.exchange", "order.created", order);
}
// 接收訊息
@RabbitListener(queues = "order.created.queue")
public void handleOrder(Order order) {
// 處理訂單
stockService.deduct(order);
notificationService.send(order);
}非同步編程最佳實踐#
線程池組態#
// 推薦:根據任務類型組態線程池
// CPU 密集型:核心數 + 1
// I/O 密集型:核心數 * 2 或更多
ExecutorService executor = new ThreadPoolExecutor(
10, // 核心線程數
20, // 最大線程數
60, TimeUnit.SECONDS, // 空閒線程存活時間
new LinkedBlockingQueue<>(1000), // 任務佇列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
);逾時控制#
// CompletableFuture 逾時
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 長時間任務
return "結果";
});
try {
String result = future.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true); // 取消任務
}
// Java 9+ 的逾時處理
future.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(ex -> "逾時默認值");錯誤處理#
CompletableFuture.supplyAsync(() -> {
return riskyOperation();
})
.exceptionally(ex -> {
logger.error("任務失敗", ex);
return fallbackValue;
})
.thenAccept(result -> {
// 處理結果
});總結#
| 技術 | 適用場景 | 複雜度 |
|---|---|---|
| Future | 簡單非同步任務 | 低 |
| CompletableFuture | 複雜非同步流程 | 中 |
| NIO | 高並行網路 I/O | 高 |
| 反應式 | 極高並行場景 | 高 |
| 訊息佇列 | 服務解耦、削峰 | 中 |
非同步編程檢查清單
- 是否需要非同步?(不要為了非同步而非同步)
- 線程池組態是否合理?
- 是否有逾時控制?
- 是否正確處理例外?
- 是否有降級方案?
- 是否考慮了資源釋放?