Какие инструменты или технологии лучше использовать для реализации паттерна Outbox?

Есть монолитное приложение на FastAPI, и нужно обеспечить event consistency при операциях создания/удаления. Например, при удалении сообщения требуется атомарно выполнить три шага:
  • 11 марта 2026 г. 23:01
Ответы на вопрос 2
Коротко — Outbox проще и надёжнее всего делать через «outbox table» в вашей БД + отдельный диспетчер (poller/worker) который читает строку(и) и публикует их в брокер. Ниже — набор практических вариантов, рекомендации по инструментам и примерная реализация для FastAPI/SQLAlchemy.

1) Основная идея (проверенная и простая)
- В той же транзакции, где вы вносите доменные изменения (удаление сообщения и т. п.), вставляете строку в таблицу outbox (event_id, type, payload, status, attempts, created_at...).
- После коммита отдельный процесс/поток/worker читает неотправленные события, публикует в брокер и помечает как отправленные (или увеличивает attempts при ошибке).
- Важные детали: уникальный event_id (UUID), idempotency у потребителей, безопасное конкурирующее чтение (SELECT ... FOR UPDATE SKIP LOCKED или логические блокировки), ретраи и dead-letter.

2) Какие инструменты/технологии выбрать
- База данных:
  - PostgreSQL — лучший выбор для большинства: поддержка транзакций, LISTEN/NOTIFY, SELECT ... FOR UPDATE SKIP LOCKED, logical decoding для CDC.
  - MySQL — тоже подходит, но для CDC сложнее; polling + SKIP LOCKED возможен в новых версиях.
- Брокер сообщений:
  - Kafka — если нужна высокая пропускная способность, ordering в партициях, долговременное хранение. Часто используют связку Outbox + Debezium/JDBC Source Connector для автоматической публикации в Kafka.
  - RabbitMQ — простая опция для очередей/доставки, хорошо сочетается с outbox + фоновый воркер.
  - Redis Streams — лёгкий вариант с хорошей задержкой/производительностью.
  - Cloud (AWS SNS/SQS, GCP Pub/Sub) — если инфраструктура в облаке.
- Для CDC (альтернативный подход):
  - Debezium (Postgres/MySQL) + Kafka Connect: читает binlog/Write-Ahead-Log и публикует изменения (обычно применяется, если не хочется писать poller).
  - wal2json/pgoutput + коннектор — если нужен собственный pipeline.
- Python-стек:
  - FastAPI + async SQLAlchemy / asyncpg.
  - Для отправки в Kafka: aiokafka или confluent-kafka.
  - Для RabbitMQ: aio-pika.
  - Для Redis Streams: aioredis.
  - Для фоновых задач: отдельный asyncio-процесс, или Celery (если уже используется), или RQ. Но Celery не обязателен — лучше отдельный лёгкий worker, т.к. логика простая.

3) Конкретные рекомендации по реализации (Postgres + RabbitMQ/Kafka)
- Schema outbox (пример полей):
  - id UUID PRIMARY KEY
  - aggregate_type, aggregate_id
  - event_type
  - payload JSONB
  - status ENUM('pending','processing','sent','failed')
  - attempts integer
  - last_error text
  - created_at, processed_at
- Вставка в транзакции:
  - BEGIN
    - delete message from messages table
    - INSERT INTO outbox (...) VALUES (...)
  - COMMIT
- Worker (псевдокод):
  - loop:
    - SELECT id,payload ... FROM outbox WHERE status='pending' ORDER BY created_at LIMIT N FOR UPDATE SKIP LOCKED
    - UPDATE outbox SET status='processing' WHERE id IN (...)
    - for each row: try publish to broker
      - on success: UPDATE outbox SET status='sent', processed_at=now() WHERE id=...
      - on failure: UPDATE outbox SET attempts = attempts+1, last_error = ..., status = 'pending' (or 'failed' после N tries)
  - Использовать транзакции при выборке/claim'е. SKIP LOCKED позволяет параллельным воркерам не мешать друг другу.
