Version: 1.0 Status: Specification Author: DBBasic Project Date: October 2025
"Append events. Replay state. Scale when measured."
Events are the permanent record of what happened. Unlike logs (rotated), queue (deleted), or sessions (ephemeral), events are kept forever and become the source of truth.
Critical observation: Systems are getting more powerful faster than user growth.
2010: - Average server: 4 cores, 8GB RAM - Average site: 10K users - Ratio: Barely keeping up
2025: - Average server: 32 cores, 128GB RAM - Average site: Still ~10K users (most sites) - Ratio: Massive overcapacity
The implication: - Single server handles what needed 10 servers in 2010 - TSV can handle what needed Kafka in 2010 - Start simple, measure, upgrade only if needed
Most apps will never need to upgrade.
Queue (dbbasic-queue):
enqueue('send_email', {'to': 'user@example.com'})
# Executed once, then deleted
# Lifespan: minutes to hours
Logs (dbbasic-logs):
log.info("Email sent", to='user@example.com')
# Recorded for debugging
# Lifespan: 30-90 days, then deleted
Events (dbbasic-events):
publish('email.sent', {'to': 'user@example.com', 'message_id': 'abc'})
# Permanent record
# Lifespan: Forever (source of truth)
Traditional state storage:
# Database stores current state
users.update(id=42, email='new@example.com')
# Old email lost forever
Event sourcing:
# Store the change as event
publish('user.email_changed', {
'user_id': 42,
'old_email': 'old@example.com',
'new_email': 'new@example.com'
})
# Current state = replay all events
def get_user_email(user_id):
events = subscribe('user.email_changed', user_id=user_id)
return events[-1]['new_email'] # Latest email
Benefits: - Complete audit trail - Time travel (state at any point in history) - Replay to fix bugs - Analytics gold mine
Financial transactions:
publish('payment.received', {'order_id': 123, 'amount': 99.99})
publish('payment.refunded', {'order_id': 123, 'amount': 99.99})
# Balance = sum of all payment events
Inventory management:
publish('inventory.added', {'sku': 'ABC', 'quantity': 100})
publish('inventory.sold', {'sku': 'ABC', 'quantity': 1})
publish('inventory.returned', {'sku': 'ABC', 'quantity': 1})
# Current stock = replay all inventory events
User activity tracking:
publish('page.viewed', {'user_id': 42, 'page': '/products'})
publish('product.clicked', {'user_id': 42, 'product_id': 99})
publish('cart.added', {'user_id': 42, 'product_id': 99})
# User journey = all their events in order
data/events/
2025/
10.tsv (current month - active, ~100MB)
09.tsv.gz (compressed, ~10MB)
08.tsv.gz
2024/
12.tsv.gz
11.tsv.gz
Why monthly partitions: - Current month uncompressed (fast writes, fast queries) - Old months compressed (10:1 space savings) - Easy to archive old years - ~1M events per month = manageable file size
Columns:
event_id type timestamp aggregate_id data version
Example:
1 user.registered 1696886400 user:42 {"email":"user@example.com","name":"Alice"} 1
2 user.email_changed 1696886401 user:42 {"old":"user@example.com","new":"alice@example.com"} 2
3 post.created 1696886402 post:123 {"title":"Hello","author":42} 1
4 post.published 1696886403 post:123 {"published_at":1696886403} 2
Fields:
- event_id: Globally unique, sequential
- type: Event type (e.g., 'user.registered')
- timestamp: When event occurred
- aggregate_id: Entity this event relates to (e.g., 'user:42')
- data: Event payload (JSON)
- version: Event version for this aggregate (1, 2, 3...)
Why this schema:
- aggregate_id enables "all events for user 42"
- version tracks sequence for each entity
- type enables "all registration events"
- timestamp enables time-based queries
publish(event_type, data, aggregate_id=None)Purpose: Publish new event to event store
Parameters:
- event_type (str): Event type (e.g., 'user.registered')
- data (dict): Event data/payload
- aggregate_id (str, optional): Entity ID (e.g., 'user:42')
Returns:
- event_id (int): Unique event identifier
Behavior: 1. Generate sequential event_id 2. Get current timestamp 3. Calculate version for aggregate (if aggregate_id provided) 4. Append to current month's TSV file 5. Return event_id
Example:
from dbbasic_events import publish
# User registration
event_id = publish('user.registered', {
'user_id': 42,
'email': 'user@example.com',
'name': 'Alice'
}, aggregate_id='user:42')
# Post creation
publish('post.created', {
'post_id': 123,
'title': 'Hello World',
'author': 42
}, aggregate_id='post:123')
# Generic event (no aggregate)
publish('system.backup_completed', {
'duration': 120,
'size_mb': 500
})
subscribe(event_type=None, aggregate_id=None, since=None)Purpose: Query events from event store
Parameters:
- event_type (str, optional): Filter by event type
- aggregate_id (str, optional): Filter by aggregate
- since (int, optional): Unix timestamp to start from
Returns: - Iterator of events (dicts)
Behavior:
1. Determine which TSV files to read (based on since)
2. Read events (using grep/zgrep for filtering)
3. Parse and yield events in order
Example:
from dbbasic_events import subscribe
# All user registration events
for event in subscribe('user.registered'):
print(f"User {event['data']['user_id']} registered")
# All events for specific user
for event in subscribe(aggregate_id='user:42'):
print(f"{event['type']}: {event['data']}")
# Recent events (last 24 hours)
yesterday = int(time.time()) - 86400
for event in subscribe(since=yesterday):
process_event(event)
# All events of type since timestamp
for event in subscribe('payment.received', since=start_of_month):
total += event['data']['amount']
replay(aggregate_id, handlers)Purpose: Rebuild state by replaying events
Parameters:
- aggregate_id (str): Entity to replay
- handlers (dict): Event type → handler function
Returns: - Final state after replaying all events
Behavior: 1. Get all events for aggregate (in order) 2. For each event, call handler 3. Handler updates state 4. Return final state
Example:
from dbbasic_events import replay
# Rebuild user state from events
def handle_user_registered(state, event):
state['email'] = event['data']['email']
state['name'] = event['data']['name']
return state
def handle_email_changed(state, event):
state['email'] = event['data']['new_email']
return state
user_state = replay('user:42', {
'user.registered': handle_user_registered,
'user.email_changed': handle_email_changed
})
# user_state = {'email': 'current@example.com', 'name': 'Alice'}
project(event_type, reducer, initial=None)Purpose: Aggregate events into projection/view
Parameters:
- event_type (str): Event type to aggregate
- reducer (function): Reducer function (state, event) → new_state
- initial (any): Initial state
Returns: - Final aggregated state
Behavior: 1. Subscribe to event_type 2. Reduce events using reducer function 3. Return final state
Example:
from dbbasic_events import project
# Calculate total revenue
def sum_payments(total, event):
return total + event['data']['amount']
total_revenue = project('payment.received', sum_payments, initial=0.0)
# Count active users
def count_users(state, event):
if event['type'] == 'user.registered':
state['total'] += 1
elif event['type'] == 'user.deleted':
state['total'] -= 1
return state
user_count = project('user.*', count_users, initial={'total': 0})
import os
import time
import json
from datetime import datetime
from dbbasic_tsv import append, query
EVENTS_DIR = os.getenv('EVENTS_DIR', 'data/events')
def _current_month_file():
"""Get current month's event file"""
year = datetime.now().year
month = datetime.now().strftime('%m')
os.makedirs(f'{EVENTS_DIR}/{year}', exist_ok=True)
return f'{EVENTS_DIR}/{year}/{month}.tsv'
def _next_event_id():
"""Generate sequential event ID"""
# Read last event ID from current month
try:
with open(_current_month_file()) as f:
lines = f.readlines()
if lines:
last_id = int(lines[-1].split('\t')[0])
return last_id + 1
except:
pass
return 1
def _get_version(aggregate_id):
"""Get next version for aggregate"""
if not aggregate_id:
return 1
# Count events for this aggregate
events = list(subscribe(aggregate_id=aggregate_id))
return len(events) + 1
def publish(event_type, data, aggregate_id=None):
"""Publish event to event store"""
event_id = _next_event_id()
timestamp = int(time.time())
version = _get_version(aggregate_id)
append(_current_month_file(), [
str(event_id),
event_type,
str(timestamp),
aggregate_id or '',
json.dumps(data),
str(version)
])
return event_id
def subscribe(event_type=None, aggregate_id=None, since=None):
"""Query events from event store"""
import subprocess
import glob
# Determine which files to search
if since:
start_date = datetime.fromtimestamp(since)
# Search from that month forward
pattern = f'{EVENTS_DIR}/**/*.tsv*'
else:
# Search all event files
pattern = f'{EVENTS_DIR}/**/*.tsv*'
files = sorted(glob.glob(pattern, recursive=True))
for file in files:
# Use zgrep for .gz files, grep for .tsv
cmd = ['zgrep' if file.endswith('.gz') else 'grep', '.', file]
try:
output = subprocess.check_output(cmd, text=True)
for line in output.strip().split('\n'):
if not line:
continue
parts = line.split('\t')
event = {
'event_id': int(parts[0]),
'type': parts[1],
'timestamp': int(parts[2]),
'aggregate_id': parts[3] if parts[3] else None,
'data': json.loads(parts[4]),
'version': int(parts[5])
}
# Apply filters
if event_type and event['type'] != event_type:
continue
if aggregate_id and event['aggregate_id'] != aggregate_id:
continue
if since and event['timestamp'] < since:
continue
yield event
except subprocess.CalledProcessError:
continue
def replay(aggregate_id, handlers):
"""Replay events to rebuild state"""
state = {}
for event in subscribe(aggregate_id=aggregate_id):
handler = handlers.get(event['type'])
if handler:
state = handler(state, event)
return state
def project(event_type, reducer, initial=None):
"""Aggregate events into projection"""
state = initial
for event in subscribe(event_type=event_type):
state = reducer(state, event)
return state
~80 lines for complete event sourcing.
from dbbasic_events import publish, subscribe, replay
# Publish events
publish('user.registered', {
'user_id': 42,
'email': 'alice@example.com',
'name': 'Alice'
}, aggregate_id='user:42')
publish('user.email_changed', {
'user_id': 42,
'old_email': 'alice@example.com',
'new_email': 'alice.smith@example.com'
}, aggregate_id='user:42')
publish('user.name_changed', {
'user_id': 42,
'old_name': 'Alice',
'new_name': 'Alice Smith'
}, aggregate_id='user:42')
# Rebuild current state from events
def handle_registered(state, event):
state['user_id'] = event['data']['user_id']
state['email'] = event['data']['email']
state['name'] = event['data']['name']
return state
def handle_email_changed(state, event):
state['email'] = event['data']['new_email']
return state
def handle_name_changed(state, event):
state['name'] = event['data']['new_name']
return state
user = replay('user:42', {
'user.registered': handle_registered,
'user.email_changed': handle_email_changed,
'user.name_changed': handle_name_changed
})
# user = {'user_id': 42, 'email': 'alice.smith@example.com', 'name': 'Alice Smith'}
from dbbasic_events import subscribe, project
# Total revenue
def sum_revenue(total, event):
return total + event['data']['amount']
revenue = project('payment.received', sum_revenue, initial=0.0)
print(f"Total revenue: ${revenue}")
# Active users
active_users = set()
for event in subscribe('page.viewed', since=yesterday):
active_users.add(event['data']['user_id'])
print(f"Active users today: {len(active_users)}")
# Conversion funnel
funnel = {'viewed': 0, 'clicked': 0, 'purchased': 0}
for event in subscribe(since=start_of_month):
if event['type'] == 'product.viewed':
funnel['viewed'] += 1
elif event['type'] == 'product.clicked':
funnel['clicked'] += 1
elif event['type'] == 'payment.received':
funnel['purchased'] += 1
print(f"Conversion: {funnel['purchased'] / funnel['viewed'] * 100}%")
# Who changed what and when
for event in subscribe(aggregate_id='order:123'):
print(f"{event['timestamp']}: {event['type']}")
print(f" Data: {event['data']}")
# Output:
# 1696886400: order.created
# Data: {'customer': 42, 'total': 99.99}
# 1696886401: order.item_added
# Data: {'sku': 'ABC', 'quantity': 2}
# 1696886402: order.paid
# Data: {'amount': 99.99, 'method': 'card'}
# 1696886403: order.shipped
# Data: {'tracking': 'ABC123'}
from dbbasic_events import publish
from dbbasic_queue import enqueue
# When user registers
def register_user(email, password):
user = User.create(email, password)
# Publish event
publish('user.registered', {
'user_id': user.id,
'email': email,
'registered_at': time.time()
}, aggregate_id=f'user:{user.id}')
return user
# Separate worker subscribes to events
def event_processor():
"""Process recent events and trigger actions"""
for event in subscribe(since=last_processed_time):
if event['type'] == 'user.registered':
# Queue welcome email
enqueue('send_email', {
'to': event['data']['email'],
'template': 'welcome'
})
# Update analytics
enqueue('update_analytics', {
'event': 'user_registered',
'user_id': event['data']['user_id']
})
# Cron runs event processor
# */5 * * * * python3 workers/event_processor.py
| Events | Uncompressed | Compressed | Notes |
|---|---|---|---|
| 1M/month | 100MB | 10MB | Typical mid-size app |
| 12M/year | 1.2GB | 120MB | Annual storage |
| 60M (5 years) | 6GB | 600MB | Long-term storage |
Even at 1M events/month, storage is minimal.
| Operation | Time | Notes |
|---|---|---|
| Publish event | 0.1ms | Append to file |
| Subscribe (current month) | 0.5s | grep 100MB file |
| Subscribe (specific type) | 0.2s | grep filtered |
| Subscribe (aggregate) | 0.1s | grep specific ID |
| Subscribe (compressed month) | 2s | zgrep 10MB file |
| Replay aggregate | 0.5s | Read + process events |
| Events/Month | File Size | Query Time | Notes |
|---|---|---|---|
| 100K | 10MB | <0.1s | Small app |
| 1M | 100MB | ~0.5s | Medium app |
| 10M | 1GB | ~5s | Large app |
| 100M | 10GB | ~50s | Very large (consider upgrade) |
When to upgrade: - Query time > 5s consistently - Events > 10M/month - Multiple servers need event access - Real-time event processing required
#!/bin/bash
# /etc/cron.daily/dbbasic-events-compress
# Compress last month's events
YEAR=$(date -d "last month" +%Y)
MONTH=$(date -d "last month" +%m)
EVENT_FILE="data/events/${YEAR}/${MONTH}.tsv"
if [ -f "$EVENT_FILE" ]; then
gzip "$EVENT_FILE"
echo "Compressed $EVENT_FILE"
fi
Run daily - when month changes, previous month gets compressed.
#!/bin/bash
# Archive events older than 2 years to cold storage
CUTOFF_YEAR=$(($(date +%Y) - 2))
for year in data/events/*/; do
year_num=$(basename $year)
if [ $year_num -lt $CUTOFF_YEAR ]; then
tar czf "archive/events-${year_num}.tar.gz" "data/events/${year_num}"
rm -rf "data/events/${year_num}"
echo "Archived events from ${year_num}"
fi
done
Archive to: - S3 / object storage - Backup server - Tape (for compliance)
Events are permanent, but cold storage is cheap.
# Public API (never changes)
from dbbasic_events import publish, subscribe, replay, project
Default: TSV
# dbbasic_events/__init__.py
from .tsv_backend import publish, subscribe, replay, project
PostgreSQL Backend:
# dbbasic_events_postgres/__init__.py
# Same functions, different implementation
def publish(event_type, data, aggregate_id=None):
cursor.execute("""
INSERT INTO events (type, timestamp, aggregate_id, data)
VALUES (?, ?, ?, ?)
""", (event_type, time.time(), aggregate_id, json.dumps(data)))
Kafka Backend:
# dbbasic_events_kafka/__init__.py
# Same functions, Kafka implementation
def publish(event_type, data, aggregate_id=None):
producer.send('events', {
'type': event_type,
'data': data,
'aggregate_id': aggregate_id
})
Start:
# requirements.txt
dbbasic-events # Uses TSV
Upgrade to PostgreSQL:
# requirements.txt
dbbasic-events-postgres # Drop-in replacement
# config
export EVENTS_BACKEND=postgres
export DATABASE_URL=postgresql://...
Upgrade to Kafka:
# requirements.txt
dbbasic-events-kafka
# config
export EVENTS_BACKEND=kafka
export KAFKA_BROKERS=localhost:9092
Application code unchanged. Just swap package and config.
Modern hardware (2025): - NVMe SSD: 7GB/s read - 100MB event file: 0.014 seconds to read - grep: ~500MB/s throughput - 100MB file: 0.2 seconds to grep
Even with 10M events/month: - 1GB uncompressed - grep takes ~2 seconds - Compressed: 100MB = 0.2s to search
This is acceptable for most queries.
Don't need to scan entire history:
# Recent events (current month, uncompressed)
subscribe('user.registered', since=last_month)
# Fast: grep 100MB file
# Historical analysis (batch job)
subscribe('payment.received', since=start_of_year)
# Slower but acceptable: zgrep 12 compressed files
Real-time = current month (fast) Historical = compressed files (slower but rare)
Measured thresholds: - Events > 100M/month (10GB files) - Query time > 10s consistently - Real-time processing required (< 1s) - Multiple servers need concurrent access
Then upgrade to PostgreSQL or Kafka.
But most apps never hit this.
# Publish event, queue async actions
from dbbasic_events import publish
from dbbasic_queue import enqueue
def create_post(title, author):
post = Post.create(title, author)
# Publish event (permanent record)
publish('post.created', {
'post_id': post.id,
'title': title,
'author': author
}, aggregate_id=f'post:{post.id}')
# Queue notifications (async work)
enqueue('notify_followers', {'author_id': author, 'post_id': post.id})
return post
# Log important events
from dbbasic_events import publish
from dbbasic_logs import log
def publish_with_logging(event_type, data, aggregate_id=None):
"""Publish event and log it"""
event_id = publish(event_type, data, aggregate_id)
log.info(f"Event published: {event_type}", event_id=event_id, aggregate_id=aggregate_id)
return event_id
Command side (writes):
def update_user_email(user_id, new_email):
# Validate
user = User.get(user_id)
old_email = user.email
# Publish event (source of truth)
publish('user.email_changed', {
'user_id': user_id,
'old_email': old_email,
'new_email': new_email
}, aggregate_id=f'user:{user_id}')
# Update read model (denormalized)
User.update(user_id, email=new_email)
Query side (reads):
def get_user(user_id):
# Read from denormalized model (fast)
return User.get(user_id)
def get_user_history(user_id):
# Replay events for audit trail
return list(subscribe(aggregate_id=f'user:{user_id}'))
Benefits: - Fast reads (from denormalized model) - Complete history (from events) - Can rebuild read model anytime
from dbbasic_events import publish, subscribe, replay, project
def test_publish_event():
event_id = publish('test.event', {'foo': 'bar'})
assert event_id > 0
events = list(subscribe('test.event'))
assert len(events) > 0
assert events[-1]['data']['foo'] == 'bar'
def test_aggregate_events():
publish('user.registered', {'email': 'old@example.com'}, aggregate_id='user:42')
publish('user.email_changed', {'new_email': 'new@example.com'}, aggregate_id='user:42')
events = list(subscribe(aggregate_id='user:42'))
assert len(events) == 2
assert events[0]['version'] == 1
assert events[1]['version'] == 2
def test_replay():
publish('counter.incremented', {'amount': 5}, aggregate_id='counter:1')
publish('counter.incremented', {'amount': 3}, aggregate_id='counter:1')
def increment(state, event):
state['value'] = state.get('value', 0) + event['data']['amount']
return state
result = replay('counter:1', {'counter.incremented': increment})
assert result['value'] == 8
def test_project():
publish('sale.completed', {'amount': 100})
publish('sale.completed', {'amount': 200})
total = project('sale.completed', lambda sum, e: sum + e['data']['amount'], initial=0)
assert total == 300
import time
def test_large_volume():
# Publish 10,000 events
start = time.time()
for i in range(10000):
publish('test.event', {'index': i})
publish_time = time.time() - start
print(f"Published 10K events in {publish_time:.2f}s")
# Query them
start = time.time()
events = list(subscribe('test.event'))
query_time = time.time() - start
print(f"Queried 10K events in {query_time:.2f}s")
assert len(events) == 10000
assert publish_time < 5.0 # Should be fast
assert query_time < 2.0 # Should be reasonable
Queue: - Jobs to execute - Deleted after completion - Temporary
Logs: - Debugging information - Rotated (30-90 days) - Observability
Events: - Permanent record - Never deleted - Source of truth
Events are data, not diagnostics.
Events need: - Append-only (TSV perfect for this) - Queryable (grep/dbbasic-tsv) - Permanent (disk storage) - Huge volume (compression helps)
TSV provides: - Simple appends (fast) - Plain text (debuggable) - Compressible (10:1 ratio) - Standard format (consistent with dbbasic)
Alternatives: - Daily files (too many files) - Single file (too large) - Yearly files (harder to manage)
Monthly wins: - ~1M events = 100MB (manageable size) - 12 files/year (not too many) - Current month uncompressed (fast) - Old months compressed (space efficient)
Kafka is for: - Multiple services consuming events in real-time - Distributed systems - High throughput (100K+ events/second) - Multiple servers
Most apps don't need this.
Kafka setup: - JVM (500MB+ RAM) - ZooKeeper (another service) - Kafka broker - Configuration complexity - Operational overhead
TSV setup: - Write to file - Compress old files - Done
Start simple. Upgrade when measured.
Why design for swappable backends:
Example progression:
Month 1: TSV (100K events)
Query time: 0.1s ✓
Month 12: TSV (1M events)
Query time: 0.5s ✓
Month 24: TSV (10M events)
Query time: 5s ⚠️
Month 25: PostgreSQL (measured need)
Query time: 0.05s ✓
Month 48: Kafka (multiple services)
Real-time streaming ✓
Each upgrade based on measured need, not imagined scale.
| Feature | TSV | PostgreSQL | Kafka | AWS EventBridge |
|---|---|---|---|---|
| Setup | None | DB setup | Kafka+ZK | AWS account |
| Storage | Files | Database | Topics | Cloud |
| Query | grep | SQL | Stream API | CloudWatch |
| Cost | $0 | $0 (self-host) | $0 (self-host) | $$$$ |
| Compression | gzip (10:1) | Some | Snappy | Managed |
| Retention | Forever | Forever | Days-weeks | 7-90 days |
| Real-time | No (batch) | No (polling) | Yes | Yes |
| Multi-server | NFS | Yes | Yes | Yes |
| Complexity | Very low | Medium | High | Medium |
| Lines of code | 80 | 150 | 300 | 50 (SDK) |
TSV: Start here PostgreSQL: Upgrade at 10M+ events Kafka: Upgrade at 100M+ events or multi-server
Stay with TSV when: - Events < 10M/month - Query time < 5s - Batch processing acceptable - Single server
Upgrade to PostgreSQL when: - Events > 10M/month - Need faster queries (< 1s) - Complex filtering needed - Already running PostgreSQL
Upgrade to Kafka when: - Events > 100M/month - Real-time streaming required - Multiple services consuming events - Multi-server architecture
Before upgrading, ask: - [ ] Did we measure query performance? - [ ] Is it actually too slow for our use case? - [ ] Can we optimize TSV queries first? - [ ] Do we understand the added complexity? - [ ] Is our team ready to operate Kafka/PostgreSQL?
Don't upgrade because: - ❌ "We might need it someday" - ❌ "Everyone else uses Kafka" - ❌ "It sounds more professional"
Upgrade because: - ✅ Measured: "Queries taking 10s, users complaining" - ✅ Measured: "10M events/month, TSV struggling" - ✅ Requirement: "Multiple services need real-time events"
A: Generally no - events are permanent record.
Exception: GDPR/privacy compliance
# Remove user's personal data from events
def anonymize_user_events(user_id):
"""Replace user data with anonymized version (GDPR)"""
for month_file in glob.glob('data/events/**/*.tsv'):
# Filter and rewrite with anonymized data
filter_file(month_file, lambda row:
anonymize_row(row) if f'user:{user_id}' in row else row
)
A: Read files in chronological order:
for event in subscribe(): # No filters = all events
process_event(event)
With compression, reading 60M events (5 years) = ~600MB compressed = minutes.
A: The version field tracks aggregate version:
# User 42 events:
event_id=1, version=1: user.registered
event_id=5, version=2: user.email_changed
event_id=9, version=3: user.name_changed
Rebuild user state = replay versions 1→2→3 in order.
A: Yes (saga pattern):
# Event processor (cron job)
for event in subscribe('order.paid', since=last_run):
# Trigger next events
publish('inventory.reserved', {
'order_id': event['data']['order_id'],
'items': event['data']['items']
})
publish('shipping.requested', {
'order_id': event['data']['order_id']
})
A: Event data is JSON - additive changes work:
# Old events
{'user_id': 42, 'email': 'user@example.com'}
# New events (added field)
{'user_id': 42, 'email': 'user@example.com', 'referrer': 'google'}
# Handlers handle both
def handle_registered(state, event):
state['email'] = event['data']['email']
state['referrer'] = event['data'].get('referrer', 'direct') # Default for old events
return state
Never modify old events - they're historical facts.
dbbasic-events/
├── dbbasic_events/
│ ├── __init__.py # Main implementation (80 lines)
│ ├── tsv_backend.py # TSV storage (default)
│ └── compress.py # Compression script
├── tests/
│ ├── test_events.py
│ ├── test_replay.py
│ └── test_performance.py
├── setup.py
├── README.md
├── LICENSE
└── CHANGELOG.md
Optional backends (separate packages):
dbbasic-events-postgres/ # PostgreSQL backend
dbbasic-events-kafka/ # Kafka backend
dbbasic-events-sqlite/ # SQLite backend
This implementation is successful if:
This is the key to dbbasic's 10x advantage:
┌─────────────────┐
│ Your App Code │ (never changes)
└────────┬────────┘
│ (interface)
┌────┴────┐
│ │
┌───▼───┐ ┌──▼──────┐ ┌──▼─────┐
│ TSV │ │ Postgres│ │ Kafka │
│Backend│ │ Backend │ │Backend │
└───────┘ └─────────┘ └────────┘
Day 1 Year 2 Year 5
Your application:
from dbbasic_events import publish, subscribe
# This never changes
Backend swaps:
# .env file
EVENTS_BACKEND=tsv # Start
EVENTS_BACKEND=postgres # Upgrade
EVENTS_BACKEND=kafka # Scale
All modules work this way: - dbbasic-sessions: Signed cookies → Redis sessions - dbbasic-queue: TSV → Redis → Kafka - dbbasic-events: TSV → PostgreSQL → Kafka - dbbasic-logs: TSV → Elasticsearch
Build features on day 1. Swap infrastructure when measured.
dbbasic-events provides event sourcing without the complexity:
What you get: - Permanent event log (append-only) - Event sourcing (replay state) - CQRS support (command/query separation) - Analytics (aggregate events) - Audit trail (full history)
How it works: - TSV files (monthly partitions) - Compression (10:1 ratio) - grep/zgrep (search) - 80 lines of code
Why it scales: - Modern hardware is fast - 12M events/year = 120MB compressed - grep 100MB in 0.2s - Good enough for 90% of apps
When to upgrade: - Measured performance issues - Events > 10M/month - Real-time processing required - Multiple services need events
Upgrade path: - Same interface - Swap backend - Application code unchanged
Start simple. Measure. Swap when needed.
Not because you might need scale. Because you measured you need scale.
Next Steps: Implement TSV backend, test at volume, ship.
No Kafka. No ZooKeeper. No complexity.
Just append-only TSV files and Unix compression.
80 lines of code that grow with your business.