Long-Running Server
The Process Jobs page covers processing jobs in a serverless environment using cron-triggered API routes. If you're running a long-lived server (Express, Fastify, plain Node.js, etc.), you can instead run the processor continuously in the background and handle lifecycle management yourself.
Starting the Processor in the Background
Use startInBackground() to run the processor as a continuous polling loop. It will check for new jobs every pollInterval milliseconds (default: 5 seconds) and process them automatically.
import { initJobQueue } from '@nicnocquee/dataqueue';
import { jobHandlers } from './job-handlers';
const jobQueue = initJobQueue({
databaseConfig: {
connectionString: process.env.PG_DATAQUEUE_DATABASE,
},
});
const processor = jobQueue.createProcessor(jobHandlers, {
workerId: `server-${process.pid}`,
batchSize: 10,
concurrency: 3,
pollInterval: 5000, // check for new jobs every 5 seconds
onError: (error) => {
// Called when an unexpected error occurs during batch processing.
// Use this to send errors to your monitoring service.
console.error('Processor error:', error);
},
});
processor.startInBackground();When a full batch is returned (i.e., the number of processed jobs equals batchSize), the processor immediately fetches the next batch when the current batch is finished, so it can drain a large backlog quickly. Once a batch returns fewer jobs than batchSize, it waits pollInterval before fetching the next batch.
Configuration Tips
pollInterval-- Lower values (e.g.,1000) reduce latency for new jobs but increase database load. Higher values (e.g.,10000) are gentler on the database but introduce more delay. 5 seconds is a good default.concurrency-- Keep this proportional to your server's resources. If jobs call external APIs with rate limits, keep it low.batchSize-- Larger batches reduce polling overhead but hold a database lock longer during claim. 10-20 is typical.onError-- Always set this in production. Without it, errors default toconsole.errorwhich is easy to miss.
Graceful Shutdown
When your server receives a termination signal (e.g., SIGTERM from a container orchestrator), you should stop the processor and wait for in-flight jobs to finish before exiting. Use stopAndDrain() for this.
async function shutdown() {
console.log('Shutting down...');
// Stop polling and wait for the current batch to finish (up to 30 seconds)
await processor.stopAndDrain(30000);
// Close the database connection pool
// PostgreSQL:
jobQueue.getPool().end();
// Redis:
// jobQueue.getRedisClient().quit();
console.log('Shutdown complete');
process.exit(0);
}
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);stopAndDrain() accepts an optional timeout in milliseconds (default: 30000). If the current batch does not finish within that time, the promise resolves anyway so your process is not stuck indefinitely.
Use stopAndDrain() instead of stop() for graceful shutdown. stop() halts
the polling loop immediately without waiting for in-flight jobs, which can
leave jobs stuck in the processing state until they are reclaimed.
Background Supervisor
In production, you need periodic maintenance: reclaiming stuck jobs, cleaning up old completed jobs/events, and expiring timed-out waitpoint tokens. The supervisor automates all of this.
const supervisor = jobQueue.createSupervisor({
intervalMs: 60_000, // run maintenance every 60 seconds
stuckJobsTimeoutMinutes: 10, // reclaim jobs stuck > 10 minutes
cleanupJobsDaysToKeep: 30, // delete completed jobs older than 30 days
cleanupEventsDaysToKeep: 30, // delete job events older than 30 days
onError: (error) => {
console.error('Supervisor error:', error);
},
});
supervisor.startInBackground();The supervisor runs independently from the processor. Each maintenance task is isolated -- if one fails, the others still run. All operations are idempotent, so it's safe to run multiple supervisor instances across a cluster.
Supervisor Options
| Option | Default | Description |
|---|---|---|
intervalMs | 60000 | How often the maintenance loop runs (ms) |
stuckJobsTimeoutMinutes | 10 | Reclaim jobs stuck in processing longer than this |
cleanupJobsDaysToKeep | 30 | Delete completed jobs older than this (days). 0 to disable |
cleanupEventsDaysToKeep | 30 | Delete job events older than this (days). 0 to disable |
cleanupBatchSize | 1000 | Batch size for cleanup deletions |
reclaimStuckJobs | true | Enable/disable stuck job reclaiming |
expireTimedOutTokens | true | Enable/disable waitpoint token expiry |
onError | console.error | Called when a maintenance task throws |
verbose | false | Enable verbose logging |
Graceful Shutdown with the Supervisor
When shutting down, drain both the processor and the supervisor:
async function shutdown() {
console.log('Shutting down...');
await Promise.all([
processor.stopAndDrain(30000),
supervisor.stopAndDrain(30000),
]);
jobQueue.getPool().end();
console.log('Shutdown complete');
process.exit(0);
}
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);One-Shot Mode (Serverless)
If you prefer to run maintenance from a cron-triggered API route or a serverless function, use start() instead of startInBackground(). It runs all tasks once and returns the results:
const supervisor = jobQueue.createSupervisor();
const result = await supervisor.start();
// { reclaimedJobs: 3, cleanedUpJobs: 120, cleanedUpEvents: 45, expiredTokens: 0 }Manual Maintenance
The supervisor above is the recommended approach. Manual maintenance is still supported if you need fine-grained control over individual tasks.
In a serverless setup, you use cron-triggered API routes for cleanup and reclaim. In a long-running server, you can use setInterval instead.
// Reclaim stuck jobs every 10 minutes
const reclaimInterval = setInterval(
async () => {
try {
const reclaimed = await jobQueue.reclaimStuckJobs(10);
if (reclaimed > 0) console.log(`Reclaimed ${reclaimed} stuck jobs`);
} catch (error) {
console.error('Reclaim error:', error);
}
},
10 * 60 * 1000,
);
// Clean up completed jobs older than 30 days, once per day
const cleanupInterval = setInterval(
async () => {
try {
const deleted = await jobQueue.cleanupOldJobs(30);
if (deleted > 0) console.log(`Cleaned up ${deleted} old jobs`);
const deletedEvents = await jobQueue.cleanupOldJobEvents(30);
if (deletedEvents > 0)
console.log(`Cleaned up ${deletedEvents} old job events`);
} catch (error) {
console.error('Cleanup error:', error);
}
},
24 * 60 * 60 * 1000,
);Make sure to clear these intervals during shutdown:
async function shutdown() {
clearInterval(reclaimInterval);
clearInterval(cleanupInterval);
await processor.stopAndDrain(30000);
jobQueue.getPool().end();
process.exit(0);
}Full Example
Here is a complete Express server that ties everything together:
import express from 'express';
import { initJobQueue } from '@nicnocquee/dataqueue';
import { jobHandlers } from './job-handlers';
// --- Initialize the queue ---
const jobQueue = initJobQueue({
databaseConfig: {
connectionString: process.env.PG_DATAQUEUE_DATABASE!,
},
});
// --- Create and start the processor ---
const processor = jobQueue.createProcessor(jobHandlers, {
workerId: `server-${process.pid}`,
batchSize: 10,
concurrency: 3,
pollInterval: 5000,
onError: (error) => {
console.error('Processor error:', error);
},
});
processor.startInBackground();
// --- Start the background supervisor ---
const supervisor = jobQueue.createSupervisor({
intervalMs: 60_000,
stuckJobsTimeoutMinutes: 10,
cleanupJobsDaysToKeep: 30,
cleanupEventsDaysToKeep: 30,
onError: (error) => {
console.error('Supervisor error:', error);
},
});
supervisor.startInBackground();
// --- Express app ---
const app = express();
app.use(express.json());
app.post('/jobs', async (req, res) => {
const { jobType, payload } = req.body;
const jobId = await jobQueue.addJob({ jobType, payload });
res.json({ jobId });
});
app.get('/jobs/:id', async (req, res) => {
const job = await jobQueue.getJob(Number(req.params.id));
if (!job) return res.status(404).json({ error: 'Not found' });
res.json(job);
});
const server = app.listen(3000, () => {
console.log('Server running on port 3000');
});
// --- Graceful shutdown ---
async function shutdown() {
console.log('Shutting down gracefully...');
// Stop accepting new HTTP connections
server.close();
// Wait for in-flight work to finish
await Promise.all([
processor.stopAndDrain(30000),
supervisor.stopAndDrain(30000),
]);
// Close the database pool
jobQueue.getPool().end();
console.log('Shutdown complete');
process.exit(0);
}
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);