← Назад к статьям

Продвинутые паттерны конкурентности в Go

M
miholeus
5 апреля 2026 г. · 15 мин чтения

Философия: конкурентность — не библиотека, а часть языка

В большинстве языков для реализации конкурентности нужно использовать сторонние библиотеки с громоздким API. В Go goroutines и channels встроены в синтаксис, что делает параллельный код почти таким же читаемым, как обычный.

Официальная мантра Go-команды:

Don't communicate by sharing memory; share memory by communicating.

На практике это значит: вместо того чтобы давать нескольким goroutines доступ к одной переменной и защищать её мьютексом, нужно передавать значения по каналам. Владение данными в каждый момент времени — у одной goroutine.

Аналогия. Представьте кассу в магазине: вместо того чтобы все покупатели одновременно лезли в ящик с деньгами (shared state + locks), каждый передаёт деньги через кассира (channel). Кассир — единственный, кто работает с ящиком.


Базовые примитивы: goroutines и channels

Запуск функции в новой goroutine занимает одно ключевое слово:

// Запустить функцию параллельно
go fetch(url)

// Channel — типизированный канал передачи данных
ch := make(chan string)

// Отправка — блокирует, пока получатель не готов
ch <- "ping"

// Получение — блокирует, пока отправитель не отправит
msg := <-ch

Блокирующая семантика — не недостаток, а особенность. Она даёт бесплатную синхронизацию: если получатель ещё не готов, отправитель подождёт. Именно на этом строятся все паттерны ниже.


Проблема утечек: goroutine leak

В короткоживущих программах висящие goroutines — мелкая неприятность. В production-сервисах, которые работают неделями, это катастрофа: утечка памяти, рост числа goroutines, в конечном счёте — OOM и падение.

Классическая ловушка

goroutine ожидает от channel данных, которые никогда не придут. Она не завершится, не освободит память, но и не сообщит об ошибке — просто будет существовать вечно

В Go есть встроенный инструмент для обнаружения утечек:

# Детектор data race — запускать при каждом тесте
go run -race main.go
go test -race ./...

# Если все горутины зависли — runtime выведет deadlock:
# "all goroutines are asleep - deadlock!"

# Stack trace показывает, где именно зависла каждая горутина
# Доступен через pprof или SIGQUIT (Ctrl+\)

Мотивирующий кейс: RSS-читалка

Рассмотрим пример RSS-агрегатора. Задача: периодически забирать обновления из нескольких источников и выдавать их в единый поток. Желаемый интерфейс прост:

type Subscription interface {
    Updates() <-chan Item  // поток новых элементов
    Close() error          // корректное завершение
}

// Создаём подписку на один источник
sub1 := Subscribe(fetcher1)

// Объединяем несколько источников в один поток
merged := Merge(sub1, sub2, sub3)

for item := range merged.Updates() {
    fmt.Println(item.Title)
}

// Когда всё готово — чистое завершение без утечек
merged.Close()
Почему этот пример универсален

Замените RSS на Kafka-топики, WebSocket-соединения, результаты HTTP-опроса или события из БД — архитектура останется той же. Subscription — это любой долгоживущий источник событий с возможностью отписки

Подписка

func Subscribe(fetcher Fetcher) Subscription {
    s := &sub{
        fetcher: fetcher,
        updates: make(chan Item), // for Updates
    }
    go s.loop()
    return s
}

// sub реализует Subscription интерфейс.
type sub struct {
    fetcher Fetcher   // fetches items
    updates chan Item // delivers items to the user
}

// loop извлекает items используя s.fetcher и отправляет их
// в s.updates канал
func (s *sub) loop() {...}

Для реализации подписки можно использовать

func (s *sub) Updates() <-chan Item {
    return s.updates
}

func (s *sub) Close() error {
    // TODO: выход из цикла
    // TODO: обработка ошибок
    return err
}

Теперь рассмотрим сам цикл при наивной реализации

for {
    if s.closed {
        close(s.updates)
        return
    }
    items, next, err := s.fetcher.Fetch()
    if err != nil {
        s.err = err                 
        time.Sleep(10 * time.Second)
        continue
    }
    for _, item := range items {
        s.updates <- item
    }
    if now := time.Now(); next.After(now) {
        time.Sleep(next.Sub(now))
    }
}

// и функция закрытия
func (s *naiveSub) Close() error {
    s.closed = true
    return s.err   
}

Три критичных бага наивной реализации

Первая версия с обычным циклом и time.Sleep выглядит рабочей. Но содержит три системных проблемы:

Баг #1 — Data Race

Поле closed читается из loop-goroutine и пишется из Close() — без синхронизации. go run -race поймает немедленно.

Баг #2 — Низкая отзывчивость

time.Sleep(nextFetch) блокирует goroutine целиком. Сигнал Close() будет обработан только после пробуждения — задержка до нескольких минут.

Баг #3 — Вечная блокировка

updates <- item блокируется, если потребитель перестал читать. Goroutine зависает навсегда — классическая утечка.

Каждый из этих багов по отдельности — неприятность. Вместе в production-сервисе — это рецепт необъяснимых зависаний, утечек памяти и ночных инцидентов.


Главный паттерн: for-select loop

