now is better than never

[airflow] TASK 분기 (2) - Task.branch 본문

데이터 엔지니어링/airflow

[airflow] TASK 분기 (2) - Task.branch

김초송 2023. 9. 6. 18:57

Task.branch

  • branch python operator 를 이용해서 객체를 얻었던 것을
  • 함수를 실행시켜 객체를 얻음
    task 정의할 때 함수를 실행시킴
  • decorator 밑에 rapping하고 싶은 함수를 작성 
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_branch_decorator",
    schedule=None,
    start_date=pendulum.datetime(2023, 9, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    @task.branch(task_id='python_branch_task')
    def select_random():
        import random

        item_list = ['A', 'B', 'C']
        selected_item = random.choice(item_list)
        if selected_item == 'A':
            return 'task_a'
        elif selected_item in ['B', 'C'] :
            return ['task_b', 'task_c']
        
    def common_func(**kwargs):
        print(kwargs['selected'])

    task_a = PythonOperator(
        task_id='task_a',
        python_callable=common_func,
        op_kwargs={'selected':'A'}
    )

    task_b = PythonOperator(
        task_id='task_b',
        python_callable=common_func,
        op_kwargs={'selected':'B'}
    )

    task_c = PythonOperator(
        task_id='task_c',
        python_callable=common_func,
        op_kwargs={'selected':'C'}
    )

    select_random() >> [task_a, task_b, task_c]

'데이터 엔지니어링 > airflow' 카테고리의 다른 글

[airflow] TASK 분기 (3) - Base Branch Operator  (0) 2023.09.19
[airflow] TASK 분기 (1) - Branch Python Operator  (0) 2023.09.05
[airflow] 전역변수 Variable  (0) 2023.07.08
[airflow] XCom  (1) 2023.07.06
[airflow] Macro  (0) 2023.06.23