궁금한게 많은 개발자 노트

[ Python ] 병렬 처리 concurrent future 본문

Back End

[ Python ] 병렬 처리 concurrent future

궁금한게 많은 개발자 2022. 5. 23. 15:26

파이썬 제약 : GIL
Python은 두 개 이상의 스레드가 동시에 실행될 때 두 개 이상의 스레드가 하나의 자원을 동시에 액세스할 때 발생할 수 있는 문제점을 방지하기 위해 GIL(Global interpreter lock)이라는 것을 도입했습니다. 즉, 스레드가 실행될 때, 프로그램 내의 리소스 전체에 락이 걸립니다. 결국 Python 구현에서는 동시에 몇 개의 스레드가 실행이 되던 간에 GIL에 의해서 한 번에 하나의 스레드만 실행됩니다. 멀티 스레드의 경우 문맥교환(Context Switch)에 필요한 리소스까지 고려하면 단일 스레드보다 성능이 떨어지게 되는 것을 확인할 수 있습니다.

 

기존에는 python에서 동시성 처리를 위해 threading과 multiprocessing을 사용했지만 3.2version부터 비동기 실행을 위한 API를 고수준으로 만들고, 사용하기 쉽도록 개선한 concurrent.future가 도입되었습니다.

thread와 multiprocessing을 사용하는 API를 통일하고, 특히 비동기 coroutine과 거의 유사한 형태의 API를 제공하여, 현대적인 자바스크립트의 비동기 Task 프로토콜인 Promise와 유사한 Future라는 클래스를 도입하여 보다 깔끔하게 병렬처리 코드를 작성할 수 있게 해줍니다.

 

 

 

[ concurrent.furue 모듈 ]

 concurrent.futures 모듈은 별도 규격의 스레드 객체를 작성하지 않고 함수 호출을 객체화하여 다른 스레드나 다른 프로세스에서 이를 실행할 수 있게 해줍니다. 이때 중심 역할을 하는 것이 Executor 클래스입니다. Executor 클래스는 다시 ThreadPoolExecutor와 ProcessPoolExecutor로 나뉘는데 두 클래스의 차이는 동시성 작업을 멀티 스레드로 처리하느냐, 멀티 프로세스로 처리하느냐만 있지 거의 동일한 기능을 제공합니다.

 

[ furture ]

executor를 이용한 동시성 처리는 호출해야 할 함수와 그에 전달될 인자들을 executor에 넘겨주는 것으로 시작되는데, executor의 해당 메소드는 다른 스레드의 리턴을 기다릴 필요가 없으므로 바로 리턴하게 됩니다. 이 때 리턴되는 객체가 Future 객체이며, 이 객체의 상태를 조사하여 완료 여부를 확인하거나, 해당 객체 내 작업이 완료되기를 기다리거나 혹은 미리 콜백을 넘겨놓아둘 수도 있습니다.

이 객체는 asyncio의 Future 클래스와 유사한 API를 가지고 있다. (둘이 호환되는 객체는 아니다.)

ex)

  1. 실행 중인 병렬 작업을 취소
  2. 실행 중 여부, 완료 여부의 체크
  3. 특정 타임아웃 시간 후의 결과값 확인
  4. 완료 콜백을 추가
  5. 동기화 코드를 매우 쉽게 작성 가능

단일 스레드 비동기 코루틴을 사용하는 방식과 concurrent.futures를 이용한 병렬처리 방식은 매우 비슷한 형태로 사용 가능하다.

 

 Future 클래스는 자바스크립트의 Promise API와 매우 비슷하다.

아직 완료되지 않은 (혹은 완료되었는지 당장은 모르는) 작업을 외부에서 객체로 다룰 수 있게 된다.

다음의 메소드들이 지원된다.

특히 하나의 작업에 대해서 하나 이상의 완료 콜백을 추가할 수 있다는 점이 흥미롭다.

  • cancel() : 작업 취소를 시도한다. 만약 현재 실행중이고 취소가 불가능할 경우 False를 리턴한다. 작업이 취소되었다면 True가 리턴된다.
  • canceled() : 취소가 완료된 작업이면 True를 리턴한다.
  • running(): 실행 중인 경우 True를 리턴한다.
  • done(): 작업이 완료되어고 정상적으로 종료되었다면 True를 리턴한다.
  • result(): 해당 호출의 결과를 리턴한다. 만약 작업이 아직 완료되지 않았다면 최대 타임아웃시간까지 기다린다음, None을 리턴한다.
  • exception(): 해당 호출이 던진 예외를 반환한다. 역시 작업이 완료되지 않았다면 타임아웃 시간까지 기다린다.
  • add_done_callback(): 콜백함수를 Future 객체에 추가한다. 이 함수는 future 객체하나를 인자로 받는 함수이다. 콜백은 취소되거나 종료된 경우에 모두 호출된다.

모듈 함수 (wait, as_completed)

wait : 특정한 타임아웃 시간 동안 대기한 다음, 그 시간동안 완료된 작업과 그렇지 않은 작업을 구분하는 두 개의 세트로 된 튜플을 리턴한다. result.done, result.not_doen

 

