Go 語言的併發模型基於 CSP(Communicating Sequential Processes)理論,透過 Goroutine 和 Channel 提供簡潔而強大的併發機制。

Goroutine 基礎#

什麼是 Goroutine#

Goroutine 是 Go 語言中的輕量級執行緒,由 Go 執行時期系統(runtime)管理,而非作業系統。

func sayHello(name string) {
    fmt.Println("Hello,", name)
}

func main() {
    // 啟動 goroutine
    go sayHello("World")

    // 使用匿名函式
    go func() {
        fmt.Println("匿名 goroutine")
    }()

    // 等待 goroutine 完成(實際應用中使用 sync.WaitGroup)
    time.Sleep(time.Second)
}

Goroutine 與執行緒的差異

  • Goroutine 初始堆疊只有 2KB(執行緒通常 1-8MB)
  • Goroutine 由 Go runtime 排程,切換成本極低
  • 可以輕鬆建立數十萬個 goroutine

Go 排程器(G-P-M 模型)#

Go 的排程器使用 G-P-M 模型:

  • G(Goroutine):要執行的 goroutine
  • P(Processor):邏輯處理器,預設數量等於 CPU 核心數
  • M(Machine):作業系統執行緒
flowchart LR
    subgraph Goroutines
        G1[G1]
        G2[G2]
        G3[G3]
        G4[G4]
        G5[G5]
        G6[G6]
    end

    subgraph Processors
        P1[P1]
        P2[P2]
    end

    subgraph OS Threads
        M1[M1]
        M2[M2]
    end

    G1 --> P1
    G2 --> P1
    G3 --> P1
    G4 --> P2
    G5 --> P2
    G6 --> P2
    P1 --> M1
    P2 --> M2
設定並行數量
import "runtime"

func main() {
    // 查詢 CPU 核心數
    numCPU := runtime.NumCPU()

    // 設定可用的邏輯處理器數量
    runtime.GOMAXPROCS(numCPU)

    // Go 1.5+ 預設使用所有 CPU 核心
}

Goroutine 生命週期#

func main() {
    go worker()  // 啟動 goroutine

    // 主 goroutine 結束時,程式終止
    // 不會等待其他 goroutine 完成!
}

func worker() {
    for i := 0; i < 10; i++ {
        fmt.Println(i)
        time.Sleep(time.Millisecond * 100)
    }
}

main 函式回傳時,整個程式會立即終止,不會等待其他 goroutine 完成。必須使用適當的同步機制(如 WaitGroup、Channel)來協調 goroutine。

Channel 通訊#

Channel 基礎#

Channel 是 goroutine 之間通訊的管道:

// 建立 channel
ch := make(chan int)        // 無緩衝 channel
chBuf := make(chan int, 10) // 有緩衝 channel,容量 10

// 傳送資料
ch <- 42

// 接收資料
value := <-ch

// 關閉 channel
close(ch)

無緩衝 vs 有緩衝 Channel#

// 無緩衝 channel:同步通訊
// 傳送者會阻塞直到接收者準備好
ch := make(chan int)

go func() {
    ch <- 1  // 阻塞,直到有人接收
}()

value := <-ch  // 接收後,傳送者才會繼續

// 有緩衝 channel:非同步通訊
// 只有緩衝區滿時傳送才會阻塞
chBuf := make(chan int, 2)

chBuf <- 1  // 不阻塞
chBuf <- 2  // 不阻塞
// chBuf <- 3  // 會阻塞,緩衝區已滿

選擇 channel 類型的建議:

  • 無緩衝:需要嚴格同步、確保接收時使用
  • 有緩衝:允許傳送者先行、減少等待時間

Channel 操作規則#

操作nil channel已關閉 channel正常 channel
傳送永久阻塞panic阻塞/成功
接收永久阻塞回傳零值阻塞/成功
關閉panicpanic成功
ch := make(chan int, 1)
ch <- 10
close(ch)

// 從已關閉的 channel 接收
v1, ok := <-ch  // v1=10, ok=true
v2, ok := <-ch  // v2=0, ok=false(channel 已空且已關閉)

// 關閉後繼續傳送會 panic
// ch <- 20  // panic: send on closed channel

單向 Channel#

// 只能傳送的 channel
func producer(out chan<- int) {
    for i := 0; i < 10; i++ {
        out <- i
    }
    close(out)
}

