Render Tutorials
ETL on Workflows, Part 2: Productionize and scale it

Understand the fan-out pattern

⏱ 10 min

In this step you’ll walk through the workflow code line by line, then trigger a run from a small SDK client script you write yourself. By the end you’ll know exactly what merge_customer_data does and why it’s faster than a single loop.

The two tasks

merge_customer_data is the orchestrator. It does not read or merge rows itself. It starts one process_shard subtask for each shard id, awaits all of them, and passes the subtask results to aggregate_results. Each process_shard subtask loads the four CSVs, filters to its shard’s customers, merges the matching records, enriches the profiles, and returns its own count.

python/workflows/main.py
@app.task
def process_shard(shard_id: int) -> dict[str, Any]:
crm_shard = filter_records_for_shard(load_csv("crm.csv"), shard_id)
billing_shard = filter_records_for_shard(load_csv("billing.csv"), shard_id)
product_shard = filter_records_for_shard(load_csv("product.csv"), shard_id)
support_shard = filter_records_for_shard(load_csv("support.csv"), shard_id)
profiles = merge_and_enrich(crm_shard, billing_shard, product_shard, support_shard)
return {"shard_id": shard_id, "profiles": profiles, "count": len(profiles)}
@app.task
async def merge_customer_data() -> dict[str, Any]:
task_handles = [process_shard(shard_id) for shard_id in range(NUM_SHARDS)]
shard_results = await asyncio.gather(*task_handles)
return aggregate_results(shard_results)
typescript/workflows/src/main.ts
const processShard = task(
{ name: "process_shard" },
function processShard(shardId: number): ShardResult {
const crmShard = filterRecordsForShard(loadCsv("crm.csv"), shardId);
const billingShard = filterRecordsForShard(loadCsv("billing.csv"), shardId);
const productShard = filterRecordsForShard(loadCsv("product.csv"), shardId);
const supportShard = filterRecordsForShard(loadCsv("support.csv"), shardId);
return mergeAndEnrich(shardId, crmShard, billingShard, productShard, supportShard);
}
);
const mergeCustomerData = task(
{ name: "merge_customer_data" },
async function mergeCustomerData(): Promise<AggregatedResult> {
const shardResults = await Promise.all(
Array.from({ length: NUM_SHARDS }, (_, shardId) => processShard(shardId))
);
return aggregateResults(shardResults);
}
);

Sequential vs sharded

The serial version does all customer merges in one process. The sharded version moves work into process_shard subtasks and keeps the orchestrator focused on fan-out and aggregation.

Before
- # A single loop over all customers
- for customer_id, records in grouped.items():
- merged.append(merge_one(customer_id, records))
- return merged
 
 
 
 
After
 
 
 
 
+ # Fan-out: one subtask per shard, all running in parallel
+ task_handles = [process_shard(shard_id) for shard_id in range(NUM_SHARDS)]
+ shard_results = await asyncio.gather(*task_handles)
+ return aggregate_results(shard_results)
If you hash `customer_id` to assign shards and then fan out with 10 subtasks, what happens when the same customer appears in CRM, Billing, Product, and Support?

Trigger your first run

Create trigger.py (or trigger.ts) in the repo root. This is the SDK client you’ll keep using for the rest of the tutorial.

trigger.py
import os
import time
from render_sdk import Render
render = Render()
local = os.getenv("RENDER_USE_LOCAL_DEV") == "true"
workflow_slug = os.getenv("WORKFLOW_SLUG")
task_name = "merge_customer_data" if local else f"{workflow_slug}/merge_customer_data"
started = time.perf_counter()
task_run = render.workflows.run_task(task_name, [])
elapsed = time.perf_counter() - started
result = task_run.results[0] if isinstance(task_run.results, list) else task_run.results
print(
f"Merged {result['profiles_generated']} enriched profiles "
f"across {result['shards_processed']} shards in {elapsed:.1f}s"
)
Terminal
$RENDER_USE_LOCAL_DEV=true python trigger.py
Run started: <run-id> Merged 1000 enriched profiles across 10 shards in 1.2s
trigger.ts
import { Render } from "@renderinc/sdk";
const render = new Render();
const local = process.env.RENDER_USE_LOCAL_DEV === "true";
const workflowSlug = process.env.WORKFLOW_SLUG;
const taskName = local ? "merge_customer_data" : `${workflowSlug}/merge_customer_data`;
const started = performance.now();
const taskRun = await render.workflows.runTask(taskName, []);
const elapsed = (performance.now() - started) / 1000;
const result = Array.isArray(taskRun.results) ? taskRun.results[0] : taskRun.results;
console.log(
`Merged ${result.profiles_generated} enriched profiles ` +
`across ${result.shards_processed} shards in ${elapsed.toFixed(1)}s`
);
Terminal
$RENDER_USE_LOCAL_DEV=true npx tsx trigger.ts
Run started: <run-id> Merged 1000 enriched profiles across 10 shards in 1.2s
Show hint

If the script can’t find the task, confirm RENDER_USE_LOCAL_DEV=true and keep render workflows dev running in another terminal. If the task returns zero profiles, check DATA_DIR: from python/workflows or typescript/workflows, ../../sample_data should contain the four CSVs you generated in step 2. If the SDK complains about input shape, keep the second argument to run_task / runTask as an empty array because merge_customer_data takes no arguments.

What you learned

  • `merge_customer_data` is the orchestrator; `process_shard` is the worker that runs once per shard
  • Hash-based sharding on `customer_id` keeps each customer's records together across source files
  • Calling the wrapped task function spawns a subtask that the orchestrator awaits
  • Your trigger script is the only client you'll need for the rest of the tutorial