velog 마이그레이션
원글 : [Airflow] 5-3. 데이터 파이프라인 구성하기 : Executor (2022년 9월 11일)
Executor 란?
@ airflow.cfg
executor = SequentialExecutor
@ docker-compose.yaml
environment:
AIRFLOW_CORE_EXECUTOR: CeleryExecutor
=> AIRFLOW_DORE_EXECUTOR 값이 executor 값을 오버라이드한다
SequentialExecutor
Airflow의 기본 Executor
1번에 1 task 실행. 동시에 여러개 실행 불가
t1 - t2 - t4 인 경우
ㅤ \ t3 /
t1 > t2 > t3 > t4 순서로 실행된다
LocalExecutor
task 병렬 실행 가능
로컬 실행기는 실행시키는 시스템(리소스)에 따라 다르기 때문에 잘 확장되지 않습니다.
executor=LocalExecutor
sqlal =
CeleryExecutor
여러 machine에서 task 실행 위해 salary cluster 사용함으로써 동시에 여러 task 실행 가능
여러 worker 노드에 task 인스턴스 실행을 분산할 수 있습니다.
Celery Executors를 사용하면 원하는 만큼 확장할 수 있고 작업을 다양한 기계에 배포할 수 있습니다.
airflow worker : task 실행 담당
celery queue : result backend + broker
result broker : 실행된 task 상태 저장
broker : 스케줄러가 실행할 task 보내고 worker가 가져간다
DAG 트리거 >
스케줄러가 task t1 브로커로 전송 > 워커중 1개가 task t1 가져감 > 완료되면 태스크 상태 result broker에 저장(DB아무거나 사용 가능) >
스케줄러가 task T2, T3 브로커로 전송 > 워커 2개가 task T2, T2 가져감 > 완료되면 태스크 상태 result broker에 저장 >
celery queue 설치해야함
Concurrency(동시성) : 동시에(병렬로) 실행할 수 있는 작업 및 DAG 실행 수
worker에게 각각 다른 리소스를 할당 할 수있고, 워커들에게 가는 큐 여러개 설정해서 맞는 워커에게 가도록
@Airflpw.cfg parallelism /
AIRFLOWCOREPARALELISM
: 스케줄러당 최대 task 인스턴스 수 정의. 리소스와 스케줄러 수에 따라 정의하기default 32 (ex. 스케줄러 2개면 최대 64개)
max_active_tasks_per_dag / AIRFLOWCOREMAX_ACTIVE_TASKS_PER_DAG
: 각 DAG에서 동시에 실행할 수 있는 최대 작업 인스턴스 수.
default 16
max_active_runs_per_dag / AIRFLOWCOREMAX_ACTIVE_RUNS_PER_DAG
: DAG당 최대 active DAG run 수.
default 16
SQLite는 한번에 readersms 제한 없으나 1 writer 제한있어서 local executor나 celery executer에 SQLite 사용 할 수 없음
각 task가 어느 큐로 갈지 설정하기
큐 생성하기
-q QueueName 옵션을 추가하면 된다
airflow-worker:
<<: *airflow-common
command: celery worker -q <<Queue_Name>>
새 woker 가 추가되었고 "Worker Name" 을 클릭해보면
Queue 를 보면 test_queue 이름으료 큐가 생성되었다
Task가 어느 Queue로 갈지 설정하기
task_a = BashOperator(
task_id = 'task_a',
queue = 'test_queue',
bash_command = 'sleep 10'
)
ip:555 Flower 에서 Worker, Queue, task 확인 가능
Task UUID를 클릭하면 상세 정보를 볼 수 있다.
'Data > Data Engineering' 카테고리의 다른 글
[Airflow] 6-2. SubDagOperator -> TaskGroup 변환하기 (ImportError : SubDagOperator ) (0) | 2024.03.06 |
---|---|
[Airflow] 6-1. TaskGroup으로 Task 그룹화하기 (0) | 2024.03.06 |
[Airflow] 3. Airflow 설치하기 (docker) (0) | 2024.03.05 |
[Airflow] 2. Airflow 구성 자세히 살펴보기 (0) | 2024.03.05 |
[Airflow] 1. Apache Airflow란? (0) | 2024.03.05 |