- Триггер/уведомление:
  - Можно оповещать воркера о новых строках через Postgres LISTEN/NOTIFY (после insert отправлять NOTIFY) — чтобы уменьшить polling.
- Idempotency: добавьте уникальный event_id и делайте consumer idempotent (в базе потребителя проверять, применено ли событие ранее).

4) Альтернативы и когда их выбирать
- Debezium + Kafka Connect (Outbox via CDC): если у вас Kafka и вы не хотите писать poller. Debezium читает binlog/WAL и публикует insert'ы в Kafka. Очень надёжно, но требует Kafka/Connect/Debezium инфраструктуры.
- Transactional producer (XA/2PC) — обычно не рекомендуется (сложно и тяжело поддерживать).
- Использовать transactional outbox connector (JDBC Source Connector → Kafka) — хорош для JVM-стеков; для Python чаще Debezium.
- Полностью брокерский подход (отправлять событие синхронно в брокер до коммита) — риск рассинхронизации при ошибке, требует распределённых транзакций или точной обработки ошибок.

5) Инструменты и библиотеки (Python)
- DB access: asyncpg, SQLAlchemy (async), psycopg2
- Kafka: aiokafka, confluent-kafka-python
- RabbitMQ: aio-pika
- Redis Streams: aioredis
- Helpers: alembic для миграций
- Нет «стандартной» суперпопулярной outbox-библиотеки в Python, обычно пишут свой poller (это несложно).

6) Операционные моменты
- Мониторинг: количество «pending», частота ошибок, DLQ size.
- Ретраи и backoff, dead-letter queue после N попыток.
- Обслуживание: архивирование/удаление старых outbox-записей.
- Тестирование на корректность транзакций и потерю событий.

7) Примерный минимальный выбор для FastAPI монолита
- Маленький проект / простые требования: Postgres + outbox table + aio-pika (RabbitMQ) + простой asyncio worker. Используйте LISTEN/NOTIFY для пробуждения worker.
- Средний/высокий трафик и/или хотите Kafka: Postgres + outbox table + Debezium/Kafka Connect → Kafka (автоматическая публикация), либо outbox + ваш worker → aiokafka.
- Если в AWS: RDS(Postgres) + DMS/Debezium или RDS + Lambda trigger (но чаще — outbox+worker).

Если нужно, могу:
- показать конкретный пример кода для FastAPI + async SQLAlchemy вставки outbox + простой asyncio-worker публикации в RabbitMQ или Kafka;
- помочь спроектировать схему outbox и таблиц под ваши нужды;
- посоветовать архитектуру при известных ограничениях (Postgres vs MySQL, требования по latency/throughput, облачный провайдер).

Какая у вас БД и брокер сейчас? Какие требования по задержке/throughput и по сложности эксплуатации?
<blockquote>при удалении сообщения нужно атомарно:<br/>
<br/>
1. Удалить запись из БД<br/>
2. Удалить файл из S3<br/>
3. Разослать уведомление по WebSocket (in-memory хранятся)</blockquote> <br/> А зачем вообще все три составляющие пихать в транзакцию? <br/> Тут атомарность и транзакционность только для БД нужна, для остального нужна идемпотентность и асинхронность. Удаление из S3 это просто "подчистка кеша/артефактов" по сути, а WebSocket — как ненадёжный best-effort event notify <i>hint</i> . Последние два могут вообще прийти заметно позже, чем факт транзакции в БД, кто от этого пострадает то? Просто держишь БД за источник консистентной истины, а остальное доводишь уже в отдельном воркере асинхронно как логический коммит. <br/> <br/> Например, можно при транзакции БД помечать записать как soft-deleted, а уже когда отрабатывает полный коммит подчистки всех артефактов, можно сделать hard delete, если память не бесконечная. <br/> <br/> Поллинг — ну такое, лучше будет pub/sub. Поллинг можно оставить как fallback.
Похожие вопросы