Data Pipeline

A classic Extract-Transform-Load pipeline. No LLMs, just pure data processing. Shows hypergraph works for traditional workflows too.

When to Use

  • Data ingestion and processing

  • Feature engineering

  • Report generation

  • Any batch data workflow

The Pipeline

extract → validate → transform → enrich → load

Complete Implementation

from hypergraph import Graph, node, SyncRunner
import json
from datetime import datetime

# ═══════════════════════════════════════════════════════════════
# EXTRACT
# ═══════════════════════════════════════════════════════════════

@node(output_name="raw_data")
def extract(source_path: str) -> list[dict]:
    """
    Extract raw data from source.
    Supports JSON, CSV, or API endpoints.
    """
    if source_path.endswith(".json"):
        with open(source_path) as f:
            return json.load(f)

    elif source_path.endswith(".csv"):
        import csv
        with open(source_path) as f:
            reader = csv.DictReader(f)
            return list(reader)

    elif source_path.startswith("http"):
        import httpx
        response = httpx.get(source_path)
        return response.json()

    raise ValueError(f"Unknown source format: {source_path}")


# ═══════════════════════════════════════════════════════════════
# VALIDATE
# ═══════════════════════════════════════════════════════════════

@node(output_name=("valid_records", "invalid_records"))
def validate(raw_data: list[dict], required_fields: list[str]) -> tuple[list, list]:
    """
    Validate records, separating valid from invalid.
    """
    valid = []
    invalid = []

    for record in raw_data:
        missing = [f for f in required_fields if f not in record or record[f] is None]

        if missing:
            invalid.append({
                "record": record,
                "errors": [f"Missing field: {f}" for f in missing],
            })
        else:
            valid.append(record)

    return valid, invalid


# ═══════════════════════════════════════════════════════════════
# TRANSFORM
# ═══════════════════════════════════════════════════════════════

@node(output_name="transformed")
def transform(valid_records: list[dict], transformations: dict) -> list[dict]:
    """
    Apply transformations to records.

    transformations = {
        "email": str.lower,
        "price": lambda x: round(float(x), 2),
        "date": lambda x: datetime.fromisoformat(x).date().isoformat(),
    }
    """
    result = []

    for record in valid_records:
        transformed = record.copy()

        for field, transform_fn in transformations.items():
            if field in transformed:
                try:
                    transformed[field] = transform_fn(transformed[field])
                except Exception as e:
                    transformed[f"{field}_error"] = str(e)

        result.append(transformed)

    return result


# ═══════════════════════════════════════════════════════════════
# ENRICH
# ═══════════════════════════════════════════════════════════════

@node(output_name="enriched")
def enrich(transformed: list[dict], lookup_table: dict) -> list[dict]:
    """
    Enrich records with data from lookup tables.

    lookup_table = {
        "category_names": {"A": "Electronics", "B": "Clothing"},
        "region_codes": {"US": "United States", "UK": "United Kingdom"},
    }
    """
    result = []

    for record in transformed:
        enriched = record.copy()

        # Add category name
        if "category" in enriched and "category_names" in lookup_table:
            code = enriched["category"]
            enriched["category_name"] = lookup_table["category_names"].get(code, "Unknown")

        # Add region name
        if "region" in enriched and "region_codes" in lookup_table:
            code = enriched["region"]
            enriched["region_name"] = lookup_table["region_codes"].get(code, "Unknown")

        # Add processing timestamp
        enriched["processed_at"] = datetime.utcnow().isoformat()

        result.append(enriched)

    return result


# ═══════════════════════════════════════════════════════════════
# LOAD
# ═══════════════════════════════════════════════════════════════

@node(output_name="load_result")
def load(enriched: list[dict], destination: str) -> dict:
    """
    Load processed data to destination.
    """
    if destination.endswith(".json"):
        with open(destination, "w") as f:
            json.dump(enriched, f, indent=2)

    elif destination.startswith("postgres://"):
        # Insert to database
        import psycopg2
        conn = psycopg2.connect(destination)
        # ... insert logic
        conn.close()

    return {
        "records_loaded": len(enriched),
        "destination": destination,
        "timestamp": datetime.utcnow().isoformat(),
    }


# ═══════════════════════════════════════════════════════════════
# COMPOSE THE PIPELINE
# ═══════════════════════════════════════════════════════════════

etl_pipeline = Graph([
    extract,
    validate,
    transform,
    enrich,
    load,
], name="etl")


# ═══════════════════════════════════════════════════════════════
# RUN THE PIPELINE
# ═══════════════════════════════════════════════════════════════

def main():
    runner = SyncRunner()

    result = runner.run(etl_pipeline, {
        "source_path": "data/raw_orders.json",
        "required_fields": ["id", "email", "amount"],
        "transformations": {
            "email": str.lower,
            "amount": lambda x: round(float(x), 2),
        },
        "lookup_table": {
            "category_names": {"A": "Electronics", "B": "Clothing"},
        },
        "destination": "data/processed_orders.json",
    })

    print(f"Loaded {result['load_result']['records_loaded']} records")
    print(f"Invalid records: {len(result['invalid_records'])}")

    # Log invalid records for review
    for invalid in result["invalid_records"]:
        print(f"  - {invalid['errors']}")

Batch Processing

Process multiple files:

With Error Reporting

Add a step to generate an error report:

Testing

What's Next?

Last updated