Как создать эффективную обработку запросов в FASTAPI с использованием многопоточности или асинхронности?

Как можно исправить мой сервер, чтобы он выполнял запросы от разных пользователей параллельно, вместо того чтобы ждать завершения выполнения каждого запроса перед обработкой следующего?
  • 24 января 2024 г. 10:29
Ответы на вопрос 2
FastAPI уже работает асинхронно по умолчанию, используя `asyncio` для обработки запросов. Это значит, что сервер может переключаться между обработкой различных запросов без блокирования на операциях ввода-вывода (I/O), таких как запросы к базам данных или вызовы к внешним сервисам. Однако для максимальной эффективности важно, чтобы ваши обработчики были написаны асинхронно.

Вот как вы можете улучшить обработку запросов в FastAPI:

1. **Используйте асинхронные функции**. Убедитесь, что ваши обработчики путей и любые вспомогательные функции, которые выполняют I/O, объявлены с ключевым словом `async` и используют асинхронные библиотеки.

```python
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def read_root():
    return {"Hello": "World"}
```

2. **Используйте асинхронные библиотеки**. Для операций, требующих обращения к базе данных, HTTP-запросов к внешним сервисам и других I/O операций, используйте асинхронно-совместимые библиотеки (например, `httpx` для HTTP-запросов, `databases` для взаимодействия с базами данных и т.д.).

3. **Используйте фоновые задачи для длительных операций**. Если вам нужно выполнить операцию, которая займет много времени, но не требуется отправлять ее результат клиенту сразу, используйте фоновые задачи (`BackgroundTasks`), чтобы не блокировать обработку приходящих запросов.

```python
from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)

@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_log, f"notification sent to {email}")
    # Отправьте уведомление здесь асинхронно
    return {"message": "Notification sent in the background"}
```

4. **Избегайте блокирующих операций в асинхронных функциях**. Если вы используете блокирующие библиотеки или функции в асинхронных обработчиках, они будут блокировать весь поток и помешают асинхронной природе FastAPI.

5. **Используйте `ThreadPoolExecutor` для интеграции синхронного кода**. Если у вас есть синхронный код, который вы не можете сделать асинхронным, можно использовать `ThreadPoolExecutor` для его исполнения в отдельных потоках:

```python
import asyncio
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI

executor = ThreadPoolExecutor()
app = FastAPI()

def block_function():
    # Здесь что-то длительное, например, обращение к синхронной библиотеке
    pass

@app.get("/block")
async def run_in_threadpool():
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(executor, block_function)
    return {"message": "This was run in a threadpool with blocking code"}
```

6. **Тюнинг конфигурации сервера**. Убедитесь, что используемый вами асинхронный сервер (например, Uvicorn или Hypercorn) настроен на достаточное количество
Скорее всего у вас где-то вызывается синхронный код в асинхронном ендпоинте. И этот код блокирует event loop. 
Посмотрите пример

Если сделать запрос на /delay и тут же сделать запрос на /instantly , получится то, о чем я говорю, когда синхронная time.sleep() блокирует цикл событий. Ответ от второго запроса придет только после отработки первого. Обратите внимание, что delay объявлена через async def .

Исправить это можно двумя способами
- Использовать асинхронный await asyncio.sleep()
- Объявить функцию синхронной через def

Возможно именно в этом у вас проблема. Найдите синхронный код и либо перепишите на асинхронный вариант, если это возможно (если для обращения в другой сервис вы используете requests , замените на aiohttp или httpx ), либо саму функцию ендпоинта сделайте синхронной.
Похожие вопросы