Task.branch
- branch python operator 를 이용해서 객체를 얻었던 것을
- → 함수를 실행시켜 객체를 얻음
task 정의할 때 함수를 실행시킴
- decorator 밑에 rapping하고 싶은 함수를 작성
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.decorators import task
with DAG(
dag_id="dags_python_with_branch_decorator",
schedule=None,
start_date=pendulum.datetime(2023, 9, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task.branch(task_id='python_branch_task')
def select_random():
import random
item_list = ['A', 'B', 'C']
selected_item = random.choice(item_list)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B', 'C'] :
return ['task_b', 'task_c']
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
select_random() >> [task_a, task_b, task_c]