БлогУслугиКарьера
Обсудить проект
БлогУслугиКарьераОбсудить проект
Python

Асинхронный Python (asyncio) простыми словами: от теории к практике

Понятное объяснение asyncio в Python: event loop, корутины, tasks. Практические примеры для веб-разработки и автоматизации.

Редакция Feature
Редакция Feature·
30 мар
·
13 мин
·
Асинхронный Python (asyncio) простыми словами: от теории к практике

Зачем нужна асинхронность

Представьте ресторан с одним официантом. Синхронный подход: официант принимает заказ у первого столика, идёт на кухню, ждёт, пока блюдо приготовят, приносит его — и только потом подходит ко второму столику. Асинхронный подход: официант принимает заказ у первого столика, передаёт его на кухню, пока блюдо готовится — принимает заказ у второго, третьего столика. Когда блюдо первого столика готово — он его несёт.

В программировании ситуация аналогичная. Ваше приложение делает HTTP-запрос к внешнему API и ждёт ответ 500 миллисекунд. В синхронном коде процесс просто стоит и ничего не делает эти полсекунды. В асинхронном — за это время он успевает обработать десятки других запросов.

Это особенно критично для веб-серверов. Если ваш API обслуживает 1000 одновременных пользователей, каждый запрос обращается к базе данных (50 мс) и внешнему сервису (200 мс) — синхронный сервер будет узким горлышком. Асинхронный обработает все запросы значительно быстрее, используя ресурсы сервера эффективнее.

Конкурентность vs параллелизм

Прежде чем погружаться в asyncio, разберёмся с терминологией. Эти два понятия часто путают, но между ними есть принципиальная разница.

Конкурентность (concurrency)

Одно ядро процессора переключается между задачами. Задачи выполняются «как бы одновременно», но в каждый момент времени работает только одна. Это именно то, что делает asyncio.

Аналогия: один повар готовит три блюда, переключаясь между ними — пока одно варится, он нарезает овощи для другого.

Параллелизм (parallelism)

Несколько ядер процессора выполняют задачи одновременно и по-настоящему. В Python для этого используется модуль multiprocessing.

Аналогия: три повара готовят три блюда одновременно.

Что даёт asyncio

Asyncio обеспечивает конкурентность. Это идеально подходит для I/O-bound задач — тех, где программа большую часть времени ждёт ответа от сети, базы данных, файловой системы. Для CPU-bound задач (тяжёлые вычисления, обработка данных) asyncio не поможет — нужен multiprocessing.

Event loop: сердце asyncio

Event loop (цикл событий) — это центральный механизм asyncio. Он управляет выполнением корутин: запускает их, приостанавливает, когда они ждут I/O, и возобновляет, когда результат готов.

Как это работает:

  1. Event loop берёт задачу из очереди
  2. Выполняет её до тех пор, пока она не встретит await (ожидание I/O)
  3. Приостанавливает задачу и переключается на следующую из очереди
  4. Когда I/O-операция завершается — задача возвращается в очередь
  5. Цикл повторяется

В коде event loop запускается так:

import asyncio

async def main():
    print("Привет, asyncio!")

asyncio.run(main())

Функция asyncio.run() создаёт event loop, запускает корутину main() и закрывает loop после завершения. В 99% случаев это единственный способ запуска, который вам нужен.

Корутины: async и await

Корутина — это функция, объявленная с ключевым словом async def. Она может приостанавливать своё выполнение с помощью await и возобновлять его, когда результат готов.

import asyncio
import time

async def fetch_data(name, delay):
    print(f"[{name}] Начинаю загрузку...")
    await asyncio.sleep(delay)  # Имитация I/O-операции
    print(f"[{name}] Загрузка завершена за {delay} сек")
    return f"Данные от {name}"

async def main():
    start = time.time()

    # Последовательное выполнение
    result1 = await fetch_data("API-1", 2)
    result2 = await fetch_data("API-2", 3)

    elapsed = time.time() - start
    print(f"Последовательно: {elapsed:.1f} сек")  # ~5 секунд

asyncio.run(main())

В примере выше две корутины выполняются последовательно — общее время 5 секунд. Но мы можем запустить их конкурентно.

Tasks и gather: параллельный запуск корутин

Чтобы запустить несколько корутин конкурентно, используйте asyncio.gather() или asyncio.create_task().

asyncio.gather()

async def main():
    start = time.time()

    # Конкурентное выполнение
    results = await asyncio.gather(
        fetch_data("API-1", 2),
        fetch_data("API-2", 3),
        fetch_data("API-3", 1),
    )

    elapsed = time.time() - start
    print(f"Конкурентно: {elapsed:.1f} сек")  # ~3 секунды (по самой долгой)
    print(f"Результаты: {results}")

