Продвинутые паттерны конкурентности в Go
Философия: конкурентность — не библиотека, а часть языка
В большинстве языков для реализации конкурентности нужно использовать сторонние библиотеки с громоздким API. В Go goroutines и channels встроены в синтаксис, что делает параллельный код почти таким же читаемым, как обычный.
Официальная мантра Go-команды:
Don't communicate by sharing memory; share memory by communicating.
На практике это значит: вместо того чтобы давать нескольким goroutines доступ к одной переменной и защищать её мьютексом, нужно передавать значения по каналам. Владение данными в каждый момент времени — у одной goroutine.
Аналогия. Представьте кассу в магазине: вместо того чтобы все покупатели одновременно лезли в ящик с деньгами (shared state + locks), каждый передаёт деньги через кассира (channel). Кассир — единственный, кто работает с ящиком.
Базовые примитивы: goroutines и channels
Запуск функции в новой goroutine занимает одно ключевое слово:
// Запустить функцию параллельно
go fetch(url)
// Channel — типизированный канал передачи данных
ch := make(chan string)
// Отправка — блокирует, пока получатель не готов
ch <- "ping"
// Получение — блокирует, пока отправитель не отправит
msg := <-ch
Блокирующая семантика — не недостаток, а особенность. Она даёт бесплатную синхронизацию: если получатель ещё не готов, отправитель подождёт. Именно на этом строятся все паттерны ниже.
Проблема утечек: goroutine leak
В короткоживущих программах висящие goroutines — мелкая неприятность. В production-сервисах, которые работают неделями, это катастрофа: утечка памяти, рост числа goroutines, в конечном счёте — OOM и падение.
goroutine ожидает от channel данных, которые никогда не придут. Она не завершится, не освободит память, но и не сообщит об ошибке — просто будет существовать вечно
В Go есть встроенный инструмент для обнаружения утечек:
# Детектор data race — запускать при каждом тесте
go run -race main.go
go test -race ./...
# Если все горутины зависли — runtime выведет deadlock:
# "all goroutines are asleep - deadlock!"
# Stack trace показывает, где именно зависла каждая горутина
# Доступен через pprof или SIGQUIT (Ctrl+\)
Мотивирующий кейс: RSS-читалка
Рассмотрим пример RSS-агрегатора. Задача: периодически забирать обновления из нескольких источников и выдавать их в единый поток. Желаемый интерфейс прост:
type Subscription interface {
Updates() <-chan Item // поток новых элементов
Close() error // корректное завершение
}
// Создаём подписку на один источник
sub1 := Subscribe(fetcher1)
// Объединяем несколько источников в один поток
merged := Merge(sub1, sub2, sub3)
for item := range merged.Updates() {
fmt.Println(item.Title)
}
// Когда всё готово — чистое завершение без утечек
merged.Close()
Замените RSS на Kafka-топики, WebSocket-соединения, результаты HTTP-опроса или события из БД — архитектура останется той же. Subscription — это любой долгоживущий источник событий с возможностью отписки
Подписка
func Subscribe(fetcher Fetcher) Subscription {
s := &sub{
fetcher: fetcher,
updates: make(chan Item), // for Updates
}
go s.loop()
return s
}
// sub реализует Subscription интерфейс.
type sub struct {
fetcher Fetcher // fetches items
updates chan Item // delivers items to the user
}
// loop извлекает items используя s.fetcher и отправляет их
// в s.updates канал
func (s *sub) loop() {...}
Для реализации подписки можно использовать
func (s *sub) Updates() <-chan Item {
return s.updates
}
func (s *sub) Close() error {
// TODO: выход из цикла
// TODO: обработка ошибок
return err
}
Теперь рассмотрим сам цикл при наивной реализации
for {
if s.closed {
close(s.updates)
return
}
items, next, err := s.fetcher.Fetch()
if err != nil {
s.err = err
time.Sleep(10 * time.Second)
continue
}
for _, item := range items {
s.updates <- item
}
if now := time.Now(); next.After(now) {
time.Sleep(next.Sub(now))
}
}
// и функция закрытия
func (s *naiveSub) Close() error {
s.closed = true
return s.err
}
Три критичных бага наивной реализации
Первая версия с обычным циклом и time.Sleep выглядит рабочей. Но содержит три системных проблемы:
Баг #1 — Data Race
Поле closed читается из loop-goroutine и пишется из Close() — без синхронизации. go run -race поймает немедленно.
Баг #2 — Низкая отзывчивость
time.Sleep(nextFetch) блокирует goroutine целиком. Сигнал Close() будет обработан только после пробуждения — задержка до нескольких минут.
Баг #3 — Вечная блокировка
updates <- item блокируется, если потребитель перестал читать. Goroutine зависает навсегда — классическая утечка.
Каждый из этих багов по отдельности — неприятность. Вместе в production-сервисе — это рецепт необъяснимых зависаний, утечек памяти и ночных инцидентов.
Главный паттерн: for-select loop
Исправление всех трёх багов — один архитектурный сдвиг: превратить loop в event-driven центр управления через for { select { ... } }.
func (s *sub) loop() {
// Локальное состояние, принадлежащее только этой goroutine
var (
pending []Item
next = time.Now()
err error
)
for {
// Выдача в updates только если есть что отдавать
var first Item
var updates chan Item // nil = case не активен
if len(pending) > 0 && err == nil {
first = pending[0]
updates = s.updates // "включить" ветку выдачи
}
select {
case <-s.closing:
// Запрос на закрытие — отвечаем и выходим
s.closing <- err
close(s.updates)
return
case <-time.After(next.Sub(time.Now())):
// Время следующего fetch
var items []Item
items, next, err = s.fetcher.Fetch()
if err == nil {
pending = append(pending, items...)
}
case updates <- first:
// Успешно выдали элемент потребителю
pending = pending[1:]
}
}
}
Нет shared state → нет race. Close() обрабатывается немедленно как отдельный case. Send в updates никогда не заблокирует loop, потому что он лишь один из вариантов в select
Техника nil channel
Это один из самых нетривиальных и мощных приёмов. Суть: операция на nil-канале блокируется навсегда — а значит, select её никогда не выберет.
select {
case updates <- item:
// а вдруг pending пуст?
// нужен if снаружи...
// код размазывается
}
var out chan Item // nil по умолчанию
if len(queue) > 0 {
out = s.updates // "включить" ветку
}
select {
case out <- queue[0]:
// сработает только когда queue непуст
}
nil-канал — это выключенный свет. Никаких вложенных if/else не требуется, только состояние переменной
Корректное завершение: Close() как request-response
Наивный Close() выглядит так: установить флаг closed = true и надеяться. Проблема — data race: loop читает флаг, Close() пишет, оба делают это без синхронизации.
Правильный подход — сделать закрытие коммуникацией:
type sub struct {
closing chan error // двунаправленный: запрос + ответ
updates chan Item
}
// Внешний вызывающий:
func (s *sub) Close() error {
s.closing <- nil // отправляем запрос (блокируем)
return <-s.closing // ждём ответ с ошибкой
}
// Внутри loop:
case <-s.closing:
s.closing <- err // отвечаем с текущей ошибкой
close(s.updates)
return
Теперь Close() и loop общаются через канал — никакого shared state, никакой гонки. Caller блокируется ровно до тех пор, пока loop не подтвердит завершение.
Оптимизации: deduplication, backpressure, async fetch
После того как базовая архитектура стала корректной, можно добавить три практически важных улучшения:
1. Дедупликация. RSS-источники часто отдают одни и те же элементы повторно. Решение: хранить в loop-state seen map[string]bool и фильтровать до добавления в pending.
2. Ограниченная очередь (backpressure). Если потребитель медленный, pending растёт бесконечно. Вводим лимит maxPending: когда очередь заполнена — не планируем новый fetch. Политику переполнения (drop oldest / drop newest / pause) выбираем явно.
3. Асинхронный fetch. Долгий HTTP-запрос, запущенный прямо в select, заблокирует весь loop. Решение: вынести fetch в отдельную goroutine, результат возвращать через канал:
type fetchResult struct {
items []Item
next time.Time
err error
}
fetchDone := make(chan fetchResult, 1)
fetching := false
select {
case <-time.After(delay):
if !fetching {
fetching = true
go func() {
items, next, err := s.fetcher.Fetch()
fetchDone <- fetchResult{items, next, err}
}()
}
case res := <-fetchDone:
fetching = false
pending = append(pending, res.items...)
}
Чеклист перед PR
- Нет shared-полей, которые читаются/пишутся из разных goroutines без синхронизации
-
Close()обрабатывается быстро — не ждёт завершения долгого I/O или sleep - Нет send/receive, которые могут заблокироваться навсегда при любых условиях
- Очередь
pendingограничена, политика backpressure определена явно - Повторы из источника фильтруются (если источник «шумный»)
- Долгий I/O вынесен из управляющего loop в отдельную goroutine
-
go test -race ./...проходит без предупреждений
Главные выводы
Channels — это не только данные. Они несут сигналы времени, отмены, готовности. Пустой struct{} в канале часто важнее, чем конкретное значение.
Одна goroutine — один владелец состояния. Не нужны мьютексы там, где данные живут только внутри одной goroutine и передаются наружу через канал.
for-select loop — фундаментальный строительный блок. Любая долгоживущая конкурентная компонента должна иметь такую форму.
Go гибче actor-модели. На channels можно строить actor-подобные решения, но язык не ограничивает вас одной парадигмой.
На основе доклада Advanced Go Concurrency Patterns · Sameer Ajmani · Google I/O 2013
Оригинальное видео: https://www.youtube.com/watch?v=QDDwwePbDtw