Исправление всех трёх багов — один архитектурный сдвиг: превратить loop в event-driven центр управления через for { select { ... } }.

func (s *sub) loop() {
    // Локальное состояние, принадлежащее только этой goroutine
    var (
        pending []Item
        next    = time.Now()
        err     error
    )

    for {
        // Выдача в updates только если есть что отдавать
        var first Item
        var updates chan Item  // nil = case не активен

        if len(pending) > 0 && err == nil {
            first = pending[0]
            updates = s.updates  // "включить" ветку выдачи
        }

        select {
        case <-s.closing:
            // Запрос на закрытие — отвечаем и выходим
            s.closing <- err
            close(s.updates)
            return

        case <-time.After(next.Sub(time.Now())):
            // Время следующего fetch
            var items []Item
            items, next, err = s.fetcher.Fetch()
            if err == nil {
                pending = append(pending, items...)
            }

        case updates <- first:
            // Успешно выдали элемент потребителю
            pending = pending[1:]
        }
    }
}
Что исправлено

Нет shared state → нет race. Close() обрабатывается немедленно как отдельный case. Send в updates никогда не заблокирует loop, потому что он лишь один из вариантов в select


Техника nil channel

Это один из самых нетривиальных и мощных приёмов. Суть: операция на nil-канале блокируется навсегда — а значит, select её никогда не выберет.

Без nil channel (хаос)
select {
case updates <- item:
    // а вдруг pending пуст?
    // нужен if снаружи...
    // код размазывается
}
С nil channel — чисто
var out chan Item  // nil по умолчанию

if len(queue) > 0 {
    out = s.updates  // "включить" ветку
}

select {
case out <- queue[0]:
    // сработает только когда queue непуст
}
Аналогия

nil-канал — это выключенный свет. Никаких вложенных if/else не требуется, только состояние переменной


Корректное завершение: Close() как request-response

Наивный Close() выглядит так: установить флаг closed = true и надеяться. Проблема — data race: loop читает флаг, Close() пишет, оба делают это без синхронизации.

Правильный подход — сделать закрытие коммуникацией:

type sub struct {
    closing chan error  // двунаправленный: запрос + ответ
    updates chan Item
}

// Внешний вызывающий:
func (s *sub) Close() error {
    s.closing <- nil    // отправляем запрос (блокируем)
    return <-s.closing  // ждём ответ с ошибкой
}

// Внутри loop:
case <-s.closing:
    s.closing <- err  // отвечаем с текущей ошибкой
    close(s.updates)
    return

Теперь Close() и loop общаются через канал — никакого shared state, никакой гонки. Caller блокируется ровно до тех пор, пока loop не подтвердит завершение.


Оптимизации: deduplication, backpressure, async fetch

После того как базовая архитектура стала корректной, можно добавить три практически важных улучшения:

1. Дедупликация. RSS-источники часто отдают одни и те же элементы повторно. Решение: хранить в loop-state seen map[string]bool и фильтровать до добавления в pending.

2. Ограниченная очередь (backpressure). Если потребитель медленный, pending растёт бесконечно. Вводим лимит maxPending: когда очередь заполнена — не планируем новый fetch. Политику переполнения (drop oldest / drop newest / pause) выбираем явно.

3. Асинхронный fetch. Долгий HTTP-запрос, запущенный прямо в select, заблокирует весь loop. Решение: вынести fetch в отдельную goroutine, результат возвращать через канал:

type fetchResult struct {
    items []Item
    next  time.Time
    err   error
}

fetchDone := make(chan fetchResult, 1)
fetching := false

select {
case <-time.After(delay):
    if !fetching {
        fetching = true
        go func() {
            items, next, err := s.fetcher.Fetch()
            fetchDone <- fetchResult{items, next, err}
        }()
    }

case res := <-fetchDone:
    fetching = false
    pending = append(pending, res.items...)
}

Чеклист перед PR

  • Нет shared-полей, которые читаются/пишутся из разных goroutines без синхронизации
  • Close() обрабатывается быстро — не ждёт завершения долгого I/O или sleep
  • Нет send/receive, которые могут заблокироваться навсегда при любых условиях
  • Очередь pending ограничена, политика backpressure определена явно
  • Повторы из источника фильтруются (если источник «шумный»)
  • Долгий I/O вынесен из управляющего loop в отдельную goroutine
  • go test -race ./... проходит без предупреждений

Главные выводы

Channels — это не только данные. Они несут сигналы времени, отмены, готовности. Пустой struct{} в канале часто важнее, чем конкретное значение.

Одна goroutine — один владелец состояния. Не нужны мьютексы там, где данные живут только внутри одной goroutine и передаются наружу через канал.

for-select loop — фундаментальный строительный блок. Любая долгоживущая конкурентная компонента должна иметь такую форму.

Go гибче actor-модели. На channels можно строить actor-подобные решения, но язык не ограничивает вас одной парадигмой.


На основе доклада Advanced Go Concurrency Patterns · Sameer Ajmani · Google I/O 2013
Оригинальное видео: https://www.youtube.com/watch?v=QDDwwePbDtw

Комментарии 0

Комментарии проходят модерацию
Загрузка комментариев...