asyncio.run(main())

Теперь все три запроса выполняются одновременно. Общее время — 3 секунды (по самой долгой операции), а не 6 секунд ( 2+3+1).

asyncio.create_task()

Для более тонкого контроля используйте create_task():

async def main():
    task1 = asyncio.create_task(fetch_data("API-1", 2))
    task2 = asyncio.create_task(fetch_data("API-2", 3))

    # Можно делать что-то другое, пока задачи выполняются
    print("Задачи запущены, делаю другую работу...")

    # Получаем результаты, когда они нужны
    result1 = await task1
    result2 = await task2

Разница: gather() — запуск группы и ожидание всех, create_task() — запуск каждой задачи отдельно с возможностью работать между ними.

aiohttp: асинхронные HTTP-запросы

Стандартная библиотека requests — синхронная. Для асинхронных HTTP-запросов используйте aiohttp или httpx.

import aiohttp
import asyncio

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.json()

async def main():
    urls = [
        "https://api.github.com/repos/python/cpython",
        "https://api.github.com/repos/django/django",
        "https://api.github.com/repos/tiangolo/fastapi",
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

        for result in results:
            print(f"{result['full_name']}: {result['stargazers_count']} звёзд")

asyncio.run(main())

Ключевой момент: aiohttp.ClientSession() создаётся один раз и переиспользуется для всех запросов. Это значительно эффективнее, чем создавать новое соединение для каждого запроса.

Если вы занимаетесь автоматизацией на Python, переход на асинхронные запросы может ускорить ваши скрипты сбора данных в десятки раз.

Нужен высоконагруженный сервис?

Разработаем асинхронный backend на Python

Заказать разработку

asyncpg: асинхронная работа с PostgreSQL

Для работы с PostgreSQL в асинхронном коде используйте asyncpg — это самая быстрая библиотека для PostgreSQL на Python.

import asyncpg
import asyncio

async def main():
    # Создаём пул соединений
    pool = await asyncpg.create_pool(
        host="localhost",
        database="mydb",
        user="postgres",
        password="password",
        min_size=5,
        max_size=20,
    )

    # Выполняем запросы через пул
    async with pool.acquire() as conn:
        # Одиночный запрос
        user = await conn.fetchrow(
            "SELECT * FROM users WHERE id = $1", 42
        )
        print(f"Пользователь: {user['name']}")

        # Множественные запросы конкурентно
        users_task = conn.fetch("SELECT * FROM users LIMIT 100")
        orders_task = conn.fetch("SELECT * FROM orders LIMIT 100")

        users, orders = await asyncio.gather(users_task, orders_task)

    await pool.close()

asyncio.run(main())

Connection pooling

Пул соединений — критически важный паттерн. Создание нового соединения к базе данных занимает 50–100 мс. Если каждый запрос создаёт новое соединение — при 1000 запросах в секунду это катастрофа. Пул поддерживает готовые соединения и переиспользует их.

Паттерн Producer-Consumer

Один из самых полезных паттернов в асинхронном программировании. Producer создаёт задачи и кладёт их в очередь. Consumer забирает задачи из очереди и обрабатывает.

import asyncio
import aiohttp

async def producer(queue, urls):
    """Кладёт URL-ы в очередь"""
    for url in urls:
        await queue.put(url)
    # Сигнал о завершении для каждого consumer
    for _ in range(3):  # количество consumers
        await queue.put(None)

async def consumer(queue, session, name):
    """Забирает URL из очереди и обрабатывает"""
    while True:
        url = await queue.get()
        if url is None:
            break
        try:
            async with session.get(url) as response:
                data = await response.text()
                print(f"[{name}] {url}: {len(data)} символов")
        except Exception as e:
            print(f"[{name}] Ошибка {url}: {e}")
        queue.task_done()

async def main():
    urls = [f"https://example.com/page/{i}" for i in range(20)]
    queue = asyncio.Queue(maxsize=10)

    async with aiohttp.ClientSession() as session:
        # Запускаем producer и 3 consumers
        await asyncio.gather(
            producer(queue, urls),
            consumer(queue, session, "Worker-1"),
            consumer(queue, session, "Worker-2"),
            consumer(queue, session, "Worker-3"),
        )

asyncio.run(main())

Этот паттерн идеален для веб-скрейпинга, обработки очереди задач, загрузки файлов — везде, где нужно обрабатывать много однотипных задач с ограничением конкурентности.

Rate limiting: ограничение частоты запросов

При работе с внешними API важно не превышать лимиты. Asyncio-семафор — простой способ ограничить количество одновременных запросов.

import asyncio
import aiohttp

async def fetch_with_limit(semaphore, session, url):
    async with semaphore:
        async with session.get(url) as response:
            return await response.json()

async def main():
    # Максимум 5 одновременных запросов
    semaphore = asyncio.Semaphore(5)
    urls = [f"https://api.example.com/data/{i}" for i in range(100)]

    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_with_limit(semaphore, session, url)
            for url in urls
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    successful = [r for r in results if not isinstance(r, Exception)]
    failed = [r for r in results if isinstance(r, Exception)]
    print(f"Успешно: {len(successful)}, Ошибки: {len(failed)}")

asyncio.run(main())

asyncio.Semaphore(5) гарантирует, что одновременно выполняется не более 5 запросов. Остальные ждут в очереди. Это защищает и вас (от бана по IP), и сервер (от перегрузки).

Реальный пример: асинхронный веб-сервер на FastAPI

Всё вышеописанное объединяется в реальном проекте. Вот пример асинхронного эндпоинта на FastAPI:

from fastapi import FastAPI, Depends
import asyncpg
import aiohttp

app = FastAPI()

async def get_db_pool():
    """Создаём пул соединений при старте"""
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/db",
        min_size=5, max_size=20
    )
    return pool

