Коротко — Outbox проще и надёжнее всего делать через «outbox table» в вашей БД + отдельный диспетчер (poller/worker) который читает строку(и) и публикует их в брокер. Ниже — набор практических вариантов, рекомендации по инструментам и примерная реализация для FastAPI/SQLAlchemy.
1) Основная идея (проверенная и простая)
- В той же транзакции, где вы вносите доменные изменения (удаление сообщения и т. п.), вставляете строку в таблицу outbox (event_id, type, payload, status, attempts, created_at...).
- После коммита отдельный процесс/поток/worker читает неотправленные события, публикует в брокер и помечает как отправленные (или увеличивает attempts при ошибке).
- Важные детали: уникальный event_id (UUID), idempotency у потребителей, безопасное конкурирующее чтение (SELECT ... FOR UPDATE SKIP LOCKED или логические блокировки), ретраи и dead-letter.
2) Какие инструменты/технологии выбрать
- База данных:
- PostgreSQL — лучший выбор для большинства: поддержка транзакций, LISTEN/NOTIFY, SELECT ... FOR UPDATE SKIP LOCKED, logical decoding для CDC.
- MySQL — тоже подходит, но для CDC сложнее; polling + SKIP LOCKED возможен в новых версиях.
- Брокер сообщений:
- Kafka — если нужна высокая пропускная способность, ordering в партициях, долговременное хранение. Часто используют связку Outbox + Debezium/JDBC Source Connector для автоматической публикации в Kafka.
- RabbitMQ — простая опция для очередей/доставки, хорошо сочетается с outbox + фоновый воркер.
- Redis Streams — лёгкий вариант с хорошей задержкой/производительностью.
- Cloud (AWS SNS/SQS, GCP Pub/Sub) — если инфраструктура в облаке.
- Для CDC (альтернативный подход):
- Debezium (Postgres/MySQL) + Kafka Connect: читает binlog/Write-Ahead-Log и публикует изменения (обычно применяется, если не хочется писать poller).
- wal2json/pgoutput + коннектор — если нужен собственный pipeline.
- Python-стек:
- FastAPI + async SQLAlchemy / asyncpg.
- Для отправки в Kafka: aiokafka или confluent-kafka.
- Для RabbitMQ: aio-pika.
- Для Redis Streams: aioredis.
- Для фоновых задач: отдельный asyncio-процесс, или Celery (если уже используется), или RQ. Но Celery не обязателен — лучше отдельный лёгкий worker, т.к. логика простая.
3) Конкретные рекомендации по реализации (Postgres + RabbitMQ/Kafka)
- Schema outbox (пример полей):
- id UUID PRIMARY KEY
- aggregate_type, aggregate_id
- event_type
- payload JSONB
- status ENUM('pending','processing','sent','failed')
- attempts integer
- last_error text
- created_at, processed_at
- Вставка в транзакции:
- BEGIN
- delete message from messages table
- INSERT INTO outbox (...) VALUES (...)
- COMMIT
- Worker (псевдокод):
- loop:
- SELECT id,payload ... FROM outbox WHERE status='pending' ORDER BY created_at LIMIT N FOR UPDATE SKIP LOCKED
- UPDATE outbox SET status='processing' WHERE id IN (...)
- for each row: try publish to broker
- on success: UPDATE outbox SET status='sent', processed_at=now() WHERE id=...
- on failure: UPDATE outbox SET attempts = attempts+1, last_error = ..., status = 'pending' (or 'failed' после N tries)
- Использовать транзакции при выборке/claim'е. SKIP LOCKED позволяет параллельным воркерам не мешать друг другу.
- Триггер/уведомление:
- Можно оповещать воркера о новых строках через Postgres LISTEN/NOTIFY (после insert отправлять NOTIFY) — чтобы уменьшить polling.
- Idempotency: добавьте уникальный event_id и делайте consumer idempotent (в базе потребителя проверять, применено ли событие ранее).
4) Альтернативы и когда их выбирать
- Debezium + Kafka Connect (Outbox via CDC): если у вас Kafka и вы не хотите писать poller. Debezium читает binlog/WAL и публикует insert'ы в Kafka. Очень надёжно, но требует Kafka/Connect/Debezium инфраструктуры.
- Transactional producer (XA/2PC) — обычно не рекомендуется (сложно и тяжело поддерживать).
- Использовать transactional outbox connector (JDBC Source Connector → Kafka) — хорош для JVM-стеков; для Python чаще Debezium.
- Полностью брокерский подход (отправлять событие синхронно в брокер до коммита) — риск рассинхронизации при ошибке, требует распределённых транзакций или точной обработки ошибок.
5) Инструменты и библиотеки (Python)
- DB access: asyncpg, SQLAlchemy (async), psycopg2
- Kafka: aiokafka, confluent-kafka-python
- RabbitMQ: aio-pika
- Redis Streams: aioredis
- Helpers: alembic для миграций
- Нет «стандартной» суперпопулярной outbox-библиотеки в Python, обычно пишут свой poller (это несложно).
6) Операционные моменты
- Мониторинг: количество «pending», частота ошибок, DLQ size.
- Ретраи и backoff, dead-letter queue после N попыток.
- Обслуживание: архивирование/удаление старых outbox-записей.
- Тестирование на корректность транзакций и потерю событий.
7) Примерный минимальный выбор для FastAPI монолита
- Маленький проект / простые требования: Postgres + outbox table + aio-pika (RabbitMQ) + простой asyncio worker. Используйте LISTEN/NOTIFY для пробуждения worker.
- Средний/высокий трафик и/или хотите Kafka: Postgres + outbox table + Debezium/Kafka Connect → Kafka (автоматическая публикация), либо outbox + ваш worker → aiokafka.
- Если в AWS: RDS(Postgres) + DMS/Debezium или RDS + Lambda trigger (но чаще — outbox+worker).
Если нужно, могу:
- показать конкретный пример кода для FastAPI + async SQLAlchemy вставки outbox + простой asyncio-worker публикации в RabbitMQ или Kafka;
- помочь спроектировать схему outbox и таблиц под ваши нужды;
- посоветовать архитектуру при известных ограничениях (Postgres vs MySQL, требования по latency/throughput, облачный провайдер).
Какая у вас БД и брокер сейчас? Какие требования по задержке/throughput и по сложности эксплуатации?