Analysis Job Processing
BullMQ job processing flows — pipeline-driven, ad-hoc, batch enqueue, sentiment gate, and resilience.
The analysis system processes qualitative feedback through a multi-stage pipeline, dispatching batch jobs to external HTTP workers via BullMQ queues.
Pipeline-Driven Flow
Ad-Hoc Job Flow
Individual jobs (e.g., embedding backfill) still use the original single-item AnalysisService.EnqueueJob() path:
Batch Enqueue Flow
Sentiment Gate
Deduplication
Pipeline jobs use deterministic IDs: ${pipelineId}:${type}. If the same pipeline + analysis type is enqueued twice, BullMQ silently rejects the duplicate.
Ad-hoc jobs use: ${submissionId}:${type}.
Resilience
| Mechanism | Configuration | Behavior |
|---|---|---|
| Retry | BULLMQ_DEFAULT_ATTEMPTS (default: 3) | Exponential backoff starting at BULLMQ_DEFAULT_BACKOFF_MS |
| HTTP Timeout | BULLMQ_HTTP_TIMEOUT_MS (default: 90s) | AbortController cancels request; job retries. Topic model uses 300s via override |
| Stall Detection | BULLMQ_STALLED_INTERVAL_MS (default: 30s) | Re-queues stalled jobs up to BULLMQ_MAX_STALLED_COUNT times |
| Validation Failure | — | Malformed worker responses fail the stage (no retry) |
| Redis Down | — | ServiceUnavailableException returned to caller; API continues serving |
| Stage Failure | — | Pipeline moves to FAILED with error message; can be inspected via status endpoint |
Adding a New Analysis Type
- Create
NewTypeProcessor extends BaseBatchProcessor(orRunPodBatchProcessorfor RunPod workers) insrc/modules/analysis/processors/ - Add
NEW_TYPE_WORKER_URLandNEW_TYPE_CONCURRENCYtosrc/configurations/env/bullmq.env.ts - Register queue in
AnalysisModule: add toBullModule.registerQueue() - Add
@InjectQueue('new-type')toPipelineOrchestratorService - Add dispatch and completion methods in
PipelineOrchestratorService - Update
PipelineStatusenum with new stage - Add worker contract doc in
docs/worker-contracts/ - Add mock endpoint in
mock-worker/server.ts