Чтобы передавать логи задач Celery в роутер FastAPI без использования файлов, вы можете использовать систему обмена сообщениями, такую как Redis или RabbitMQ, для передачи логов, а затем обрабатывать их в FastAPI. Вот шаги, которые вы можете выполнить для реализации этой функциональности:
1. **Настройка Celery**: Теперь вы можете настраивать ваш Celery-воркер для отправки логов через Redis или RabbitMQ.
2. **Добавление логирования в задачи Celery**: В каждой задаче Celery вы можете использовать стандартный модуль `logging` и отправлять логи в выбранный вами брокер сообщений.
3. **Создание FastAPI эндпоинта для получения логов**: В вашем приложении FastAPI создайте эндпоинт, который будет слушать логи и сохранять их в памяти (например, в списке).
### Пример кода
#### 1. Настройка Celery
```python
# tasks.py
from celery import Celery
import logging
app = Celery('tasks', broker='redis://localhost:6379/0')
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@app.task
def my_task():
logger.info("Task started")
# Здесь ваша логика задачи
logger.info("Task finished")
```
#### 2. FastAPI эндпоинт
```python
# main.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import logging
import aioredis
app = FastAPI()
# Список для хранения логов
logs = []
# Настройка Redis
redis = aioredis.from_url("redis://localhost")
async def log_listener():
pubsub = redis.pubsub()
await pubsub.subscribe("task_logs")
async for message in pubsub.listen():
if message['type'] == 'message':
logs.append(message['data'].decode())
@app.on_event("startup")
async def startup_event():
asyncio.create_task(log_listener())
@app.get("/logs")
async def get_logs():
return {"logs": logs}
```
#### 3. Отправка логов из Celery
Направьте логи из Celery в Redis с помощью `publish` метода.
```python
# tasks.py
import redis
# Ваш основной код...
r = redis.Redis(host='localhost', port=6379, db=0)
@app.task
def my_task():
logger.info("Task started")
r.publish("task_logs", "Task started")
# Здесь ваша логика задачи
r.publish("task_logs", "Task finished")
logger.info("Task finished")
```
### Запуск
1. Убедитесь, что у вас запущен Redis сервер.
2. Запустите Celery воркер:
```bash
celery -A tasks worker --loglevel=info
```
3. Запустите FastAPI:
```bash
uvicorn main:app --reload
```
4. Перейдите по адресу `http://localhost:8000/logs`, чтобы увидеть логи.
### Заметки
- В примере используется `aioredis`, который позволяет работе с Redis асинхронно.
- Логи сохраняются в памяти, так что они не будут храниться после перезапуска приложения. Если нужно более надежное хранение, следует рассмотреть другие варианты (например, использование базы данных).
- Убедитесь, что ваши задачи и роутеры настроены правильно, чтобы избежать проблем с производительностью при большом объеме логов.