Analysis Job Processing
BullMQ job processing flows — pipeline-driven, ad-hoc, batch enqueue, sentiment gate, and resilience.
The analysis system processes qualitative feedback through a multi-stage pipeline, dispatching batch jobs to external HTTP workers via BullMQ queues.
Pipeline-Driven Flow
Ad-Hoc Job Flow
Individual jobs (e.g., embedding backfill) still use the original single-item AnalysisService.EnqueueJob() path:
Batch Enqueue Flow
Sentiment Gate
Deduplication
Pipeline jobs use deterministic IDs: ${pipelineId}--${type}. Ad-hoc jobs use: ${submissionId}--${type}. If the same pipeline/submission + analysis type is enqueued twice, BullMQ silently rejects the duplicate.
Note: BullMQ does not allow
:in custom job IDs. The--separator is used instead. Chunked sentiment jobs append the chunk index — see § Chunked Sentiment Dispatch.
Chunked Sentiment Dispatch
When a pipeline confirms, the orchestrator splits the in-scope submissions into N chunks of SENTIMENT_CHUNK_SIZE (default 50) using chunkSubmissionsForSentiment() and enqueues one BullMQ job per chunk. This avoids 504 timeouts on large scopes (e.g., 800+ comments at the campus tier) — the previous single-batch dispatch could exceed the worker's HTTP timeout in a single request.
Job envelope
Each chunk job carries the same BatchAnalysisJobMessage envelope, with two additional optional metadata fields:
{
// ...standard fields
metadata: {
pipelineId: string;
runId: string;
chunkIndex?: number; // 0-based
chunkCount?: number; // total chunks for this run
},
vllmConfig?: { url, model, enabled }; // optional, see ai-inference-pipeline.md
}The envelope is .strict() — any unexpected key fails validation at dispatch.
Counter-based completion
SentimentRun gains two columns: expectedChunks (set when the run is created) and completedChunks (incremented per chunk). Each chunk's Persist():
- Inserts its
SentimentResultrows. UPDATE sentiment_run SET completed_chunks = completed_chunks + 1— atomic counter increment in the same transaction.- If
completedChunks === expectedChunks, marks the runCOMPLETEDand callsOnSentimentComplete()to advance the pipeline.
The processor passes tx.getTransactionContext() into execute() so the entity insert and the counter bump commit together. Partial chunk failures retry per-chunk; a failed chunk does not roll back its peers.
Duplicate-swallowed retry safety
If OnSentimentComplete() succeeds but the BullMQ job's post-commit acknowledgment fails (e.g., Redis hiccup), the chunk retries. The retry's insert hits the run's full unique index ((run_id, submission_id) — converted from a partial index in migration 20260417120000 so soft-deleted rows are also de-duplicated) and is treated as duplicate-swallowed. The processor re-reads the counter and re-fires OnSentimentComplete() only when the run is genuinely saturated, so completion is at-least-once but the downstream effect is idempotent.
The unique-index conversion required a preflight duplicate check in the migration — if any (run_id, submission_id) pairs exist across live + soft-deleted rows, the migration aborts so the operator can investigate before re-running.
Resilience
| Mechanism | Configuration | Behavior |
|---|---|---|
| Retry | BULLMQ_DEFAULT_ATTEMPTS (default: 3) | Exponential backoff starting at BULLMQ_DEFAULT_BACKOFF_MS |
| HTTP Timeout | BULLMQ_HTTP_TIMEOUT_MS (default: 90s) | AbortController cancels request; job retries. Topic model uses 300s via override |
| Stall Detection | BULLMQ_STALLED_INTERVAL_MS (default: 30s) | Re-queues stalled jobs up to BULLMQ_MAX_STALLED_COUNT times |
| Validation Failure | — | Malformed worker responses fail the stage (no retry) |
| LLM Hallucination | — | Results for undispatched submissionIds are dropped with a warn log; if all results are dropped, the stage fails terminally via OnStageFailed (no retry, since retrying the LLM is likely to produce more hallucinations) |
| Redis Down | — | ServiceUnavailableException returned to caller; API continues serving |
| Stage Failure | — | Pipeline moves to FAILED with error message; can be inspected via status endpoint |
Adding a New Analysis Type
- Create
NewTypeProcessor extends BaseBatchProcessor(orRunPodBatchProcessorfor RunPod workers) insrc/modules/analysis/processors/ - Add
NEW_TYPE_WORKER_URLandNEW_TYPE_CONCURRENCYtosrc/configurations/env/bullmq.env.ts - Register queue in
AnalysisModule: add toBullModule.registerQueue() - Add
@InjectQueue('new-type')toPipelineOrchestratorService - Add dispatch and completion methods in
PipelineOrchestratorService - Update
PipelineStatusenum with new stage - Add worker contract doc in
docs/worker-contracts/ - Add mock endpoint in
mock-worker/server.ts