now is better than never
[airflow] TASK 분기 (3) - Base Branch Operator 본문
BaseBranchOperator
- BaseBranchOpertator 상속해서 class 만듦
- BaseBranchOpertator 상속할 때 choose_branch 함수를 구현해야 함
- parameter : context
python operator의 kwargs랑 비슷
- parameter : context
- Branch Python Operator와 task.branch decorator 를 더 많이 사용함
- 세 방법 다 결과값은 같음
- airflow.operators.branch
- A base class for creating operators with branching functionality, like to BranchPythonOperator.
- Users should create a subclass from this operator and implement the function choose_branch(self, context). This should run whatever business logic is needed to determine the branch, and return either the task_id for a single task (as a str) or a list of task_ids.
- The operator will continue with the returned task_id(s), and all other tasks directly downstream of this operator will be skipped.
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.operators.branch import BaseBranchOperator
with DAG(
dag_id="dags_base_branch_operator",
schedule=None,
start_date=pendulum.datetime(2023, 9, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
class CustomBranchOperator(BaseBranchOperator):
#overriding
def choose_branch(self, context):
import random
item_list = ['A', 'B', 'C']
selected_item = random.choice(item_list)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B', 'C']:
return ['task_b', 'task_c']
custom_branch_operator = CustomBranchOperator(task_id='python_branch_task')
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id = 'task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id = 'task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id = 'task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
custom_branch_operator >> ['task_a', 'task_b', 'task_c']
'데이터 엔지니어링 > airflow' 카테고리의 다른 글
[airflow] TASK 분기 (2) - Task.branch (0) | 2023.09.06 |
---|---|
[airflow] TASK 분기 (1) - Branch Python Operator (0) | 2023.09.05 |
[airflow] 전역변수 Variable (0) | 2023.07.08 |
[airflow] XCom (1) | 2023.07.06 |
[airflow] Macro (0) | 2023.06.23 |