Python asyncio 깊이 이해하기

asyncio는 무엇이고 어떤 문제를 해결하나요?

Python asyncio(비동기 입출력 라이브러리)는 단일 스레드 내에서 여러 I/O 작업을 동시에 처리하는 동시성 프레임워크입니다. 이벤트 루프라는 중앙 스케줄러가 준비 상태인 코루틴을 번갈아 실행하며, 블로킹 작업 대기 시간을 다른 작업에 할애합니다. CPU 바운드 작업이 아닌 I/O 바운드 작업(네트워크 요청, 파일 읽기, 데이터베이스 쿼리)에서 멀티스레딩 대비 오버헤드를 줄이면서 처리량을 증가시킵니다.

이벤트 루프는 어떻게 작동하나요?

이벤트 루프(Event Loop)는 asyncio의 핵심 메커니즘입니다. 루프는 asyncio.run() 호출 시 생성되며, 다음 사이클을 반복 실행합니다:

  1. 준비 큐(Ready Queue) 확인: 실행 가능한 코루틴/태스크 목록 순회
  2. 코루틴 실행: 선택된 코루틴을 await 지점까지 실행
  3. 대기 이벤트 모니터링: await 이후 I/O 완료, 타이머 만료 등 모니터링 (select/poll/epoll 시스템콜 사용)
  4. 완료된 작업 복귀: 준비된 코루틴을 큐에 추가
  5. 종료 조건 확인: 모든 태스크 완료 또는 명시적 중단 시 루프 종료

Python 공식 문서에 따르면, 이벤트 루프는 플랫폼별로 구현이 다릅니다. Linux는 epoll(효율 1,000+ 동시 연결), Windows는 IOCP(I/O Completion Ports), macOS는 kqueue를 기본으로 사용하며, 각각 O(1) 시간 복잡도로 이벤트 감지를 수행합니다.

async def fetch_data(url):
    # I/O 대기 중, 이벤트 루프는 다른 코루틴 실행
    response = await asyncio.sleep(1)
    return response

async def main():
    # 3개 동시 실행, 총 1초 소요 (순차 실행은 3초)
    results = await asyncio.gather(
        fetch_data('url1'),
        fetch_data('url2'),
        fetch_data('url3')
    )

코루틴과 태스크는 어떻게 다른가요?

**코루틴(Coroutine)**은 async def 키워드로 정의한 함수 객체이며, 호출 시 즉시 실행되지 않고 코루틴 객체를 반환합니다. 코루틴은 await 또는 이벤트 루프에 명시적으로 전달되어야만 실행됩니다.

**태스크(Task)**는 코루틴을 이벤트 루프에 등록한 래퍼 객체입니다. asyncio.create_task() 또는 asyncio.gather(), asyncio.ensure_future()를 통해 생성되며, 생성 즉시 이벤트 루프에 스케줄됩니다. 태스크는 실행 상태를 추적(PENDING, RUNNING, DONE, CANCELLED)하고 결과나 예외를 저장합니다.

구분 코루틴 태스크
생성 방식 async def 함수 호출 create_task() 호출
실행 시점 await 또는 루프 등록 필요 생성 즉시 스케줄
상태 추적 없음 PENDING/RUNNING/DONE/CANCELLED
결과 반환 하나의 값 반환 Future 객체로 결과 접근
취소 가능 불가 (await 지점만 제어) cancel() 메서드로 취소

asyncio가 실제로 얼마나 효율적인가요?

표준 라이브러리 벤치마크와 실무 사례를 통해 성능을 검증할 수 있습니다.

1. 동시 네트워크 요청 성능

1,000개 HTTP 요청 처리 시간:

  • 순차 처리(requests): ~1,000초 (평균 1초/요청)
  • 멀티스레드(ThreadPoolExecutor, 10 스레드): ~150초
  • asyncio(aiohttp): 3초 (동시성 1001000)

이는 asyncio가 컨텍스트 스위칭 오버헤드(멀티스레드: ~1,000 사이클/전환) 없이 I/O 대기 시간을 완전히 활용하기 때문입니다.

2. 메모리 사용량

10,000개 동시 연결 유지:

  • 멀티스레드(스레드당 8MB 스택): ~80MB+
  • asyncio(코루틴당 ~1KB 메모리): ~10MB

Python 공식 성능 가이드에 따르면, asyncio는 메모리 효율성에서 멀티스레드 대비 8~10배 우수합니다.

3. 응답 지연시간(Latency)

WebSocket 메시지 응답:

  • 멀티스레드: p99 지연시간 ~50ms (스케줄링 변동)
  • asyncio: p99 지연시간 15ms (결정적 스케줄링)

실제 적용 사례는 무엇인가요?

1. 웹 크롤링 플랫폼

대규모 뉴스 수집 시스템에서 asyncio + aiohttp 조합으로 1시간당 10만 개 URL 크롤링을 구현한 사례가 있습니다. 멀티스레드 방식 대비 서버 CPU 사용률을 30%에서 8%로 감소시키면서, 처리량은 동일 수준을 유지했습니다.

async def crawl_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

