← Back to Modules

dbbasic-queue Specification

Version: 1.0 Status: Specification Author: DBBasic Project Date: October 2025

Links: - PyPI: https://pypi.org/project/dbbasic-queue/ - GitHub: https://github.com/askrobots/dbbasic-queue - Specification: http://dbbasic.com/queue-spec


Philosophy

"Store work, not workers. Queue jobs, not processes."

Background jobs are actual work to be done, not temporary state. Unlike sessions (which are ephemeral authentication), jobs need persistent storage, retry logic, and failure handling.

Design Principles

  1. Foundation-First: Build on dbbasic-tsv
  2. Simple: ~50 lines of code
  3. Reliable: Retry on failure, preserve failed jobs
  4. Debuggable: Plain text TSV, inspect with cat/grep
  5. Unix-Compatible: Cron-based workers, no daemon

Architecture Decision History

The Unix/CGI Background Job Pattern

Traditional CGI approach:

#!/bin/bash
# CGI script

# Queue job by backgrounding
{
    sleep 5
    send_email user@example.com
} &

# Return immediately
echo "Content-Type: text/html"
echo ""
echo "Email queued!"

Why this was common: - Simple (just add &) - No setup needed - Works for light load

Why this fails:

Problems:
- 1000 concurrent jobs = 1000 background processes
- No retry on failure
- No logging of failures
- No backoff/throttling
- Process table fills up
- No job inspection
- No way to cancel jobs

This worked fine for low-traffic sites but breaks under load.


The Distributed Computing Problem

Unix queue patterns (1970s-1990s):

Mail queue (mbox format):

/var/mail/username  - Single file, append messages

Mail queue (Maildir format):

~/Maildir/
  new/      - One file per message
  cur/
  tmp/

At/batch queues:

/var/spool/at/  - One file per scheduled job

Why these work: - Single server - Local filesystem - Local cron - No coordination needed

When the web scaled (2000s):

Problem: Multiple servers

Server 1: /queue/pending/job1.json
Server 2: /queue/pending/job2.json
Server 3: ???

Where is the queue?

Attempted solutions: 1. NFS: Shared /queue/ across servers - Problem: File locking over network - Problem: Slow, race conditions - Problem: Complex, unreliable

  1. Database: Jobs table
  2. Problem: Connection pooling overhead
  3. Problem: Locking, transactions
  4. Problem: Heavier than needed

  5. Redis/RabbitMQ: Network queue

  6. Solution: Actually works
  7. Problem: Another service to run
  8. Problem: More complexity than single-server needs

The Unix failure: Filesystem-based patterns don't distribute. We abandoned them entirely, even for single-server apps.

The lesson: Unix patterns are perfect for single-server. Graduate to distributed systems when you actually need multiple servers.


Approaches Considered

Option 1: Maildir Pattern (One File Per Job)

/queue/pending/job1.json
/queue/pending/job2.json
/queue/processing/job3.json

Problems: - Filesystem inflation (1000s of small files) - Directory bloat - locatedb indexing overhead - No easy "list all failed jobs" - File cleanup complexity

Option 2: Background Processes (CGI Pattern)

{ do_work } &  # Just background it

Problems: - No retry logic - No failure logging - Process table fills up - Can't inspect queued jobs - No throttling

Option 3: Redis/Celery

from celery import Celery
app.task(send_email.delay(user))

Problems: - Requires Redis server - Complex setup - Overkill for single server - Not building on dbbasic foundation

Option 4: TSV File (CHOSEN)

data/queue.tsv
id  type    payload status  attempts    run_at

Why this wins: - One file, not thousands - Builds on dbbasic-tsv - Plain text, debuggable - Query/filter easily - Retry logic built in - No filesystem bloat - No locatedb impact


What Is a Queue?

Not like sessions: - Sessions = temporary auth state ("are you logged in?") - Queue = actual work to do ("send this email")

Jobs are persistent data: - Job definition (what to do) - Payload (parameters) - Status (pending/processing/completed/failed) - Retry count - Error messages - Results

You can't "compute" a job - you must store it.


Storage Format

Single TSV File

data/queue.tsv

Columns:

id  type    payload status  created_at  run_at  attempts    error   result

Example:

1   send_email  {"to":"user@example.com","subject":"Welcome"}   completed   1696886400  1696886400  1   null    sent
2   process_video   {"video_id":42} processing  1696886401  1696886401  1   null    null
3   generate_report {"user_id":99}  failed  1696886000  1696886000  3   Timeout null
4   send_sms    {"phone":"+1234567890"} pending 1696886500  1696886500  0   null    null

