Data Pipeline
When to Use
The Pipeline
extract → validate → transform → enrich → loadComplete Implementation
from hypergraph import Graph, node, SyncRunner
import json
from datetime import datetime
# ═══════════════════════════════════════════════════════════════
# EXTRACT
# ═══════════════════════════════════════════════════════════════
@node(output_name="raw_data")
def extract(source_path: str) -> list[dict]:
"""
Extract raw data from source.
Supports JSON, CSV, or API endpoints.
"""
if source_path.endswith(".json"):
with open(source_path) as f:
return json.load(f)
elif source_path.endswith(".csv"):
import csv
with open(source_path) as f:
reader = csv.DictReader(f)
return list(reader)
elif source_path.startswith("http"):
import httpx
response = httpx.get(source_path)
return response.json()
raise ValueError(f"Unknown source format: {source_path}")
# ═══════════════════════════════════════════════════════════════
# VALIDATE
# ═══════════════════════════════════════════════════════════════
@node(output_name=("valid_records", "invalid_records"))
def validate(raw_data: list[dict], required_fields: list[str]) -> tuple[list, list]:
"""
Validate records, separating valid from invalid.
"""
valid = []
invalid = []
for record in raw_data:
missing = [f for f in required_fields if f not in record or record[f] is None]
if missing:
invalid.append({
"record": record,
"errors": [f"Missing field: {f}" for f in missing],
})
else:
valid.append(record)
return valid, invalid
# ═══════════════════════════════════════════════════════════════
# TRANSFORM
# ═══════════════════════════════════════════════════════════════
@node(output_name="transformed")
def transform(valid_records: list[dict], transformations: dict) -> list[dict]:
"""
Apply transformations to records.
transformations = {
"email": str.lower,
"price": lambda x: round(float(x), 2),
"date": lambda x: datetime.fromisoformat(x).date().isoformat(),
}
"""
result = []
for record in valid_records:
transformed = record.copy()
for field, transform_fn in transformations.items():
if field in transformed:
try:
transformed[field] = transform_fn(transformed[field])
except Exception as e:
transformed[f"{field}_error"] = str(e)
result.append(transformed)
return result
# ═══════════════════════════════════════════════════════════════
# ENRICH
# ═══════════════════════════════════════════════════════════════
@node(output_name="enriched")
def enrich(transformed: list[dict], lookup_table: dict) -> list[dict]:
"""
Enrich records with data from lookup tables.
lookup_table = {
"category_names": {"A": "Electronics", "B": "Clothing"},
"region_codes": {"US": "United States", "UK": "United Kingdom"},
}
"""
result = []
for record in transformed:
enriched = record.copy()
# Add category name
if "category" in enriched and "category_names" in lookup_table:
code = enriched["category"]
enriched["category_name"] = lookup_table["category_names"].get(code, "Unknown")
# Add region name
if "region" in enriched and "region_codes" in lookup_table:
code = enriched["region"]
enriched["region_name"] = lookup_table["region_codes"].get(code, "Unknown")
# Add processing timestamp
enriched["processed_at"] = datetime.utcnow().isoformat()
result.append(enriched)
return result
# ═══════════════════════════════════════════════════════════════
# LOAD
# ═══════════════════════════════════════════════════════════════
@node(output_name="load_result")
def load(enriched: list[dict], destination: str) -> dict:
"""
Load processed data to destination.
"""
if destination.endswith(".json"):
with open(destination, "w") as f:
json.dump(enriched, f, indent=2)
elif destination.startswith("postgres://"):
# Insert to database
import psycopg2
conn = psycopg2.connect(destination)
# ... insert logic
conn.close()
return {
"records_loaded": len(enriched),
"destination": destination,
"timestamp": datetime.utcnow().isoformat(),
}
# ═══════════════════════════════════════════════════════════════
# COMPOSE THE PIPELINE
# ═══════════════════════════════════════════════════════════════
etl_pipeline = Graph([
extract,
validate,
transform,
enrich,
load,
], name="etl")
# ═══════════════════════════════════════════════════════════════
# RUN THE PIPELINE
# ═══════════════════════════════════════════════════════════════
def main():
runner = SyncRunner()
result = runner.run(etl_pipeline, {
"source_path": "data/raw_orders.json",
"required_fields": ["id", "email", "amount"],
"transformations": {
"email": str.lower,
"amount": lambda x: round(float(x), 2),
},
"lookup_table": {
"category_names": {"A": "Electronics", "B": "Clothing"},
},
"destination": "data/processed_orders.json",
})
print(f"Loaded {result['load_result']['records_loaded']} records")
print(f"Invalid records: {len(result['invalid_records'])}")
# Log invalid records for review
for invalid in result["invalid_records"]:
print(f" - {invalid['errors']}")Batch Processing
With Error Reporting
Testing
What's Next?
Last updated