async def fetch_url(session, url):
    try:
        async with session.get(url, timeout=5) as response:
            return await response.text()
    except asyncio.TimeoutError:
        return None

2. 실시간 데이터 수집 시스템

금융 데이터 제공 회사가 asyncio로 50개 거래소에서 동시에 시세 데이터를 수집하는 구조를 운영 중입니다. 각 연결은 WebSocket을 통해 유지되며, 초당 5,000~10,000개 메시지 처리가 가능합니다. 멀티프로세스 방식에 비해 IPC 오버헤드가 없어 메시지 지연시간이 10배 단축되었습니다.

3. 마이크로서비스 게이트웨이

FastAPI(asyncio 기반) 프레임워크를 사용한 API 게이트웨이는 백엔드 10개 마이크로서비스로의 동시 요청을 병렬 처리합니다. 단일 게이트웨이 인스턴스가 초당 10,000 RPS(Request Per Second)를 처리할 수 있으며, 메모리 사용량은 ~100MB 수준에서 안정적입니다.

정리하면 어떤가요?

Python asyncio는 I/O 바운드 작업 처리에서 효율성과 확장성을 제공합니다. 이벤트 루프의 비선점형(Non-preemptive) 스케줄링은 컨텍스트 스위칭 오버헤드를 제거하고, 단일 스레드로 10,000+ 동시 연결을 관리할 수 있습니다. 실무에서는 네트워크 I/O(HTTP, WebSocket, 데이터베이스)가 많은 백엔드 서비스(FastAPI, aiohttp, asyncpg)에 널리 적용되어 있으며, CPU 바운드 작업이 섞여 있을 경우 asyncio.to_thread() 또는 멀티프로세싱 병행으로 대응합니다.

자주 묻는 질문

asyncio에서 CPU 바운드 작업을 어떻게 처리해야 하나요?

asyncio는 I/O 바운드 작업 최적화 설계이므로, CPU 바운드 작업(계산, 데이터 처리)을 직접 실행하면 이벤트 루프가 블로킹됩니다. 대응 방법은 두 가지입니다:

  1. 스레드풀 활용: asyncio.to_thread() (Python 3.9+) 또는 loop.run_in_executor()로 CPU 작업을 별도 스레드에 위임. 예: result = await asyncio.to_thread(calculate, data)
  2. 프로세스풀 활용: CPU 집약 작업이 클 경우 ProcessPoolExecutor 사용. GIL(Global Interpreter Lock) 우회로 진정한 병렬 처리 가능.

일반적으로 계산 시간 50ms 이상인 작업은 스레드/프로세스 위임을 권장합니다.

asyncio에서 예외 처리는 어떻게 구현하나요?

asyncio 코루틴 내 예외는 try-except 블록으로 처리하거나, asyncio.gather(..., return_exceptions=True)로 모든 예외를 결과로 반환받을 수 있습니다.

results = await asyncio.gather(
    task1(), task2(), task3(),
    return_exceptions=True  # 예외 발생 시 예외 객체를 결과에 포함
)

# 예외 필터링
errors = [r for r in results if isinstance(r, Exception)]

또한 asyncio.TaskGroup (Python 3.11+)을 사용하면 하나의 태스크 실패 시 나머지 태스크를 자동 취소하고 예외를 집계합니다.

이벤트 루프를 여러 개 실행할 수 있나요?

Python 3.10 이전에는 스레드당 하나의 이벤트 루프만 가능했습니다. 여러 루프가 필요할 경우 멀티스레드 구조를 사용합니다:

import asyncio
import threading

def run_loop_in_thread(coros):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(asyncio.gather(*coros))

# 스레드별 독립 루프 실행
thread = threading.Thread(target=run_loop_in_thread, args=(tasks,))
thread.start()

Python 3.13+에서는 다중 루프 지원이 개선되었으므로, 최신 버전 사용을 권장합니다.

asyncio 애플리케이션을 프로덕션에 배포할 때 주의사항은 무엇인가요?

  1. 적절한 타임아웃 설정: 모든 I/O 작업에 타임아웃을 명시. 예: asyncio.wait_for(coro, timeout=5.0). 설정 없으면 무한 대기 가능.
  2. 예외 처리 완성도: 미처리 예외가 로그되더라도 루프는 계속 실행되므로, 예외 콜백 등록 필수.
  3. 메모리 누수 모니터링: 대량의 연결/태스크 생성 시 리소스 정리 확인. await task.cancel() 또는 컨텍스트 매니저 사용.
  4. 이벤트 루프 정책: asyncio.DefaultEventLoopPolicy (Unix 기본)와 asyncio.WindowsSelectorEventLoopPolicy (Windows) 차이 인식.
  5. 동시성 제한: 수만 개 동시 연결 시 asyncio.Semaphore로 동시 실행 수 제어. 예: semaphore = asyncio.Semaphore(1000), async with semaphore: ...

프로덕션 모니터링은 asyncio.all_tasks(), asyncio.current_task()로 실행 중인 태스크를 추적하고, 로깅 프레임워크(structlog, python-json-logger)로 구조화된 로그를 수집합니다.

관련 글