Architecture
Job Queue
How the PostgreSQL-backed job queue works, including LISTEN/NOTIFY wakeup and retry logic.
Design Principles
- PostgreSQL-native: no Redis, no RabbitMQ — one less dependency, one more thing Postgres does well
- At-least-once delivery: jobs can be retried; handlers must be idempotent (use
dedupeKey) - Instant wakeup: a
LISTEN/NOTIFYtrigger fires the moment a job is enqueued — no polling lag - Concurrency-safe:
SELECT FOR UPDATE SKIP LOCKEDensures each job is claimed by exactly one worker
Job Kinds
| Kind | Triggered By | Handler |
|---|---|---|
agent-turn | Gateway inbound message | agent-turn.ts |
trigger-fire | Cron or webhook trigger | trigger-fire.ts |
analytics-write | Tool execution events | analytics-write.ts |
provider-refresh | Scheduled provider check | provider-refresh.ts |
orchestrator-plan | POST /v1/orchestrate | orchestrator-plan.ts |
orchestrator-subtask | After plan creation | orchestrator-subtask.ts |
orchestrator-synthesize | All subtasks done | orchestrator-synthesize.ts |
dispatch-execute | Agent dispatch tool call | dispatch-execute.ts |
dispatch-synthesize | All remote agents returned | dispatch-synthesize.ts |
embed-memories | Manual or cron | embed-memories.ts |
Enqueueing
import { JobsClient } from "@orginabox/core/jobs"
const jobs = new JobsClient(db)
await jobs.enqueue({
kind: "agent-turn",
payload: { userId, sessionId, text, channel: "slack" },
dedupeKey: `slack:${externalId}:${messageTs}`, // optional
runAt: new Date(), // optional; default now()
maxAttempts: 3, // optional; default 3
})
The dedupeKey partial unique index prevents duplicate jobs: if a job with the same dedupeKey already exists in pending status, the insert is a no-op.
Claim & Execute Loop
while (not stopping):
job = SELECT id FROM jobs
WHERE status = 'pending' AND run_at <= now()
ORDER BY run_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
if no job:
wait for NOTIFY or idle timeout (5s)
continue
UPDATE jobs SET status='running', locked_by=workerId, attempts=attempts+1
try:
result = handler(payload, record, ctx)
UPDATE jobs SET status='completed', result=result
except:
if attempts >= maxAttempts:
UPDATE jobs SET status='failed', error=message
else:
delay = base * attempt^2 # exponential backoff
UPDATE jobs SET status='pending', run_at=now()+delay
Recovery
On worker boot, any job stuck in running state for >5 minutes (worker crashed mid-job) is reset to pending automatically.
Scaling
Multiple workers can run in parallel — each worker claims different jobs because of SKIP LOCKED. The lockedBy column records which worker holds each job.
In Azure Container Apps, the worker is configured to scale from 1→3 replicas in production.
Monitoring
GET /v1/jobs?status=failed&limit=20
Returns the most recent failed jobs with their error messages. Use this to diagnose repeated failures.
