Как передать логи задач Celery в роутер FastAPI?

У меня есть Celery-воркер, через который я запускаю необходимые задачи. Я хотел бы выводить логи работы этого воркера или его задач в роут FastAPI. Однако я не могу сохранять логи в отдельный файл. Как можно реализовать эту функциональность?
  • 26 сентября 2024 г. 5:43
Ответы на вопрос 1
Чтобы передавать логи задач Celery в роутер FastAPI без использования файлов, вы можете использовать систему обмена сообщениями, такую как Redis или RabbitMQ, для передачи логов, а затем обрабатывать их в FastAPI. Вот шаги, которые вы можете выполнить для реализации этой функциональности:

1. **Настройка Celery**: Теперь вы можете настраивать ваш Celery-воркер для отправки логов через Redis или RabbitMQ.

2. **Добавление логирования в задачи Celery**: В каждой задаче Celery вы можете использовать стандартный модуль `logging` и отправлять логи в выбранный вами брокер сообщений.

3. **Создание FastAPI эндпоинта для получения логов**: В вашем приложении FastAPI создайте эндпоинт, который будет слушать логи и сохранять их в памяти (например, в списке).

### Пример кода

#### 1. Настройка Celery

```python
# tasks.py
from celery import Celery
import logging

app = Celery('tasks', broker='redis://localhost:6379/0')

# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@app.task
def my_task():
    logger.info("Task started")
    # Здесь ваша логика задачи
    logger.info("Task finished")
```

#### 2. FastAPI эндпоинт

```python
# main.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import logging
import aioredis

app = FastAPI()

# Список для хранения логов
logs = []

# Настройка Redis
redis = aioredis.from_url("redis://localhost")

async def log_listener():
    pubsub = redis.pubsub()
    await pubsub.subscribe("task_logs")
    
    async for message in pubsub.listen():
        if message['type'] == 'message':
            logs.append(message['data'].decode())

@app.on_event("startup")
async def startup_event():
    asyncio.create_task(log_listener())

@app.get("/logs")
async def get_logs():
    return {"logs": logs}
```

#### 3. Отправка логов из Celery

Направьте логи из Celery в Redis с помощью `publish` метода.

```python
# tasks.py
import redis

# Ваш основной код...

r = redis.Redis(host='localhost', port=6379, db=0)

@app.task
def my_task():
    logger.info("Task started")
    r.publish("task_logs", "Task started")
    # Здесь ваша логика задачи
    r.publish("task_logs", "Task finished")
    logger.info("Task finished")
```

### Запуск

1. Убедитесь, что у вас запущен Redis сервер.
2. Запустите Celery воркер:
   ```bash
   celery -A tasks worker --loglevel=info
   ```
3. Запустите FastAPI:
   ```bash
   uvicorn main:app --reload
   ```
4. Перейдите по адресу `http://localhost:8000/logs`, чтобы увидеть логи.

### Заметки

- В примере используется `aioredis`, который позволяет работе с Redis асинхронно.
- Логи сохраняются в памяти, так что они не будут храниться после перезапуска приложения. Если нужно более надежное хранение, следует рассмотреть другие варианты (например, использование базы данных).
- Убедитесь, что ваши задачи и роутеры настроены правильно, чтобы избежать проблем с производительностью при большом объеме логов.
Похожие вопросы