DataQueueDataQueue
Usage

Long-Running Server

The Process Jobs page covers processing jobs in a serverless environment using cron-triggered API routes. If you're running a long-lived server (Express, Fastify, plain Node.js, etc.), you can instead run the processor continuously in the background and handle lifecycle management yourself.

Starting the Processor in the Background

Use startInBackground() to run the processor as a continuous polling loop. It will check for new jobs every pollInterval milliseconds (default: 5 seconds) and process them automatically.

import { initJobQueue } from '@nicnocquee/dataqueue';
import { jobHandlers } from './job-handlers';

const jobQueue = initJobQueue({
  databaseConfig: {
    connectionString: process.env.PG_DATAQUEUE_DATABASE,
  },
});

const processor = jobQueue.createProcessor(jobHandlers, {
  workerId: `server-${process.pid}`,
  batchSize: 10,
  concurrency: 3,
  pollInterval: 5000, // check for new jobs every 5 seconds
  onError: (error) => {
    // Called when an unexpected error occurs during batch processing.
    // Use this to send errors to your monitoring service.
    console.error('Processor error:', error);
  },
});

processor.startInBackground();

When a full batch is returned (i.e., the number of processed jobs equals batchSize), the processor immediately fetches the next batch when the current batch is finished, so it can drain a large backlog quickly. Once a batch returns fewer jobs than batchSize, it waits pollInterval before fetching the next batch.

Configuration Tips

  • pollInterval -- Lower values (e.g., 1000) reduce latency for new jobs but increase database load. Higher values (e.g., 10000) are gentler on the database but introduce more delay. 5 seconds is a good default.
  • concurrency -- Keep this proportional to your server's resources. If jobs call external APIs with rate limits, keep it low.
  • batchSize -- Larger batches reduce polling overhead but hold a database lock longer during claim. 10-20 is typical.
  • onError -- Always set this in production. Without it, errors default to console.error which is easy to miss.

Graceful Shutdown

When your server receives a termination signal (e.g., SIGTERM from a container orchestrator), you should stop the processor and wait for in-flight jobs to finish before exiting. Use stopAndDrain() for this.

async function shutdown() {
  console.log('Shutting down...');

  // Stop polling and wait for the current batch to finish (up to 30 seconds)
  await processor.stopAndDrain(30000);

  // Close the database connection pool
  // PostgreSQL:
  jobQueue.getPool().end();
  // Redis:
  // jobQueue.getRedisClient().quit();

  console.log('Shutdown complete');
  process.exit(0);
}

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

stopAndDrain() accepts an optional timeout in milliseconds (default: 30000). If the current batch does not finish within that time, the promise resolves anyway so your process is not stuck indefinitely.

Use stopAndDrain() instead of stop() for graceful shutdown. stop() halts the polling loop immediately without waiting for in-flight jobs, which can leave jobs stuck in the processing state until they are reclaimed.

Background Supervisor

In production, you need periodic maintenance: reclaiming stuck jobs, cleaning up old completed jobs/events, and expiring timed-out waitpoint tokens. The supervisor automates all of this.

const supervisor = jobQueue.createSupervisor({
  intervalMs: 60_000, // run maintenance every 60 seconds
  stuckJobsTimeoutMinutes: 10, // reclaim jobs stuck > 10 minutes
  cleanupJobsDaysToKeep: 30, // delete completed jobs older than 30 days
  cleanupEventsDaysToKeep: 30, // delete job events older than 30 days
  onError: (error) => {
    console.error('Supervisor error:', error);
  },
});

supervisor.startInBackground();

The supervisor runs independently from the processor. Each maintenance task is isolated -- if one fails, the others still run. All operations are idempotent, so it's safe to run multiple supervisor instances across a cluster.

Supervisor Options

OptionDefaultDescription
intervalMs60000How often the maintenance loop runs (ms)
stuckJobsTimeoutMinutes10Reclaim jobs stuck in processing longer than this
cleanupJobsDaysToKeep30Delete completed jobs older than this (days). 0 to disable
cleanupEventsDaysToKeep30Delete job events older than this (days). 0 to disable
cleanupBatchSize1000Batch size for cleanup deletions
reclaimStuckJobstrueEnable/disable stuck job reclaiming
expireTimedOutTokenstrueEnable/disable waitpoint token expiry
onErrorconsole.errorCalled when a maintenance task throws
verbosefalseEnable verbose logging