Status values: - pending - Not started yet - processing - Currently running - completed - Success - failed - Failed after max retries

Why TSV: - One file (no filesystem bloat) - Append new jobs (fast) - Query by status (dbbasic-tsv indexed) - Plain text (debug with cat/grep) - Update status in place


API Specification

Function: enqueue(job_type, payload, run_at=None)

Purpose: Add job to queue

Parameters: - job_type (str): Job handler name (e.g., 'send_email') - payload (dict): Job parameters - run_at (int, optional): Unix timestamp to run job (default: now)

Returns: - job_id (str): Unique job identifier

Behavior: 1. Generate unique job ID 2. Serialize payload to JSON 3. Append row to queue.tsv: [id, type, payload, 'pending', now, run_at, 0, null, null] 4. Return job_id

Example:

from dbbasic_queue import enqueue

# Run immediately
job_id = enqueue('send_email', {
    'to': 'user@example.com',
    'subject': 'Welcome',
    'body': 'Thanks for signing up!'
})

# Run later (delayed job)
job_id = enqueue('generate_report',
    {'user_id': 42},
    run_at=time.time() + 3600  # 1 hour from now
)

Function: process_jobs(handlers, max_attempts=3)

Purpose: Process pending jobs (run by worker)

Parameters: - handlers (dict): Map of job_type → handler function - max_attempts (int): Max retry attempts before marking failed

Returns: - None (processes jobs until none pending)

Behavior: 1. Query queue.tsv for jobs where status='pending' AND run_at <= now 2. For each job: - Update status to 'processing' - Call handler function with payload - If success: Update status to 'completed', store result - If failure: Increment attempts - If attempts < max_attempts: Reset to 'pending' with backoff - If attempts >= max_attempts: Update status to 'failed', store error 3. Repeat until no pending jobs

Example:

from dbbasic_queue import process_jobs

def send_email_handler(payload):
    """Handler for send_email jobs"""
    import smtplib
    # ... send email using payload['to'], payload['subject'], etc.
    return True  # Success

def process_video_handler(payload):
    """Handler for process_video jobs"""
    video_id = payload['video_id']
    # ... process video
    return {'output_url': 'https://...'}

# Worker script (run by cron)
if __name__ == '__main__':
    handlers = {
        'send_email': send_email_handler,
        'process_video': process_video_handler,
    }
    process_jobs(handlers, max_attempts=3)

Function: get_job(job_id)

Purpose: Get job status and details

Parameters: - job_id (str): Job identifier

Returns: - job (dict): Job details or None if not found

Example:

job = get_job(job_id)
if job['status'] == 'completed':
    print(f"Result: {job['result']}")
elif job['status'] == 'failed':
    print(f"Error: {job['error']}")

Function: cancel_job(job_id)

Purpose: Cancel pending job

Parameters: - job_id (str): Job identifier

Returns: - bool: True if cancelled, False if already processing/completed

Example:

if cancel_job(job_id):
    print("Job cancelled")

Implementation

Core Implementation (~50 lines)

import time
import json
import secrets
from dbbasic_tsv import append, query, update

QUEUE_FILE = 'data/queue.tsv'
MAX_ATTEMPTS = 3

def enqueue(job_type, payload, run_at=None):
    """Add job to queue"""
    job_id = secrets.token_hex(8)
    now = int(time.time())
    run_at = run_at or now

    append(QUEUE_FILE, [
        job_id,
        job_type,
        json.dumps(payload),
        'pending',
        str(now),
        str(run_at),
        '0',  # attempts
        '',   # error
        ''    # result
    ])
    return job_id

def process_jobs(handlers, max_attempts=MAX_ATTEMPTS):
    """Process pending jobs (worker)"""
    now = int(time.time())

    # Get pending jobs that are ready to run
    jobs = query(QUEUE_FILE, lambda row:
        row[3] == 'pending' and int(row[5]) <= now
    )

    for job in jobs:
        job_id, job_type, payload_json, status, created, run_at, attempts, error, result = job
        attempts = int(attempts)

        # Update to processing
        update(QUEUE_FILE,
            lambda r: r[0] == job_id,
            lambda r: [r[0], r[1], r[2], 'processing', r[4], r[5], r[6], r[7], r[8]]
        )

        # Execute job
        try:
            handler = handlers.get(job_type)
            if not handler:
                raise Exception(f"No handler for {job_type}")

            payload = json.loads(payload_json)
            result = handler(payload)

            # Success
            update(QUEUE_FILE,
                lambda r: r[0] == job_id,
                lambda r: [r[0], r[1], r[2], 'completed', r[4], r[5], str(attempts + 1), '', json.dumps(result)]
            )
        except Exception as e:
            attempts += 1

            if attempts >= max_attempts:
                # Failed permanently
                update(QUEUE_FILE,
                    lambda r: r[0] == job_id,
                    lambda r: [r[0], r[1], r[2], 'failed', r[4], r[5], str(attempts), str(e), '']
                )
            else:
                # Retry with backoff
                backoff = 60 * (2 ** attempts)  # Exponential backoff
                new_run_at = int(time.time()) + backoff
                update(QUEUE_FILE,
                    lambda r: r[0] == job_id,
                    lambda r: [r[0], r[1], r[2], 'pending', r[4], str(new_run_at), str(attempts), str(e), '']
                )