// 只能接收的 channel
func consumer(in <-chan int) {
    for value := range in {
        fmt.Println(value)
    }
}

func main() {
    ch := make(chan int)
    go producer(ch)  // 雙向 channel 可以轉為單向
    consumer(ch)
}

Select 語句#

select 用於同時監聽多個 channel 操作:

func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)

    go func() {
        time.Sleep(time.Second)
        ch1 <- 1
    }()

    go func() {
        time.Sleep(time.Second * 2)
        ch2 <- 2
    }()

    // 等待第一個可用的 channel
    select {
    case v := <-ch1:
        fmt.Println("from ch1:", v)
    case v := <-ch2:
        fmt.Println("from ch2:", v)
    case <-time.After(time.Second * 3):
        fmt.Println("timeout")
    }
}
select 的進階用法
// 非阻塞操作
select {
case v := <-ch:
    fmt.Println("received:", v)
default:
    fmt.Println("no value available")
}

// 帶 timeout 的傳送
select {
case ch <- value:
    fmt.Println("sent")
case <-time.After(time.Second):
    fmt.Println("send timeout")
}

// 永久等待
select {}  // 會永久阻塞

sync 套件#

Mutex(互斥鎖)#

import "sync"

type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

Mutex 使用注意事項

  1. 不要重複鎖定同一個 mutex(會死鎖)
  2. 不要解鎖未鎖定的 mutex(會 panic)
  3. 建議使用 defer 確保解鎖

RWMutex(讀寫鎖)#

適用於讀多寫少的場景:

type Cache struct {
    mu   sync.RWMutex
    data map[string]string
}

// 讀操作使用讀鎖
func (c *Cache) Get(key string) string {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.data[key]
}

// 寫操作使用寫鎖
func (c *Cache) Set(key, value string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.data[key] = value
}

RWMutex 允許多個讀取者同時持有讀鎖,但寫入者需要獨占鎖。如果讀操作遠多於寫操作,RWMutex 能顯著提升效能。

WaitGroup#

用於等待一組 goroutine 完成:

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1)  // 在啟動 goroutine 前增加計數

        go func(id int) {
            defer wg.Done()  // 完成時減少計數
            fmt.Printf("Worker %d starting\n", id)
            time.Sleep(time.Second)
            fmt.Printf("Worker %d done\n", id)
        }(i)
    }

    wg.Wait()  // 阻塞直到計數歸零
    fmt.Println("All workers completed")
}

