Go 語言的併發模型基於 CSP(Communicating Sequential Processes)理論,透過 Goroutine 和 Channel 提供簡潔而強大的併發機制。
Goroutine 基礎#
什麼是 Goroutine#
Goroutine 是 Go 語言中的輕量級執行緒,由 Go 執行時期系統(runtime)管理,而非作業系統。
func sayHello(name string) {
fmt.Println("Hello,", name)
}
func main() {
// 啟動 goroutine
go sayHello("World")
// 使用匿名函式
go func() {
fmt.Println("匿名 goroutine")
}()
// 等待 goroutine 完成(實際應用中使用 sync.WaitGroup)
time.Sleep(time.Second)
}Goroutine 與執行緒的差異:
- Goroutine 初始堆疊只有 2KB(執行緒通常 1-8MB)
- Goroutine 由 Go runtime 排程,切換成本極低
- 可以輕鬆建立數十萬個 goroutine
Go 排程器(G-P-M 模型)#
Go 的排程器使用 G-P-M 模型:
- G(Goroutine):要執行的 goroutine
- P(Processor):邏輯處理器,預設數量等於 CPU 核心數
- M(Machine):作業系統執行緒
flowchart LR
subgraph Goroutines
G1[G1]
G2[G2]
G3[G3]
G4[G4]
G5[G5]
G6[G6]
end
subgraph Processors
P1[P1]
P2[P2]
end
subgraph OS Threads
M1[M1]
M2[M2]
end
G1 --> P1
G2 --> P1
G3 --> P1
G4 --> P2
G5 --> P2
G6 --> P2
P1 --> M1
P2 --> M2設定並行數量
import "runtime"
func main() {
// 查詢 CPU 核心數
numCPU := runtime.NumCPU()
// 設定可用的邏輯處理器數量
runtime.GOMAXPROCS(numCPU)
// Go 1.5+ 預設使用所有 CPU 核心
}Goroutine 生命週期#
func main() {
go worker() // 啟動 goroutine
// 主 goroutine 結束時,程式終止
// 不會等待其他 goroutine 完成!
}
func worker() {
for i := 0; i < 10; i++ {
fmt.Println(i)
time.Sleep(time.Millisecond * 100)
}
}當
main函式回傳時,整個程式會立即終止,不會等待其他 goroutine 完成。必須使用適當的同步機制(如 WaitGroup、Channel)來協調 goroutine。
Channel 通訊#
Channel 基礎#
Channel 是 goroutine 之間通訊的管道:
// 建立 channel
ch := make(chan int) // 無緩衝 channel
chBuf := make(chan int, 10) // 有緩衝 channel,容量 10
// 傳送資料
ch <- 42
// 接收資料
value := <-ch
// 關閉 channel
close(ch)無緩衝 vs 有緩衝 Channel#
// 無緩衝 channel:同步通訊
// 傳送者會阻塞直到接收者準備好
ch := make(chan int)
go func() {
ch <- 1 // 阻塞,直到有人接收
}()
value := <-ch // 接收後,傳送者才會繼續
// 有緩衝 channel:非同步通訊
// 只有緩衝區滿時傳送才會阻塞
chBuf := make(chan int, 2)
chBuf <- 1 // 不阻塞
chBuf <- 2 // 不阻塞
// chBuf <- 3 // 會阻塞,緩衝區已滿選擇 channel 類型的建議:
- 無緩衝:需要嚴格同步、確保接收時使用
- 有緩衝:允許傳送者先行、減少等待時間
Channel 操作規則#
| 操作 | nil channel | 已關閉 channel | 正常 channel |
|---|---|---|---|
| 傳送 | 永久阻塞 | panic | 阻塞/成功 |
| 接收 | 永久阻塞 | 回傳零值 | 阻塞/成功 |
| 關閉 | panic | panic | 成功 |
ch := make(chan int, 1)
ch <- 10
close(ch)
// 從已關閉的 channel 接收
v1, ok := <-ch // v1=10, ok=true
v2, ok := <-ch // v2=0, ok=false(channel 已空且已關閉)
// 關閉後繼續傳送會 panic
// ch <- 20 // panic: send on closed channel單向 Channel#
// 只能傳送的 channel
func producer(out chan<- int) {
for i := 0; i < 10; i++ {
out <- i
}
close(out)
}
// 只能接收的 channel
func consumer(in <-chan int) {
for value := range in {
fmt.Println(value)
}
}
func main() {
ch := make(chan int)
go producer(ch) // 雙向 channel 可以轉為單向
consumer(ch)
}Select 語句#
select 用於同時監聽多個 channel 操作:
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
time.Sleep(time.Second)
ch1 <- 1
}()
go func() {
time.Sleep(time.Second * 2)
ch2 <- 2
}()
// 等待第一個可用的 channel
select {
case v := <-ch1:
fmt.Println("from ch1:", v)
case v := <-ch2:
fmt.Println("from ch2:", v)
case <-time.After(time.Second * 3):
fmt.Println("timeout")
}
}select 的進階用法
// 非阻塞操作
select {
case v := <-ch:
fmt.Println("received:", v)
default:
fmt.Println("no value available")
}
// 帶 timeout 的傳送
select {
case ch <- value:
fmt.Println("sent")
case <-time.After(time.Second):
fmt.Println("send timeout")
}
// 永久等待
select {} // 會永久阻塞sync 套件#
Mutex(互斥鎖)#
import "sync"
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}Mutex 使用注意事項:
- 不要重複鎖定同一個 mutex(會死鎖)
- 不要解鎖未鎖定的 mutex(會 panic)
- 建議使用
defer確保解鎖
RWMutex(讀寫鎖)#
適用於讀多寫少的場景:
type Cache struct {
mu sync.RWMutex
data map[string]string
}
// 讀操作使用讀鎖
func (c *Cache) Get(key string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.data[key]
}
// 寫操作使用寫鎖
func (c *Cache) Set(key, value string) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[key] = value
}RWMutex 允許多個讀取者同時持有讀鎖,但寫入者需要獨占鎖。如果讀操作遠多於寫操作,RWMutex 能顯著提升效能。
WaitGroup#
用於等待一組 goroutine 完成:
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1) // 在啟動 goroutine 前增加計數
go func(id int) {
defer wg.Done() // 完成時減少計數
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}(i)
}
wg.Wait() // 阻塞直到計數歸零
fmt.Println("All workers completed")
}WaitGroup 使用規則:
Add()必須在 goroutine 啟動之前呼叫Done()應該在 goroutine 結束時呼叫(建議使用defer)Wait()會阻塞直到計數器歸零- 計數器不能變成負數(會 panic)
Once#
確保某段程式碼只執行一次:
var (
instance *Config
once sync.Once
)
func GetConfig() *Config {
once.Do(func() {
// 只會執行一次,即使多個 goroutine 同時呼叫
instance = loadConfig()
})
return instance
}Once 的實作原理
sync.Once 內部使用 atomic 操作和 mutex 實作:
type Once struct {
done uint32
m Mutex
}
func (o *Once) Do(f func()) {
// 快速路徑:已執行過
if atomic.LoadUint32(&o.done) == 1 {
return
}
// 慢速路徑:需要競爭鎖
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}原子操作#
sync/atomic 套件#
原子操作是不可分割的操作,提供更高效能的同步方式:
import "sync/atomic"
var counter int64
func increment() {
atomic.AddInt64(&counter, 1)
}
func getCount() int64 {
return atomic.LoadInt64(&counter)
}常用原子操作#
var value int64 = 10
// 增減操作
atomic.AddInt64(&value, 5) // value += 5
atomic.AddInt64(&value, -3) // value -= 3
// 讀取操作
v := atomic.LoadInt64(&value)
// 儲存操作
atomic.StoreInt64(&value, 100)
// 比較並交換(CAS)
old := atomic.SwapInt64(&value, 200) // 回傳舊值
// 條件交換
swapped := atomic.CompareAndSwapInt64(&value, 200, 300)
// 若 value == 200,則設為 300,回傳 true
// 否則不改變,回傳 false原子操作支援的型別:
int32、int64、uint32、uint64、uintptr、unsafe.Pointer
atomic.Value#
用於原子地儲存和載入任意型別的值:
var config atomic.Value
// 儲存組態
config.Store(&Config{Debug: true})
// 載入組態
cfg := config.Load().(*Config)使用 atomic.Value 實作無鎖快取
type Cache struct {
data atomic.Value // 儲存 map[string]string
}
func NewCache() *Cache {
c := &Cache{}
c.data.Store(make(map[string]string))
return c
}
func (c *Cache) Get(key string) string {
m := c.data.Load().(map[string]string)
return m[key]
}
func (c *Cache) Set(key, value string) {
for {
oldMap := c.data.Load().(map[string]string)
newMap := make(map[string]string, len(oldMap)+1)
for k, v := range oldMap {
newMap[k] = v
}
newMap[key] = value
if c.data.CompareAndSwap(oldMap, newMap) {
return
}
// CAS 失敗,重試
}
}sync.Map#
併發安全字典#
sync.Map 是內建的併發安全字典實作:
var m sync.Map
// 儲存
m.Store("key", "value")
// 載入
value, ok := m.Load("key")
if ok {
fmt.Println(value.(string))
}
// 載入或儲存(若不存在則儲存)
actual, loaded := m.LoadOrStore("key2", "default")
// 刪除
m.Delete("key")
// 遍歷
m.Range(func(key, value interface{}) bool {
fmt.Printf("%v: %v\n", key, value)
return true // 繼續遍歷
})鍵型別限制#
sync.Map的鍵型別同樣不能是:
- 函式型別
- 字典型別
- 切片型別
違反此規則會在執行時期引發 panic。
型別安全封裝#
由於 sync.Map 使用 interface{} 作為鍵值型別,建議進行封裝:
type IntStringMap struct {
m sync.Map
}
func (m *IntStringMap) Store(key int, value string) {
m.m.Store(key, value)
}
func (m *IntStringMap) Load(key int) (string, bool) {
v, ok := m.m.Load(key)
if !ok {
return "", false
}
return v.(string), true
}
func (m *IntStringMap) Delete(key int) {
m.m.Delete(key)
}sync.Map vs map + Mutex#
| 特性 | sync.Map | map + Mutex |
|---|---|---|
| 適用場景 | 讀多寫少、鍵穩定 | 通用場景 |
| 效能 | 讀取最佳化、減少鎖競爭 | 穩定但有鎖開銷 |
| 型別安全 | 需要型別斷言 | 編譯時期檢查 |
| 記憶體使用 | 較高(維護兩份資料) | 較低 |
何時使用 sync.Map:
- 鍵值對只增不減或很少刪除
- 讀操作遠多於寫操作
- 不同 goroutine 操作不同的鍵
併發模式#
Worker Pool 模式#
flowchart LR
J[Jobs Channel] --> W1[Worker 1]
J --> W2[Worker 2]
J --> W3[Worker 3]
W1 --> R[Results Channel]
W2 --> R
W3 --> Rfunc workerPool(numWorkers int, jobs <-chan int, results chan<- int) {
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for job := range jobs {
results <- process(job)
}
}(i)
}
wg.Wait()
close(results)
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 啟動 worker pool
go workerPool(5, jobs, results)
// 傳送工作
for i := 0; i < 50; i++ {
jobs <- i
}
close(jobs)
// 收集結果
for result := range results {
fmt.Println(result)
}
}Pipeline 模式#
flowchart LR
A[generator<br>產生資料] -->|chan int| B[square<br>平方運算]
B -->|chan int| C[consumer<br>消費結果]func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// 建立 pipeline
nums := generator(1, 2, 3, 4, 5)
squared := square(nums)
// 消費結果
for result := range squared {
fmt.Println(result)
}
}Fan-out/Fan-in 模式
flowchart TB
subgraph Fan-out
IN[Input Channel] --> W1[Worker 1]
IN --> W2[Worker 2]
IN --> W3[Worker 3]
end
subgraph Fan-in
W1 --> OUT[Output Channel]
W2 --> OUT
W3 --> OUT
end// Fan-out: 多個 goroutine 讀取同一 channel
func fanOut(in <-chan int, n int) []<-chan int {
outs := make([]<-chan int, n)
for i := 0; i < n; i++ {
outs[i] = worker(in)
}
return outs
}
// Fan-in: 合併多個 channel 到單一 channel
func fanIn(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}