In this step you’ll add the three production guarantees that separate a demo workflow from a real one: retries so transient failures recover on their own, idempotency so re-runs don’t double-count, and structured logs so you can see what each shard did.
Add retry to the shard task
Put retry on process_shard, not on merge_customer_data. A shard is the smallest unit that can fail and recover without redoing the whole run. For this workload, use 3 retries with a short initial wait (1 to 2 seconds) and exponential backoff. That gives transient CSV, network, or provider errors room to clear while keeping a bad shard from hiding for minutes.
- @app.taskdef process_shard(shard_id: int) -> dict:# ... merge logic ...return enriched
+ @app.task(retry=Retry(max_retries=3, wait_duration_ms=2000))def process_shard(shard_id: int) -> dict:# ... merge logic ...return enriched
const processShard = task(- { name: "process_shard" },function processShard(shardId: number) {// ... merge logic ...return enriched;});
const processShard = task(+ {+ name: "process_shard",+ retry: { maxRetries: 3, waitDurationMs: 2000 },+ },function processShard(shardId: number) {// ... merge logic ...return enriched;});
Make each shard idempotent
The contract is simple: the same (shard_id, source_data_hash) input must produce the same enriched profiles every time. If a retry happens, the second attempt should overwrite or de-dupe the first attempt’s output with byte-identical records. Add a deterministic key to each enriched profile, then have the aggregator keep one record per key.
def enriched_profile(customer_id, sources):return {"customer_id": customer_id,"health_score": score(sources),# ...}
def enriched_profile(customer_id, sources):return {"customer_id": customer_id,"health_score": score(sources),# ...+ "idempotency_key": f"{customer_id}:{stable_hash(sources)}",}
function enrichedProfile(customerId: string, sources: Sources) {return {customerId,healthScore: score(sources),// ...};}
function enrichedProfile(customerId: string, sources: Sources) {return {customerId,healthScore: score(sources),// ...+ idempotencyKey: `${customerId}:${stableHash(sources)}`,};}
Emit per-shard timing logs
Emit one JSON line when each shard starts and one when it ends. Include shard_id, record count, and elapsed time. The Render Dashboard keeps stdout with each task run, so the chaos drill in step 6 can prove which shard failed, which retry recovered, and how much work it did.
import jsonimport time
def log(**fields: object) -> None: print(json.dumps(fields, sort_keys=True))
@app.task(retry=Retry(max_retries=3, wait_duration_ms=2000))def process_shard(shard_id: int) -> dict: started = time.perf_counter() log(event="shard_start", shard_id=shard_id)
profiles = build_profiles_for_shard(shard_id)
log( event="shard_end", shard_id=shard_id, record_count=len(profiles), elapsed_ms=round((time.perf_counter() - started) * 1000), ) return {"shard_id": shard_id, "profiles": profiles, "count": len(profiles)}function log(fields: Record<string, unknown>) { console.log(JSON.stringify(fields));}
const processShard = task( { name: "process_shard", retry: { maxRetries: 3, waitDurationMs: 2000 }, }, function processShard(shardId: number): ShardResult { const started = performance.now(); log({ event: "shard_start", shard_id: shardId });
const result = buildProfilesForShard(shardId);
log({ event: "shard_end", shard_id: shardId, record_count: result.count, elapsed_ms: Math.round(performance.now() - started), }); return result; });Show hint
Use the Workflow service’s Logs tab for the full stream. For a single run, open Runs, expand the parent row, then click a process_shard child row to see that subtask’s stdout inline.
What you learned
- Retry belongs on the worker task, not the orchestrator
- Retries are only safe when the task is idempotent. Pair both, always
- An idempotency key per record gives the aggregator a deterministic way to de-dupe
- Per-shard JSON logs make the chaos drill in the next step observable from the Render Dashboard