now is better than never

[airflow] Macro 본문

데이터 엔지니어링/airflow

[airflow] Macro

김초송 2023. 6. 23. 19:29

매크로 변수

  • Jinja template 에서 날짜 연산을 가능하게 하는 기능
  • Template 변수 기반 다양한 날짜 연산이 가능하도록 연산 모듈을 제공
  • ex)
    • sql = f''' SELECT NAME, ADDRESS FROM TBL_REG WHERE REG_DATE BETWEEN ?? AND ?? '''
    • 배치일이 1월 31일이면 12월 31일부터 1월 30일 까지
      배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN 이 설정
    • from = {{ data_interval_start }}
      to = {{ data_interval_end }} - 1일
  • datetime, dateutil 라이브러리를 많이 씀

 

Bash Operator + Macro

  1. 매월 말일 수행되는 DAG
    변수
    START_DATE: 전월 말일
    변수 END_DATE: 어제
  2. 매월 둘째주 토요일에 수행되는 DAG
    변수 
    START_DATE: 2주 전 월요일
    변수 END_DATE: 2주 전 토요일
  • YYYY-MM-DD 형식
import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_macro_eg1",
    schedule="10 0 L * *", # 매월 말일 실행
    start_date=pendulum.datetime(2023, 6, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:
    bash_task_1 = BashOperator(
        task_id='bash_task_1',
        env={'START_DATE':'{{ data_interval_start.in_timezone("Asia/Seoul") | ds }}', # 전월 말일
             'END_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=1)) | ds}}'}, # 1일 전
        bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
    )
import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_macro_eg2",
    schedule="10 0 * * 6#2", # 격주 토요일마다
    start_date=pendulum.datetime(2023, 6, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:
    bash_task_2 = BashOperator(
        task_id='bash_task_2',
        env={'START_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=19)) | ds }}', # 2주전 월요일
             'END_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(weeks=2)) | ds}}'}, # 2주전 토요일
        bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
    )

* task1 : 5월 31일이 아니라 5월 30일로 나옴 

 

Python Operator + Macro

  • start_date: 전월 1일
  • end_date: 전월 마지막일
  • | ds: YYYY-MM-DD 형식
    = strftime('%Y-%m-%d)
# marcro
@task(task_id='task_using_macro',
        templates_dict={'start_date':
                        '{{(data_interval_end.in_timezone("Asia/Seoul") + macro.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds}}',
                        'end_date':
                        '{{(data_interval_den.in_timezone("Asia/Seoul").replace(day=1) + macro.dateutil.relativedelta.relativedelta(days=-1)) | ds}}'})
def get_datetime_macro(**kwargs):
    templates_dict = kwargs.get('templates_dict') or {}
    if templates_dict:
        start_date = templates_dict.get('start_date') or 'start_date 없음'
        end_date = templates_dict.get('end_date') or 'end_date 없음'
        print(start_date)
        print(end_date)
  • 함수 안에서 import ? 
    스케쥴러 부하 경감
  • 스케쥴러는 as dag: 전까지 주기적으로 코드 오류 검사
    → 그 전까지 코드가 많으면 스케쥴러 부하 커짐
    오퍼레이터 안에서만 쓰는 라이브러리는 안에 선언!
# 직접 연산
@task(task_id='task_direct_calc')
def get_datetime_calc(**kwargs):
    from dateutil.relativedelta import relativedelta

    data_interval_end = kwargs['data_interval_end']
    prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul') + relativedelta(months=-1, day=1)
    prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1) +  relativedelta(days=-1)
    print(prev_month_day_first.strftime('%Y-%m-%d'))
    print(prev_month_day_last.strftime('%Y-%m-%d'))

get_datetime_macro() >> get_datetime_calc()

 

 

'데이터 엔지니어링 > airflow' 카테고리의 다른 글

[airflow] 전역변수 Variable  (0) 2023.07.08
[airflow] XCom  (1) 2023.07.06
[airflow] Jinja template  (0) 2023.06.23
[airflow] op_args  (0) 2023.06.22
[airflow] Python Operator  (2) 2023.06.22