Кратко — главное правило: не держать объект Session (и, тем более, открытую транзакцию) «навечно». Для каждого логического юнита работы (HTTP‑запрос, задача в фоне, обработка одного пачки элементов и т.п.) создавайте новую сессию, в конце — либо коммит, либо rollback, и всегда — close(). Ниже — практические шаблоны и объяснения, как это сделать правильно с DI, фоновыми задачами и без утечек соединений.
1) Основные принципы
- Session = краткоживущий объект; используйте его как «unit of work» (юнит работы). Создали — отработали — закрыли. Не храните Session в глобальном состоянии между операциями.
- Не держите открытые транзакции через длительные ожидания или внешние вызовы (sleep, сетевые запросы, ожидание сообщений). Это блокирует строки и соединения.
- Для фоновых задач (Celery, asyncio background, background thread) создавайте сессию внутри самой задачи.
- Используйте try/except/finally (или context manager) чтобы гарантированно rollback + close при ошибке.
- Не используйте scoped_session как «волшебный глобальный объект», если вы применяете DI — он скрывает жизненный цикл и часто становится причиной утечек. scoped_session подходит для простых случаев с thread-local, но с DI лучше явное управление.
- Если вам нужно сохранить данные из ORM после commit и использовать их в коде без повторного доступа к БД, либо используйте DTO (словарь/модель pydantic), либо session.expunge() / session.merge(), либо sessionmaker(expire_on_commit=False) — но последняя опция может скрыть проблемы с устаревшими данными.
2) Базовая конфигурация
Пример sync (SQLAlchemy 1.4+):
- engine = create_engine(url, pool_size=20, max_overflow=10, pool_pre_ping=True, pool_recycle=3600)
- SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire_on_commit=True)
pool_pre_ping полезен для долгих соединений, pool_recycle — для закрытия старых соединений, pool_size/max_overflow — чтобы не исчерпать пул при высоких параллельных задачах.
3) Простая фабрика сессий + контекстный менеджер
Рекомендуемая практика — передавать в DI (через dishka) не сам Session, а фабрику/выдающий провайдер, который внутри задачи создаёт сессию.
Пример (синхронный):
- определение фабрики
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
- провайдер (yield-провайдер или просто функция-фабрика)
def session_factory():
return SessionLocal()
Использование в коде/задаче:
with session_factory() as session:
try:
# работа с БД
session.add(obj)
session.commit()
except:
session.rollback()
raise
Замечание: Session поддерживает контекстный менеджер: with SessionLocal() as session: — при выходе закрывается, но commit не делается автоматически, так что явно делайте commit/rollback.
4) Пример провайдера для dishka (yield)
Если dishka позволяет yield-стиль provider (как FastAPI Depends), можно сделать так:
class ProviderDatabase(Provider):
def __init__(self, session_factory):
self.session_factory = session_factory
def __call__(self):
session = self.session_factory()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
Этот провайдер можно использовать для одиночного запроса/операции. Для фоновых задач — не используйте request-scoped провайдер, а явно вызвать фабрику.
5) Фоновые задачи — шаблоны
- Каждая фон. задача = своя сессия.
- Для обработки множества элементов в одной задаче: дробите обработку на батчи, открывая/закрывая сессию и делая commit после каждого батча.
Пример:
def background_job(items, session_factory):
BATCH = 100
for batch in chunks(items, BATCH):
with session_factory() as session:
try:
for item in batch:
process_item(session, item)
session.commit()
except:
session.rollback()
raise
Не открывайте сессию перед всем долгим циклом, если цикл может работать часы — вместо этого делайте по батчам.
6) UnitOfWork + repository (паттерн для чистой архитектуры + DI)
Слой UoW инкапсулирует сессию и жизненный цикл транзакции, сервисы получают UoW через DI.
Пример:
class UnitOfWork:
def __init__(self, session_factory):
self.session_factory = session_factory
self.session = None
def __enter__(self):
self.session = self.session_factory()
self.transaction = self.session.begin()
return self
def __exit__(self, exc_type, exc, tb):
if exc:
self.transaction.rollback()
self.session.close()
else:
try:
self.transaction.commit()
finally:
self.session.close()
# вспомогательное:
def repository(self, repo_class):
return repo_class(self.session)
Использование:
with UnitOfWork(session_factory) as uow:
repo = uow.repository(UserRepository)
user = repo.get(...)
repo.save(...)
7) Async SQLAlchemy
Если у вас async приложение, используйте async_engine и async_sessionmaker и всегда async with:
async_engine = create_async_engine(url, ...)
AsyncSessionLocal = async_sessionmaker(bind=async_engine, expire_on_commit=False)
async def do_task():
async with AsyncSessionLocal() as session:
async with session.begin():
await session.execute(...)
8) Объекты ORM между сессиями
- Не передавайте ORM-инстансы между сессиями/пользователем/в очередь задач. При попытке ленивой загрузки они будут «Detached» и вызов запроса потерпит ошибку.
- Вместо этого передавайте id/плейн-DTO или создавайте отдельные модели сериализации (pydantic).
- Если нужно продолжить работу с тем же объектом в другой сессии — используйте session.merge(obj) или загрузите объект по id.
9) Предотвращение утечек соединений
- Всегда session.close() в finally (или через context manager).
- Не полагайтесь на GC для закрытия сессий.
- Мониторьте количество открытых соединений в пуле и логи ошибок «connection pool exhausted».
- Если вы используете threaded worker или многопроцесс, настройте pool_size/ max_overflow с учётом числа воркеров.
10) Контроль транзакций
- Используйте короткие транзакции. Не держите транзакции через сторонние I/O.
- Для сложных операций, которые состоят из нескольких шагов: либо разбивайте на независимые транзакции с компенсацией при ошибках, либо применяйте механизм saga/очередей.
- Не делайте commit автоматически при выходе из context manager, если логика требует отката при ошибке — организуйте commit явно в коде.
11) Пример итогового кода (sync) — провайдер фабрики + фон. задача
# инициализация
engine = create_engine(URL, pool_pre_ping=True, pool_size=20, max_overflow=10)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire_on_commit=True)
# фабрика передаётся через DI
def session_factory():
return SessionLocal()
# фоновая задача
def background_task(job_data, session_factory):
for chunk in chunks(job_data, 100):
with session_factory() as session:
try:
for item in chunk:
# явные операции сессии
session.add(process(item))
session.flush() # опционально
session.commit()
except Exception:
session.rollback()
raise
12) Контроль и метрики
- Логируйте время жизни задач и используйте монитор pool (например, pg_stat_activity в Postgres) при проблемах.
- Замеряйте latency/количество активных соединений, чтобы корректировать pool_size.
Коротко о том, как интегрировать с dishka
- Зарегистрируйте в контейнере session_factory (фабрику) или UnitOfWork.
- Для HTTP-обработчиков можно сделать request-scoped провайдер, который выдаёт session и гарантирует commit/rollback/close.
- Для background workers передавайте session_factory или uow_factory и внутри worker создавайте сессии явно — не используйте request-scoped провайдер.
Если хотите — могу:
- Показать конкретный пример ProviderDatabase для dishka (полный код),
- Или показать UnitOfWork + репозитории и пример использования в фоновом worker (Celery/asyncio),
- Или дать рекомендации по настройке pool для вашей нагрузки — расскажите, как вы запускаете фоновые задачи (threads/processes/async), сколько одновременных воркеров и пример нагрузки.