Skip to content

data-pipeline-idempotent

TL;DR

  • Idempotence is the property of certain operations in mathematics and computer science whereby they can be applied multiple times without changing the result beyond the initial application
  • Idempotence는 반복 실행 시 결과가 변하지 않는 특성을 말함. 데이터 통합에서 이는 중복 기록을 방지하고, 데이터 파이프라인을 오류에서 안전하게 함.
  • Idempotent한 데이터 파이프라인을 만드는 방법은 delete-write 패턴을 사용하는 것임.
  • Idempotent 가 보장된 파이프라인은 백필링, 실패한 실행 또는 기타 오류로 인해 데이터 파이프라인을 다시 실행해야할 때, 많은 문제가 사라짐.

Idempotence 의미

  • Idempotence는 수학적 함수에서 f(f(x)) = f(x) 같은 특성을 가짐.
  • 데이터 통합에서 Idempotence는 동일한 데이터를 여러 번 처리해도 결과가 변하지 않음을 의미함.

Example

다음과 같은 테이블에 기록된 데이터 웨어하우스가 있다고 가정해보자:

id 이름 역할
1 RM 리더
2 보컬

데이터 소스가 다음과 같은 기록을 추가하고 이를 데이터 웨어하우스와 동기화하려고 할 때:

id 이름 역할
3 슈가 래퍼

원하는 출력 결과는 다음과 같다:

id 이름 역할
1 RM 리더
2 보컬
3 슈가 래퍼

구체적으로, 우리는 id 3의 기록을 추가하고자 한다.

  • 기존 기록: id 1, 2
  • 추가 기록: id 3
  • 실패한 동기화 후 다시 시도할 때 idempotent하면 중복 기록이 발생하지 않음.

Idempotent하지 않은 경우:

id 이름 역할
1 RM 리더
2 보컬
3 슈가 래퍼
3 슈가 래퍼

Idempotence가 없는 경우 발생 문제

  • 데이터 파이프라인이 동일한 작업을 반복할 때, 중복된 데이터가 기록됨.
  • 중복 기록 발생으로 인해 잘못된 통계 및 분석 결과 도출.
  • 데이터 처리 및 저장 공간 낭비.
  • 비즈니스 의사 결정에 영향을 미칠 수 있음.

데이터 파이프라인을 Idempotent하게 만드는 방법

  • 데이터 파이프라인을 Idempotent하게 만드는 일반적인 방법은 delete-write 패턴을 사용하는 것임.
  • 이름에서 알 수 있듯이, 파이프라인은 새 데이터를 쓰기 전에 기존 데이터를 삭제함.
  • 이 방법은 데이터 파이프라인이 다시 생성할 데이터를 신중하게 삭제하는 것임

    import os
    import shutil
    import pandas as pd
    
    def process_data(input_file: str, output_dir: str, execution_date: str) -> None:
        output_path = os.path.join(output_dir, execution_date)
        if os.path.exists(output_path):
            shutil.rmtree(output_path) # removes entire folder
    
        df = pd.read_csv(input_file)
        # Add your transformations here
        df.to_parquet(os.path.join(output_path, "data.parquet"), partition_cols=["Date"])
    
  • 여러 작업(다른 날짜)을 동시에 실행할 때 테이블 이름 충돌을 방지하기 위해 run-specific 임시 테이블 TEMP_YYYY_MM_DD를 사용하는 것에 유의.

    CREATE TEMP TABLE TEMP_YYYY_MM_DD AS
    SELECT c1, c2, SOME_TRANSFORMATION_FUNCTION(c3) as c3
    FROM stage_table
    WHERE day = 'yyyy-mm-dd';
    
    -- note the delete-write pattern
    DELETE FROM final_table WHERE day = 'yyyy-mm-dd';
    
    INSERT INTO final_table(c1, c2, c3)
    SELECT c1, c2, c3
    FROM TEMP_YYYY_MM_DD;
    
    DROP TEMP TABLE TEMP_YYYY_MM_DD;
    

    or

    WITH TEMP_YYYY_MM_DD AS (
        SELECT c1, c2, SOME_TRANSFORMATION_FUNCTION(c3) as c3
        FROM stage_table
        WHERE day = 'yyyy-mm-dd'
    )
    -- note the delete-write pattern
    DELETE FROM final_table WHERE day = 'yyyy-mm-dd';
    
    INSERT INTO final_table(c1, c2, c3)
    SELECT c1, c2, c3
    FROM TEMP_YYYY_MM_DD;
    
  • 대부분의 라이브러리와 프레임워크는 덮어쓰기 옵션을 제공함(e.g. Spark overwrite, Snowflake overwrite) 이는 삭제 후 쓰기보다 안전함.

    • 데이터 무결성 문제가 발생하거나
    • 데이터 손실이 발생할 수 있음