Skip to content

data-pipeline-design-pattern

TL DR

  • 소스와 싱크의 특성을 파악하고
  • 추출, 행동, 구조 패턴을 고려하여 파이프라인을 설계하는 것이 중요함.

소스와 싱크

  • 소스: 데이터 파이프라인에 입력을 제공하는 시스템.
  • 싱크: 데이터 파이프라인이 처리된 데이터를 저장하는 시스템.

Source Replayability

  • 아래 질문에 대답할 수 있으면 replayable.
    • What did the data look like n periods(min/hour/day/months/years) ago?
  • 종류
    • Replayable sources: 이벤트 스트림, 웹 서버 로그, 데이터베이스 WAL 덤프 등.
    • Nonreplayable sources: 지속적으로 수정되는 애플리케이션 테이블, 현재 상태만 제공하는 API 등.
  • Replayability 는 Backfill 에 중요.
  • non-replayable 소스를 덤프떠서 replayable 소스로 만들 수 있음.
  • 덤프는 특정 주기로 이뤄지기 때문에, 어느정도 제약은 생기지만, 큰 문제는 아님

Source Ordring

  • 소스 시스템에서 데이터를 특정 순서로 가져오는 것.
  • 순서가 보장되지 않는 이벤트 스트림에서 중요.
  • 특정 이벤트에 대한 순서를 맞추기 위해 지수 백오프, 워터마킹, 늦게 도착한 이벤트 처리 등의 기술 사용.

Sink Overwritability

  • 데이터의 특정 행을 업데이트할 수 있으면 오버라이트 가능성(Overwritability) 이 있음.
  • Overwritability 는 파이프라인의 오류로 인한 중복 데이터 생성이나, 불완전 데이터 처리에 중요
    • 덮어쓰기 가능한 싱크: 고유 키를 가진 데이터베이스 테이블, 고유 실행 ID로 네임스페이스화 ( e.g. yyyy-mm-dd 등 ) 된 클라우드 저장소 등.
    • 덮어쓰기 불가능한 싱크: 로그 압축이 없는 Kafka 큐, "생성만" 가능한 싱크, 고유 키가 없는 데이터베이스 테이블 등.

데이터 파이프라인 패턴

추출 패턴

  1. Time Range

    • 특정 시간 프레임에 해당하는 데이터만 가져옴.
    • 빠른 데이터 가져오기 가능, 파티션으로 병렬 처리 가능. ( e.g. backfill )
    • 증분 로드가 어려울 수 있음. ( UPSERTs & MERGE INTOs )
    -- value column 을 id 기준으로 업데이트
    -- PostgreSQL
    INSERT INTO target_table (id, value)
    SELECT id, value FROM source_table
    ON CONFLICT (id) DO UPDATE
    SET value = EXCLUDED.value;
    
    -- MySQL
    INSERT INTO target_table (id, value)
    SELECT id, value FROM source_table
    ON DUPLICATE KEY UPDATE
    value = VALUES(value);
    
    -- Oracle (12c and later)
    MERGE INTO target_table t
    USING source_table s
    ON (t.id = s.id)
    WHEN MATCHED THEN
    UPDATE SET t.value = s.value
    WHEN NOT MATCHED THEN
    INSERT (id, value) VALUES (s.id, s.value);
    
  2. Full Snapshot

    • 소스에서 전체 데이터를 가져옴.
    • 구축 및 유지 보수가 쉬움.
    • 속도가 느리고 저장 비용이 높을 수 있음.
  3. LookBack

    • 지난 n 기간 동안의 집계 메트릭을 계산함.
    • 유지 보수가 쉬움.
    • 늦게 도착한 이벤트가 많으면 혼란을 초래할 수 있음.
  4. 스트리밍

    • 각 레코드가 데이터 파이프라인을 통해 흐름.
    • 저지연성, 실시간 처리 필요 시 유용.
    • 구축이 복잡하고 재생 가능성 보장이 어려울 수 있음.

행동 패턴

  1. Idempotent

    • 동일한 입력으로 여러 번 실행해도 출력이 변하지 않음.
    • 유지 보수 및 재실행이 쉬움.
    • 멱등성을 유지하기 어려운 경우가 있음.
  2. Self healing

    • 실패 시 다음 실행에서 자동으로 복구.
    • 예를 들면, 실행 시점 이전에 실패한 작업을 모두 자동으로 실행시키는 것.
    • 유지 보수가 쉬움.
    • Self Healing 믿다가, 버그가 즉시 발견되지 않을 수 있음.

구조 패턴

  1. 멀티 홉 파이프라인

    • 여러 단계로 데이터 변환을 분리.
    • 데이터 품질 체크가 용이함.
    • 저장 및 처리 비용이 증가할 수 있음.
  2. 조건부/동적 파이프라인

    • 복잡한 흐름을 처리하기 위해 조건에 따라 다른 작업 수행.
    • 개발이 어렵고 디버깅이 복잡할 수 있음.
  3. 비연결 데이터 파이프라인

    • 다른 데이터 파이프라인의 싱크에 의존.
    • 파이프라인의 명시적인 의존성이 없다보니, 쉽게 만들 수 있음.
    • 디버깅이 어렵고 SLA 정의 및 모니터링이 어려울 수 있음.
Code Snippet
# 1. Multi-hop Pipeline (using dbt)

# dbt_project.yml
name: 'my_project'
version: '1.0.0'
config-version: 2
profile: 'default'

models:
my_project:
   staging:
      materialized: table
   intermediate:
      materialized: table
   marts:
      materialized: table

# models/staging/stg_orders.sql
SELECT
   id AS order_id,
   user_id,
   status,
   created_at
FROM raw_orders

# models/intermediate/int_order_status.sql
SELECT
   order_id,
   status,
   COUNT(*) AS status_count
FROM {{ ref('stg_orders') }}
GROUP BY order_id, status

# models/marts/fct_order_summary.sql
SELECT
   o.order_id,
   o.user_id,
   s.status,
   s.status_count
FROM {{ ref('stg_orders') }} o
JOIN {{ ref('int_order_status') }} s ON o.order_id = s.order_id

# 2. Conditional/Dynamic Pipeline (using Apache Airflow)

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from datetime import datetime

def check_condition():
   # Some logic to determine which task to run
   condition = True  # This could be based on a database query, API call, etc.
   if condition:
      return 'task_a'
   else:
      return 'task_b'

def task_a():
   print("Executing task A")

def task_b():
   print("Executing task B")

with DAG('conditional_pipeline', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:

   condition_check = BranchPythonOperator(
      task_id='condition_check',
      python_callable=check_condition,
   )

   task_a = PythonOperator(
      task_id='task_a',
      python_callable=task_a,
   )

   task_b = PythonOperator(
      task_id='task_b',
      python_callable=task_b,
   )

   condition_check >> [task_a, task_b]

# 3. Disconnected Pipelines (using Apache Airflow)

# Pipeline 1: process_orders
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_orders():
   print("Processing orders")

with DAG('process_orders', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:

   process_orders_task = PythonOperator(
      task_id='process_orders',
      python_callable=process_orders,
   )

# Pipeline 2: generate_report
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def generate_report():
   print("Generating report")

with DAG('generate_report', start_date=datetime(2023, 1, 1), schedule_interval='@daily') as dag:

   generate_report_task = PythonOperator(
      task_id='generate_report',
      python_callable=generate_report,
   )

# Note: These two pipelines are disconnected and run independently

Reference