In this step you’ll write the worker task that does the per-shard merge. The input is a single integer: shard_id. The task loads the four CSVs itself, filters them to its shard, merges per customer, and returns enriched profiles. The orchestrator (step 6) never touches a CSV.
This design (subtasks own their I/O) is what keeps the pipeline inside the 4 MB task-argument limit. The reader sees why in step 6.
Add the dependencies
The reference implementation uses pandas for CSV loading. Add it to your project before writing the task:
- render-sdk>=0.6.0
+ render-sdk>=0.6.0+ pandas>=2.2.0
$pip install -r requirements.txtSuccessfully installed pandas ...
{"dependencies": {- "@renderinc/sdk": "^0.6.0"}}
{"dependencies": {+ "@renderinc/sdk": "^0.6.0",+ "papaparse": "^5.4.1"+ },+ "devDependencies": {+ "@types/papaparse": "^5.3.14"}}
$npm installadded 2 packages
Copy the enrichment module
Health score, churn risk, and expansion potential are computed per customer. The math is product-specific (it would be different in your real pipeline), so you’ll copy the module from the reference repo. The interesting part is the task structure, not the formulas.
$curl -fsSL https://raw.githubusercontent.com/render-examples/data-processor-workflow/main/python/workflows/enrichment.py -o workflows/enrichment.py
The module exposes one function you’ll call: enrich_profile(profile). It adds three calculated fields (health_score, churn_risk, expansion_potential) to a merged profile and returns it.
$curl -fsSL https://raw.githubusercontent.com/render-examples/data-processor-workflow/main/typescript/workflows/src/enrichment.ts -o workflows/src/enrichment.ts
The module exposes one function you’ll call: enrichProfile(profile). It adds three calculated fields (health_score, churn_risk, expansion_potential) to a merged profile and returns it.
Write the task
import asyncioimport osfrom pathlib import Pathfrom typing import Any
import pandas as pdfrom render_sdk import Workflows
from sharding import NUM_SHARDS, get_shard_idfrom enrichment import enrich_profile
app = Workflows()
DATA_DIR = Path(os.getenv("DATA_DIR", "../sample_data"))
def sanitize(obj: Any) -> Any: if pd.isna(obj): return None if hasattr(obj, "item"): return obj.item() if isinstance(obj, dict): return {k: sanitize(v) for k, v in obj.items()} if isinstance(obj, list): return [sanitize(v) for v in obj] return obj
def load_csv(filename: str) -> list[dict]: df = pd.read_csv(DATA_DIR / filename) return [sanitize(r) for r in df.to_dict("records")]
@app.taskdef process_shard(shard_id: int) -> dict: sources = { name: load_csv(f"{name}.csv") for name in ("crm", "billing", "product", "support") }
# Filter each source down to this shard's customers indexed: dict[str, dict[str, dict]] = {} for source_name, rows in sources.items(): for row in rows: cid = str(row["customer_id"]) if get_shard_id(cid) != shard_id: continue indexed.setdefault(cid, {})[source_name] = row
# Merge and enrich each customer profiles = [] for customer_id, by_source in indexed.items(): merged = {"customer_id": customer_id} for source_name in ("crm", "billing", "product", "support"): if source_name in by_source: merged.update(by_source[source_name]) profiles.append(enrich_profile(merged))
return { "shard_id": shard_id, "profiles": profiles, "count": len(profiles), }
if __name__ == "__main__": app.start()import { readFileSync } from "node:fs";import { join } from "node:path";import Papa from "papaparse";import { task } from "@renderinc/sdk/workflows";import { NUM_SHARDS, getShardId } from "./sharding.js";import { enrichProfile, type EnrichedProfile } from "./enrichment.js";
const DATA_DIR = process.env.DATA_DIR ?? "../sample_data";
type Row = Record<string, unknown>;
function loadCsv(filename: string): Row[] { const content = readFileSync(join(DATA_DIR, filename), "utf-8"); const result = Papa.parse<Row>(content, { header: true, skipEmptyLines: true, dynamicTyping: true, }); return result.data;}
export const processShard = task( { name: "process_shard" }, function processShard(shardId: number) { const sources = { crm: loadCsv("crm.csv"), billing: loadCsv("billing.csv"), product: loadCsv("product.csv"), support: loadCsv("support.csv"), };
// Filter each source down to this shard's customers const indexed = new Map<string, Record<string, Row>>(); for (const [sourceName, rows] of Object.entries(sources)) { for (const row of rows) { const cid = String(row.customer_id); if (getShardId(cid) !== shardId) continue; if (!indexed.has(cid)) indexed.set(cid, {}); indexed.get(cid)![sourceName] = row; } }
// Merge and enrich each customer const profiles: EnrichedProfile[] = []; for (const [customerId, bySource] of indexed) { const merged: Record<string, unknown> = { customer_id: customerId }; for (const sourceName of ["crm", "billing", "product", "support"]) { if (bySource[sourceName]) Object.assign(merged, bySource[sourceName]); } profiles.push(enrichProfile(merged)); }
return { shard_id: shardId, profiles, count: profiles.length, }; },);Confirm the task registers
$render workflows dev -- python main.pyLocal workflow server listening on :8120
$render workflows dev -- npx tsx src/main.tsLocal workflow server listening on :8120
In a second terminal:
$render workflows tasks list --localprocess_shard
What you learned
- `process_shard` owns its own I/O. It loads all four CSVs and filters to its shard
- Inputs and outputs must be JSON-serializable. `sanitize` (Python) handles numpy types from pandas
- Enrichment math lives in its own module so the task code stays focused on structure
- Missing sources for a customer are normal. The merge tolerates them by checking presence before `update`