Faculytics Docs

Analysis Job Processing

BullMQ job processing flows — pipeline-driven, ad-hoc, batch enqueue, sentiment gate, and resilience.

The analysis system processes qualitative feedback through a multi-stage pipeline, dispatching batch jobs to external HTTP workers via BullMQ queues.

Pipeline-Driven Flow

Ad-Hoc Job Flow

Individual jobs (e.g., embedding backfill) still use the original single-item AnalysisService.EnqueueJob() path:

Batch Enqueue Flow

Sentiment Gate

Deduplication

Pipeline jobs use deterministic IDs: ${pipelineId}--${type}. Ad-hoc jobs use: ${submissionId}--${type}. If the same pipeline/submission + analysis type is enqueued twice, BullMQ silently rejects the duplicate.

Note: BullMQ does not allow : in custom job IDs. The -- separator is used instead. Chunked sentiment jobs append the chunk index — see § Chunked Sentiment Dispatch.

Chunked Sentiment Dispatch

When a pipeline confirms, the orchestrator splits the in-scope submissions into N chunks of SENTIMENT_CHUNK_SIZE (default 50) using chunkSubmissionsForSentiment() and enqueues one BullMQ job per chunk. This avoids 504 timeouts on large scopes (e.g., 800+ comments at the campus tier) — the previous single-batch dispatch could exceed the worker's HTTP timeout in a single request.

Job envelope

Each chunk job carries the same BatchAnalysisJobMessage envelope, with two additional optional metadata fields:

{
  // ...standard fields
  metadata: {
    pipelineId: string;
    runId: string;
    chunkIndex?: number;  // 0-based
    chunkCount?: number;  // total chunks for this run
  },
  vllmConfig?: { url, model, enabled };  // optional, see ai-inference-pipeline.md
}

The envelope is .strict() — any unexpected key fails validation at dispatch.

Counter-based completion

SentimentRun gains two columns: expectedChunks (set when the run is created) and completedChunks (incremented per chunk). Each chunk's Persist():

  1. Inserts its SentimentResult rows.
  2. UPDATE sentiment_run SET completed_chunks = completed_chunks + 1 — atomic counter increment in the same transaction.
  3. If completedChunks === expectedChunks, marks the run COMPLETED and calls OnSentimentComplete() to advance the pipeline.

The processor passes tx.getTransactionContext() into execute() so the entity insert and the counter bump commit together. Partial chunk failures retry per-chunk; a failed chunk does not roll back its peers.

Duplicate-swallowed retry safety

If OnSentimentComplete() succeeds but the BullMQ job's post-commit acknowledgment fails (e.g., Redis hiccup), the chunk retries. The retry's insert hits the run's full unique index ((run_id, submission_id) — converted from a partial index in migration 20260417120000 so soft-deleted rows are also de-duplicated) and is treated as duplicate-swallowed. The processor re-reads the counter and re-fires OnSentimentComplete() only when the run is genuinely saturated, so completion is at-least-once but the downstream effect is idempotent.

The unique-index conversion required a preflight duplicate check in the migration — if any (run_id, submission_id) pairs exist across live + soft-deleted rows, the migration aborts so the operator can investigate before re-running.

Resilience

MechanismConfigurationBehavior
RetryBULLMQ_DEFAULT_ATTEMPTS (default: 3)Exponential backoff starting at BULLMQ_DEFAULT_BACKOFF_MS
HTTP TimeoutBULLMQ_HTTP_TIMEOUT_MS (default: 90s)AbortController cancels request; job retries. Topic model uses 300s via override
Stall DetectionBULLMQ_STALLED_INTERVAL_MS (default: 30s)Re-queues stalled jobs up to BULLMQ_MAX_STALLED_COUNT times
Validation FailureMalformed worker responses fail the stage (no retry)
LLM HallucinationResults for undispatched submissionIds are dropped with a warn log; if all results are dropped, the stage fails terminally via OnStageFailed (no retry, since retrying the LLM is likely to produce more hallucinations)
Redis DownServiceUnavailableException returned to caller; API continues serving
Stage FailurePipeline moves to FAILED with error message; can be inspected via status endpoint

Adding a New Analysis Type

  1. Create NewTypeProcessor extends BaseBatchProcessor (or RunPodBatchProcessor for RunPod workers) in src/modules/analysis/processors/
  2. Add NEW_TYPE_WORKER_URL and NEW_TYPE_CONCURRENCY to src/configurations/env/bullmq.env.ts
  3. Register queue in AnalysisModule: add to BullModule.registerQueue()
  4. Add @InjectQueue('new-type') to PipelineOrchestratorService
  5. Add dispatch and completion methods in PipelineOrchestratorService
  6. Update PipelineStatus enum with new stage
  7. Add worker contract doc in docs/worker-contracts/
  8. Add mock endpoint in mock-worker/server.ts