@app.get("/user/{user_id}/dashboard")
async def get_dashboard(user_id: int):
    pool = app.state.db_pool

    async with pool.acquire() as conn:
        # Параллельно загружаем данные из БД и внешнего API
        user_task = conn.fetchrow(
            "SELECT * FROM users WHERE id = $1", user_id
        )
        orders_task = conn.fetch(
            "SELECT * FROM orders WHERE user_id = $1 ORDER BY created_at DESC LIMIT 10",
            user_id
        )

        async with aiohttp.ClientSession() as session:
            weather_task = session.get(
                f"https://api.weather.com/current?city=Moscow"
            )

            user, orders, weather_resp = await asyncio.gather(
                user_task, orders_task, weather_task
            )
            weather = await weather_resp.json()

    return {
        "user": dict(user),
        "recent_orders": [dict(o) for o in orders],
        "weather": weather["current"],
    }

Один эндпоинт делает три асинхронных операции параллельно: два запроса к базе данных и один к внешнему API. Вместо 150 мс последовательно — 60 мс параллельно.

Обсудим ваш проект?

Оставьте контакты — перезвоним и обсудим задачу

Частые ошибки при работе с asyncio

Ошибка 1: Вызов синхронного кода в корутине

# ПЛОХО: requests блокирует event loop
async def bad_example():
    import requests
    response = requests.get("https://api.example.com")  # Блокировка!
    return response.json()

# ХОРОШО: используем aiohttp
async def good_example():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com") as response:
            return await response.json()

Синхронный вызов requests.get() внутри корутины блокирует весь event loop. Пока один запрос ждёт ответа — все остальные корутины стоят. Это убивает весь смысл асинхронности.

Если вынуждены использовать синхронную библиотеку, оберните вызов в asyncio.to_thread():

async def workaround():
    import requests
    response = await asyncio.to_thread(requests.get, "https://api.example.com")
    return response.json()

Ошибка 2: Забыть await

# ПЛОХО: корутина не выполнится
async def main():
    fetch_data()  # Возвращает объект корутины, но не выполняет её!

# ХОРОШО
async def main():
    await fetch_data()  # Теперь выполнится

Python выдаст предупреждение RuntimeWarning: coroutine was never awaited, но не ошибку. Код просто молча не выполнит то, что должен.

Ошибка 3: Создание слишком много задач одновременно

# ПЛОХО: 10 000 одновременных соединений
async def main():
    tasks = [fetch(url) for url in ten_thousand_urls]
    await asyncio.gather(*tasks)  # Можно упереться в лимит соединений ОС

# ХОРОШО: ограничиваем через семафор
async def main():
    semaphore = asyncio.Semaphore(100)
    tasks = [fetch_with_semaphore(semaphore, url) for url in ten_thousand_urls]
    await asyncio.gather(*tasks)

Ошибка 4: Игнорирование исключений в задачах

# ПЛОХО: исключение потеряется
async def main():
    task = asyncio.create_task(risky_operation())
    # Если risky_operation упадёт — вы не узнаете

