AirFlow에 대해서
최근 한 기업의 데이터 엔지니어링 과제 전형에 참여했습니다. 요구사항은 ‘특정 API에서 데이터를 수집하여 데이터베이스에 저장하는 파이프라인을 구축’하는 것이었습니다. 과제를 마치고 제출했지만, ‘더 Robust하게, 더 안정성 있게, 더 확장성 있는 파이프라인’을 만들지 못한 아쉬움이 남습니다. 이 포스팅에서는 과제를 진행하며 공부한 Airflow를 정리하고, 미진했던 부분을 공부하고자 합니다.
참고로, 과제 내용 유출이 금지되어 있어 과제에 대한 내용은 다루지 않습니다.
Airflow의 필요성
단순히 “매일 정해진 시간에 스크립트를 실행”하는것은 Python 스크립트나 crontab을 통해 가능합니다. 하지만 파이프라인의 요구사항이 복잡해질수록 한계가 있습니다.
이때 에어플로우는 복잡한 워크플로우를 파이썬 코드로 정의할 수 있게 해 줍니다. 스케줄링과 실패처리, 과거 데이터 처리, 모니터링까지 다 통합해서 제공하는 플랫폼이라고 보면 됩니다.
Airflow의 기본 구성 요소
Airflow를 이해하기 위해선 DAG, Operator, Task의 관계를 알아야 합니다.
Airflow 개념 | 설명 |
---|---|
DAG | Airflow는 워크플로우 전체의 ‘설계도’라고 생각하면 쉽습니다. 각 작업(Tassk)들이 어떤 순서로 실행되어야 하는지(방향성), 또 작업 흐름이 무한 루프 돌지 않는(비순환성) 특성을 가진 설계도입니다. |
Operator | 어떤 종류의 일을 할 건지 정의하는 일종의 템플릿이라고 생각하면 됩니다. 예를 들어, BashOperator는 Bash 명령어를 실행하는 템플릿, PythonOperator는 Python 함수를 실행하는 템플릿입니다. 많은 종류의 Operator가 제공되고, 필요하다면 직접 custom Operator를 만들 수도 있습니다. |
Task | Operator를 호출하여 만들어진 실제 작업 단위이자 Operator의 ‘인스턴스’입니다. 즉, DAG 안에서 Operator를 호출하여 생성된 개별 작업 단위를 Task라고 부릅니다. |
DockerOperator에 대해서
저 같은 경우 DockerOperator
를 사용했습니다.
PythonOperator
를 사용하면 DAG 파일 내에서 직접 파이썬 함수를 실행할 수 있어 간편합니다. 하지만 의존성 관리 측면에 있어서는 DockerOperator
를 사용하는게 더 좋습니다.
왜냐하면 DockerOperator
는 각 Task를 완전히 분리된 도커 컨테이너 안에서 실행시키기 때문에 Task마다 필요한 라이브러리 버전이나 파이썬 버전이 달라도 서로 전혀 영향을 안 줘서 충돌 걱정이 없고, Task 로직 자체를 에어플로우 환경과 분리하기 때문에 따로 개발하고 테스트하기도 훨씬 편해집니다. 나중에 어떤 무거운 라이브러리가 필요한 Task가 새로 생겨도 시스템 전체에 부담을 덜 주면서 확장하기에도 유리합니다.
Airflow 아키텍처
처음 Airflow라는 이름을 들었을 때는 단순히 하나의 애플리케이션일 것이라고 생각했지만, 들여다보니 여러 컴포넌트가 연결된 아키텍처더라고요. 이런 아키텍처를 보는것은 늘 재밌는것 같습니다.
API Server:
Airflow UI를 제공하고, 동시에 Airflow의 다양한 기능을 외부 시스템에서 프로그래밍 방식으로 제어할 수 있도록 REST API를 제공하는 서버입니다. DAG 제출, 태스크 상태 확인, 연결 관리 등 Airflow의 여러 기능에 대한 접근을 담당합니다. React와 FastAPI 기반으로 만들어졌네요.
DAG Processor:
설정된 DAG 디렉토리에서 DAG 파일을 스캔하고 파싱하여 메타데이터 데이터베이스에 로드하는 역할을 합니다.
Scheduler:
모든 태스크와 DAG를 모니터링하고 종속성이 충족되는 즉시 태스크 인스턴스를 실행하도록 예약하며, 구성된 Executor를 사용하여 워커에서 태스크를 실행합니다.
Worker:
스케줄러로부터 태스크를 받아 실제로 실행하는 프로세스입니다.
Triggerer:
비동기 태스크의 실행을 담당하여, 태스크가 특정 이벤트나 조건이 충족될 때까지 대기하도록 처리하고 해당 조건이 충족되면 태스크 실행을 재개합니다.
Metadata Database (Postgres):
Airflow의 모든 영구적인 상태 정보를 저장하는 데이터베이스입니다. DAG 정의, 태스크 상태, 연결 정보, 변수 등이 포함됩니다.
Executor (CeleryExecutor):
Scheduler 프로세스 안에 존재하는 요소입니다. 단순히 Task가 동작하는 환경과 메커니즘을 Executor라고 생각하면 됩니다. 여러 Executor가 있는데 docker compose yaml파일을 통해서 airflow를 실행한다면 CeleryExecutor로 되어 있을 것입니다.(가장 많이 쓰입니다.) CeleryExecutor는 파이썬의 Celery라는 라이브러리를 사용하는데 비동기 작업 큐를 사용하여 Task를 비동기 방식으로 처리합니다. 또한 아키텍처 수평구조 확장이 가능합니다. (pip install로 airflow를 설치했다면 Sequential Executor로 되어 있을 것입니다.)
Message Broker (Redis):
스케줄러와 워커 간에 태스크 메시지를 전달하는 데 사용되는 메시징 시스템입니다. (Redis
는 이러한 메시징을 위한 빠른 인메모리 데이터 저장소로 사용될 수 있습니다.)
스케줄링이란?
아래 코드와 같이 DAG 파일에서 schedule이라는 파라미터가 있습니다.
with DAG(
...
schedule=None
...
)
이 schedule 파라미터는 DAG을 얼마나 자주 실행할지 정의합니다. @daily, @hourly, 혹은 cron 표현식(‘0 9 * * *‘)으로 설정할 수 있습니다.
다만 이 스케줄링 동작 방식이 좀 특이한게 주기의 끝에서 실행됩니다.
예를 들어서 매일 실행되도록 @daily 스케줄로 설정된 DAG가 있다고 친다면 예를들어 6월 25일에 해당하는 데이터를 처리하는 태스크는 6월 25일 하루가 완전히 끝난 시점인 정확히 6월 26일 00시 00분에 시작됩니다. 즉, 하루가 지나고 나서 시작됩니다.
왜 이렇게 하냐면 해당 날짜의 데이터가 모두 생성되고 이제 더 이상 변경되지 않을 거라고 확정된 시점, 즉 그 간격이 끝난 후에 처리를 시작해야 데이터 누락 없이 안전하게 처리할 수 있다는 논리가 깔려있기 때문입니다. 즉, 데이터가 다 모일 때까지 기다렸다가 처리하는거라 생각하면 됩니다.
Catchup이란?
이번에도 아래 코드를 보면 DAG 파일에서 catchup 파라미터도 있습니다.
with DAG(
...
catchup=False, # True(기본값)
...
)
catchup은 스케줄러가 자동으로 해 주는 기능입니다. DAG 설정에 캐치업은 true
(기본값)으로 설정하면 DAG가 잠시 꺼져 있다가 다시 켜졌을 때 그 사이에 실행됐어야 할 과거 스케줄들을 스케줄러가 알아서 감지해서 자동으로 순서대로 쭉 실행시켜 줍니다.
예를 들어 catchup이 True일때 DAG의 start_date가 ‘2025년 6월 1일’인데, 제가 6월 5일에 이 DAG를 처음으로 활성화하면
Airflow 스케줄러가 “6월 1일부터 4일까지 4일 치 작업이 밀렸다”라고 판단하고, 1일, 2일, 3일, 4일 치 DAG Run을 순차적으로 생성하고 실행합니다.
하지만 catchup이 False일때는 스케줄러가 밀린 작업을 무시하고 앞으로 다가올 가장 가까운 스케줄부터 작업을 시작합니다.
Backfill이란?
Catchup이 스케줄러에 의해 자동으로 밀린 작업을 처리하는 기능이라면, Backfill은 수동으로 특정 과거 구간의 작업을 재실행시키는 기능입니다
예를 들어서 코드에 버그가 있어서 수정하고 특정 날짜들의 데이터만 다시 처리하고 싶을 때 혹은 새로운 파이프라인을 만들고 과거 데이터를 적재해야 하는 상황일때 사용자가 직접 CLI를 통해서 실행할 수 있습니다.
airflow dags backfill [DAG_ID] \
--start-date [START_DATE] \
--end-date [END_DATE]
만약 2025-06-20 부터 2025-06-25까지 실행을 원한다면 아래와 같이 작성하면 됩니다.
airflow dags backfill <dag_id> --start-date 2024-06-20 --end-date 2024-06-25
그리고 Airflow 3.0부터는 UI 화면에서도 Backfill을 실행시킬 수 있습니다.
멱등성에 관하여
개발을 하다보면 데이터가 서로 모순 없이 일치하는 상태인 정합성은 들어봤지만 멱등성은 첨 들어보네요.
멱등성은 정합성과 비슷한 느낌입니다.
멱등성은 “똑같은 작업을 여러 번 실행해도 그 결과가 항상 똑같이 유지되는 성질”을 의미합니다.
만약 이 멱등성이 보장되지 않으면 Task가 실패해서 자동으로 재시도될 때나, 방금 말한 Backfill처럼 사용자가 일부러 다시 실행시킬 때 데이터가 중복으로 쌓이거나 결과가 이상해질 수 있습니다.
제가 과제를 하면서 겪었던 상황을 예로 들어보겠습니다.
데이터를 수집,정제를 하는 로직을 다 작성했고, 마지막으로 DB에 저장하는 로직을 작성할 시점이었습니다.
이때 단순히 데이터를 DB에 INSERT하도록만 했는데 재실행할 때마다 데이터가 계속 추가되는 문제가 있었습니다.
저는 이 문제를 해결하기 위해 DB 저장 로직에 INSERT 대신 UPSERT를 적용했습니다.
upsert_sql = sql.SQL("""
INSERT INTO 테이블명 (...) VALUES %s
ON CONFLICT (id) DO UPDATE SET ...;
""")
이를 통해서 여러 번 실행해도 최종 결과는 항상 같게 유지되었고 멱등성을 확보할 수 있었습니다.
만약 DB가 아니라 S3에 데이터를 저장해야 한다면 동일한 데이터에 동일한 유니크 ID를 갖도록 하거나, 타임스탬프를 비교하여 재시도 시 기존 파일을 덮어쓰거나 무시하도록 로직을 짜도 될 것입니다.
원자성에 관하여
데이터 파이프라인 설계할 때 멱등성에 이어서 원자성도 보장해야 합니다.
원자성은 쉽게 말해 성공하려면 완벽하게 성공해야 하고, 실패하려면 완벽하게 실패해야 하는것을 말합니다.
즉, 하나의 트랜잭션에서 모든 연산이 전부 성공해야만 최종적으로 시스템에 반영되고, 단 하나라도 실패하면 모든 변경 사항을 이전 상태로 되돌리는 성질을 말합니다.
제가 과제를 하면서 겪었던 상황을 예를 또다시 들어보겠습니다.
저는 과제에서 데이터 저장할 때 여러 배치로 나누어 저장하도록 로직을 구현하였습니다. 만약 첫 번째 배치는 DB에 저장 성공했는데, 두 번째 배치 저장 중 네트워크 오류나 데이터 형식 오류로 실패한다면, 즉 원자성이 보장되지 않는다면 데이터베이스는 절반만 저장된 상태로 남게 됩니다.
이 문제를 해결하기 위해, 저는 데이터베이스 트랜잭션 관리를 통해 원자성을 보장했습니다.
def 저장(conn, data):
...
# 원자성(Atomicity) 보장을 위한 트랜잭션 시작
try:
with conn.cursor() as cur:
upsert_sql = sql.SQL("""
INSERT INTO 테이블명 (...) VALUES %s
ON CONFLICT (id) DO UPDATE SET ...;
""")
execute_values(cur, upsert_sql, data)
# 모든 작업이 성공했을 때만 최종 반영
conn.commit()
except psycopg2.Error as e:
# 하나라도 실패하면 모든 변경 사항을 되돌림
logging.error(f"Error while saving data: {e}")
conn.rollback()
이미지 출처:
· Astronomer
Leave a comment