[Celery] python 비동기 백그라운드 작업 스케줄링 / 분산 테스크 큐 (기본 개념 / 입문 / 실습 Redis+Celery)
새로 들어간 프로젝트에서 데이터 수집 & 정제 워크플로우를 구축해야 했다.
한 사이클마다 여러개의 작업이 진행되어야 했고, 작업마다 연관 관계가 있는 것도 있어 이전의 수집 작업보다는 더 복잡한 구조를 가지고 있었다.
또 속도와 실패시 재시도처리도 정말 중요했다.
'분산 처리 & 데이터 파이프라인 구축관리'라는 키워드만 보고 kafka를 생각했는데 수집서버는 한대에 여러 작업 간의 의존성 처리가 관건이라 Celery를 도입하기로 했다.
사실 파이썬에도 스케줄링, 백그라운드 작업 처리를 할 수 있는 모듈들이 있지만 Celery의 특장점이 해야하는 업무에 잘 맞기에 선택했다.
Celery는 분산 처리가 가능하다. 또 멀티 프로세스 기반으로 작업되어 병렬 비동기 작업에 적합하다.
작업 실패시 자동 재시작이 기본 기능으로 제공되며, 작업마다 연관 관계가 있을 경우 chain, chord 등 제어할 수 있다.
Celery 개념
Celery는 분산 작업 큐 (Distributed Task Queue) 이다.
본인들은 그렇게 정의내렸지만 정확히 말하자면 분산 작업 처리 관리 프레임워크로 봐야 한다.
실제로 작업을 전달하는 건 Redis나 RabbitMQ와 같은 메시지 브로커를 사용해야 하기 때문이다.
기본 작동 방식은 아주 간단하다.
워커가 있고 브로커가 있다.
아래 영상을 보면 이해하기 쉽다.
https://youtu.be/TzVkED3y3Ig?si=Fdvj418wmu03tNdr
Celery의 역할은 작업(Task)를 정의하고 워커(Worker)를 사용해 병렬로 여러 프로세스에서 작업을 처리하는 것이다.
여기서 브로커는 이 작업을 보내고 받아서 처리할 수 있게 하는 중앙 허브 역할을 해준다.
Worker
작동 방식의 핵심인 워커란 작업을 실행시키는 프로세스를 말한다. 워커는 명령어를 통해 실행할 수 있다.
실행 명령어 옵션 중 concurrency 설정을 줘서 여러 개의 작업을 동시에 병렬로 처리할 수도 있다.
celery -A project_name worker --concurrency=4 --loglevel=info
예를 들면 이 명령으로 하나의 워커 프로세스가 동시 4개의 작업을 처리하도록 설정한 것이다.
celery -A project_name worker --concurrency=2 --loglevel=info
celery -A project_name worker --concurrency=2 --loglevel=info
아니면 하나의 celery 애플리케이션에서 여러개의 워커를 실행할 수도 있다.
위의 명령으로 2개의 작업이 가능한 워커 총 2개를 한번에 처리할 수 있는 구성이 된다.
자원과 작업에 따라 유연하게 설정하면 된다.
그럼 Celery의 작동방식을 확인할 수 있는 간단한 실습을 해보겠다.
Mac OS에서 개발 환경 구성
M1 기준이고, 브로커로는 Redis를 선택했다. Redis는 도커로 띄워 사용했다.
# 가상 환경 설정
pyenv virtualenv 3.10.6 celerytest
# 가상 환경 실행
pyenv activate celerytest
pip install celery redis
동작 방식 확인 용 더하기&곱하기 실습
tasks.py
간단한 작동방식만 보기 위해 add / multiply 라는 작업만 정의해줬다.
Celery 를 띄우기 전에 애플리케이션 설정을 또 해줘야 한다.
작업을 전달해줄 브로커이자 작업 결과를 저장할 Redis의 설정이 필요하다.
@app.task를 정의해줌으로써 Celery가 큐에 쌓인 후 워커에 의해 실행될 것이다.
이제 워커를 실행해주면 된다. 기본 명령어로 단일 워커를 실행해준다.
celery -A tasks worker --loglevel=info
워커 실행이 잘 되었다.
실제 작업을 실행하는 코드도 작성해 실행시켜주겠다.
run_celery.py
작업으로 정의된 add와 multiply를 불러와 실행해준다.
작업을 큐에 추가하고 실행하는 delay() 메서드를 사용했다.
터미널을 새로 열어 실행해줬더니 작업별로 프로세스가 생성되었고 ID가 찍혔다. 또 결과도 잘 나왔다.
워커가 실행되는 터미널도 열어 확인해봤더니 내가 코드에 작성한대로 print도 찍혔고 워커의 로그도 잘 찍혔다.
또 Redis-cli 로 확인해보면 결과도 잘 저장되는 걸 볼 수 있다.
이번에는 조금 더 Celery의 작업을 보다 잘 관리하기 위한 기능인 Chain / Group / Chord 를 비교해보겠다.
Celery 작업 흐름 제어 관리
사실 세 기능의 개념이 비슷하게 느껴져 조금은 헷갈렸다.
개념부터 설명하자면 이렇다.
- Chain : 작업을 순차적으로 실행하게 제어하는 기능
- Group : 여러 작업을 동시 병렬로 독립적으로 실행한 후 결과를 모아 반환하도록 제어하는 기능
- Chord : 여러 작업을 동시 병렬로 독립적으로 실행한 후 추가 후속 작업을 실행하도록 제어하는 기능
실습을 통해 이해해보자
chord를 쓰기 위해 task를 추가해줬다.
⭐️ task를 변경했으니 워커도 다시 실행해줬다. ⭐️
run_celery.py 에 해당 코드를 추가해 확인해봤다.
def test_workflow():
workflow_chain = chain(multiply.s(2, 2) | add.s(10))
result_chain = workflow_chain()
print(result_chain.get()) # 결과: 14
workflow_group = group(multiply.s(2, 2), multiply.s(3, 2))
result_group = workflow_group()
print(result_group.get()) # 결과: [4, 6]
workflow_chord = chord([multiply.s(2, 2), multiply.s(3, 2)], add_all.s())
result = workflow_chord()
print(result.get()) # 결과: 10
test_workflow()
chain의 workflow는 2*2 ➡️ +10
이렇게 된다. chain은 순차적으로 실행이 되고 이전의 결과가 다음 작업으로 넘어간다.
결국 4 + 10 으로 14가 출력된다.
group의 workflow는 2*2 & 3*2 로 각각 독립적으로 실행이 되고 서로 영향도 없다. 하지만 최종결과는 한번에 반환되어 리스트인 [4, 6] 이 반환되었다.
chord의 workflow는 2*2 & 3*2 ➡️ add_all 으로 header인 곱하기 작업은 독립적으로 실행되고 완료되면 call_back 후속작업이 실행된다.
😲 ChatGPT가 비유를 기가 막히게 들어줬는데 공유해보자면
Chain은 요리할 때, 밀가루 반죽을 만들고 → 그 반죽을 구워서 빵을 만든 후 → 빵에 크림을 발라서 완성하는 순서
Group은 여러 사람이 각각 다른 요리를 동시에 만드는 상황, 결과가 각기 다르지만 동시에 완료
Chord는 여러 사람이 동시에 다른 재료를 준비하고, 모든 재료가 준비된 후에 요리사가 그것을 합쳐서 하나의 요리를 완성하는 것
이라고 한다. 이해가 잘 되었다.
요즘 업무에서 하고싶은 데이터 파이프라인 구축을 하고 있다. 꽤나 재미있다. 커리어 전환 화이팅 가보자〰️ 👍
참고
https://docs.celeryq.dev/en/stable/index.html
https://github.com/celery/celery?tab=readme-ov-file
https://youtu.be/MtCa7A3_JGs?si=XMzVxkaUhi4Kzb4u