In this step you’ll write the orchestrator task. After step 5 it’s a short job: spawn N subtasks, await them, summarize. The orchestrator owns coordination and aggregation. It never reads a CSV.
What the orchestrator does
In order:
- Spawn
NUM_SHARDSprocess_shardsubtasks, one pershard_id(a single integer per call). - Await all subtasks in parallel.
- Aggregate the per-shard returns into one summary dict (counts, statistics, one sample profile).
- Return the summary to the caller.
Nothing else. No CSV reads, no grouping, no per-customer logic. Those all live in the subtask.
Sequential vs fan-out
Before writing the orchestrator, compare what you’d write without Workflows to what you’ll write with them. Same merge, very different runtime shape:
- # Sequential: one process does everything- sources = load_all_csvs()- grouped = group_by_customer(sources)-- enriched = []- for customer_id, customer_sources in grouped.items():- enriched.append(merge_and_enrich(customer_id, customer_sources))- return enriched
+ # Fan-out: orchestrator coordinates; subtasks do the work+ task_handles = [process_shard(shard_id) for shard_id in range(NUM_SHARDS)]+ shard_results = await asyncio.gather(*task_handles)+ return aggregate_results(shard_results)
The sequential version runs on one instance. The fan-out version runs on N instances. The orchestrator’s body is three lines.
Write the orchestrator
Add the orchestrator task and the aggregate_results helper to the same main.py (or src/main.ts) as process_shard from step 5:
import asyncioimport osfrom pathlib import Pathfrom typing import Any
import pandas as pdfrom render_sdk import Workflows
from sharding import NUM_SHARDS, get_shard_idfrom enrichment import enrich_profile
app = Workflows()
DATA_DIR = Path(os.getenv("DATA_DIR", "../sample_data"))
# sanitize, load_csv, and process_shard from step 5 stay where they are.
def aggregate_results(shard_results: list[dict]) -> dict: all_profiles = [p for r in shard_results for p in r["profiles"]] health_scores = [p["health_score"] for p in all_profiles] churn_counts = {"LOW": 0, "MEDIUM": 0, "HIGH": 0} for p in all_profiles: churn_counts[p["churn_risk"]] += 1
return { "profiles_generated": len(all_profiles), "shards_processed": NUM_SHARDS, "sample_profile": all_profiles[0] if all_profiles else None, "statistics": { "avg_health_score": round(sum(health_scores) / len(health_scores), 1) if health_scores else 0, "churn_distribution": churn_counts, }, }
@app.taskasync def merge_customer_data() -> dict: task_handles = [process_shard(shard_id) for shard_id in range(NUM_SHARDS)] shard_results = await asyncio.gather(*task_handles) return aggregate_results(shard_results)
if __name__ == "__main__": app.start()import { readFileSync } from "node:fs";import { join } from "node:path";import Papa from "papaparse";import { task } from "@renderinc/sdk/workflows";import { NUM_SHARDS, getShardId } from "./sharding.js";import { enrichProfile, type EnrichedProfile } from "./enrichment.js";
// loadCsv and processShard from step 5 stay where they are.
interface ShardResult { shard_id: number; profiles: EnrichedProfile[]; count: number;}
function aggregateResults(shardResults: ShardResult[]) { const allProfiles = shardResults.flatMap((r) => r.profiles); const healthScores = allProfiles.map((p) => p.health_score); const churnCounts = { LOW: 0, MEDIUM: 0, HIGH: 0 }; for (const p of allProfiles) churnCounts[p.churn_risk]++;
return { profiles_generated: allProfiles.length, shards_processed: NUM_SHARDS, sample_profile: allProfiles[0] ?? null, statistics: { avg_health_score: healthScores.length > 0 ? Math.round((healthScores.reduce((a, b) => a + b, 0) / healthScores.length) * 10) / 10 : 0, churn_distribution: churnCounts, }, };}
export const mergeCustomerData = task( { name: "merge_customer_data" }, async function mergeCustomerData() { const shardResults = await Promise.all( Array.from({ length: NUM_SHARDS }, (_, shardId) => processShard(shardId)), ); return aggregateResults(shardResults); },);The spawn primitive is calling the task function. process_shard(0) returns an awaitable handle. asyncio.gather (Python) or Promise.all (TS) is the join. There’s no separate start_subtask API.
What this looks like at runtime
Why subtasks load their own data
The orchestrator passes a single integer (shard_id) to each subtask. It does not pass pre-built shard payloads. There’s a sharp reason:
What you learned
- The orchestrator spawns subtasks by calling the task function. The handle is awaitable
- `asyncio.gather` (Python) or `Promise.all` (TS) is the join. Subtasks run in parallel on separate instances
- Subtasks own I/O to stay inside the 4 MB task-argument limit. The orchestrator passes one integer per call
- Aggregation runs in the orchestrator process. The return to the caller is a stats summary, not the raw profiles