as_completed: future 의 집합을 받아서 기다리면서 하나씩 완료되는 것 순서대로 순회하면서 반복하는 반복자를 생성하는 함수이다. executor의 map과 차이점으로는 완료되는 순서대로 순회가 가능합니다.​

 

 

 

 

[ excutor ]

Executor 객체는 풀 기반으로 작업을 관리합니다. 초기화 시에 몇 개의 worker가 사용될 것인지를 정해주면 전달되는 작업들을 큐에 넣고 worker pool에서 사용 가능한 worker로 하여금 작업을 처리하게 합니다.

병렬 작업을 dispatch하며 thread나 process를 관리

Executor 객체는 컨텍스트 매니저 프로토콜을 지원하기 때문에 with 구문 내에서 사용할 수 있다.

 

submit(fn, *args, **kwargs)

함수 fn에 대해 주어진 인자들을 전달하여 실행할 수 있는 Future 객체를 리턴합니다. 해당 함수는 호출 즉시 스케줄링됩니다.

with ThreadPoolExcutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
 

map(func, *iterables, timeout=None)

일반함수 map과 동일하나, 각 호출은 병렬적으로 일어납니다. 만약 타임아웃 값이 지정된 경우, 맵핑 작업이 완료되지 않은 호출이 있으면 TimeoutError가 일어납니다. 타임 아웃 값을 별도로 주지 않으면 디스패치된 모든 작업들이 종료될 때까지 기다린 후 리턴한다. 타임아웃 값이 주어진 경우에는 해당 타임아웃 내에 완료되지 못한 작업이 있을 때, 예외를 일으키게 된다.

 

입력 데이터와 동작 함수를 짝지어서 바로 스케줄링하도록 합니다. map()함수는 이터레이터를 리턴하는데, 이는 각 개별 작업이 동시에 실행된 후, 먼저 종료된 작업부터 내놓는 리턴값을 내놓게 됩니다.

(결과값은 특이하게 [Futures] 타입이 아닌 결과에 대한 제너레이터이다.)

호출은 비동기적으로 발생하며, 결과값이 생성되는 순서가 반드시 호출이 시작된 순서와 동일하지는 않다.

결과 list는 iterable이 전달된 순서대로 저장 되는 것이 as_completed()와의 차이점

 

shutdown(wait=True)

executor에게 종료 시그널을 보냅니다. 시그널을 받은 executor는 실행 중 및 대기 중인 모든 future에 대해 리소스를 정리합니다. shutdown 후에 submit이나 map을 호출하면 런타임에러가 발생합니다.만약 wait 값이 True로 정해지면 진행 및 대기중이던 작업이 종료된 후에 shutdown이 일어나고, 그 때까지 해당 함수는 리턴을 보류하게 됩니다.

(만약 강제 shutdown을 피하고 싶다면 with 구문 내에서 사용합니다.)

 

 

# Executor의 구분 – 스레드 vs 프로세스
Executor는 멀티스레드를 쓸 것이냐, 멀티 프로세스를 쓸 것이냐에 따라 ThreadPoolExecutor와 ProcessPoolExecutor로 나뉩니다. 둘의 사용방법은 거의 동일하나 다음과 같은 차이가 있습니다.

IO기반의 작업에 대해서 대기 시간을 줄이고 리소스 사용 효율을 늘리고 싶다면 ThreadPoolExecutor를 사용합니다. 예를 들어 HTTP 통신을 이용하여 여러 곳을 순차적으로 접근하는 것보다, 멀티 스레드를 이용하면 전반적인 성능향상을 볼 수 있습니다..
CPU 부하가 많은 작업을 분산처리 하는 목적이라면 ProcessPoolExecutor를 사용합니다.. CPU 로드가 크게 걸리는 작업인 경우에는 파이썬 내에서는 GIL(Grand Interal Lock)이라는 제약이 존재하기 때문에,  멀티 스레드로는 CPU 분산 처리의 효과를 누릴 수 없습니다. 멀티 프로세스는 서브 프로세스와 유사하게 __main__ 스스로를 반입하는 별도의 프로세스를 가지므로, 이를 호출하는 코드는 반드시 __main__ 모듈 내에서 호출되어야 합니다.

 

 

아래는 참고하기에 좋은 예시 코드입니다.

from concurrent import futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

def main():
    with futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = dict(
            (executor.submit(load_url, url, 60), url)
             for url in URLS)

        for future in futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                print('%r page is %d bytes' % (
                          url, len(future.result())))
            except Exception as e:
                print('%r generated an exception: %s' % (
                          url, e))

if __name__ == '__main__':
    main()

'Back End' 카테고리의 다른 글

SQLalchemy  (0) 2022.05.26
[ Python ] pydantic  (0) 2022.05.24
Fast API  (0) 2022.05.23
django model 간 관계  (0) 2022.05.19
django 핵심 기능 - Model  (0) 2022.05.19
Comments