Org in a Box
Architecture

Job Queue

How the PostgreSQL-backed job queue works, including LISTEN/NOTIFY wakeup and retry logic.

Design Principles

  • PostgreSQL-native: no Redis, no RabbitMQ — one less dependency, one more thing Postgres does well
  • At-least-once delivery: jobs can be retried; handlers must be idempotent (use dedupeKey)
  • Instant wakeup: a LISTEN/NOTIFY trigger fires the moment a job is enqueued — no polling lag
  • Concurrency-safe: SELECT FOR UPDATE SKIP LOCKED ensures each job is claimed by exactly one worker

Job Kinds

KindTriggered ByHandler
agent-turnGateway inbound messageagent-turn.ts
trigger-fireCron or webhook triggertrigger-fire.ts
analytics-writeTool execution eventsanalytics-write.ts
provider-refreshScheduled provider checkprovider-refresh.ts
orchestrator-planPOST /v1/orchestrateorchestrator-plan.ts
orchestrator-subtaskAfter plan creationorchestrator-subtask.ts
orchestrator-synthesizeAll subtasks doneorchestrator-synthesize.ts
dispatch-executeAgent dispatch tool calldispatch-execute.ts
dispatch-synthesizeAll remote agents returneddispatch-synthesize.ts
embed-memoriesManual or cronembed-memories.ts

Enqueueing

import { JobsClient } from "@orginabox/core/jobs"

const jobs = new JobsClient(db)

await jobs.enqueue({
  kind: "agent-turn",
  payload: { userId, sessionId, text, channel: "slack" },
  dedupeKey: `slack:${externalId}:${messageTs}`,  // optional
  runAt: new Date(),                               // optional; default now()
  maxAttempts: 3,                                  // optional; default 3
})

The dedupeKey partial unique index prevents duplicate jobs: if a job with the same dedupeKey already exists in pending status, the insert is a no-op.

Claim & Execute Loop

while (not stopping):
  job = SELECT id FROM jobs
        WHERE status = 'pending' AND run_at <= now()
        ORDER BY run_at ASC
        FOR UPDATE SKIP LOCKED
        LIMIT 1

  if no job:
    wait for NOTIFY or idle timeout (5s)
    continue

  UPDATE jobs SET status='running', locked_by=workerId, attempts=attempts+1

  try:
    result = handler(payload, record, ctx)
    UPDATE jobs SET status='completed', result=result
  except:
    if attempts >= maxAttempts:
      UPDATE jobs SET status='failed', error=message
    else:
      delay = base * attempt^2   # exponential backoff
      UPDATE jobs SET status='pending', run_at=now()+delay

Recovery

On worker boot, any job stuck in running state for >5 minutes (worker crashed mid-job) is reset to pending automatically.

Scaling

Multiple workers can run in parallel — each worker claims different jobs because of SKIP LOCKED. The lockedBy column records which worker holds each job.

In Azure Container Apps, the worker is configured to scale from 1→3 replicas in production.

Monitoring

GET /v1/jobs?status=failed&limit=20

Returns the most recent failed jobs with their error messages. Use this to diagnose repeated failures.

On this page