now is better than never

[airflow] TASK 분기 (1) - Branch Python Operator 본문

데이터 엔지니어링/airflow

[airflow] TASK 분기 (1) - Branch Python Operator

김초송 2023. 9. 5. 17:53

Task 분기 처리

  • 선행 Task 의 수행 결과에 따라 선택적으로 다음 Task 를 수행할 경우
  1. Branch Python Operator
  2. task.branch Decorator
  3. Base Branch Operator 상속하여 직접 개발

 

Branch Python Operator

  • python_branch_task → task a, task b, task c 중 하나 실행
  • python callable : select_random 함수 실행
  • 실행할 task 의 id 값으로 return
    • 후행 task 가 1개라면 string
    • 후행 task 가 여러 개라면 list
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator

with DAG(
    dag_id="dags_branch_python_operator",
    schedule="0 8 1 * *",
    start_date=pendulum.datetime(2023, 9, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    def select_random():
        import random

        item_list = ['A', 'B', 'C']
        selected_item = random.choice(item_list)
        if selected_item == 'A':
            return 'task_a'
        elif selected_item in ['B', 'C'] :
            return ['task_b', 'task_c']
        
    python_branch_task = BranchPythonOperator(
        task_id='python_branch_task',
        python_callable=select_random
    )

    # 후행 task
    def common_func(**kwargs):
        print(kwargs['selected'])

    task_a = PythonOperator(
        task_id='task_a',
        python_callable=common_func,
        op_kwargs={'selected':'A'}
    )

    task_b = PythonOperator(
        task_id='task_b',
        python_callable=common_func,
        op_kwargs={'selected':'B'}
    )

    task_c = PythonOperator(
        task_id='task_c',
        python_callable=common_func,
        op_kwargs={'selected':'C'}
    )

    python_branch_task >> [task_a, task_b, task_c]

  • task a 실행
  • task b, task c 스킵

'데이터 엔지니어링 > airflow' 카테고리의 다른 글

[airflow] TASK 분기 (3) - Base Branch Operator  (0) 2023.09.19
[airflow] TASK 분기 (2) - Task.branch  (0) 2023.09.06
[airflow] 전역변수 Variable  (0) 2023.07.08
[airflow] XCom  (1) 2023.07.06
[airflow] Macro  (0) 2023.06.23