Зачем нужна асинхронность
Представьте ресторан с одним официантом. Синхронный подход: официант принимает заказ у первого столика, идёт на кухню,
ждёт, пока блюдо приготовят, приносит его — и только потом подходит ко второму столику. Асинхронный подход: официант
принимает заказ у первого столика, передаёт его на кухню, пока блюдо готовится — принимает заказ у второго, третьего
столика. Когда блюдо первого столика готово — он его несёт.
В программировании ситуация аналогичная. Ваше приложение делает HTTP-запрос к внешнему API и ждёт ответ 500 миллисекунд.
В синхронном коде процесс просто стоит и ничего не делает эти полсекунды. В асинхронном — за это время он успевает
обработать десятки других запросов.
Это особенно критично для веб-серверов. Если ваш API обслуживает 1000 одновременных пользователей, каждый запрос
обращается к базе данных (50 мс) и внешнему сервису (200 мс) — синхронный сервер будет узким горлышком. Асинхронный
обработает все запросы значительно быстрее, используя ресурсы сервера эффективнее.
Конкурентность vs параллелизм
Прежде чем погружаться в asyncio, разберёмся с терминологией. Эти два понятия часто путают, но между ними есть
принципиальная разница.
Конкурентность (concurrency)
Одно ядро процессора переключается между задачами. Задачи выполняются «как бы одновременно», но в каждый момент времени
работает только одна. Это именно то, что делает asyncio.
Аналогия: один повар готовит три блюда, переключаясь между ними — пока одно варится, он нарезает овощи для другого.
Параллелизм (parallelism)
Несколько ядер процессора выполняют задачи одновременно и по-настоящему. В Python для этого используется модуль
multiprocessing.
Аналогия: три повара готовят три блюда одновременно.
Что даёт asyncio
Asyncio обеспечивает конкурентность. Это идеально подходит для I/O-bound задач — тех, где программа большую часть
времени ждёт ответа от сети, базы данных, файловой системы. Для CPU-bound задач (тяжёлые вычисления, обработка данных)
asyncio не поможет — нужен multiprocessing.
Event loop: сердце asyncio
Event loop (цикл событий) — это центральный механизм asyncio. Он управляет выполнением корутин: запускает их,
приостанавливает, когда они ждут I/O, и возобновляет, когда результат готов.
Как это работает:
- Event loop берёт задачу из очереди
- Выполняет её до тех пор, пока она не встретит
await (ожидание I/O)
- Приостанавливает задачу и переключается на следующую из очереди
- Когда I/O-операция завершается — задача возвращается в очередь
- Цикл повторяется
В коде event loop запускается так:
import asyncio
async def main():
print("Привет, asyncio!")
asyncio.run(main())
Функция asyncio.run() создаёт event loop, запускает корутину main() и закрывает loop после завершения. В 99% случаев
это единственный способ запуска, который вам нужен.
Корутины: async и await
Корутина — это функция, объявленная с ключевым словом async def. Она может приостанавливать своё выполнение с помощью
await и возобновлять его, когда результат готов.
import asyncio
import time
async def fetch_data(name, delay):
print(f"[{name}] Начинаю загрузку...")
await asyncio.sleep(delay) # Имитация I/O-операции
print(f"[{name}] Загрузка завершена за {delay} сек")
return f"Данные от {name}"
async def main():
start = time.time()
# Последовательное выполнение
result1 = await fetch_data("API-1", 2)
result2 = await fetch_data("API-2", 3)
elapsed = time.time() - start
print(f"Последовательно: {elapsed:.1f} сек") # ~5 секунд
asyncio.run(main())
В примере выше две корутины выполняются последовательно — общее время 5 секунд. Но мы можем запустить их конкурентно.
Tasks и gather: параллельный запуск корутин
Чтобы запустить несколько корутин конкурентно, используйте asyncio.gather() или asyncio.create_task().
asyncio.gather()
async def main():
start = time.time()
# Конкурентное выполнение
results = await asyncio.gather(
fetch_data("API-1", 2),
fetch_data("API-2", 3),
fetch_data("API-3", 1),
)
elapsed = time.time() - start
print(f"Конкурентно: {elapsed:.1f} сек") # ~3 секунды (по самой долгой)
print(f"Результаты: {results}")
asyncio.run(main())
Теперь все три запроса выполняются одновременно. Общее время — 3 секунды (по самой долгой операции), а не 6 секунд (
2+3+1).
asyncio.create_task()
Для более тонкого контроля используйте create_task():
async def main():
task1 = asyncio.create_task(fetch_data("API-1", 2))
task2 = asyncio.create_task(fetch_data("API-2", 3))
# Можно делать что-то другое, пока задачи выполняются
print("Задачи запущены, делаю другую работу...")
# Получаем результаты, когда они нужны
result1 = await task1
result2 = await task2
Разница: gather() — запуск группы и ожидание всех, create_task() — запуск каждой задачи отдельно с возможностью
работать между ними.
aiohttp: асинхронные HTTP-запросы
Стандартная библиотека requests — синхронная. Для асинхронных HTTP-запросов используйте aiohttp или httpx.
import aiohttp
import asyncio
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.json()
async def main():
urls = [
"https://api.github.com/repos/python/cpython",
"https://api.github.com/repos/django/django",
"https://api.github.com/repos/tiangolo/fastapi",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(f"{result['full_name']}: {result['stargazers_count']} звёзд")
asyncio.run(main())
Ключевой момент: aiohttp.ClientSession() создаётся один раз и переиспользуется для всех запросов. Это значительно
эффективнее, чем создавать новое соединение для каждого запроса.
Если вы занимаетесь автоматизацией на Python, переход на асинхронные запросы
может ускорить ваши скрипты сбора данных в десятки раз.
asyncpg: асинхронная работа с PostgreSQL
Для работы с PostgreSQL в асинхронном коде используйте asyncpg — это самая быстрая библиотека для PostgreSQL на
Python.
import asyncpg
import asyncio
async def main():
# Создаём пул соединений
pool = await asyncpg.create_pool(
host="localhost",
database="mydb",
user="postgres",
password="password",
min_size=5,
max_size=20,
)
# Выполняем запросы через пул
async with pool.acquire() as conn:
# Одиночный запрос
user = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1", 42
)
print(f"Пользователь: {user['name']}")
# Множественные запросы конкурентно
users_task = conn.fetch("SELECT * FROM users LIMIT 100")
orders_task = conn.fetch("SELECT * FROM orders LIMIT 100")
users, orders = await asyncio.gather(users_task, orders_task)
await pool.close()
asyncio.run(main())
Connection pooling
Пул соединений — критически важный паттерн. Создание нового соединения к базе данных занимает 50–100 мс. Если каждый
запрос создаёт новое соединение — при 1000 запросах в секунду это катастрофа. Пул поддерживает готовые соединения и
переиспользует их.
Паттерн Producer-Consumer
Один из самых полезных паттернов в асинхронном программировании. Producer создаёт задачи и кладёт их в очередь. Consumer
забирает задачи из очереди и обрабатывает.
import asyncio
import aiohttp
async def producer(queue, urls):
"""Кладёт URL-ы в очередь"""
for url in urls:
await queue.put(url)
# Сигнал о завершении для каждого consumer
for _ in range(3): # количество consumers
await queue.put(None)
async def consumer(queue, session, name):
"""Забирает URL из очереди и обрабатывает"""
while True:
url = await queue.get()
if url is None:
break
try:
async with session.get(url) as response:
data = await response.text()
print(f"[{name}] {url}: {len(data)} символов")
except Exception as e:
print(f"[{name}] Ошибка {url}: {e}")
queue.task_done()
async def main():
urls = [f"https://example.com/page/{i}" for i in range(20)]
queue = asyncio.Queue(maxsize=10)
async with aiohttp.ClientSession() as session:
# Запускаем producer и 3 consumers
await asyncio.gather(
producer(queue, urls),
consumer(queue, session, "Worker-1"),
consumer(queue, session, "Worker-2"),
consumer(queue, session, "Worker-3"),
)
asyncio.run(main())
Этот паттерн идеален для веб-скрейпинга, обработки очереди задач, загрузки файлов — везде, где нужно обрабатывать много
однотипных задач с ограничением конкурентности.
Rate limiting: ограничение частоты запросов
При работе с внешними API важно не превышать лимиты. Asyncio-семафор — простой способ ограничить количество
одновременных запросов.
import asyncio
import aiohttp
async def fetch_with_limit(semaphore, session, url):
async with semaphore:
async with session.get(url) as response:
return await response.json()
async def main():
# Максимум 5 одновременных запросов
semaphore = asyncio.Semaphore(5)
urls = [f"https://api.example.com/data/{i}" for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_limit(semaphore, session, url)
for url in urls
]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
print(f"Успешно: {len(successful)}, Ошибки: {len(failed)}")
asyncio.run(main())
asyncio.Semaphore(5) гарантирует, что одновременно выполняется не более 5 запросов. Остальные ждут в очереди. Это
защищает и вас (от бана по IP), и сервер (от перегрузки).
Реальный пример: асинхронный веб-сервер на FastAPI
Всё вышеописанное объединяется в реальном проекте. Вот пример асинхронного эндпоинта
на FastAPI:
from fastapi import FastAPI, Depends
import asyncpg
import aiohttp
app = FastAPI()
async def get_db_pool():
"""Создаём пул соединений при старте"""
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/db",
min_size=5, max_size=20
)
return pool
@app.get("/user/{user_id}/dashboard")
async def get_dashboard(user_id: int):
pool = app.state.db_pool
async with pool.acquire() as conn:
# Параллельно загружаем данные из БД и внешнего API
user_task = conn.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
orders_task = conn.fetch(
"SELECT * FROM orders WHERE user_id = $1 ORDER BY created_at DESC LIMIT 10",
user_id
)
async with aiohttp.ClientSession() as session:
weather_task = session.get(
f"https://api.weather.com/current?city=Moscow"
)
user, orders, weather_resp = await asyncio.gather(
user_task, orders_task, weather_task
)
weather = await weather_resp.json()
return {
"user": dict(user),
"recent_orders": [dict(o) for o in orders],
"weather": weather["current"],
}
Один эндпоинт делает три асинхронных операции параллельно: два запроса к базе данных и один к внешнему API. Вместо 150
мс последовательно — 60 мс параллельно.
Частые ошибки при работе с asyncio
Ошибка 1: Вызов синхронного кода в корутине
# ПЛОХО: requests блокирует event loop
async def bad_example():
import requests
response = requests.get("https://api.example.com") # Блокировка!
return response.json()
# ХОРОШО: используем aiohttp
async def good_example():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as response:
return await response.json()
Синхронный вызов requests.get() внутри корутины блокирует весь event loop. Пока один запрос ждёт ответа — все
остальные корутины стоят. Это убивает весь смысл асинхронности.
Если вынуждены использовать синхронную библиотеку, оберните вызов в asyncio.to_thread():
async def workaround():
import requests
response = await asyncio.to_thread(requests.get, "https://api.example.com")
return response.json()
Ошибка 2: Забыть await
# ПЛОХО: корутина не выполнится
async def main():
fetch_data() # Возвращает объект корутины, но не выполняет её!
# ХОРОШО
async def main():
await fetch_data() # Теперь выполнится
Python выдаст предупреждение RuntimeWarning: coroutine was never awaited, но не ошибку. Код просто молча не выполнит
то, что должен.
Ошибка 3: Создание слишком много задач одновременно
# ПЛОХО: 10 000 одновременных соединений
async def main():
tasks = [fetch(url) for url in ten_thousand_urls]
await asyncio.gather(*tasks) # Можно упереться в лимит соединений ОС
# ХОРОШО: ограничиваем через семафор
async def main():
semaphore = asyncio.Semaphore(100)
tasks = [fetch_with_semaphore(semaphore, url) for url in ten_thousand_urls]
await asyncio.gather(*tasks)
Ошибка 4: Игнорирование исключений в задачах
# ПЛОХО: исключение потеряется
async def main():
task = asyncio.create_task(risky_operation())
# Если risky_operation упадёт — вы не узнаете
# ХОРОШО: обрабатываем исключения
async def main():
task = asyncio.create_task(risky_operation())
try:
result = await task
except Exception as e:
print(f"Задача упала: {e}")
Когда НЕ использовать asyncio
Асинхронность — не серебряная пуля. Вот случаи, когда она не поможет или навредит:
CPU-bound задачи
Если ваш код выполняет тяжёлые вычисления (обработка изображений, ML-инференс, сложная математика) — asyncio не ускорит
его. Используйте multiprocessing или concurrent.futures.ProcessPoolExecutor.
Простые скрипты
Если скрипт делает один запрос к API, обрабатывает результат и завершается — асинхронность добавляет сложность без
выгоды. Обычный requests.get() будет проще и понятнее.
Когда все библиотеки синхронные
Если ваш стек целиком синхронный (Django ORM, requests, psycopg2) — переход на asyncio потребует замены всех библиотек.
Это может быть оправдано, но только если вы действительно упираетесь в производительность I/O.
Когда команда не готова
Асинхронный код сложнее в отладке и понимании. Если команда не знакома с концепциями конкурентности — баги будут
множиться. Лучше начать с обучения.
Полезные библиотеки для асинхронного Python
| Библиотека |
Назначение |
| aiohttp |
HTTP-клиент и сервер |
| httpx |
HTTP-клиент (поддерживает sync и async) |
| asyncpg |
PostgreSQL |
| aiosqlite |
SQLite |
| motor |
MongoDB |
| aioredis |
Redis |
| aiokafka |
Apache Kafka |
| aiofiles |
Работа с файлами |
| uvicorn |
ASGI-сервер для FastAPI |
Заключение
Asyncio — мощный инструмент для построения эффективных приложений на Python. Ключевые идеи просты: event loop управляет
задачами, async/await позволяет приостанавливать и возобновлять выполнение, а gather() и create_task() запускают
задачи конкурентно.
Начните с малого: замените синхронный сбор данных из нескольких API на асинхронный и измерьте разницу. Потом переходите
к более сложным паттернам: producer-consumer, connection pooling, rate limiting.
Если вы только начинаете изучать Python, сначала пройдите базовый курс — asyncio
требует уверенного владения основами языка. А когда дойдёте до выбора фреймворка для веб-разработки, обратите внимание
на наше сравнение FastAPI, Django и Flask — все три поддерживают
асинхронную работу в той или иной степени.
Если вашему бизнесу нужен высоконагруженный асинхронный сервис на Python — наша команда разработает решение,
которое выдержит любую нагрузку. Мы специализируемся на построении масштабируемых backend-систем
и автоматизации бизнес-процессов.