WaitGroup 使用規則

  • Add() 必須在 goroutine 啟動之前呼叫
  • Done() 應該在 goroutine 結束時呼叫(建議使用 defer
  • Wait() 會阻塞直到計數器歸零
  • 計數器不能變成負數(會 panic)

Once#

確保某段程式碼只執行一次:

var (
    instance *Config
    once     sync.Once
)

func GetConfig() *Config {
    once.Do(func() {
        // 只會執行一次,即使多個 goroutine 同時呼叫
        instance = loadConfig()
    })
    return instance
}
Once 的實作原理

sync.Once 內部使用 atomic 操作和 mutex 實作:

type Once struct {
    done uint32
    m    Mutex
}

func (o *Once) Do(f func()) {
    // 快速路徑:已執行過
    if atomic.LoadUint32(&o.done) == 1 {
        return
    }
    // 慢速路徑:需要競爭鎖
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

原子操作#

sync/atomic 套件#

原子操作是不可分割的操作,提供更高效能的同步方式:

import "sync/atomic"

var counter int64

func increment() {
    atomic.AddInt64(&counter, 1)
}

func getCount() int64 {
    return atomic.LoadInt64(&counter)
}

常用原子操作#

var value int64 = 10

// 增減操作
atomic.AddInt64(&value, 5)   // value += 5
atomic.AddInt64(&value, -3)  // value -= 3

// 讀取操作
v := atomic.LoadInt64(&value)

// 儲存操作
atomic.StoreInt64(&value, 100)

// 比較並交換(CAS)
old := atomic.SwapInt64(&value, 200)  // 回傳舊值

// 條件交換
swapped := atomic.CompareAndSwapInt64(&value, 200, 300)
// 若 value == 200,則設為 300,回傳 true
// 否則不改變,回傳 false

原子操作支援的型別:int32int64uint32uint64uintptrunsafe.Pointer

atomic.Value#

用於原子地儲存和載入任意型別的值:

var config atomic.Value

// 儲存組態
config.Store(&Config{Debug: true})

// 載入組態
cfg := config.Load().(*Config)
使用 atomic.Value 實作無鎖快取
type Cache struct {
    data atomic.Value  // 儲存 map[string]string
}

func NewCache() *Cache {
    c := &Cache{}
    c.data.Store(make(map[string]string))
    return c
}

func (c *Cache) Get(key string) string {
    m := c.data.Load().(map[string]string)
    return m[key]
}

func (c *Cache) Set(key, value string) {
    for {
        oldMap := c.data.Load().(map[string]string)
        newMap := make(map[string]string, len(oldMap)+1)
        for k, v := range oldMap {
            newMap[k] = v
        }
        newMap[key] = value
        if c.data.CompareAndSwap(oldMap, newMap) {
            return
        }
        // CAS 失敗,重試
    }
}

sync.Map#

併發安全字典#

sync.Map 是內建的併發安全字典實作:

var m sync.Map

// 儲存
m.Store("key", "value")

// 載入
value, ok := m.Load("key")
if ok {
    fmt.Println(value.(string))
}

// 載入或儲存(若不存在則儲存)
actual, loaded := m.LoadOrStore("key2", "default")

// 刪除
m.Delete("key")

// 遍歷
m.Range(func(key, value interface{}) bool {
    fmt.Printf("%v: %v\n", key, value)
    return true  // 繼續遍歷
})

鍵型別限制#

sync.Map 的鍵型別同樣不能是:

  • 函式型別
  • 字典型別
  • 切片型別

違反此規則會在執行時期引發 panic。

型別安全封裝#

由於 sync.Map 使用 interface{} 作為鍵值型別,建議進行封裝:

type IntStringMap struct {
    m sync.Map
}

func (m *IntStringMap) Store(key int, value string) {
    m.m.Store(key, value)
}

func (m *IntStringMap) Load(key int) (string, bool) {
    v, ok := m.m.Load(key)
    if !ok {
        return "", false
    }
    return v.(string), true
}

func (m *IntStringMap) Delete(key int) {
    m.m.Delete(key)
}

sync.Map vs map + Mutex#

特性sync.Mapmap + Mutex
適用場景讀多寫少、鍵穩定通用場景
效能讀取最佳化、減少鎖競爭穩定但有鎖開銷
型別安全需要型別斷言編譯時期檢查
記憶體使用較高(維護兩份資料)較低

何時使用 sync.Map

  1. 鍵值對只增不減或很少刪除
  2. 讀操作遠多於寫操作
  3. 不同 goroutine 操作不同的鍵

併發模式#

Worker Pool 模式#

flowchart LR
    J[Jobs Channel] --> W1[Worker 1]
    J --> W2[Worker 2]
    J --> W3[Worker 3]
    W1 --> R[Results Channel]
    W2 --> R
    W3 --> R
func workerPool(numWorkers int, jobs <-chan int, results chan<- int) {
    var wg sync.WaitGroup

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for job := range jobs {
                results <- process(job)
            }
        }(i)
    }

    wg.Wait()
    close(results)
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    // 啟動 worker pool
    go workerPool(5, jobs, results)

    // 傳送工作
    for i := 0; i < 50; i++ {
        jobs <- i
    }
    close(jobs)

    // 收集結果
    for result := range results {
        fmt.Println(result)
    }
}

Pipeline 模式#

flowchart LR
    A[generator<br>產生資料] -->|chan int| B[square<br>平方運算]
    B -->|chan int| C[consumer<br>消費結果]
func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    // 建立 pipeline
    nums := generator(1, 2, 3, 4, 5)
    squared := square(nums)

    // 消費結果
    for result := range squared {
        fmt.Println(result)
    }
}
Fan-out/Fan-in 模式
flowchart TB
    subgraph Fan-out
        IN[Input Channel] --> W1[Worker 1]
        IN --> W2[Worker 2]
        IN --> W3[Worker 3]
    end

    subgraph Fan-in
        W1 --> OUT[Output Channel]
        W2 --> OUT
        W3 --> OUT
    end
// Fan-out: 多個 goroutine 讀取同一 channel
func fanOut(in <-chan int, n int) []<-chan int {
    outs := make([]<-chan int, n)
    for i := 0; i < n; i++ {
        outs[i] = worker(in)
    }
    return outs
}

// Fan-in: 合併多個 channel 到單一 channel
func fanIn(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                out <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}