반응형
velog 마이그레이션
원글 : [Airflow] 6. TaskGroup으로 Task 그룹화하기(2022년 9월 11일)
group_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG('group_dag', start_date=datetime(2022, 9, 1),
schedule_interval='@daily', catchup=False) as dag:
sleep_a = BashOperator(
task_id='sleep_a',
bash_command='sleep 10'
)
sleep_b = BashOperator(
task_id='sleep_b',
bash_command='sleep 10'
)
sleep_c = BashOperator(
task_id='sleep_c',
bash_command='sleep 10'
)
check_files = BashOperator(
task_id='check_files',
bash_command='sleep 10'
)
[sleep_a, sleep_b, sleep_c] >> check_files
클릭하면
group_dag.py
from airflow import DAG
from airflow.operators.bash import BashOperator
#groups 디렉토리에 group_sleeps.py에서 sleep_tasks 를 import 한다
from groups.group_sleeps import sleep_tasks
from datetime import datetime
with DAG('group_dag', start_date=datetime(2022, 9, 1),
schedule_interval='@daily', catchup=False) as dag:
args = {'start_date': dag.start_date, 'schedule_interval': dag.schedule_interval, 'catchup': dag.catchup}
#task 정의가 되어있는 sleep_tasks() 를 호출한다.
sleeps = sleep_tasks()
check_files = BashOperator(
task_id='check_files',
bash_command='sleep 10'
)
sleeps >> check_files
groups/group_sleeps.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
def sleep_tasks():
with TaskGroup("sleeps", tooltip="Sleep tasks") as group:
sleep_a = BashOperator(
task_id='sleep_a',
bash_command='sleep 10'
)
sleep_b = BashOperator(
task_id='sleep_b',
bash_command='sleep 10'
)
sleep_c = BashOperator(
task_id='sleep_c',
bash_command='sleep 10'
)
return group
728x90
반응형
'Data > Data Engineering' 카테고리의 다른 글
[Airflow] 8. Airflow plugin 생성하기 (0) | 2024.03.06 |
---|---|
[Airflow] 6-2. SubDagOperator -> TaskGroup 변환하기 (ImportError : SubDagOperator ) (0) | 2024.03.06 |
[Airflow] 5-3. 데이터 파이프라인 구성하기 : Executor (2) | 2024.03.06 |
[Airflow] 3. Airflow 설치하기 (docker) (0) | 2024.03.05 |
[Airflow] 2. Airflow 구성 자세히 살펴보기 (0) | 2024.03.05 |