now is better than never

[airflow] XCom 본문

데이터 엔지니어링/airflow

[airflow] XCom

김초송 2023. 7. 6. 15:43

XCom (Cross Communication)

  • Airflow DAG Task 간 데이터 공유를 위해 사용되는 기술
    ex) Task1의 리턴 값을 Task2에서 사용하고 싶은 경우
  • 주로 작은 규모의 데이터 공유를 위해 사용
  • XCom 내용은 메타 DBxcom 테이블에 값이 저장됨
  • 1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션 사용 필요 (AWS S3-object storage, GCP GCS, HDFS )

 

Python Operator + XCom

  1. **kwargs 의 ti (task_instance) 객체 활용
    • template 변수에서 task_instance 객체를 얻을 수 있음
    • ti 로 XCom 데이터를 올리거나(push) 받아올 수 있음(pull)
    • key 만 쓸 경우 같은 이름 key 가 여러 개 있다면 제일 마지막 데이터를 갖고 옴
      task_id 로 특정 task 명시! 
  2. return 값 활용
    • airflow 는 return 값을 자동으로 Xcom 에 저장
    • task decorator 사용시 함수 입력/출력 관계만으로 task flow 정의됨 
      = '>>' 쓰지 않아도 task 실행 순서가 정해짐
      1. xcom_push_by_return
      2. xcom_pull_by_return
    • return한 Task가 여러개 있을 때는 task_ids 를 지정

# ti
@task(task_id='python_xcom_push_task1')
def xcom_push1(**kwargs):
    ti = kwargs['ti']
    ti.xcom_push(key='result1', value='value_1')
    ti.xcom_push(key='result2', value=[1, 2, 3])

@task(task_id='python_xcom_push_task2')
def xcom_push2(**kwargs):
    ti = kwargs['ti']
    ti.xcom_push(key='result1', value='value_2')
    ti.xcom_push(key='result2', value=[1, 2, 3, 4])

@task(task_id='python_xcom_pull_task')
def xcom_pull(**kwargs):
    ti = kwargs['ti']
    value1 = ti.xcom_pull(key='result1')
    value2 = ti.xcom_pull(key='result2', task_ids='python_xcom_push_task1')
    print(value1)
    print(value2)

xcom_push1() >> xcom_push2() >> xcom_pull()
@task(task_id='python_xcom_push_by_return')
def xcom_push_result(**kwargs):
    return 'Success'

# return 1
@task(task_id='python_xcom_pull_1')
def xcom_pull_1(**kwargs):
    ti = kwargs['ti']
    value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
    print(f'xcom_pull method: {value1}')

# return 2
@task(task_id='python_xcom_pull_2')
def xcom_pull_2(status, **kwargs):
    print(f'function input: {status}')

python_xcom_push_by_return = xcom_push_result() # airflow task 객체
xcom_pull_2(python_xcom_push_by_return)
python_xcom_push_by_return >> xcom_pull_1()

처음에 pull 할 때 task_id 로 써서 에러남..

Xcom 확인

 

Bash Operator + XCom

  • bash operator 에서 template 문법을 쓸 수 있는 파라미터 사용해야 함
  • env, bash_command 파라미터로 template 문법 쓸 수 있음
     push / pull
  • python operator 는 template 사용 X, kwargs 에서 ti 객체  활용
  • bash 는 마지막 출력 문장(echo)를 return_value 키 값에 저장됨
  • env: key-value 형태
  • task_ids 만 지정하면 key='return_value' 를 의미
bash_push = BashOperator(
    task_id='bash_push',
    bash_command="echo START && "
                    "echo XCOM_PUSHED "
                    "{{ti.xcom_push(key='bash_pushed', value='first_bash_message')}} && "
                    "echo COMPLETE"
)

bash_pull = BashOperator(
    task_id='bash_pull',
    env={'PUSHED_VALUE':'{{ti.xcom_pull(key="bash_pushed")}}',
            'RETURN_VALUE':'{{ti.xcom_pull(task_ids="bash_push")}}'},
    bash_command='echo $PUSHED_VALUE && echo $RETURN_VALUE ',
    do_xcom_push=False
)

bash_push >> bash_pull

 

Python Operator → Bash Operator

@task(task_id='python_push')
def python_push_xcom():
    result_dict = {'status':'GOOD',
                    'data':[1, 2, 3],
                    'option_cnt':100}
    return result_dict

bash_pull = BashOperator(
    task_id='bash_pull',
    env={'STATUS':'{{ti.xcom_pull(task_ids="python_push")["status"]}}',
            'DATA':'{{ti.xcom_pull(task_ids="python_push")["data"]}}',
            'OPTION_CNT':'{{ti.xcom_pull(task_ids="python_push")["option_cnt"]}}'
            },
    bash_command='echo $STATUS && echo $DATA && echo $OPTION_CNT'   
)

python_push_xcom() >> bash_pull

 

Bash Operator → Python Operator

bash_push = BashOperator(
    task_id='bash_push',
    bash_command='echo PUSH_START '
                    '{{ti.xcom_push(key="bash_pushed", value=200)}} && '
                    'echo PUSH_COMPLETE'
)

@task(task_id='python_pull')
def python_pull_xcom(**kwargs):
    ti = kwargs['ti']
    status_value = ti.xcom_pull(key='bash_pushed')
    return_value = ti.xcom_pull(task_ids='bash_push')
    print(f'status_value: {status_value}')
    print(f'return_value: {return_value}')

bash_push >> python_pull_xcom()

 

Python + XCom + Email

  • Email Operator 에서 template 쓸 수 있는 파라미터
    • to
    • subject
    • html_content
    • files
    • cc
    • bcc
    • mime_subtype
    • mime_charset
    • custom_headers
    @task(task_id='something_task')
    def logic(**kwargs):
        from random import choice
        return choice(['Success', 'Fail'])
    
    send_email = EmailOperator(
        task_id='send_email',
        to='',
        subject='{{data_interval_end.in_timezone("Asia/Seoul") | ds}} logic 처리 결과',
        html_content='{{data_interval_end.in_timezone("Asia/Seoul") | ds}} 처리 결과는 <br> \
            {{ti.xcom_pull(task_ids="something_task")}} 했습니다. <br>'
    )
    
    logic() >> send_email

'데이터 엔지니어링 > airflow' 카테고리의 다른 글

[airflow] TASK 분기 (1) - Branch Python Operator  (0) 2023.09.05
[airflow] 전역변수 Variable  (0) 2023.07.08
[airflow] Macro  (0) 2023.06.23
[airflow] Jinja template  (0) 2023.06.23
[airflow] op_args  (0) 2023.06.22