일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- ebs
- asgi
- asyncio
- YAML
- K8S
- Service
- dockerfile
- POD
- elasticsearch
- Deployment
- 쿠버네티스
- github
- IAC
- leetcode
- docker
- Kubernetes
- IAM
- Python
- 자바스크립트
- WSGI
- terraform
- Django
- EKS
- intervals
- kernel
- ansible
- AWS
- EC2
- event loop
- FastAPI
- Today
- Total
궁금한게 많은 개발자 노트
[ python ] event loop (1) 본문
asyncio에서는 coroutine과 event loop를 사용하여 비동기 프로그래밍을 지원합니다. 다른 언어에서와 마찬가지로 evnet loop는 task들을 loop를 돌면서 하나씩 실행시키는 역할을 합니다. 만약 실행된 task가 특정한 데이터를 요청하고 응답을 대기해야 한다면 이 task는 제어권을 다시 event loop에 넘겨줍니다.
제어권을 받은 event loop는 다음 task를 실행하게 되고, 응답을 받은 순서대로 task queue에 들어가고 재개되는 task들은 멈췄던 부분부터 다시 제어권을 가지고 작업을 마무리합니다.
# 여기서 coroutine이 응답을 대기하는 상태에서 제어권을 event loop로 주는 용도로 await를 사용합니다.
coroutine으로 task를 만들고, asyncio.get_event_loop()를 통해 event loop객체를 얻어 오고, run_until_complete() method를 통해 실행시킬 수 있습니다. run_until_completed()는 parameter로 Future 객체를 받는데, coroutine을 받으면 task로 실행되도록 내부적으로 설계되어 있습니다.
task를 추가하는 방법으로는 event loop객체의 create_task() method를 이용할 수도 있습니다.
coroutine을 실행시키는 방법으로는 다음과 같다.
- await 키워드 (coroutine내에서만 사용할 수 있으므로, 처음 coroutine을 실행하는 용도로는 사용X)
- asyncio.run(), asyncio.create_task() : 이들이 coroutine chain으로 들어가는 일종의 entry point
ayncio.run() method는 현재 thread에서 새 event loop를 설정하고, 해당 event loop에서 인자로 넘어오는 coroutine 객체에 해당하는 task를 예약하여 실행시킨 뒤, 실행이 완료되면 event loop를 닫는 역할을 수행 (python3.7이상)
이전 버전에서는 아래와 같이 사용
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine())
loop.close()
asyncio.run 함수는 전달된 코루틴을 실행하고, asyncio 이벤트 루프와 비동기 제너레이터의 파이널리제이션을 관리합니다. 다른 asyncio 이벤트 루프가 같은 스레드에서 실행 중일 때, 이 함수를 호출할 수 없습니다.
이 함수는 항상 새 이벤트 루프를 만들고 끝에 이벤트 루프를 닫습니다.
asyncio 프로그램의 메인 진입 지점으로 사용해야 하고, 이상적으로는 한 번만 호출해야 합니다.
[ asyncio.get_event_loop ]
현재 thread에 설정된 event loop를 가져오는 함수입니다. 만약 thread에 설정된 event loop가 없다면 새로 생성하여 이를 현재 thread에 설정한 뒤 event loop를 반환합니다. 이 method는 coroutine실행을 위해 event loop를 준비하는 과정
event loop를 다시 설명하면, 무한 루프를 돌며 매 루프마다 task를 하나씩 실행시키는 로직입니다.
task는 하나의 coroutine에서 출발하는 하나의 실행 흐름으로 coroutine chain과 연관되어 있습니다.
[ loop.run_until_completed ]
0. Task의 생성
- 고수준 asyncio.create_task()를 사용하거나 저수준 loop.create_task(), loop.ensure_future() method를 사용
https://docs.python.org/ko/3.8/library/asyncio-task.html#asyncio.Task
1. Task의 실행 (coroutine chain형성)
인자로 넘어오는 coroutine 객체를 이용하여 task객체를 생성하고, event loop에 의해 즉시 예약됩니다. 예약된 task가 없다면 event loop는 해당 task를 즉시 실행할 것입니다. task의 실행이란 해당 task 객체의 __step__() method를 실행시키는 것을 의미합니다. 이 method는 coroutine객체(_coro필드에 저장)의 send() method를 호출함으로써 해당 coroutine을 실행하는 역할을 수행합니다. 그러면 이 coroutine을 시작으로 await키워드를 마주칠 때 마다 연쇄적으로 coroutine을 호출하며 coroutine chain을 형성합니다.
2. coroutine chain의 종착점
await를 통해 coroutine을 실행하다 보면, sleep혹은 I/O 관련 coroutine을 await하는 코드를 마주치게 됩니다. 이러한 종류의 coroutine들은 Future객체를 await하도록 구현되어 있습니다. (coroutine chain끝에서 return으로 await를 안 마주칠 수도 있습니다. 이러한 경우 __step() method에서 StopIteration 예외 발생하면서 task종료)
# I/O 또는 sleep coroutine 처리 방식
예를 들어 I/O 관련 코루틴이라고 해보자. 그러면 이 코루틴은 특정 소켓에 대해 데이터를 읽거나 쓰기 위해 해당 소켓의 상태를 검사한다. 만약 당장 읽거나 쓸 수 있는 데이터가 있다면, 단순히 yield 키워드만을 사용하여 태스크 객체의 __step() 메소드로까지 제어를 넘긴다. 그러면 태스크 객체는 바로 다시 자신의 실행을 이벤트 루프에게 예약하고 지금의 실행은 중단한 뒤 이벤트 루프에게 제어를 넘긴다. 이때 태스크의 실행을 예약한다 함은 곧 해당 태스크 객체의 __step() 메소드를 이벤트 루프의 콜백 큐에 등록하는 것을 의미한다는 것을 기억하자.
그러나 보통은 당장 읽거나 쓸 수 있는 데이터가 있지 않다. 따라서 보통의 경우에는 select() 함수를 이용하여 해당 소켓을 등록해두고, 해당 소켓에 바인딩된 퓨처 객체를 새로 생성하여 await 한다. 퓨처 객체의 __await__() 메소드는 자기 자신(퓨처 객체)을 yield 하도록 구현되어 있기 때문에, 이로 인해 해당 퓨처 객체는 코루틴 체인을 따라 태스크 객체의 __step() 메소드로까지 전달될 것이다. 우선 여기까지 설명을 하고, 이번에는 Sleep 관련 코루틴도 알아보자. 그 이후의 절차는 아래 섹션에서 설명한다.
※ select() 함수 : Unix의 select() 함수를 래핑 한 Python 함수로, 특정 소켓들에 대해 데이터를 읽거나 쓸 준비가 될 때까지 (원하는 시간만큼) 기다릴 수 있게 하는 Blocking 함수이다. 이는 (원하는 시간만큼) 기다린 후 데이터를 읽거나 쓸 준비가 된 소켓들을 반환한다.
Sleep 관련 코루틴의 경우, 이벤트 루프 자체의 타이머를 이용한다. 만약 asyncio.sleep(1)이라면, 이 코루틴은 퓨처 객체를 하나 생성한 뒤 이벤트 루프에게는 1초 뒤에 해당 퓨처 객체의 결과 값을 업데이트하도록 요청한다. 그리고 그 퓨처 객체를 await 한다. 그러면 마찬가지로 해당 퓨처 객체가 코루틴 체인을 따라 태스크 객체의 __step() 메소드로까지 전달될 것이다. 그렇다면 이제 그렇게 전달된 퓨처 객체를 태스크 객체가 어떻게 처리하는지 알아보자.
3. Task 객체의 Future 객체 처리
태스크 객체는 yield 된 퓨처 객체를 받으면 우선 이것을 자신의 __fut_waiter 필드에 저장한다(바인딩한다). 그리고 퓨처 객체의 add_done_callback() 메소드를 호출하여, 해당 퓨처 객체가 완료 상태가 될 때 이벤트 루프에게 실행을 예약할 콜백 함수를 등록한다. 이때 등록하는 함수는 곧 자기 자신의 __step() 메소드라고 생각해도 된다. 이러한 콜백 함수의 실행을 이벤트 루프에게 예약한다는 것은 곧 해당 태스크의 실행을 예약한다는 것과 같은 말이다.
그러고 나면 이제 태스크 객체는 자신의 실행을 중단하고 제어를 이벤트 루프에게 넘긴다. 그러면 지금과 같이 퓨처 객체에 바인딩되어 있는 태스크 객체는 더 이상 이벤트 루프에 의해 실행되지 못할 것이다. __fut_waiter 필드의 이름이 나타내듯이, 어떠한 퓨처 객체를 기다리고 있을 때는 실행되면 안 되기 때문이다. 아무튼 그렇게 제어가 넘어가고 나면, 이벤트 루프는 다시 자신에게 실행을 예약해둔 태스크(정확히는 콜백 함수)들 중 우선순위가 높은 것을 적절히 선택하여 이를 실행시킨다. 이벤트 루프는 이러한 과정을 반복하며 여러 태스크들을 동시적으로(Concurrent, not Parallel) 실행하는 역할을 맡는다.
4. Event loop의 Polling (I/O 소켓 검사)
그런데 만약 더 이상 자신에게 실행을 예약해둔 태스크가 없게 되면, 이벤트 루프는 그 시간을 낭비하지 않고 select() 함수를 이용하여 데이터를 읽거나 쓸 준비가 된 소켓을 계속 찾는다. 만약 데이터를 읽거나 쓸 준비가 된 소켓을 찾게 되면, 그 소켓에 바인딩되어 있는 퓨처 객체의 결과 값을 업데이트해주고, 이로 인해 이 순간 아까 등록해두었던 콜백 함수의 실행이 이벤트 루프에서 예약될 것이다. 다시 강조하지만, 콜백 함수의 실행을 예약한다는 건 곧 해당 태스크의 실행을 예약한다는 말이다.
5. Task 객체의 실행 재개 (__step() 메소드 재실행)
그러면 이벤트 루프가 실행이 예약된 태스크를 실제로 실행시키는 과정을 한 번 살펴보자. 태스크의 실행이란 곧 해당 태스크 객체의 __step() 메소드가 호출되는 것을 의미한다. 이 메소드는 먼저 자기 자신(태스크 객체)과 퓨처 객체의 바인딩을 해제함으로써 더 이상 기다리는 퓨처 객체가 없음을 나타내도록 하고, 다시 자신의 코루틴 객체에 대해 send() 메소드를 호출함으로써 해당 코루틴의 실행을 재개하게 된다. 그러면 다시 해당 퓨처 객체의 __await()__ 메소드에서 실행이 중단되었던 부분(자기 자신을 yield 하는 부분)까지 가게 된다.
__await()__ 메소드로까지 돌아왔을 때, 만약 I/O 관련 코루틴 때문에 기다리고 있었던 거라면 이제는 해당 소켓에 대해 데이터를 읽거나 쓸 준비가 되었다는 것이므로 해당 소켓(자기 자신에 바인딩되어 있음)에 대해 데이터를 읽거나 쓴 다음 그 값을 return 할 것이다. 반면에 Sleep 관련 코루틴 때문이었다면 바로 return 할 것이다.
6. 최초 coroutine의 return (태스크 실행의 종료)
이러한 과정을 반복하다 보면 언젠가 태스크가 실행한 최초의 코루틴이 return 해야 하는 시점에 도달할 것이고, 이로 인해 해당 태스크 객체의 __step() 메소드에선 StopIteration 예외가 발생할 것이다. 그러면 태스크 객체는 그 예외 객체의 value 필드 값으로 자기 자신의 결과 값을 업데이트하고, 자신의 실행을 종료한다. 그러면 이 태스크는 더 이상 이벤트 루프에 의해 실행이 예약되지 않고 버려진다. loop.run_until_complete() 함수의 실행이 끝나는 시점이 이때이다. 자신이 실행한 태스크가 종료되었기 때문이다. 그리고 그 태스크 객체의 결과 값이 곧 loop.run_until_complete() 함수의 반환 값이다.
[ loop.close() ]
loop.run_until_complete() 함수의 실행이 끝났다는 것은 이제 해당 이벤트 루프가 실행되지 않는다는 것이다. 따라서 이벤트 루프를 닫아줘야 하는데, 이 역할을 수행하는 것이 loop.close() 함수이다. 이는 이벤트 루프에 남아 있는 모든 데이터(EX. 아직 실행이 종료되지 않은 태스크)들을 제거한다. 그래서 만약 loop.run_until_complete() 함수의 실행이 끝나고 loop.close()에 의해 이벤트 루프까지 닫히는 시점에 여전히 실행이 완료되지 않은 태스크가 남아 있다면, "Task was destroyed but it is pending!"라는 워닝 메시지가 출력될 것이다.
'Language' 카테고리의 다른 글
[ python ] python답게 코딩하기 (0) | 2023.01.23 |
---|---|
[ python ] event loop (2) (0) | 2022.05.24 |
[ python ] asyncio (2) (0) | 2022.05.24 |
[python] asyncio (1) (0) | 2022.05.23 |
Node.js convention (0) | 2022.04.07 |