# ХОРОШО: обрабатываем исключения
async def main():
    task = asyncio.create_task(risky_operation())
    try:
        result = await task
    except Exception as e:
        print(f"Задача упала: {e}")

Когда НЕ использовать asyncio

Асинхронность — не серебряная пуля. Вот случаи, когда она не поможет или навредит:

CPU-bound задачи

Если ваш код выполняет тяжёлые вычисления (обработка изображений, ML-инференс, сложная математика) — asyncio не ускорит его. Используйте multiprocessing или concurrent.futures.ProcessPoolExecutor.

Простые скрипты

Если скрипт делает один запрос к API, обрабатывает результат и завершается — асинхронность добавляет сложность без выгоды. Обычный requests.get() будет проще и понятнее.

Когда все библиотеки синхронные

Если ваш стек целиком синхронный (Django ORM, requests, psycopg2) — переход на asyncio потребует замены всех библиотек. Это может быть оправдано, но только если вы действительно упираетесь в производительность I/O.

Когда команда не готова

Асинхронный код сложнее в отладке и понимании. Если команда не знакома с концепциями конкурентности — баги будут множиться. Лучше начать с обучения.

Полезные библиотеки для асинхронного Python

Библиотека Назначение
aiohttp HTTP-клиент и сервер
httpx HTTP-клиент (поддерживает sync и async)
asyncpg PostgreSQL
aiosqlite SQLite
motor MongoDB
aioredis Redis
aiokafka Apache Kafka
aiofiles Работа с файлами
uvicorn ASGI-сервер для FastAPI

Заключение

Asyncio — мощный инструмент для построения эффективных приложений на Python. Ключевые идеи просты: event loop управляет задачами, async/await позволяет приостанавливать и возобновлять выполнение, а gather() и create_task() запускают задачи конкурентно.

Начните с малого: замените синхронный сбор данных из нескольких API на асинхронный и измерьте разницу. Потом переходите к более сложным паттернам: producer-consumer, connection pooling, rate limiting.

Если вы только начинаете изучать Python, сначала пройдите базовый курс — asyncio требует уверенного владения основами языка. А когда дойдёте до выбора фреймворка для веб-разработки, обратите внимание на наше сравнение FastAPI, Django и Flask — все три поддерживают асинхронную работу в той или иной степени.

Если вашему бизнесу нужен высоконагруженный асинхронный сервис на Python — наша команда разработает решение, которое выдержит любую нагрузку. Мы специализируемся на построении масштабируемых backend-систем и автоматизации бизнес-процессов.

Обсудим ваш проект?

Оставьте контакты — перезвоним и обсудим задачу

Содержание
  • Зачем нужна асинхронность
  • Конкурентность vs параллелизм
  • Event loop: сердце asyncio
  • Корутины: async и await
  • Tasks и gather: параллельный запуск корутин
  • aiohttp: асинхронные HTTP-запросы
  • asyncpg: асинхронная работа с PostgreSQL
  • Паттерн Producer-Consumer
  • Rate limiting: ограничение частоты запросов
  • Реальный пример: асинхронный веб-сервер на FastAPI
  • Частые ошибки при работе с asyncio
  • Когда НЕ использовать asyncio
  • Полезные библиотеки для асинхронного Python
  • Заключение
Поделиться:

Похожие статьи

FastAPI vs Django vs Flask: что учить в 2026 году
Python

FastAPI vs Django vs Flask: что учить в 2026 году

12 мин
Python для автоматизации: 15 скриптов, экономящих 10 часов в неделю
Python

Python для автоматизации: 15 скриптов, экономящих 10 часов в неделю

14 мин
Как я получил первую работу Python-разработчиком за 4 месяца
Python

Как я получил первую работу Python-разработчиком за 4 месяца

13 мин
Feature IT

Feature IT — платформа по обучению программированию и разработке цифровых продуктов. Мы создаём современные веб-решения для бизнеса и обучаем этому других!

Политика конфиденциальностиПользовательское соглашение

О компании

  • Блог
  • Карьера

Услуги разработки

  • Разработка сайтов под ключ
  • Веб-приложения на React/Next.js
  • Telegram-боты для бизнеса
  • Mini Apps (Telegram, VK)
  • SEO-оптимизированные сайты
  • Автоматизация бизнес-процессов
  • Поддержка и развитие IT-продуктов

Обучение

  • Курс Python с нуля
  • Алгоритмы и структуры данных
  • Паттерны проектирования
  • Подготовка к собеседованиям в IT
  • Практика на реальных проектах

Инструменты

  • Генератор UTM-меток
  • Счётчик символов