Version: 1.0 Status: Specification Author: DBBasic Project Date: October 2025
Links: - PyPI: https://pypi.org/project/dbbasic-queue/ - GitHub: https://github.com/askrobots/dbbasic-queue - Specification: http://dbbasic.com/queue-spec
"Store work, not workers. Queue jobs, not processes."
Background jobs are actual work to be done, not temporary state. Unlike sessions (which are ephemeral authentication), jobs need persistent storage, retry logic, and failure handling.
Traditional CGI approach:
#!/bin/bash
# CGI script
# Queue job by backgrounding
{
sleep 5
send_email user@example.com
} &
# Return immediately
echo "Content-Type: text/html"
echo ""
echo "Email queued!"
Why this was common:
- Simple (just add &)
- No setup needed
- Works for light load
Why this fails:
Problems:
- 1000 concurrent jobs = 1000 background processes
- No retry on failure
- No logging of failures
- No backoff/throttling
- Process table fills up
- No job inspection
- No way to cancel jobs
This worked fine for low-traffic sites but breaks under load.
Unix queue patterns (1970s-1990s):
Mail queue (mbox format):
/var/mail/username - Single file, append messages
Mail queue (Maildir format):
~/Maildir/
new/ - One file per message
cur/
tmp/
At/batch queues:
/var/spool/at/ - One file per scheduled job
Why these work: - Single server - Local filesystem - Local cron - No coordination needed
When the web scaled (2000s):
Problem: Multiple servers
Server 1: /queue/pending/job1.json
Server 2: /queue/pending/job2.json
Server 3: ???
Where is the queue?
Attempted solutions:
1. NFS: Shared /queue/ across servers
- Problem: File locking over network
- Problem: Slow, race conditions
- Problem: Complex, unreliable
Problem: Heavier than needed
Redis/RabbitMQ: Network queue
The Unix failure: Filesystem-based patterns don't distribute. We abandoned them entirely, even for single-server apps.
The lesson: Unix patterns are perfect for single-server. Graduate to distributed systems when you actually need multiple servers.
/queue/pending/job1.json
/queue/pending/job2.json
/queue/processing/job3.json
Problems: - Filesystem inflation (1000s of small files) - Directory bloat - locatedb indexing overhead - No easy "list all failed jobs" - File cleanup complexity
{ do_work } & # Just background it
Problems: - No retry logic - No failure logging - Process table fills up - Can't inspect queued jobs - No throttling
from celery import Celery
app.task(send_email.delay(user))
Problems: - Requires Redis server - Complex setup - Overkill for single server - Not building on dbbasic foundation
data/queue.tsv
id type payload status attempts run_at
Why this wins: - One file, not thousands - Builds on dbbasic-tsv - Plain text, debuggable - Query/filter easily - Retry logic built in - No filesystem bloat - No locatedb impact
Not like sessions: - Sessions = temporary auth state ("are you logged in?") - Queue = actual work to do ("send this email")
Jobs are persistent data: - Job definition (what to do) - Payload (parameters) - Status (pending/processing/completed/failed) - Retry count - Error messages - Results
You can't "compute" a job - you must store it.
data/queue.tsv
Columns:
id type payload status created_at run_at attempts error result
Example:
1 send_email {"to":"user@example.com","subject":"Welcome"} completed 1696886400 1696886400 1 null sent
2 process_video {"video_id":42} processing 1696886401 1696886401 1 null null
3 generate_report {"user_id":99} failed 1696886000 1696886000 3 Timeout null
4 send_sms {"phone":"+1234567890"} pending 1696886500 1696886500 0 null null
Status values:
- pending - Not started yet
- processing - Currently running
- completed - Success
- failed - Failed after max retries
Why TSV: - One file (no filesystem bloat) - Append new jobs (fast) - Query by status (dbbasic-tsv indexed) - Plain text (debug with cat/grep) - Update status in place
enqueue(job_type, payload, run_at=None)Purpose: Add job to queue
Parameters:
- job_type (str): Job handler name (e.g., 'send_email')
- payload (dict): Job parameters
- run_at (int, optional): Unix timestamp to run job (default: now)
Returns:
- job_id (str): Unique job identifier
Behavior:
1. Generate unique job ID
2. Serialize payload to JSON
3. Append row to queue.tsv: [id, type, payload, 'pending', now, run_at, 0, null, null]
4. Return job_id
Example:
from dbbasic_queue import enqueue
# Run immediately
job_id = enqueue('send_email', {
'to': 'user@example.com',
'subject': 'Welcome',
'body': 'Thanks for signing up!'
})
# Run later (delayed job)
job_id = enqueue('generate_report',
{'user_id': 42},
run_at=time.time() + 3600 # 1 hour from now
)
process_jobs(handlers, max_attempts=3)Purpose: Process pending jobs (run by worker)
Parameters:
- handlers (dict): Map of job_type → handler function
- max_attempts (int): Max retry attempts before marking failed
Returns: - None (processes jobs until none pending)
Behavior: 1. Query queue.tsv for jobs where status='pending' AND run_at <= now 2. For each job: - Update status to 'processing' - Call handler function with payload - If success: Update status to 'completed', store result - If failure: Increment attempts - If attempts < max_attempts: Reset to 'pending' with backoff - If attempts >= max_attempts: Update status to 'failed', store error 3. Repeat until no pending jobs
Example:
from dbbasic_queue import process_jobs
def send_email_handler(payload):
"""Handler for send_email jobs"""
import smtplib
# ... send email using payload['to'], payload['subject'], etc.
return True # Success
def process_video_handler(payload):
"""Handler for process_video jobs"""
video_id = payload['video_id']
# ... process video
return {'output_url': 'https://...'}
# Worker script (run by cron)
if __name__ == '__main__':
handlers = {
'send_email': send_email_handler,
'process_video': process_video_handler,
}
process_jobs(handlers, max_attempts=3)
get_job(job_id)Purpose: Get job status and details
Parameters:
- job_id (str): Job identifier
Returns:
- job (dict): Job details or None if not found
Example:
job = get_job(job_id)
if job['status'] == 'completed':
print(f"Result: {job['result']}")
elif job['status'] == 'failed':
print(f"Error: {job['error']}")
cancel_job(job_id)Purpose: Cancel pending job
Parameters:
- job_id (str): Job identifier
Returns:
- bool: True if cancelled, False if already processing/completed
Example:
if cancel_job(job_id):
print("Job cancelled")
import time
import json
import secrets
from dbbasic_tsv import append, query, update
QUEUE_FILE = 'data/queue.tsv'
MAX_ATTEMPTS = 3
def enqueue(job_type, payload, run_at=None):
"""Add job to queue"""
job_id = secrets.token_hex(8)
now = int(time.time())
run_at = run_at or now
append(QUEUE_FILE, [
job_id,
job_type,
json.dumps(payload),
'pending',
str(now),
str(run_at),
'0', # attempts
'', # error
'' # result
])
return job_id
def process_jobs(handlers, max_attempts=MAX_ATTEMPTS):
"""Process pending jobs (worker)"""
now = int(time.time())
# Get pending jobs that are ready to run
jobs = query(QUEUE_FILE, lambda row:
row[3] == 'pending' and int(row[5]) <= now
)
for job in jobs:
job_id, job_type, payload_json, status, created, run_at, attempts, error, result = job
attempts = int(attempts)
# Update to processing
update(QUEUE_FILE,
lambda r: r[0] == job_id,
lambda r: [r[0], r[1], r[2], 'processing', r[4], r[5], r[6], r[7], r[8]]
)
# Execute job
try:
handler = handlers.get(job_type)
if not handler:
raise Exception(f"No handler for {job_type}")
payload = json.loads(payload_json)
result = handler(payload)
# Success
update(QUEUE_FILE,
lambda r: r[0] == job_id,
lambda r: [r[0], r[1], r[2], 'completed', r[4], r[5], str(attempts + 1), '', json.dumps(result)]
)
except Exception as e:
attempts += 1
if attempts >= max_attempts:
# Failed permanently
update(QUEUE_FILE,
lambda r: r[0] == job_id,
lambda r: [r[0], r[1], r[2], 'failed', r[4], r[5], str(attempts), str(e), '']
)
else:
# Retry with backoff
backoff = 60 * (2 ** attempts) # Exponential backoff
new_run_at = int(time.time()) + backoff
update(QUEUE_FILE,
lambda r: r[0] == job_id,
lambda r: [r[0], r[1], r[2], 'pending', r[4], str(new_run_at), str(attempts), str(e), '']
)
def get_job(job_id):
"""Get job details"""
jobs = query(QUEUE_FILE, lambda row: row[0] == job_id)
if not jobs:
return None
job = jobs[0]
return {
'id': job[0],
'type': job[1],
'payload': json.loads(job[2]),
'status': job[3],
'created_at': int(job[4]),
'run_at': int(job[5]),
'attempts': int(job[6]),
'error': job[7] or None,
'result': json.loads(job[8]) if job[8] else None
}
def cancel_job(job_id):
"""Cancel pending job"""
jobs = query(QUEUE_FILE, lambda row: row[0] == job_id and row[3] == 'pending')
if not jobs:
return False
update(QUEUE_FILE,
lambda r: r[0] == job_id,
lambda r: [r[0], r[1], r[2], 'cancelled', r[4], r[5], r[6], 'Cancelled by user', '']
)
return True
That's it. 50 lines.
pip install dbbasic-tsv)from flask import Flask, request, jsonify
from dbbasic_queue import enqueue, get_job
app = Flask(__name__)
@app.route('/api/send-email', methods=['POST'])
def send_email():
"""API endpoint that queues email job"""
data = request.json
# Queue job instead of sending immediately
job_id = enqueue('send_email', {
'to': data['to'],
'subject': data['subject'],
'body': data['body']
})
# Return immediately (don't wait for email to send)
return jsonify({'job_id': job_id, 'status': 'queued'})
@app.route('/api/jobs/<job_id>')
def job_status(job_id):
"""Check job status"""
job = get_job(job_id)
if not job:
return jsonify({'error': 'Not found'}), 404
return jsonify(job)
#!/usr/bin/env python3
# workers/queue_worker.py
import smtplib
from email.message import EmailMessage
from dbbasic_queue import process_jobs
def send_email_handler(payload):
"""Send email via SMTP"""
msg = EmailMessage()
msg['From'] = 'noreply@example.com'
msg['To'] = payload['to']
msg['Subject'] = payload['subject']
msg.set_content(payload['body'])
with smtplib.SMTP('localhost') as smtp:
smtp.send_message(msg)
return {'sent_at': time.time()}
def process_video_handler(payload):
"""Process video (placeholder)"""
video_id = payload['video_id']
# ... video processing logic
return {'output_url': f'https://cdn.example.com/videos/{video_id}.mp4'}
if __name__ == '__main__':
handlers = {
'send_email': send_email_handler,
'process_video': process_video_handler,
}
# Process all pending jobs
process_jobs(handlers, max_attempts=3)
# /etc/cron.d/dbbasic-queue
# Run worker every minute
* * * * * cd /app && python3 workers/queue_worker.py >> /var/log/queue.log 2>&1
#!/usr/bin/env python3
# cgi-bin/submit-form.cgi
import cgi
import os
from dbbasic_queue import enqueue
form = cgi.FieldStorage()
# Queue email notification (don't block response)
enqueue('send_email', {
'to': form.getvalue('email'),
'subject': 'Form submitted',
'body': 'Thanks for your submission!'
})
# Return immediately
print("Content-Type: text/html\n")
print("<h1>Thanks! You'll receive a confirmation email shortly.</h1>")
| Operation | Time | Notes |
|---|---|---|
| Enqueue job | 0.1ms | TSV append |
| Process job (query) | 0.5ms | Find pending jobs |
| Update status | 0.2ms | TSV update |
| Get job details | 0.1ms | Indexed lookup |
| Queued Jobs | TSV Size | Worker Time | Notes |
|---|---|---|---|
| 100 | 10KB | <1s | Instant |
| 1,000 | 100KB | ~5s | Fast |
| 10,000 | 1MB | ~30s | Acceptable |
| 100,000 | 10MB | ~5min | May need optimization |
When to graduate to Redis: - 100K+ queued jobs - Multiple worker servers - Sub-second job pickup required - Measured performance issues
For single-server apps up to 10K jobs: TSV is perfect.
Exponential backoff:
Attempt 1: Immediate
Attempt 2: 60s later (2^1 * 60)
Attempt 3: 120s later (2^2 * 60)
Attempt 4: 240s later (2^3 * 60)
After max attempts: Status set to 'failed', error preserved
# Run in 1 hour
enqueue('cleanup_old_data', {}, run_at=time.time() + 3600)
# Run at specific time
enqueue('send_reminder', {'user_id': 42}, run_at=1696900000)
# View all failed jobs
cat data/queue.tsv | grep failed
# Count pending jobs
cat data/queue.tsv | grep pending | wc -l
# Find specific job
grep "job_id_here" data/queue.tsv
Failed jobs preserved with error message:
id123 send_email {"to":"invalid"} failed 1696886400 1696886400 3 SMTP error: invalid address
Can retry manually:
# Reset failed job to pending
update(QUEUE_FILE,
lambda r: r[0] == 'id123',
lambda r: [r[0], r[1], r[2], 'pending', r[4], str(time.time()), '0', '', '']
)
1. Create queue file:
mkdir -p data
touch data/queue.tsv
2. Create worker script:
mkdir -p workers
# Create workers/queue_worker.py (see example above)
chmod +x workers/queue_worker.py
3. Set up cron:
# Edit crontab
crontab -e
# Add worker
* * * * * cd /app && python3 workers/queue_worker.py >> /var/log/queue.log 2>&1
4. Use in application:
from dbbasic_queue import enqueue
enqueue('send_email', {'to': 'user@example.com'})
That's it. No Redis, no Celery, no daemon.
View queue status:
# Pending jobs
grep pending data/queue.tsv | wc -l
# Failed jobs
grep failed data/queue.tsv
# Recent activity
tail -f /var/log/queue.log
Admin endpoint:
@app.route('/admin/queue')
def queue_status():
stats = {
'pending': len(query(QUEUE_FILE, lambda r: r[3] == 'pending')),
'processing': len(query(QUEUE_FILE, lambda r: r[3] == 'processing')),
'completed': len(query(QUEUE_FILE, lambda r: r[3] == 'completed')),
'failed': len(query(QUEUE_FILE, lambda r: r[3] == 'failed')),
}
return jsonify(stats)
# Clean jobs older than 7 days
def cleanup_old_jobs():
cutoff = int(time.time()) - (7 * 86400)
filter_file(QUEUE_FILE, lambda row:
row[3] not in ['completed', 'cancelled'] or int(row[4]) > cutoff
)
# Run weekly
# crontab: 0 2 * * 0 python3 workers/cleanup_jobs.py
Keep failed jobs longer for debugging.
Individual files (Maildir pattern):
/queue/pending/job1.json
/queue/pending/job2.json
... (10,000 files)
Problems:
- Filesystem bloat
- Directory inflation
- locatedb overhead
- Hard to query "all failed jobs"
- File cleanup complexity
TSV file:
data/queue.tsv (one file)
Benefits:
- One file, not thousands
- No filesystem bloat
- Query with dbbasic-tsv
- Plain text debugging
- Simple cleanup
Background process pattern:
{ send_email user@example.com } &
Problems: - No retry on failure - No failure logging - Process table fills up (1000 jobs = 1000 processes) - Can't inspect queued jobs - No throttling/backoff
**TSV queue:**
- Retry logic built in
- Error logging preserved
- One worker process
- Inspect jobs with cat/grep
- Exponential backoff
### Why TSV Instead of Redis/Celery?
**Redis/Celery:**
- Redis server required
- Complex setup
- Network overhead
- Overkill for single server
**TSV queue:**
- No additional services
- Simple setup (cron + worker script)
- File I/O (fast on single server)
- Perfect for < 10K jobs
**Upgrade path:** When you need multiple servers or > 100K jobs, migrate to Redis/Celery.
### Why Cron Instead of Daemon?
**Daemon worker:**
```python
while True:
process_jobs(handlers)
time.sleep(60)
Problems: - Another process to monitor - Restart on crash - systemd/supervisor setup - Memory leaks over time
Cron worker:
* * * * * python3 workers/queue_worker.py
Benefits: - Unix handles scheduling - Automatic restart - No memory leaks (fresh process) - Simple monitoring (cron logs) - No daemon complexity
Always validate handler inputs:
def send_email_handler(payload):
# Validate required fields
if 'to' not in payload or 'subject' not in payload:
raise ValueError("Missing required fields")
# Sanitize email address
email = validate_email(payload['to'])
# ... send email
# Restrict queue file access
chmod 600 data/queue.tsv
chown www-data:www-data data/queue.tsv
Don't execute arbitrary code:
# BAD: Don't do this
def dangerous_handler(payload):
eval(payload['code']) # NEVER DO THIS
# GOOD: Predefined handlers only
handlers = {
'send_email': send_email_handler,
'process_video': process_video_handler,
}
import time
from dbbasic_queue import enqueue, process_jobs, get_job, cancel_job
def test_enqueue():
job_id = enqueue('test_job', {'foo': 'bar'})
assert len(job_id) == 16 # 8 bytes = 16 hex chars
job = get_job(job_id)
assert job['type'] == 'test_job'
assert job['status'] == 'pending'
def test_process_job():
results = []
def test_handler(payload):
results.append(payload)
return 'success'
job_id = enqueue('test', {'data': 123})
process_jobs({'test': test_handler})
job = get_job(job_id)
assert job['status'] == 'completed'
assert results == [{'data': 123}]
def test_retry_logic():
attempts = []
def failing_handler(payload):
attempts.append(1)
raise Exception("Test failure")
job_id = enqueue('fail_test', {})
# Process 3 times (max attempts)
for _ in range(3):
process_jobs({'fail_test': failing_handler}, max_attempts=3)
time.sleep(1) # Wait for backoff
job = get_job(job_id)
assert job['status'] == 'failed'
assert len(attempts) == 3
def test_cancel_job():
job_id = enqueue('test', {})
assert cancel_job(job_id) == True
job = get_job(job_id)
assert job['status'] == 'cancelled'
def test_email_workflow():
# Queue email
job_id = enqueue('send_email', {
'to': 'test@example.com',
'subject': 'Test',
'body': 'Hello'
})
# Process
process_jobs({'send_email': mock_email_handler})
# Verify
job = get_job(job_id)
assert job['status'] == 'completed'
assert 'sent_at' in job['result']
| Approach | Setup | Lines | Retry | Logging | Multi-Server | Speed |
|---|---|---|---|---|---|---|
| TSV Queue | Cron | 50 | ✅ | ✅ | ❌ | 0.5ms |
Background (&) |
None | 5 | ❌ | ❌ | ❌ | 0ms |
| Individual Files | Cron | 80 | ✅ | ✅ | NFS | 0.1ms |
| Redis/Celery | Redis+Workers | 200 | ✅ | ✅ | ✅ | 1ms |
| Database Queue | DB | 100 | ✅ | ✅ | ✅ | 5ms |
TSV wins for single-server apps needing reliability.
Use Redis/Celery if: - Multiple worker servers - > 100K queued jobs - Sub-second job pickup required - Already running Redis - Need advanced features (priorities, chains, etc.)
Use background process (&) if:
- Fire-and-forget only
- Don't care about failures
- Very low traffic
Otherwise, use TSV queue.
Old:
{ send_email user@example.com } &
New:
enqueue('send_email', {'to': 'user@example.com'})
Old:
@app.task
def send_email(to, subject, body):
# ...
send_email.delay(to='user@example.com', subject='Hi', body='Hello')
New:
def send_email_handler(payload):
# Same logic
enqueue('send_email', {'to': 'user@example.com', 'subject': 'Hi', 'body': 'Hello'})
dbbasic-queue learns from Unix queue patterns and their limitations:
What Unix got right: - Simple storage (spool directories, mbox files) - Cron-based processing - Plain text, debuggable - No daemons needed
Where Unix failed: - Single-server only - Didn't scale to multiple servers - Led to abandoning Unix patterns entirely
dbbasic-queue approach: - Embrace Unix patterns for single server - TSV storage (one file, not thousands) - Cron-based workers - Built-in retry and logging - Clear upgrade path to Redis when needed
Use it when: - Single server deployment - < 10K queued jobs - Want simplicity over distribution - Building on dbbasic ecosystem
Graduate to Redis/Celery when: - Multiple worker servers - > 100K jobs - Need advanced features
Until then, use TSV. It's reliable, simple, and debuggable.
Next Steps: Implement, test, deploy, ship.
No Redis. No Celery. No daemon. Just cron + TSV.
50 lines of code.