now is better than never
[airflow] TASK 분기 (1) - Branch Python Operator 본문
Task 분기 처리
- 선행 Task 의 수행 결과에 따라 선택적으로 다음 Task 를 수행할 경우
- Branch Python Operator
- task.branch Decorator
- Base Branch Operator 상속하여 직접 개발
Branch Python Operator
- python_branch_task → task a, task b, task c 중 하나 실행
- python callable : select_random 함수 실행
- 실행할 task 의 id 값으로 return
- 후행 task 가 1개라면 string
- 후행 task 가 여러 개라면 list
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
with DAG(
dag_id="dags_branch_python_operator",
schedule="0 8 1 * *",
start_date=pendulum.datetime(2023, 9, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
def select_random():
import random
item_list = ['A', 'B', 'C']
selected_item = random.choice(item_list)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B', 'C'] :
return ['task_b', 'task_c']
python_branch_task = BranchPythonOperator(
task_id='python_branch_task',
python_callable=select_random
)
# 후행 task
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
python_branch_task >> [task_a, task_b, task_c]
- task a 실행
- task b, task c 스킵
'데이터 엔지니어링 > airflow' 카테고리의 다른 글
[airflow] TASK 분기 (3) - Base Branch Operator (0) | 2023.09.19 |
---|---|
[airflow] TASK 분기 (2) - Task.branch (0) | 2023.09.06 |
[airflow] 전역변수 Variable (0) | 2023.07.08 |
[airflow] XCom (1) | 2023.07.06 |
[airflow] Macro (0) | 2023.06.23 |