데이터 엔지니어링/airflow
[airflow] Python Operator
김초송
2023. 6. 22. 14:30
파이썬 오퍼레이터
- 정의된 파이썬 함수를 실행시키는 오퍼레이터
from airflow.operators.python import PythonOperator
PythonOperator | 어떤 파이썬 함수를 실행시키기 위한 오퍼레이터 |
BranchPythonOperator | 파이썬 함수 실행 결과에 따라 task를 선택적으로 실행시킬 때 사용되는 오퍼레이터 TASK 분기- t1 >> [t2, t3] 일 때 t1 의 결과에 따라 t2 / t3 중 하나만 실행 |
ShortCircuitOperator | 파이썬 함수 실행 결과에 따라 후행 Task를 실행하지 않고 종료시킬 수 있는 오퍼레이터 |
PythonVirtualenvOperator | 파이썬 가상환경 생성 후 Job 수행하고 마무리되면 가상환경을 삭제해주는 오퍼레이터 |
ExternalPythonOperator | 기존에 존재하는 파이썬 가상환경에서 Job 수행하게 하는 오퍼레이터 |
import datetime
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
import random
with DAG(
dag_id="dags_python_operator",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 6, 1, tz="Asia/Seoul"),
catchup=False,
) as dag:
def select_fruit():
fruits = ['APPLE', 'BANANA', 'ORANGE', 'AVOCADO']
rand_int = random.randint(0, 3)
print(fruits[rand_int])
py_t1 = PythonOperator(
task_id='py_t1',
python_callable=select_fruit
)
py_t1
파이썬 외부 함수 실행
- DAG에 함수를 import 하려면 sys.path 에 경로를 추가해야 함
- 명시적으로 추가
ex) sys.path.append(‘/home/...’) - OS 환경변수 PYTHONPATH 에 값을 추가
- 명시적으로 추가
- Airflow 는 위의 과정 필요없이 자동으로 dags 폴더와 plugins 폴더를 sys.path에 추가함
- /opt/airflow/plugins 까지는 기본 path 인식
- 공통 함수 재사용 (재활용성 높음)
- 코드 간결
Decorator
- 원래의 함수를 감싸서(Wrapping) 바깥에 추가 기능을 덧붙이는 방법
- ex) A 함수 실행 전 후로 로그를 찍어줄 때 A 함수 내용을 고치지 않고 B 함수에서 호출

TASK Decorator
- 파이썬 함수 정의만으로 쉽게 TASK 생성
- 파이썬 오퍼레이터를 직접 쓰는 것보다 태스크 데코레이터를 권장
from airflow import DAG
from airflow.decorators import task
import pendulum
with DAG(
dag_id="dags_python_task_decorator",
schedule="0 2 * * 1",
start_date=pendulum.datetime(2023, 6, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id="python_task_1")
def print_context(some_input):
print(some_input)
python_task_1 = print_context("task decorator 실행")