데이터 엔지니어링/airflow

[airflow] DAG 생성 (bash)

김초송 2023. 6. 16. 22:26

Bash Operator

  • 오퍼레이터
    • 특정 행위를 할 수 있는 기능을 모아 놓은 클래스, 설계도
  • TASK
    • 오퍼레이터에서 객체화(인스턴스화)되어 DAG에서 실행 가능한 오브젝트
  • Bash Operator
    • 쉘 스크립트 명령을 수행하는 오퍼레이터
  • 오퍼레이터로 객체를 생성 -> TASK

TASK 수행 주체

  • 스케줄러
    1. DAG Parsing : task 관계 파악 등
    2. DB에 정보 저장 : task/주기 등등 저장
    3. DAG 시작시간 결정
  • 워커
    1. 실제 작업 수행
    • 스케줄러와 워커 사이에 큐가 있을 수도 있음
    • 스케줄러가 시킨 DAG 파일을 찾아서 처리
    • 처리가 되기 전후 메타DB에 업데이트

from __future__ import annotations

import datetime

import pendulum

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id="dags_bash_operator",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2023, 6, 1, tz="Asia/Seoul"),
    catchup=False,
    # dagrun_timeout=datetime.timedelta(minutes=60),
    # tags=["example", "example2"],
    # params={"example_key": "example_value"},
) as dag:
    # run_this_last = EmptyOperator(
    #     task_id="run_this_last",
    # )

    # [START howto_operator_bash]
    bash_t1 = BashOperator(
        task_id="bash_t1",
        bash_command="echo whoami",
    )

    bash_t2 = BashOperator(
        task_id="bash_t2",
        bash_command="echo $HOSTNAME",
    )
    # [END howto_operator_bash]

    bash_t1 >> bash_t2
  • pendulum : datetime을 좀 더 쉽게 사용하게 하는 라이브러리
  • with DAG : DAG 정의 - 모든 DAG 마다 필요
  • schedule="0 0 * * *"
    • 분 시 일 월 요일
    • 0시 0분 마다 DAG 이 돎 = 매일
  • catchup=False : start_date 날짜와 실제 시작 날짜 사이 누락된 구간을 돌리지 않음
  • dagrun_timeout=datetime.timedelta(minutes=60) : DAG이 60분 이상 돌면 실패
  • parmas : TASK 에 공통적으로 넘겨줄 파라미터
  • echo : print 와 비슷
  • >> : TASK 수행 순서/관계

DAG - Airflow 연결

  1. docker-compose.yaml 편집

  • volumes : 로컬과 연결해주는 컨테이너 디렉토리 맵핑 ( = 마운트)
    • 노란색 : 로컬 볼륨
    • 파란색 : 도커 컨테이너 볼륨
  • AIRFLOW_PROJ_DIR:-.
    : AIRFLOW_PROJ_DIR 출력, 없으면 . 출력
  • /dags 주소를 git clone한 위치로 변경
  • plugins : 커스터마이징한 py, sh 파일을 마운트
  1. 도커 올리기
# docker compose down
docker compose up

 

  • 노란색 박스 : 결과
  • t2 의 결과
    : $HOSTNAME = worker container ID
    -> TASK 를 실제로 처리하는 건 worker 이기 때문에!

 

문서 : https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html