Как можно улучшить скрипт на Python для работы с веб-сокетом на сайте, который обновляет финансовую информацию несколько раз в секунду? Я пытаюсь взаимодействовать с веб-сокетом, однако столкнулся с несколькими вопросами.
Первый вопрос: я нашел видео на YouTube, которое не дало мне четких ответов: <a href="https://youtu.be/tgtb9iucOts" rel="nofollow">https://youtu.be/tgtb9iucOts</a>. Я несколько раз проверял код и ошибок не обнаружил.
Вот код, который я использую:
<pre><code>import websockets
import asyncio
async def listen():
url = "ws://simple-websocket-server-echo. glitch.me/"
async with websockets.connect(url) as ws:
while True:
msg = await ws.recv()
print(msg)
asyncio.get_event_loop().run_until_complete
(listen())</code></pre>
Второй вопрос: почему в строке `url = "ws://simple-websocket-server-echo.glitch.me/"` используется именно этот URL? В видео взаимодействуют с "sync-chasing-ball.glitch.me".
Третий вопрос: почему функция `listen()` должна вызываться с новой строки? Если я не сделаю этого, появляется ошибка:
<pre><code>partially initialized module 'websockets' from 'program.py' has no attribute 'connect' (most likely due to a circular import)</code></pre>
Как я могу разрешить эти вопросы и успешно получить данные с веб-сокета?
Давайте пошагово разберем ваши вопросы и предложим исправления.
### 1. Ошибка "coroutine 'listen' was never awaited"
Ваше текущее использование функции `listen()` действительно вызывает эту ошибку, потому что вы не вызываете её правильно. Ваша строка:
```python
asyncio.get_event_loop().run_until_complete(listen())
```
вызвана неправильно. Она должна выглядеть так, чтобы вызывалась функция. Исправьте её следующим образом:
```python
asyncio.run(listen())
```
или, если вы хотите сохранить использование `get_event_loop`, напишите:
```python
loop = asyncio.get_event_loop()
loop.run_until_complete(listen())
```
### 2. URL веб-сокета
URL `ws://simple-websocket-server-echo.glitch.me/` используется в вашем коде как пример веб-сокета. Вы можете использовать его для тестирования и отладки, однако если в видео упоминается `sync-chasing-ball.glitch.me`, и этот адрес важен для ваших целей, вы можете использовать именно его. Важно, чтобы URL соответствует вашему проекту. Убедитесь, что сервер доступен и поддерживает нужный вам протокол.
### 3. Вызов функции listen()
Ваша ошибка с `partially initialized module 'websockets'` может возникнуть из-за неправильного именования вашего скрипта. Убедитесь, что ваш файл не называется `websockets.py`, так как это может создавать конфликт с библиотекой.
### Полный исправленный пример кода
Учитывая всё вышесказанное, ваш обновлённый код может выглядеть так:
```python
import websockets
import asyncio
async def listen():
url = "ws://simple-websocket-server-echo.glitch.me/" # Замените на нужный URL.
async with websockets.connect(url) as ws:
while True:
msg = await ws.recv()
print(msg)
if __name__ == "__main__":
asyncio.run(listen())
```
### Резюме
1. Убедитесь, что вы правильно вызываете корутину `listen()` через `asyncio.run`.
2. Используйте нужный вам URL.
3. Убедитесь, что ваш скрипт не конфликтует с установленными модулями (избегайте названий, совпадающих с библиотеками).
Теперь, когда вы проделали эти шаги, ваш код должен работать без ошибок! Если у вас возникнут дополнительные вопросы, не стесняйтесь спрашивать.
<pre><code class="python">import websockets
import asyncio
async def listen():
url = "ws://simple-websocket-server-echo.glitch.me/"
async with websockets.connect(url) as ws:
while True:
msg = await ws.recv()
print(msg)
async def main():
# Создание задачи для прослушивания
listener_task = asyncio.create_task(listen())
# Здесь вы можете добавить другие задачи параллельно с прослушиванием сообщений сервера, если необходимо
await listener_task # Ожидаем завершения задачи что гарантирует, что программа не завершится, пока она прослушивает входящие сообщения.
# Запуск основной функции
asyncio.run(main())</code></pre>