def get_job(job_id):
    """Get job details"""
    jobs = query(QUEUE_FILE, lambda row: row[0] == job_id)
    if not jobs:
        return None
    job = jobs[0]
    return {
        'id': job[0],
        'type': job[1],
        'payload': json.loads(job[2]),
        'status': job[3],
        'created_at': int(job[4]),
        'run_at': int(job[5]),
        'attempts': int(job[6]),
        'error': job[7] or None,
        'result': json.loads(job[8]) if job[8] else None
    }

def cancel_job(job_id):
    """Cancel pending job"""
    jobs = query(QUEUE_FILE, lambda row: row[0] == job_id and row[3] == 'pending')
    if not jobs:
        return False

    update(QUEUE_FILE,
        lambda r: r[0] == job_id,
        lambda r: [r[0], r[1], r[2], 'cancelled', r[4], r[5], r[6], 'Cancelled by user', '']
    )
    return True

That's it. 50 lines.


Dependencies


Usage Examples

Web Application Integration

from flask import Flask, request, jsonify
from dbbasic_queue import enqueue, get_job

app = Flask(__name__)

@app.route('/api/send-email', methods=['POST'])
def send_email():
    """API endpoint that queues email job"""
    data = request.json

    # Queue job instead of sending immediately
    job_id = enqueue('send_email', {
        'to': data['to'],
        'subject': data['subject'],
        'body': data['body']
    })

    # Return immediately (don't wait for email to send)
    return jsonify({'job_id': job_id, 'status': 'queued'})

@app.route('/api/jobs/<job_id>')
def job_status(job_id):
    """Check job status"""
    job = get_job(job_id)
    if not job:
        return jsonify({'error': 'Not found'}), 404
    return jsonify(job)

Worker Script

#!/usr/bin/env python3
# workers/queue_worker.py

import smtplib
from email.message import EmailMessage
from dbbasic_queue import process_jobs

def send_email_handler(payload):
    """Send email via SMTP"""
    msg = EmailMessage()
    msg['From'] = 'noreply@example.com'
    msg['To'] = payload['to']
    msg['Subject'] = payload['subject']
    msg.set_content(payload['body'])

    with smtplib.SMTP('localhost') as smtp:
        smtp.send_message(msg)

    return {'sent_at': time.time()}

def process_video_handler(payload):
    """Process video (placeholder)"""
    video_id = payload['video_id']
    # ... video processing logic
    return {'output_url': f'https://cdn.example.com/videos/{video_id}.mp4'}

if __name__ == '__main__':
    handlers = {
        'send_email': send_email_handler,
        'process_video': process_video_handler,
    }

    # Process all pending jobs
    process_jobs(handlers, max_attempts=3)

Cron Setup

# /etc/cron.d/dbbasic-queue
# Run worker every minute

* * * * * cd /app && python3 workers/queue_worker.py >> /var/log/queue.log 2>&1

CGI Script Integration

#!/usr/bin/env python3
# cgi-bin/submit-form.cgi

import cgi
import os
from dbbasic_queue import enqueue

form = cgi.FieldStorage()

# Queue email notification (don't block response)
enqueue('send_email', {
    'to': form.getvalue('email'),
    'subject': 'Form submitted',
    'body': 'Thanks for your submission!'
})

# Return immediately
print("Content-Type: text/html\n")
print("<h1>Thanks! You'll receive a confirmation email shortly.</h1>")

Performance Characteristics

Benchmarks

Operation Time Notes
Enqueue job 0.1ms TSV append
Process job (query) 0.5ms Find pending jobs
Update status 0.2ms TSV update
Get job details 0.1ms Indexed lookup

Scaling

Queued Jobs TSV Size Worker Time Notes
100 10KB <1s Instant
1,000 100KB ~5s Fast
10,000 1MB ~30s Acceptable
100,000 10MB ~5min May need optimization

