now is better than never
[airflow] Jinja template 본문
Jinja
- 문서(파일)에서 특정 양식으로 작성된 값을 런타임시 실제 값으로 치환해주는 처리 엔진
= 템플릿 엔진 - Jinja 템플릿은 파이썬 언어에서 사용하는 템플릿 엔진
- 파이썬 기반 웹 프레임워크인 Flask, Django 에서 많이 쓰임
→ HTML 템플릿 저장 후 백엔드 처리 결과에 따라 실제 값으로 변환해서 화면에 출력 - SQL 에서도 사용 가능
- 오퍼레이터 파라미터 입력 시 {{ }} 안에 변수
- Airflow 에서 기본적으로 제공하는 변수 : 수행날짜, DAG_ID 등등
참고 : https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html - 모든 오퍼레이터, 파라미터에 템플릿 적용 X
참고 : https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html
→ 오퍼레이터 문서에서 template_fields 변수만 가능
- Airflow 에서 기본적으로 제공하는 변수 : 수행날짜, DAG_ID 등등
from jinja2 import Template
template = Template('My name is {{name}}')
s = template.render(name='Hong Gildong')
print(s)
SELECT * FROM tables WHERE base_dt = {{}}
Bash Operator + Jinja Template
- bash_command (str)
- env (dict[str, str] | None)
import datetime
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dags_bash_with_template",
schedule="0 0 * * *",
start_date=pendulum.datetime(2023, 6, 1, tz="Asia/Seoul"),
catchup=False,
) as dag:
bash_t1 = BashOperator(
task_id='bash_t1',
bash_command='echo "data_interval_end: {{ data_interval_end }}"'
)
bash_t2 = BashOperator(
task_id='bash_t2',
env={'START_DATE':'{{ data_interval_start | ds }}',
'END_DATE':'{{ data_interval_end | ds }}'},
bash_command='echo $START_DATE && echo $END_DATE'
)
bash_t1 >> bash_t2
- 시간 : tz='UTC' 기준
+ 9시간 하면 내가 설정한 스케쥴임- 한국시간대로 출력하는 법 :
data_interval_start.in_timezone('Asia/Seoul')
- 한국시간대로 출력하는 법 :
Python Operator + Jinja Template
python_callable (Callable)- op_args (Mapping[str, Any] | None)
- op_kwargs (Collection[Any] | None)
- templates_dict (dict[str, Any] | None)
templates_exts (Sequence[str] | None)show_return_value_in_logs (bool)
- jinja template
- kwargs
import datetime
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task
with DAG(
dag_id="dags_python_template",
schedule="30 9 * * *",
start_date=pendulum.datetime(2023, 6, 10, tz="Asia/Seoul"),
catchup=False,
) as dag:
def python_function1(start_date, end_date, **kwargs):
print(start_date, end_date)
python_t1 = PythonOperator(
task_id='python_t1',
python_callable=python_function1,
op_kwargs={'start_date':{{ data_interval_start | ds }}, # jinja template
'end_date':{{ data_interval_end | ds }}}
)
@task(task_id='python_t2')
def python_function2(**kwargs):
print(**kwargs)
print('ds: '+kwargs['ds'])
print('ts: '+kwargs['ts'])
print('data_interval_start: '+str(kwargs['data_interval_start']))
print('data_interval_end: '+str(kwargs['data_interval_end']))
print('task_instance: '+str(kwargs['ti']))
python_t1 >> python_function2()
'데이터 엔지니어링 > airflow' 카테고리의 다른 글
[airflow] XCom (1) | 2023.07.06 |
---|---|
[airflow] Macro (0) | 2023.06.23 |
[airflow] op_args (0) | 2023.06.22 |
[airflow] Python Operator (2) | 2023.06.22 |
[airflow] Email Operator (1) | 2023.06.19 |