데이터 엔지니어링/airflow

[airflow] Python Operator

김초송 2023. 6. 22. 14:30

파이썬 오퍼레이터

  • 정의된 파이썬 함수를 실행시키는 오퍼레이터
from airflow.operators.python import PythonOperator
PythonOperator 어떤 파이썬 함수를 실행시키기 위한 오퍼레이터
BranchPythonOperator 파이썬 함수 실행 결과에 따라 task를 선택적으로 실행시킬 때 사용되는 오퍼레이터
TASK 분기- t1 >> [t2, t3] 일 때 t1 의 결과에 따라 t2 / t3 중 하나만 실행
ShortCircuitOperator 파이썬 함수 실행 결과에 따라 후행 Task를 실행하지 않고 종료시킬 수 있는 오퍼레이터
PythonVirtualenvOperator 파이썬 가상환경 생성 후 Job 수행하고 마무리되면 가상환경을 삭제해주는 오퍼레이터
ExternalPythonOperator 기존에 존재하는 파이썬 가상환경에서 Job 수행하게 하는 오퍼레이터
import datetime
import pendulum

from airflow import DAG
from airflow.operators.python import PythonOperator
import random

with DAG(
    dag_id="dags_python_operator",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 6, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:
    def select_fruit():
        fruits = ['APPLE', 'BANANA', 'ORANGE', 'AVOCADO']
        rand_int = random.randint(0, 3)
        print(fruits[rand_int])

    py_t1 = PythonOperator(
        task_id='py_t1',
        python_callable=select_fruit

    )

    py_t1

 

파이썬 외부 함수 실행

  • DAG에 함수를 import 하려면 sys.path 에 경로를 추가해야 함
    1. 명시적으로 추가
      ex) sys.path.append(‘/home/...’)
    2. OS 환경변수 PYTHONPATH 에 값을 추가
  • Airflow 는 위의 과정 필요없이 자동으로 dags 폴더와 plugins 폴더를 sys.path에 추가함
    • /opt/airflow/plugins 까지는 기본 path 인식
    1. 공통 함수 재사용 (재활용성 높음)
    2. 코드 간결

 

Decorator 

  • 원래의 함수를 감싸서(Wrapping) 바깥에 추가 기능을 덧붙이는 방법
  • ex) A 함수 실행 전 후로 로그를 찍어줄 때 A 함수 내용을 고치지 않고 B 함수에서 호출

TASK Decorator

  • 파이썬 함수 정의만으로 쉽게 TASK 생성
  • 파이썬 오퍼레이터를 직접 쓰는 것보다 태스크 데코레이터를 권장

from airflow import DAG
from airflow.decorators import task

import pendulum

with DAG(
    dag_id="dags_python_task_decorator",
    schedule="0 2 * * 1",
    start_date=pendulum.datetime(2023, 6, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    @task(task_id="python_task_1")
    def print_context(some_input):
        print(some_input)

    python_task_1 = print_context("task decorator 실행")