When to graduate to Redis: - 100K+ queued jobs - Multiple worker servers - Sub-second job pickup required - Measured performance issues

For single-server apps up to 10K jobs: TSV is perfect.


Features

Retry Logic

Exponential backoff:

Attempt 1: Immediate
Attempt 2: 60s later  (2^1 * 60)
Attempt 3: 120s later (2^2 * 60)
Attempt 4: 240s later (2^3 * 60)

After max attempts: Status set to 'failed', error preserved

Delayed Jobs

# Run in 1 hour
enqueue('cleanup_old_data', {}, run_at=time.time() + 3600)

# Run at specific time
enqueue('send_reminder', {'user_id': 42}, run_at=1696900000)

Job Inspection

# View all failed jobs
cat data/queue.tsv | grep failed

# Count pending jobs
cat data/queue.tsv | grep pending | wc -l

# Find specific job
grep "job_id_here" data/queue.tsv

Error Handling

Failed jobs preserved with error message:

id123   send_email  {"to":"invalid"}    failed  1696886400  1696886400  3   SMTP error: invalid address

Can retry manually:

# Reset failed job to pending
update(QUEUE_FILE,
    lambda r: r[0] == 'id123',
    lambda r: [r[0], r[1], r[2], 'pending', r[4], str(time.time()), '0', '', '']
)

Deployment

Production Setup

1. Create queue file:

mkdir -p data
touch data/queue.tsv

2. Create worker script:

mkdir -p workers
# Create workers/queue_worker.py (see example above)
chmod +x workers/queue_worker.py

3. Set up cron:

# Edit crontab
crontab -e

# Add worker
* * * * * cd /app && python3 workers/queue_worker.py >> /var/log/queue.log 2>&1

4. Use in application:

from dbbasic_queue import enqueue

enqueue('send_email', {'to': 'user@example.com'})

That's it. No Redis, no Celery, no daemon.

Monitoring

View queue status:

# Pending jobs
grep pending data/queue.tsv | wc -l

# Failed jobs
grep failed data/queue.tsv

# Recent activity
tail -f /var/log/queue.log

Admin endpoint:

@app.route('/admin/queue')
def queue_status():
    stats = {
        'pending': len(query(QUEUE_FILE, lambda r: r[3] == 'pending')),
        'processing': len(query(QUEUE_FILE, lambda r: r[3] == 'processing')),
        'completed': len(query(QUEUE_FILE, lambda r: r[3] == 'completed')),
        'failed': len(query(QUEUE_FILE, lambda r: r[3] == 'failed')),
    }
    return jsonify(stats)

Cleanup Strategy

Old Completed Jobs

# Clean jobs older than 7 days
def cleanup_old_jobs():
    cutoff = int(time.time()) - (7 * 86400)
    filter_file(QUEUE_FILE, lambda row:
        row[3] not in ['completed', 'cancelled'] or int(row[4]) > cutoff
    )

# Run weekly
# crontab: 0 2 * * 0 python3 workers/cleanup_jobs.py

Keep failed jobs longer for debugging.


Architectural Decisions

Why TSV Instead of Individual Files?

Individual files (Maildir pattern):

/queue/pending/job1.json
/queue/pending/job2.json
... (10,000 files)

Problems:
- Filesystem bloat
- Directory inflation
- locatedb overhead
- Hard to query "all failed jobs"
- File cleanup complexity

TSV file:

data/queue.tsv (one file)

Benefits:
- One file, not thousands
- No filesystem bloat
- Query with dbbasic-tsv
- Plain text debugging
- Simple cleanup

Why TSV Instead of Background Processes?

Background process pattern:

{ send_email user@example.com } &

Problems: - No retry on failure - No failure logging - Process table fills up (1000 jobs = 1000 processes) - Can't inspect queued jobs - No throttling/backoff


**TSV queue:**
- Retry logic built in
- Error logging preserved
- One worker process
- Inspect jobs with cat/grep
- Exponential backoff

### Why TSV Instead of Redis/Celery?

**Redis/Celery:**
- Redis server required
- Complex setup
- Network overhead
- Overkill for single server

**TSV queue:**
- No additional services
- Simple setup (cron + worker script)
- File I/O (fast on single server)
- Perfect for < 10K jobs

**Upgrade path:** When you need multiple servers or > 100K jobs, migrate to Redis/Celery.

### Why Cron Instead of Daemon?