Graceful Shutdown with the Supervisor

When shutting down, drain both the processor and the supervisor:

async function shutdown() {
  console.log('Shutting down...');

  await Promise.all([
    processor.stopAndDrain(30000),
    supervisor.stopAndDrain(30000),
  ]);

  jobQueue.getPool().end();
  console.log('Shutdown complete');
  process.exit(0);
}

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

One-Shot Mode (Serverless)

If you prefer to run maintenance from a cron-triggered API route or a serverless function, use start() instead of startInBackground(). It runs all tasks once and returns the results:

const supervisor = jobQueue.createSupervisor();
const result = await supervisor.start();
// { reclaimedJobs: 3, cleanedUpJobs: 120, cleanedUpEvents: 45, expiredTokens: 0 }

Manual Maintenance

The supervisor above is the recommended approach. Manual maintenance is still supported if you need fine-grained control over individual tasks.

In a serverless setup, you use cron-triggered API routes for cleanup and reclaim. In a long-running server, you can use setInterval instead.

// Reclaim stuck jobs every 10 minutes
const reclaimInterval = setInterval(
  async () => {
    try {
      const reclaimed = await jobQueue.reclaimStuckJobs(10);
      if (reclaimed > 0) console.log(`Reclaimed ${reclaimed} stuck jobs`);
    } catch (error) {
      console.error('Reclaim error:', error);
    }
  },
  10 * 60 * 1000,
);

// Clean up completed jobs older than 30 days, once per day
const cleanupInterval = setInterval(
  async () => {
    try {
      const deleted = await jobQueue.cleanupOldJobs(30);
      if (deleted > 0) console.log(`Cleaned up ${deleted} old jobs`);

      const deletedEvents = await jobQueue.cleanupOldJobEvents(30);
      if (deletedEvents > 0)
        console.log(`Cleaned up ${deletedEvents} old job events`);
    } catch (error) {
      console.error('Cleanup error:', error);
    }
  },
  24 * 60 * 60 * 1000,
);

Make sure to clear these intervals during shutdown:

async function shutdown() {
  clearInterval(reclaimInterval);
  clearInterval(cleanupInterval);

  await processor.stopAndDrain(30000);

  jobQueue.getPool().end();
  process.exit(0);
}

Full Example

Here is a complete Express server that ties everything together:

server.ts
import express from 'express';
import { initJobQueue } from '@nicnocquee/dataqueue';
import { jobHandlers } from './job-handlers';

// --- Initialize the queue ---
const jobQueue = initJobQueue({
  databaseConfig: {
    connectionString: process.env.PG_DATAQUEUE_DATABASE!,
  },
});

// --- Create and start the processor ---
const processor = jobQueue.createProcessor(jobHandlers, {
  workerId: `server-${process.pid}`,
  batchSize: 10,
  concurrency: 3,
  pollInterval: 5000,
  onError: (error) => {
    console.error('Processor error:', error);
  },
});

processor.startInBackground();

// --- Start the background supervisor ---
const supervisor = jobQueue.createSupervisor({
  intervalMs: 60_000,
  stuckJobsTimeoutMinutes: 10,
  cleanupJobsDaysToKeep: 30,
  cleanupEventsDaysToKeep: 30,
  onError: (error) => {
    console.error('Supervisor error:', error);
  },
});

supervisor.startInBackground();

// --- Express app ---
const app = express();
app.use(express.json());

app.post('/jobs', async (req, res) => {
  const { jobType, payload } = req.body;
  const jobId = await jobQueue.addJob({ jobType, payload });
  res.json({ jobId });
});

app.get('/jobs/:id', async (req, res) => {
  const job = await jobQueue.getJob(Number(req.params.id));
  if (!job) return res.status(404).json({ error: 'Not found' });
  res.json(job);
});

const server = app.listen(3000, () => {
  console.log('Server running on port 3000');
});

// --- Graceful shutdown ---
async function shutdown() {
  console.log('Shutting down gracefully...');

  // Stop accepting new HTTP connections
  server.close();

  // Wait for in-flight work to finish
  await Promise.all([
    processor.stopAndDrain(30000),
    supervisor.stopAndDrain(30000),
  ]);

  // Close the database pool
  jobQueue.getPool().end();

  console.log('Shutdown complete');
  process.exit(0);
}

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);