Render Tutorials
ETL on Workflows, Part 1: Build a sharded pipeline

Write the orchestrator (`merge_customer_data`)

⏱ 12 min

In this step you’ll write the orchestrator task. After step 5 it’s a short job: spawn N subtasks, await them, summarize. The orchestrator owns coordination and aggregation. It never reads a CSV.

What the orchestrator does

In order:

  1. Spawn NUM_SHARDS process_shard subtasks, one per shard_id (a single integer per call).
  2. Await all subtasks in parallel.
  3. Aggregate the per-shard returns into one summary dict (counts, statistics, one sample profile).
  4. Return the summary to the caller.

Nothing else. No CSV reads, no grouping, no per-customer logic. Those all live in the subtask.

Sequential vs fan-out

Before writing the orchestrator, compare what you’d write without Workflows to what you’ll write with them. Same merge, very different runtime shape:

Before
- # Sequential: one process does everything
- sources = load_all_csvs()
- grouped = group_by_customer(sources)
-
- enriched = []
- for customer_id, customer_sources in grouped.items():
- enriched.append(merge_and_enrich(customer_id, customer_sources))
- return enriched
 
 
 
 
After
 
 
 
 
 
 
 
 
+ # Fan-out: orchestrator coordinates; subtasks do the work
+ task_handles = [process_shard(shard_id) for shard_id in range(NUM_SHARDS)]
+ shard_results = await asyncio.gather(*task_handles)
+ return aggregate_results(shard_results)

The sequential version runs on one instance. The fan-out version runs on N instances. The orchestrator’s body is three lines.

Write the orchestrator

Add the orchestrator task and the aggregate_results helper to the same main.py (or src/main.ts) as process_shard from step 5:

workflows/main.py
import asyncio
import os
from pathlib import Path
from typing import Any
import pandas as pd
from render_sdk import Workflows
from sharding import NUM_SHARDS, get_shard_id
from enrichment import enrich_profile
app = Workflows()
DATA_DIR = Path(os.getenv("DATA_DIR", "../sample_data"))
# sanitize, load_csv, and process_shard from step 5 stay where they are.
def aggregate_results(shard_results: list[dict]) -> dict:
all_profiles = [p for r in shard_results for p in r["profiles"]]
health_scores = [p["health_score"] for p in all_profiles]
churn_counts = {"LOW": 0, "MEDIUM": 0, "HIGH": 0}
for p in all_profiles:
churn_counts[p["churn_risk"]] += 1
return {
"profiles_generated": len(all_profiles),
"shards_processed": NUM_SHARDS,
"sample_profile": all_profiles[0] if all_profiles else None,
"statistics": {
"avg_health_score": round(sum(health_scores) / len(health_scores), 1) if health_scores else 0,
"churn_distribution": churn_counts,
},
}
@app.task
async def merge_customer_data() -> dict:
task_handles = [process_shard(shard_id) for shard_id in range(NUM_SHARDS)]
shard_results = await asyncio.gather(*task_handles)
return aggregate_results(shard_results)
if __name__ == "__main__":
app.start()
workflows/src/main.ts
import { readFileSync } from "node:fs";
import { join } from "node:path";
import Papa from "papaparse";
import { task } from "@renderinc/sdk/workflows";
import { NUM_SHARDS, getShardId } from "./sharding.js";
import { enrichProfile, type EnrichedProfile } from "./enrichment.js";
// loadCsv and processShard from step 5 stay where they are.
interface ShardResult {
shard_id: number;
profiles: EnrichedProfile[];
count: number;
}
function aggregateResults(shardResults: ShardResult[]) {
const allProfiles = shardResults.flatMap((r) => r.profiles);
const healthScores = allProfiles.map((p) => p.health_score);
const churnCounts = { LOW: 0, MEDIUM: 0, HIGH: 0 };
for (const p of allProfiles) churnCounts[p.churn_risk]++;
return {
profiles_generated: allProfiles.length,
shards_processed: NUM_SHARDS,
sample_profile: allProfiles[0] ?? null,
statistics: {
avg_health_score:
healthScores.length > 0
? Math.round((healthScores.reduce((a, b) => a + b, 0) / healthScores.length) * 10) / 10
: 0,
churn_distribution: churnCounts,
},
};
}
export const mergeCustomerData = task(
{ name: "merge_customer_data" },
async function mergeCustomerData() {
const shardResults = await Promise.all(
Array.from({ length: NUM_SHARDS }, (_, shardId) => processShard(shardId)),
);
return aggregateResults(shardResults);
},
);

The spawn primitive is calling the task function. process_shard(0) returns an awaitable handle. asyncio.gather (Python) or Promise.all (TS) is the join. There’s no separate start_subtask API.

What this looks like at runtime

Why subtasks load their own data

The orchestrator passes a single integer (shard_id) to each subtask. It does not pass pre-built shard payloads. There’s a sharp reason:

Each `process_shard` task loads all four CSVs and filters to its shard. Why not load them once in the orchestrator and pass each shard's slice as an argument?

What you learned

  • The orchestrator spawns subtasks by calling the task function. The handle is awaitable
  • `asyncio.gather` (Python) or `Promise.all` (TS) is the join. Subtasks run in parallel on separate instances
  • Subtasks own I/O to stay inside the 4 MB task-argument limit. The orchestrator passes one integer per call
  • Aggregation runs in the orchestrator process. The return to the caller is a stats summary, not the raw profiles