**Daemon worker:**
```python
while True:
    process_jobs(handlers)
    time.sleep(60)

Problems: - Another process to monitor - Restart on crash - systemd/supervisor setup - Memory leaks over time

Cron worker:

* * * * * python3 workers/queue_worker.py

Benefits: - Unix handles scheduling - Automatic restart - No memory leaks (fresh process) - Simple monitoring (cron logs) - No daemon complexity


Security Considerations

Job Payload Validation

Always validate handler inputs:

def send_email_handler(payload):
    # Validate required fields
    if 'to' not in payload or 'subject' not in payload:
        raise ValueError("Missing required fields")

    # Sanitize email address
    email = validate_email(payload['to'])

    # ... send email

File Permissions

# Restrict queue file access
chmod 600 data/queue.tsv
chown www-data:www-data data/queue.tsv

Handler Security

Don't execute arbitrary code:

# BAD: Don't do this
def dangerous_handler(payload):
    eval(payload['code'])  # NEVER DO THIS

# GOOD: Predefined handlers only
handlers = {
    'send_email': send_email_handler,
    'process_video': process_video_handler,
}

Testing Requirements

Unit Tests

import time
from dbbasic_queue import enqueue, process_jobs, get_job, cancel_job

def test_enqueue():
    job_id = enqueue('test_job', {'foo': 'bar'})
    assert len(job_id) == 16  # 8 bytes = 16 hex chars

    job = get_job(job_id)
    assert job['type'] == 'test_job'
    assert job['status'] == 'pending'

def test_process_job():
    results = []

    def test_handler(payload):
        results.append(payload)
        return 'success'

    job_id = enqueue('test', {'data': 123})
    process_jobs({'test': test_handler})

    job = get_job(job_id)
    assert job['status'] == 'completed'
    assert results == [{'data': 123}]

def test_retry_logic():
    attempts = []

    def failing_handler(payload):
        attempts.append(1)
        raise Exception("Test failure")

    job_id = enqueue('fail_test', {})

    # Process 3 times (max attempts)
    for _ in range(3):
        process_jobs({'fail_test': failing_handler}, max_attempts=3)
        time.sleep(1)  # Wait for backoff

    job = get_job(job_id)
    assert job['status'] == 'failed'
    assert len(attempts) == 3

def test_cancel_job():
    job_id = enqueue('test', {})
    assert cancel_job(job_id) == True

    job = get_job(job_id)
    assert job['status'] == 'cancelled'

Integration Tests

def test_email_workflow():
    # Queue email
    job_id = enqueue('send_email', {
        'to': 'test@example.com',
        'subject': 'Test',
        'body': 'Hello'
    })

    # Process
    process_jobs({'send_email': mock_email_handler})

    # Verify
    job = get_job(job_id)
    assert job['status'] == 'completed'
    assert 'sent_at' in job['result']

Comparison to Alternatives

Approach Setup Lines Retry Logging Multi-Server Speed
TSV Queue Cron 50 0.5ms
Background (&) None 5 0ms
Individual Files Cron 80 NFS 0.1ms
Redis/Celery Redis+Workers 200 1ms
Database Queue DB 100 5ms

TSV wins for single-server apps needing reliability.


When to Use Something Else

Use Redis/Celery if: - Multiple worker servers - > 100K queued jobs - Sub-second job pickup required - Already running Redis - Need advanced features (priorities, chains, etc.)

Use background process (&) if: - Fire-and-forget only - Don't care about failures - Very low traffic

Otherwise, use TSV queue.


Migration Guide

From Background Processes

Old:

{ send_email user@example.com } &

New:

enqueue('send_email', {'to': 'user@example.com'})

From Redis/Celery

Old:

@app.task
def send_email(to, subject, body):
    # ...

send_email.delay(to='user@example.com', subject='Hi', body='Hello')

New:

def send_email_handler(payload):
    # Same logic

enqueue('send_email', {'to': 'user@example.com', 'subject': 'Hi', 'body': 'Hello'})

References


Summary

dbbasic-queue learns from Unix queue patterns and their limitations:

What Unix got right: - Simple storage (spool directories, mbox files) - Cron-based processing - Plain text, debuggable - No daemons needed

Where Unix failed: - Single-server only - Didn't scale to multiple servers - Led to abandoning Unix patterns entirely

dbbasic-queue approach: - Embrace Unix patterns for single server - TSV storage (one file, not thousands) - Cron-based workers - Built-in retry and logging - Clear upgrade path to Redis when needed

Use it when: - Single server deployment - < 10K queued jobs - Want simplicity over distribution - Building on dbbasic ecosystem

Graduate to Redis/Celery when: - Multiple worker servers - > 100K jobs - Need advanced features

Until then, use TSV. It's reliable, simple, and debuggable.


Next Steps: Implement, test, deploy, ship.

No Redis. No Celery. No daemon. Just cron + TSV.

50 lines of code.