In this step you’ll walk through the workflow code line by line, then trigger a run from a small SDK client script you write yourself. By the end you’ll know exactly what merge_customer_data does and why it’s faster than a single loop.
The two tasks
merge_customer_data is the orchestrator. It does not read or merge rows itself. It starts one process_shard subtask for each shard id, awaits all of them, and passes the subtask results to aggregate_results. Each process_shard subtask loads the four CSVs, filters to its shard’s customers, merges the matching records, enriches the profiles, and returns its own count.
@app.taskdef process_shard(shard_id: int) -> dict[str, Any]: crm_shard = filter_records_for_shard(load_csv("crm.csv"), shard_id) billing_shard = filter_records_for_shard(load_csv("billing.csv"), shard_id) product_shard = filter_records_for_shard(load_csv("product.csv"), shard_id) support_shard = filter_records_for_shard(load_csv("support.csv"), shard_id) profiles = merge_and_enrich(crm_shard, billing_shard, product_shard, support_shard) return {"shard_id": shard_id, "profiles": profiles, "count": len(profiles)}
@app.taskasync def merge_customer_data() -> dict[str, Any]: task_handles = [process_shard(shard_id) for shard_id in range(NUM_SHARDS)] shard_results = await asyncio.gather(*task_handles) return aggregate_results(shard_results)const processShard = task( { name: "process_shard" }, function processShard(shardId: number): ShardResult { const crmShard = filterRecordsForShard(loadCsv("crm.csv"), shardId); const billingShard = filterRecordsForShard(loadCsv("billing.csv"), shardId); const productShard = filterRecordsForShard(loadCsv("product.csv"), shardId); const supportShard = filterRecordsForShard(loadCsv("support.csv"), shardId); return mergeAndEnrich(shardId, crmShard, billingShard, productShard, supportShard); });
const mergeCustomerData = task( { name: "merge_customer_data" }, async function mergeCustomerData(): Promise<AggregatedResult> { const shardResults = await Promise.all( Array.from({ length: NUM_SHARDS }, (_, shardId) => processShard(shardId)) ); return aggregateResults(shardResults); });Sequential vs sharded
The serial version does all customer merges in one process. The sharded version moves work into process_shard subtasks and keeps the orchestrator focused on fan-out and aggregation.
- # A single loop over all customers- for customer_id, records in grouped.items():- merged.append(merge_one(customer_id, records))- return merged
+ # Fan-out: one subtask per shard, all running in parallel+ task_handles = [process_shard(shard_id) for shard_id in range(NUM_SHARDS)]+ shard_results = await asyncio.gather(*task_handles)+ return aggregate_results(shard_results)
Trigger your first run
Create trigger.py (or trigger.ts) in the repo root. This is the SDK client you’ll keep using for the rest of the tutorial.
import osimport time
from render_sdk import Render
render = Render()local = os.getenv("RENDER_USE_LOCAL_DEV") == "true"workflow_slug = os.getenv("WORKFLOW_SLUG")task_name = "merge_customer_data" if local else f"{workflow_slug}/merge_customer_data"
started = time.perf_counter()task_run = render.workflows.run_task(task_name, [])elapsed = time.perf_counter() - started
result = task_run.results[0] if isinstance(task_run.results, list) else task_run.resultsprint( f"Merged {result['profiles_generated']} enriched profiles " f"across {result['shards_processed']} shards in {elapsed:.1f}s")$RENDER_USE_LOCAL_DEV=true python trigger.pyRun started: <run-id> Merged 1000 enriched profiles across 10 shards in 1.2s
import { Render } from "@renderinc/sdk";
const render = new Render();const local = process.env.RENDER_USE_LOCAL_DEV === "true";const workflowSlug = process.env.WORKFLOW_SLUG;const taskName = local ? "merge_customer_data" : `${workflowSlug}/merge_customer_data`;
const started = performance.now();const taskRun = await render.workflows.runTask(taskName, []);const elapsed = (performance.now() - started) / 1000;
const result = Array.isArray(taskRun.results) ? taskRun.results[0] : taskRun.results;console.log( `Merged ${result.profiles_generated} enriched profiles ` + `across ${result.shards_processed} shards in ${elapsed.toFixed(1)}s`);$RENDER_USE_LOCAL_DEV=true npx tsx trigger.tsRun started: <run-id> Merged 1000 enriched profiles across 10 shards in 1.2s
Show hint
If the script can’t find the task, confirm RENDER_USE_LOCAL_DEV=true and keep render workflows dev running in another terminal. If the task returns zero profiles, check DATA_DIR: from python/workflows or typescript/workflows, ../../sample_data should contain the four CSVs you generated in step 2. If the SDK complains about input shape, keep the second argument to run_task / runTask as an empty array because merge_customer_data takes no arguments.
What you learned
- `merge_customer_data` is the orchestrator; `process_shard` is the worker that runs once per shard
- Hash-based sharding on `customer_id` keeps each customer's records together across source files
- Calling the wrapped task function spawns a subtask that the orchestrator awaits
- Your trigger script is the only client you'll need for the rest of the tutorial