now is better than never
[airflow] XCom 본문
XCom (Cross Communication)
- Airflow DAG 안 Task 간 데이터 공유를 위해 사용되는 기술
ex) Task1의 리턴 값을 Task2에서 사용하고 싶은 경우 - 주로 작은 규모의 데이터 공유를 위해 사용
- XCom 내용은 메타 DB의 xcom 테이블에 값이 저장됨
- 1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션 사용 필요 (AWS S3-object storage, GCP GCS, HDFS 등)
Python Operator + XCom
- **kwargs 의 ti (task_instance) 객체 활용
- template 변수에서 task_instance 객체를 얻을 수 있음
- ti 로 XCom 데이터를 올리거나(push) 받아올 수 있음(pull)
- key 만 쓸 경우 같은 이름 key 가 여러 개 있다면 제일 마지막 데이터를 갖고 옴
→ task_id 로 특정 task 명시!
- return 값 활용
- airflow 는 return 값을 자동으로 Xcom 에 저장
- task decorator 사용시 함수 입력/출력 관계만으로 task flow 정의됨
= '>>' 쓰지 않아도 task 실행 순서가 정해짐- xcom_push_by_return
- xcom_pull_by_return
-
return한 Task가 여러개 있을 때는 task_ids 를 지정
# ti
@task(task_id='python_xcom_push_task1')
def xcom_push1(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key='result1', value='value_1')
ti.xcom_push(key='result2', value=[1, 2, 3])
@task(task_id='python_xcom_push_task2')
def xcom_push2(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key='result1', value='value_2')
ti.xcom_push(key='result2', value=[1, 2, 3, 4])
@task(task_id='python_xcom_pull_task')
def xcom_pull(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(key='result1')
value2 = ti.xcom_pull(key='result2', task_ids='python_xcom_push_task1')
print(value1)
print(value2)
xcom_push1() >> xcom_push2() >> xcom_pull()
@task(task_id='python_xcom_push_by_return')
def xcom_push_result(**kwargs):
return 'Success'
# return 1
@task(task_id='python_xcom_pull_1')
def xcom_pull_1(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
print(f'xcom_pull method: {value1}')
# return 2
@task(task_id='python_xcom_pull_2')
def xcom_pull_2(status, **kwargs):
print(f'function input: {status}')
python_xcom_push_by_return = xcom_push_result() # airflow task 객체
xcom_pull_2(python_xcom_push_by_return)
python_xcom_push_by_return >> xcom_pull_1()

처음에 pull 할 때 task_id 로 써서 에러남..

Bash Operator + XCom
- bash operator 에서 template 문법을 쓸 수 있는 파라미터 사용해야 함
- env, bash_command 파라미터로 template 문법 쓸 수 있음
→ push / pull - python operator 는 template 사용 X, kwargs 에서 ti 객체 활용
- bash 는 마지막 출력 문장(echo)를 return_value 키 값에 저장됨
- env: key-value 형태
- task_ids 만 지정하면 key='return_value' 를 의미
bash_push = BashOperator(
task_id='bash_push',
bash_command="echo START && "
"echo XCOM_PUSHED "
"{{ti.xcom_push(key='bash_pushed', value='first_bash_message')}} && "
"echo COMPLETE"
)
bash_pull = BashOperator(
task_id='bash_pull',
env={'PUSHED_VALUE':'{{ti.xcom_pull(key="bash_pushed")}}',
'RETURN_VALUE':'{{ti.xcom_pull(task_ids="bash_push")}}'},
bash_command='echo $PUSHED_VALUE && echo $RETURN_VALUE ',
do_xcom_push=False
)
bash_push >> bash_pull
Python Operator → Bash Operator
@task(task_id='python_push')
def python_push_xcom():
result_dict = {'status':'GOOD',
'data':[1, 2, 3],
'option_cnt':100}
return result_dict
bash_pull = BashOperator(
task_id='bash_pull',
env={'STATUS':'{{ti.xcom_pull(task_ids="python_push")["status"]}}',
'DATA':'{{ti.xcom_pull(task_ids="python_push")["data"]}}',
'OPTION_CNT':'{{ti.xcom_pull(task_ids="python_push")["option_cnt"]}}'
},
bash_command='echo $STATUS && echo $DATA && echo $OPTION_CNT'
)
python_push_xcom() >> bash_pull
Bash Operator → Python Operator
bash_push = BashOperator(
task_id='bash_push',
bash_command='echo PUSH_START '
'{{ti.xcom_push(key="bash_pushed", value=200)}} && '
'echo PUSH_COMPLETE'
)
@task(task_id='python_pull')
def python_pull_xcom(**kwargs):
ti = kwargs['ti']
status_value = ti.xcom_pull(key='bash_pushed')
return_value = ti.xcom_pull(task_ids='bash_push')
print(f'status_value: {status_value}')
print(f'return_value: {return_value}')
bash_push >> python_pull_xcom()
Python + XCom + Email
- Email Operator 에서 template 쓸 수 있는 파라미터
- to
- subject
- html_content
- files
ccbccmime_subtypemime_charsetcustom_headers
@task(task_id='something_task')
def logic(**kwargs):
from random import choice
return choice(['Success', 'Fail'])
send_email = EmailOperator(
task_id='send_email',
to='',
subject='{{data_interval_end.in_timezone("Asia/Seoul") | ds}} logic 처리 결과',
html_content='{{data_interval_end.in_timezone("Asia/Seoul") | ds}} 처리 결과는 <br> \
{{ti.xcom_pull(task_ids="something_task")}} 했습니다. <br>'
)
logic() >> send_email
'데이터 엔지니어링 > airflow' 카테고리의 다른 글
[airflow] TASK 분기 (1) - Branch Python Operator (0) | 2023.09.05 |
---|---|
[airflow] 전역변수 Variable (0) | 2023.07.08 |
[airflow] Macro (0) | 2023.06.23 |
[airflow] Jinja template (0) | 2023.06.23 |
[airflow] op_args (0) | 2023.06.22 |