data-pipeline-design-pattern
TL DR⚑
- 소스와 싱크의 특성을 파악하고
- 추출, 행동, 구조 패턴을 고려하여 파이프라인을 설계하는 것이 중요함.
소스와 싱크⚑
- 소스: 데이터 파이프라인에 입력을 제공하는 시스템.
- 싱크: 데이터 파이프라인이 처리된 데이터를 저장하는 시스템.
Source Replayability⚑
- 아래 질문에 대답할 수 있으면 replayable.
What did the data look like n periods(min/hour/day/months/years) ago?
- 종류
- Replayable sources: 이벤트 스트림, 웹 서버 로그, 데이터베이스 WAL 덤프 등.
- Nonreplayable sources: 지속적으로 수정되는 애플리케이션 테이블, 현재 상태만 제공하는 API 등.
- Replayability 는 Backfill 에 중요.
- non-replayable 소스를 덤프떠서 replayable 소스로 만들 수 있음.
- 덤프는 특정 주기로 이뤄지기 때문에, 어느정도 제약은 생기지만, 큰 문제는 아님
Source Ordring⚑
- 소스 시스템에서 데이터를 특정 순서로 가져오는 것.
- 순서가 보장되지 않는 이벤트 스트림에서 중요.
- 특정 이벤트에 대한 순서를 맞추기 위해 지수 백오프, 워터마킹, 늦게 도착한 이벤트 처리 등의 기술 사용.
Sink Overwritability⚑
- 데이터의 특정 행을 업데이트할 수 있으면 오버라이트 가능성(Overwritability) 이 있음.
- Overwritability 는 파이프라인의 오류로 인한 중복 데이터 생성이나, 불완전 데이터 처리에 중요
- 덮어쓰기 가능한 싱크: 고유 키를 가진 데이터베이스 테이블, 고유 실행 ID로 네임스페이스화 ( e.g. yyyy-mm-dd 등 ) 된 클라우드 저장소 등.
- 덮어쓰기 불가능한 싱크: 로그 압축이 없는 Kafka 큐, "생성만" 가능한 싱크, 고유 키가 없는 데이터베이스 테이블 등.
데이터 파이프라인 패턴⚑
추출 패턴⚑
-
Time Range
- 특정 시간 프레임에 해당하는 데이터만 가져옴.
- 빠른 데이터 가져오기 가능, 파티션으로 병렬 처리 가능. ( e.g. backfill )
- 증분 로드가 어려울 수 있음. ( UPSERTs & MERGE INTOs )
-- value column 을 id 기준으로 업데이트 -- PostgreSQL INSERT INTO target_table (id, value) SELECT id, value FROM source_table ON CONFLICT (id) DO UPDATE SET value = EXCLUDED.value; -- MySQL INSERT INTO target_table (id, value) SELECT id, value FROM source_table ON DUPLICATE KEY UPDATE value = VALUES(value); -- Oracle (12c and later) MERGE INTO target_table t USING source_table s ON (t.id = s.id) WHEN MATCHED THEN UPDATE SET t.value = s.value WHEN NOT MATCHED THEN INSERT (id, value) VALUES (s.id, s.value);
-
Full Snapshot
- 소스에서 전체 데이터를 가져옴.
- 구축 및 유지 보수가 쉬움.
- 속도가 느리고 저장 비용이 높을 수 있음.
-
LookBack
- 지난 n 기간 동안의 집계 메트릭을 계산함.
- 유지 보수가 쉬움.
- 늦게 도착한 이벤트가 많으면 혼란을 초래할 수 있음.
-
스트리밍
- 각 레코드가 데이터 파이프라인을 통해 흐름.
- 저지연성, 실시간 처리 필요 시 유용.
- 구축이 복잡하고 재생 가능성 보장이 어려울 수 있음.
행동 패턴⚑
-
Idempotent
- 동일한 입력으로 여러 번 실행해도 출력이 변하지 않음.
- 유지 보수 및 재실행이 쉬움.
- 멱등성을 유지하기 어려운 경우가 있음.
-
Self healing
- 실패 시 다음 실행에서 자동으로 복구.
- 예를 들면, 실행 시점 이전에 실패한 작업을 모두 자동으로 실행시키는 것.
- 유지 보수가 쉬움.
- Self Healing 믿다가, 버그가 즉시 발견되지 않을 수 있음.
구조 패턴⚑
-
멀티 홉 파이프라인
- 여러 단계로 데이터 변환을 분리.
- 데이터 품질 체크가 용이함.
- 저장 및 처리 비용이 증가할 수 있음.
-
조건부/동적 파이프라인
- 복잡한 흐름을 처리하기 위해 조건에 따라 다른 작업 수행.
- 개발이 어렵고 디버깅이 복잡할 수 있음.
-
비연결 데이터 파이프라인
- 다른 데이터 파이프라인의 싱크에 의존.
- 파이프라인의 명시적인 의존성이 없다보니, 쉽게 만들 수 있음.
- 디버깅이 어렵고 SLA 정의 및 모니터링이 어려울 수 있음.
Code Snippet
# 1. Multi-hop Pipeline (using dbt)
# dbt_project.yml
name: 'my_project'
version: '1.0.0'
config-version: 2
profile: 'default'
models:
my_project:
staging:
materialized: table
intermediate:
materialized: table
marts:
materialized: table
# models/staging/stg_orders.sql
SELECT
id AS order_id,
user_id,
status,
created_at
FROM raw_orders
# models/intermediate/int_order_status.sql
SELECT
order_id,
status,
COUNT(*) AS status_count
FROM {{ ref('stg_orders') }}
GROUP BY order_id, status
# models/marts/fct_order_summary.sql
SELECT
o.order_id,
o.user_id,
s.status,
s.status_count
FROM {{ ref('stg_orders') }} o
JOIN {{ ref('int_order_status') }} s ON o.order_id = s.order_id
# 2. Conditional/Dynamic Pipeline (using Apache Airflow)
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from datetime import datetime
def check_condition():
# Some logic to determine which task to run
condition = True # This could be based on a database query, API call, etc.
if condition:
return 'task_a'
else:
return 'task_b'
def task_a():
print("Executing task A")
def task_b():
print("Executing task B")
with DAG('conditional_pipeline', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:
condition_check = BranchPythonOperator(
task_id='condition_check',
python_callable=check_condition,
)
task_a = PythonOperator(
task_id='task_a',
python_callable=task_a,
)
task_b = PythonOperator(
task_id='task_b',
python_callable=task_b,
)
condition_check >> [task_a, task_b]
# 3. Disconnected Pipelines (using Apache Airflow)
# Pipeline 1: process_orders
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def process_orders():
print("Processing orders")
with DAG('process_orders', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:
process_orders_task = PythonOperator(
task_id='process_orders',
python_callable=process_orders,
)
# Pipeline 2: generate_report
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def generate_report():
print("Generating report")
with DAG('generate_report', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:
generate_report_task = PythonOperator(
task_id='generate_report',
python_callable=generate_report,
)
# Note: These two pipelines are disconnected and run independently