now is better than never

[airflow] TASK 분기 (3) - Base Branch Operator 본문

데이터 엔지니어링/airflow

[airflow] TASK 분기 (3) - Base Branch Operator

김초송 2023. 9. 19. 17:30

BaseBranchOperator

  • BaseBranchOpertator 상속해서 class 만듦
  • BaseBranchOpertator 상속할 때 choose_branch 함수를 구현해야 함
    • parameter : context
      python operator의 kwargs랑 비슷
  • Branch Python Operator와 task.branch decorator 를 더 많이 사용함
  • 세 방법 다 결과값은 같음

첫번째줄 : context 출력

  • airflow.operators.branch
    • A base class for creating operators with branching functionality, like to BranchPythonOperator.
    • Users should create a subclass from this operator and implement the function choose_branch(self, context). This should run whatever business logic is needed to determine the branch, and return either the task_id for a single task (as a str) or a list of task_ids.
    • The operator will continue with the returned task_id(s), and all other tasks directly downstream of this operator will be skipped.

 

from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.operators.branch import BaseBranchOperator
with DAG(
    dag_id="dags_base_branch_operator",
    schedule=None,
    start_date=pendulum.datetime(2023, 9, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    class CustomBranchOperator(BaseBranchOperator):
        #overriding
        def choose_branch(self, context):
            import random

            item_list = ['A', 'B', 'C']
            selected_item = random.choice(item_list)
            if selected_item == 'A':
                return 'task_a'
            elif selected_item in ['B', 'C']:
                return ['task_b', 'task_c']
            
    custom_branch_operator = CustomBranchOperator(task_id='python_branch_task')

    def common_func(**kwargs):
        print(kwargs['selected'])

    task_a = PythonOperator(
        task_id = 'task_a',
        python_callable=common_func,
        op_kwargs={'selected':'A'}
    )
    task_b = PythonOperator(
        task_id = 'task_b',
        python_callable=common_func,
        op_kwargs={'selected':'B'}
    )
    task_c = PythonOperator(
        task_id = 'task_c',
        python_callable=common_func,
        op_kwargs={'selected':'C'}
    )
    
    custom_branch_operator >> ['task_a', 'task_b', 'task_c']

 

'데이터 엔지니어링 > airflow' 카테고리의 다른 글

[airflow] TASK 분기 (2) - Task.branch  (0) 2023.09.06
[airflow] TASK 분기 (1) - Branch Python Operator  (0) 2023.09.05
[airflow] 전역변수 Variable  (0) 2023.07.08
[airflow] XCom  (1) 2023.07.06
[airflow] Macro  (0) 2023.06.23