데이터 엔지니어링/airflow
[airflow] DAG 생성 (bash)
김초송
2023. 6. 16. 22:26
Bash Operator
- 오퍼레이터
- 특정 행위를 할 수 있는 기능을 모아 놓은 클래스, 설계도
- TASK
- 오퍼레이터에서 객체화(인스턴스화)되어 DAG에서 실행 가능한 오브젝트
- Bash Operator
- 쉘 스크립트 명령을 수행하는 오퍼레이터
- 오퍼레이터로 객체를 생성 -> TASK
TASK 수행 주체
- 스케줄러
- DAG Parsing : task 관계 파악 등
- DB에 정보 저장 : task/주기 등등 저장
- DAG 시작시간 결정
- 워커
- 실제 작업 수행
- 스케줄러와 워커 사이에 큐가 있을 수도 있음
- 스케줄러가 시킨 DAG 파일을 찾아서 처리
- 처리가 되기 전후 메타DB에 업데이트
from __future__ import annotations
import datetime
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="dags_bash_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2023, 6, 1, tz="Asia/Seoul"),
catchup=False,
# dagrun_timeout=datetime.timedelta(minutes=60),
# tags=["example", "example2"],
# params={"example_key": "example_value"},
) as dag:
# run_this_last = EmptyOperator(
# task_id="run_this_last",
# )
# [START howto_operator_bash]
bash_t1 = BashOperator(
task_id="bash_t1",
bash_command="echo whoami",
)
bash_t2 = BashOperator(
task_id="bash_t2",
bash_command="echo $HOSTNAME",
)
# [END howto_operator_bash]
bash_t1 >> bash_t2
- pendulum : datetime을 좀 더 쉽게 사용하게 하는 라이브러리
- with DAG : DAG 정의 - 모든 DAG 마다 필요
- schedule="0 0 * * *"
- 분 시 일 월 요일
- 0시 0분 마다 DAG 이 돎 = 매일
- catchup=False : start_date 날짜와 실제 시작 날짜 사이 누락된 구간을 돌리지 않음
- dagrun_timeout=datetime.timedelta(minutes=60) : DAG이 60분 이상 돌면 실패
- parmas : TASK 에 공통적으로 넘겨줄 파라미터
- echo : print 와 비슷
- >> : TASK 수행 순서/관계
DAG - Airflow 연결
- docker-compose.yaml 편집
- volumes : 로컬과 연결해주는 컨테이너 디렉토리 맵핑 ( = 마운트)
- 노란색 : 로컬 볼륨
- 파란색 : 도커 컨테이너 볼륨
- AIRFLOW_PROJ_DIR:-.
: AIRFLOW_PROJ_DIR 출력, 없으면 . 출력 - /dags 주소를 git clone한 위치로 변경
- plugins : 커스터마이징한 py, sh 파일을 마운트
- 도커 올리기
# docker compose down
docker compose up
- 노란색 박스 : 결과
- t2 의 결과
: $HOSTNAME = worker container ID
-> TASK 를 실제로 처리하는 건 worker 이기 때문에!
문서 : https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html