반응형
velog 마이그레이션
원글: [Airflow] 6-1. SubDagOperator -> TaskGroup 변환하기 (ImportError : SubDagOperator ) (2022년 9월 11일)
Task를 그룹화하기 위해 SubDagOperator를 사용하면 아래와 같은 오류가 발생한다.
Broken DAG: [/opt/airflow/dags/group_dag.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/opt/airflow/dags/group_dag.py", line 3, in <module>
from airflow.operators.bash import SubDagOperator
ImportError: cannot import name 'SubDagOperator' from 'airflow.operators.bash' (/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/bash.py)
SubDagOperator를 사용할 수 없다는 오류인데, Airflow Docs를 찾아보면 SubDagOperator 대신 TaskGroup을 사용하라고 나온다.
from airflow import DAG
from airflow.operators.bash import BashOperator
#from airflow.operators.bash import SubDagOperator
from subdags.subdag_sleeps import subdag_sleeps
from datetime import datetime
with DAG('group_dag', start_date=datetime(2022, 9, 1),
schedule_interval='@daily', catchup=False) as dag:
args = {'start_date': dag.start_date, 'schedule_interval': dag.schedule_interval, 'catchup': dag.catchup}
'''
sleeps = SubDagOperator(
task_id='sleeps',
subdag=subdag_sleeps(dag.dag_id, 'sleeps', args)
)
'''
sleeps = sleep_tasks()
check_files = BashOperator(
task_id='check_files',
bash_command='sleep 10'
)
sleeps >> check_files
from airflow import DAG
from airflow.operators.bash import BashOperator
/# 추가
from airflow.utils.task_group import TaskGroup
#def subdag_sleeps(parent_dag_id, child_dag_id, args):
def sleep_tasks():
# 그룹 이름 정의
with TaskGroup("sleeps", tooltip="Sleep tasks") as group:
#with DAG(f"{parent_dag_id}.{child_dag_id}", start_date=args['start_date'], schedule_interval=args['schedule_interval'], catchup=args['catchup']) as dag:
sleep_a = BashOperator(
task_id='sleep_a',
bash_command='sleep 10'
)
sleep_b = BashOperator(
task_id='sleep_b',
bash_command='sleep 10'
)
sleep_c = BashOperator(
task_id='sleep_c',
bash_command='sleep 10'
)
return dag
728x90
반응형
'Data > Data Engineering' 카테고리의 다른 글
[Airflow] Airflow 설치하기(pip) (0) | 2024.03.06 |
---|---|
[Airflow] 8. Airflow plugin 생성하기 (0) | 2024.03.06 |
[Airflow] 6-1. TaskGroup으로 Task 그룹화하기 (0) | 2024.03.06 |
[Airflow] 5-3. 데이터 파이프라인 구성하기 : Executor (2) | 2024.03.06 |
[Airflow] 3. Airflow 설치하기 (docker) (0) | 2024.03.05 |