now is better than never
[airflow] Macro 본문
매크로 변수
- Jinja template 에서 날짜 연산을 가능하게 하는 기능
- Template 변수 기반 다양한 날짜 연산이 가능하도록 연산 모듈을 제공
- ex)
-
sql = f''' SELECT NAME, ADDRESS FROM TBL_REG WHERE REG_DATE BETWEEN ?? AND ?? '''
- 배치일이 1월 31일이면 12월 31일부터 1월 30일 까지
배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN 이 설정 - from = {{ data_interval_start }}
to = {{ data_interval_end }} - 1일
-
- datetime, dateutil 라이브러리를 많이 씀
Bash Operator + Macro
- 매월 말일 수행되는 DAG
변수 START_DATE: 전월 말일
변수 END_DATE: 어제 - 매월 둘째주 토요일에 수행되는 DAG
변수 START_DATE: 2주 전 월요일
변수 END_DATE: 2주 전 토요일
- YYYY-MM-DD 형식
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_with_macro_eg1",
schedule="10 0 L * *", # 매월 말일 실행
start_date=pendulum.datetime(2023, 6, 1, tz="Asia/Seoul"),
catchup=False,
) as dag:
bash_task_1 = BashOperator(
task_id='bash_task_1',
env={'START_DATE':'{{ data_interval_start.in_timezone("Asia/Seoul") | ds }}', # 전월 말일
'END_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=1)) | ds}}'}, # 1일 전
bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
)
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_with_macro_eg2",
schedule="10 0 * * 6#2", # 격주 토요일마다
start_date=pendulum.datetime(2023, 6, 1, tz="Asia/Seoul"),
catchup=False,
) as dag:
bash_task_2 = BashOperator(
task_id='bash_task_2',
env={'START_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=19)) | ds }}', # 2주전 월요일
'END_DATE':'{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(weeks=2)) | ds}}'}, # 2주전 토요일
bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
)
* task1 : 5월 31일이 아니라 5월 30일로 나옴
Python Operator + Macro
- start_date: 전월 1일
- end_date: 전월 마지막일
- | ds: YYYY-MM-DD 형식
= strftime('%Y-%m-%d)
# marcro
@task(task_id='task_using_macro',
templates_dict={'start_date':
'{{(data_interval_end.in_timezone("Asia/Seoul") + macro.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds}}',
'end_date':
'{{(data_interval_den.in_timezone("Asia/Seoul").replace(day=1) + macro.dateutil.relativedelta.relativedelta(days=-1)) | ds}}'})
def get_datetime_macro(**kwargs):
templates_dict = kwargs.get('templates_dict') or {}
if templates_dict:
start_date = templates_dict.get('start_date') or 'start_date 없음'
end_date = templates_dict.get('end_date') or 'end_date 없음'
print(start_date)
print(end_date)
- 함수 안에서 import ?
→ 스케쥴러 부하 경감 - 스케쥴러는 as dag: 전까지 주기적으로 코드 오류 검사
→ 그 전까지 코드가 많으면 스케쥴러 부하 커짐
→ 오퍼레이터 안에서만 쓰는 라이브러리는 안에 선언!
# 직접 연산
@task(task_id='task_direct_calc')
def get_datetime_calc(**kwargs):
from dateutil.relativedelta import relativedelta
data_interval_end = kwargs['data_interval_end']
prev_month_day_first = data_interval_end.in_timezone('Asia/Seoul') + relativedelta(months=-1, day=1)
prev_month_day_last = data_interval_end.in_timezone('Asia/Seoul').replace(day=1) + relativedelta(days=-1)
print(prev_month_day_first.strftime('%Y-%m-%d'))
print(prev_month_day_last.strftime('%Y-%m-%d'))
get_datetime_macro() >> get_datetime_calc()
'데이터 엔지니어링 > airflow' 카테고리의 다른 글
[airflow] 전역변수 Variable (0) | 2023.07.08 |
---|---|
[airflow] XCom (1) | 2023.07.06 |
[airflow] Jinja template (0) | 2023.06.23 |
[airflow] op_args (0) | 2023.06.22 |
[airflow] Python Operator (2) | 2023.06.22 |