Render Tutorials
ETL on Workflows, Part 1: Build a sharded pipeline

Write the shard worker (`process_shard`)

⏱ 15 min

In this step you’ll write the worker task that does the per-shard merge. The input is a single integer: shard_id. The task loads the four CSVs itself, filters them to its shard, merges per customer, and returns enriched profiles. The orchestrator (step 6) never touches a CSV.

This design (subtasks own their I/O) is what keeps the pipeline inside the 4 MB task-argument limit. The reader sees why in step 6.

Add the dependencies

The reference implementation uses pandas for CSV loading. Add it to your project before writing the task:

Before
- render-sdk>=0.6.0
 
 
After
 
+ render-sdk>=0.6.0
+ pandas>=2.2.0
Terminal
$pip install -r requirements.txt
Successfully installed pandas ...
Before
{
"dependencies": {
- "@renderinc/sdk": "^0.6.0"
 
 
 
 
 
}
}
After
{
"dependencies": {
 
+ "@renderinc/sdk": "^0.6.0",
+ "papaparse": "^5.4.1"
+ },
+ "devDependencies": {
+ "@types/papaparse": "^5.3.14"
}
}
Terminal
$npm install
added 2 packages

Copy the enrichment module

Health score, churn risk, and expansion potential are computed per customer. The math is product-specific (it would be different in your real pipeline), so you’ll copy the module from the reference repo. The interesting part is the task structure, not the formulas.

Terminal
$curl -fsSL https://raw.githubusercontent.com/render-examples/data-processor-workflow/main/python/workflows/enrichment.py -o workflows/enrichment.py

The module exposes one function you’ll call: enrich_profile(profile). It adds three calculated fields (health_score, churn_risk, expansion_potential) to a merged profile and returns it.

Terminal
$curl -fsSL https://raw.githubusercontent.com/render-examples/data-processor-workflow/main/typescript/workflows/src/enrichment.ts -o workflows/src/enrichment.ts

The module exposes one function you’ll call: enrichProfile(profile). It adds three calculated fields (health_score, churn_risk, expansion_potential) to a merged profile and returns it.

Write the task

workflows/main.py
import asyncio
import os
from pathlib import Path
from typing import Any
import pandas as pd
from render_sdk import Workflows
from sharding import NUM_SHARDS, get_shard_id
from enrichment import enrich_profile
app = Workflows()
DATA_DIR = Path(os.getenv("DATA_DIR", "../sample_data"))
def sanitize(obj: Any) -> Any:
if pd.isna(obj):
return None
if hasattr(obj, "item"):
return obj.item()
if isinstance(obj, dict):
return {k: sanitize(v) for k, v in obj.items()}
if isinstance(obj, list):
return [sanitize(v) for v in obj]
return obj
def load_csv(filename: str) -> list[dict]:
df = pd.read_csv(DATA_DIR / filename)
return [sanitize(r) for r in df.to_dict("records")]
@app.task
def process_shard(shard_id: int) -> dict:
sources = {
name: load_csv(f"{name}.csv")
for name in ("crm", "billing", "product", "support")
}
# Filter each source down to this shard's customers
indexed: dict[str, dict[str, dict]] = {}
for source_name, rows in sources.items():
for row in rows:
cid = str(row["customer_id"])
if get_shard_id(cid) != shard_id:
continue
indexed.setdefault(cid, {})[source_name] = row
# Merge and enrich each customer
profiles = []
for customer_id, by_source in indexed.items():
merged = {"customer_id": customer_id}
for source_name in ("crm", "billing", "product", "support"):
if source_name in by_source:
merged.update(by_source[source_name])
profiles.append(enrich_profile(merged))
return {
"shard_id": shard_id,
"profiles": profiles,
"count": len(profiles),
}
if __name__ == "__main__":
app.start()
workflows/src/main.ts
import { readFileSync } from "node:fs";
import { join } from "node:path";
import Papa from "papaparse";
import { task } from "@renderinc/sdk/workflows";
import { NUM_SHARDS, getShardId } from "./sharding.js";
import { enrichProfile, type EnrichedProfile } from "./enrichment.js";
const DATA_DIR = process.env.DATA_DIR ?? "../sample_data";
type Row = Record<string, unknown>;
function loadCsv(filename: string): Row[] {
const content = readFileSync(join(DATA_DIR, filename), "utf-8");
const result = Papa.parse<Row>(content, {
header: true,
skipEmptyLines: true,
dynamicTyping: true,
});
return result.data;
}
export const processShard = task(
{ name: "process_shard" },
function processShard(shardId: number) {
const sources = {
crm: loadCsv("crm.csv"),
billing: loadCsv("billing.csv"),
product: loadCsv("product.csv"),
support: loadCsv("support.csv"),
};
// Filter each source down to this shard's customers
const indexed = new Map<string, Record<string, Row>>();
for (const [sourceName, rows] of Object.entries(sources)) {
for (const row of rows) {
const cid = String(row.customer_id);
if (getShardId(cid) !== shardId) continue;
if (!indexed.has(cid)) indexed.set(cid, {});
indexed.get(cid)![sourceName] = row;
}
}
// Merge and enrich each customer
const profiles: EnrichedProfile[] = [];
for (const [customerId, bySource] of indexed) {
const merged: Record<string, unknown> = { customer_id: customerId };
for (const sourceName of ["crm", "billing", "product", "support"]) {
if (bySource[sourceName]) Object.assign(merged, bySource[sourceName]);
}
profiles.push(enrichProfile(merged));
}
return {
shard_id: shardId,
profiles,
count: profiles.length,
};
},
);

Confirm the task registers

Terminal
$render workflows dev -- python main.py
Local workflow server listening on :8120
Terminal
$render workflows dev -- npx tsx src/main.ts
Local workflow server listening on :8120

In a second terminal:

Terminal
$render workflows tasks list --local
process_shard
You enrich profiles with a `datetime` for `last_active` instead of an ISO string. Local dev runs fine. What happens when you deploy?

What you learned

  • `process_shard` owns its own I/O. It loads all four CSVs and filters to its shard
  • Inputs and outputs must be JSON-serializable. `sanitize` (Python) handles numpy types from pandas
  • Enrichment math lives in its own module so the task code stays focused on structure
  • Missing sources for a customer are normal. The merge tolerates them by checking presence before `update`