now is better than never

[airflow] Jinja template 본문

데이터 엔지니어링/airflow

[airflow] Jinja template

김초송 2023. 6. 23. 17:08

Jinja

from jinja2 import Template

template = Template('My name is {{name}}')
s = template.render(name='Hong Gildong')
print(s)
SELECT * FROM tables WHERE base_dt = {{}}

Bash Operator + Jinja Template

  • bash_command (str)
  • env (dict[str, str] | None)
import datetime
import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_template",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2023, 6, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:
    bash_t1 = BashOperator(
        task_id='bash_t1',
        bash_command='echo "data_interval_end: {{ data_interval_end }}"'
    )

    bash_t2 = BashOperator(
        task_id='bash_t2',
        env={'START_DATE':'{{ data_interval_start | ds }}',
             'END_DATE':'{{ data_interval_end | ds }}'},
        bash_command='echo $START_DATE && echo $END_DATE'
    )

    bash_t1 >> bash_t2

  • 시간 : tz='UTC' 기준
    + 9시간 하면 내가 설정한 스케쥴임
    • 한국시간대로 출력하는 법 :
      data_interval_start.in_timezone('Asia/Seoul')

Python Operator + Jinja Template

  • python_callable (Callable)
  • op_args (Mapping[str, Any] | None)
  • op_kwargs (Collection[Any] | None)
  • templates_dict (dict[str, Any] | None)
  • templates_exts (Sequence[str] | None)
  • show_return_value_in_logs (bool)
  1. jinja template
  2. kwargs
import datetime
import pendulum

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task

with DAG(
    dag_id="dags_python_template",
    schedule="30 9 * * *",
    start_date=pendulum.datetime(2023, 6, 10, tz="Asia/Seoul"),
    catchup=False,
) as dag:
    def python_function1(start_date, end_date, **kwargs):
        print(start_date, end_date)
    python_t1 = PythonOperator(
        task_id='python_t1',
        python_callable=python_function1,
        op_kwargs={'start_date':{{ data_interval_start | ds }}, # jinja template
                   'end_date':{{ data_interval_end | ds }}}
    )

    @task(task_id='python_t2')
    def python_function2(**kwargs):
        print(**kwargs)
        print('ds: '+kwargs['ds'])
        print('ts: '+kwargs['ts'])
        print('data_interval_start: '+str(kwargs['data_interval_start']))
        print('data_interval_end: '+str(kwargs['data_interval_end']))
        print('task_instance: '+str(kwargs['ti']))

    python_t1 >> python_function2()

 

'데이터 엔지니어링 > airflow' 카테고리의 다른 글

[airflow] XCom  (1) 2023.07.06
[airflow] Macro  (0) 2023.06.23
[airflow] op_args  (0) 2023.06.22
[airflow] Python Operator  (2) 2023.06.22
[airflow] Email Operator  (1) 2023.06.19