목록데이터 엔지니어링 (16)
now is better than never

BaseBranchOperator BaseBranchOpertator 상속해서 class 만듦 BaseBranchOpertator 상속할 때 choose_branch 함수를 구현해야 함 parameter : context python operator의 kwargs랑 비슷 Branch Python Operator와 task.branch decorator 를 더 많이 사용함 세 방법 다 결과값은 같음 airflow.operators.branch A base class for creating operators with branching functionality, like to BranchPythonOperator. Users should create a subclass from this operator and..
Task.branch branch python operator 를 이용해서 객체를 얻었던 것을 → 함수를 실행시켜 객체를 얻음 task 정의할 때 함수를 실행시킴 decorator 밑에 rapping하고 싶은 함수를 작성 from airflow import DAG import pendulum from airflow.operators.python import PythonOperator from airflow.decorators import task with DAG( dag_id="dags_python_with_branch_decorator", schedule=None, start_date=pendulum.datetime(2023, 9, 1, tz="Asia/Seoul"), catchup=False ) as..

Task 분기 처리 선행 Task 의 수행 결과에 따라 선택적으로 다음 Task 를 수행할 경우 Branch Python Operator task.branch Decorator Base Branch Operator 상속하여 직접 개발 Branch Python Operator python_branch_task → task a, task b, task c 중 하나 실행 python callable : select_random 함수 실행 실행할 task 의 id 값으로 return 후행 task 가 1개라면 string 후행 task 가 여러 개라면 list from airflow import DAG import pendulum from airflow.operators.python import PythonOp..
Variable XCom: 특정 DAG / schedule 에서 수행되는 TASK 간에만 공유 (같은 DAG 이라도 어제와 오늘 TASK 간 데이터는 공유 X) Variable 모든 DAG 이 공유 airflow 사이트에서 등록 가능 Admin > variables Variable의 key, value 값은 메타 DB의 variable 테이블에 저장됨 전역변수 사용 Variable 라이브러리 + 파이썬 문법 *from airflow.models import Variable *→ 주기적인 DAG 파싱 시 Variable.get 개수만큼 DB(메타DB variable 테이블) 연결 → 불필요한 부하 발생 Jinja template + 오퍼레이터 내부 (권고) {{var.value.}} 협업 환경에서 표준화된..

XCom (Cross Communication) Airflow DAG 안 Task 간 데이터 공유를 위해 사용되는 기술ex) Task1의 리턴 값을 Task2에서 사용하고 싶은 경우 주로 작은 규모의 데이터 공유를 위해 사용 XCom 내용은 메타 DB의 xcom 테이블에 값이 저장됨 1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션 사용 필요 (AWS S3-object storage, GCP GCS, HDFS 등) Python Operator + XCom **kwargs 의 ti (task_instance) 객체 활용 template 변수에서 task_instance 객체를 얻을 수 있음 ti 로 XCom 데이터..

매크로 변수 Jinja template 에서 날짜 연산을 가능하게 하는 기능 Template 변수 기반 다양한 날짜 연산이 가능하도록 연산 모듈을 제공 ex) sql = f''' SELECT NAME, ADDRESS FROM TBL_REG WHERE REG_DATE BETWEEN ?? AND ?? ''' 배치일이 1월 31일이면 12월 31일부터 1월 30일 까지 배치일이 2월 28일이면 1월 31일부터 2월 27일까지 BETWEEN 이 설정 from = {{ data_interval_start }} to = {{ data_interval_end }} - 1일 datetime, dateutil 라이브러리를 많이 씀 Bash Operator + Macro 매월 말일 수행되는 DAG 변수 START_DATE..