Render Tutorials
ETL on Workflows, Part 2: Productionize and scale it

Harden the tasks (retries, idempotency, structured logs)

⏱ 10 min

In this step you’ll add the three production guarantees that separate a demo workflow from a real one: retries so transient failures recover on their own, idempotency so re-runs don’t double-count, and structured logs so you can see what each shard did.

Add retry to the shard task

Put retry on process_shard, not on merge_customer_data. A shard is the smallest unit that can fail and recover without redoing the whole run. For this workload, use 3 retries with a short initial wait (1 to 2 seconds) and exponential backoff. That gives transient CSV, network, or provider errors room to clear while keeping a bad shard from hiding for minutes.

Before
- @app.task
 
def process_shard(shard_id: int) -> dict:
# ... merge logic ...
return enriched
After
 
+ @app.task(retry=Retry(max_retries=3, wait_duration_ms=2000))
def process_shard(shard_id: int) -> dict:
# ... merge logic ...
return enriched
Before
const processShard = task(
- { name: "process_shard" },
 
 
 
 
function processShard(shardId: number) {
// ... merge logic ...
return enriched;
}
);
After
const processShard = task(
 
+ {
+ name: "process_shard",
+ retry: { maxRetries: 3, waitDurationMs: 2000 },
+ },
function processShard(shardId: number) {
// ... merge logic ...
return enriched;
}
);

Make each shard idempotent

The contract is simple: the same (shard_id, source_data_hash) input must produce the same enriched profiles every time. If a retry happens, the second attempt should overwrite or de-dupe the first attempt’s output with byte-identical records. Add a deterministic key to each enriched profile, then have the aggregator keep one record per key.

Before
def enriched_profile(customer_id, sources):
return {
"customer_id": customer_id,
"health_score": score(sources),
# ...
 
}
After
def enriched_profile(customer_id, sources):
return {
"customer_id": customer_id,
"health_score": score(sources),
# ...
+ "idempotency_key": f"{customer_id}:{stable_hash(sources)}",
}
Before
function enrichedProfile(customerId: string, sources: Sources) {
return {
customerId,
healthScore: score(sources),
// ...
 
};
}
After
function enrichedProfile(customerId: string, sources: Sources) {
return {
customerId,
healthScore: score(sources),
// ...
+ idempotencyKey: `${customerId}:${stableHash(sources)}`,
};
}

Emit per-shard timing logs

Emit one JSON line when each shard starts and one when it ends. Include shard_id, record count, and elapsed time. The Render Dashboard keeps stdout with each task run, so the chaos drill in step 6 can prove which shard failed, which retry recovered, and how much work it did.

python/workflows/main.py
import json
import time
def log(**fields: object) -> None:
print(json.dumps(fields, sort_keys=True))
@app.task(retry=Retry(max_retries=3, wait_duration_ms=2000))
def process_shard(shard_id: int) -> dict:
started = time.perf_counter()
log(event="shard_start", shard_id=shard_id)
profiles = build_profiles_for_shard(shard_id)
log(
event="shard_end",
shard_id=shard_id,
record_count=len(profiles),
elapsed_ms=round((time.perf_counter() - started) * 1000),
)
return {"shard_id": shard_id, "profiles": profiles, "count": len(profiles)}
typescript/workflows/src/main.ts
function log(fields: Record<string, unknown>) {
console.log(JSON.stringify(fields));
}
const processShard = task(
{
name: "process_shard",
retry: { maxRetries: 3, waitDurationMs: 2000 },
},
function processShard(shardId: number): ShardResult {
const started = performance.now();
log({ event: "shard_start", shard_id: shardId });
const result = buildProfilesForShard(shardId);
log({
event: "shard_end",
shard_id: shardId,
record_count: result.count,
elapsed_ms: Math.round(performance.now() - started),
});
return result;
}
);
Show hint

Use the Workflow service’s Logs tab for the full stream. For a single run, open Runs, expand the parent row, then click a process_shard child row to see that subtask’s stdout inline.

You add retry to `process_shard` but skip the idempotency key. What can go wrong on a retry?

What you learned

  • Retry belongs on the worker task, not the orchestrator
  • Retries are only safe when the task is idempotent. Pair both, always
  • An idempotency key per record gives the aggregator a deterministic way to de-dupe
  • Per-shard JSON logs make the chaos drill in the next step observable from the Render Dashboard