← Back to Modules

dbbasic-events Specification

Version: 1.0 Status: Specification Author: DBBasic Project Date: October 2025


Philosophy

"Append events. Replay state. Scale when measured."

Events are the permanent record of what happened. Unlike logs (rotated), queue (deleted), or sessions (ephemeral), events are kept forever and become the source of truth.

Design Principles

  1. Append-Only: Never modify or delete events
  2. Immutable: Events are historical facts
  3. Replayable: Rebuild state from events
  4. Swappable: Start TSV, upgrade to DB/Kafka when needed
  5. Simple Interface: Same API regardless of backend

The Modern Context

Critical observation: Systems are getting more powerful faster than user growth.

2010: - Average server: 4 cores, 8GB RAM - Average site: 10K users - Ratio: Barely keeping up

2025: - Average server: 32 cores, 128GB RAM - Average site: Still ~10K users (most sites) - Ratio: Massive overcapacity

The implication: - Single server handles what needed 10 servers in 2010 - TSV can handle what needed Kafka in 2010 - Start simple, measure, upgrade only if needed

Most apps will never need to upgrade.


What Are Events?

Events vs Queue vs Logs

Queue (dbbasic-queue):

enqueue('send_email', {'to': 'user@example.com'})
# Executed once, then deleted
# Lifespan: minutes to hours

Logs (dbbasic-logs):

log.info("Email sent", to='user@example.com')
# Recorded for debugging
# Lifespan: 30-90 days, then deleted

Events (dbbasic-events):

publish('email.sent', {'to': 'user@example.com', 'message_id': 'abc'})
# Permanent record
# Lifespan: Forever (source of truth)

Event Sourcing

Traditional state storage:

# Database stores current state
users.update(id=42, email='new@example.com')
# Old email lost forever

Event sourcing:

# Store the change as event
publish('user.email_changed', {
    'user_id': 42,
    'old_email': 'old@example.com',
    'new_email': 'new@example.com'
})

# Current state = replay all events
def get_user_email(user_id):
    events = subscribe('user.email_changed', user_id=user_id)
    return events[-1]['new_email']  # Latest email

Benefits: - Complete audit trail - Time travel (state at any point in history) - Replay to fix bugs - Analytics gold mine

Use Cases

Financial transactions:

publish('payment.received', {'order_id': 123, 'amount': 99.99})
publish('payment.refunded', {'order_id': 123, 'amount': 99.99})
# Balance = sum of all payment events

Inventory management:

publish('inventory.added', {'sku': 'ABC', 'quantity': 100})
publish('inventory.sold', {'sku': 'ABC', 'quantity': 1})
publish('inventory.returned', {'sku': 'ABC', 'quantity': 1})
# Current stock = replay all inventory events

User activity tracking:

publish('page.viewed', {'user_id': 42, 'page': '/products'})
publish('product.clicked', {'user_id': 42, 'product_id': 99})
publish('cart.added', {'user_id': 42, 'product_id': 99})
# User journey = all their events in order

Storage Format

Monthly TSV Files (Append-Only)

data/events/
  2025/
    10.tsv           (current month - active, ~100MB)
    09.tsv.gz        (compressed, ~10MB)
    08.tsv.gz
  2024/
    12.tsv.gz
    11.tsv.gz

Why monthly partitions: - Current month uncompressed (fast writes, fast queries) - Old months compressed (10:1 space savings) - Easy to archive old years - ~1M events per month = manageable file size

TSV Schema

Columns:

event_id    type    timestamp   aggregate_id    data    version

Example:

1   user.registered 1696886400  user:42 {"email":"user@example.com","name":"Alice"} 1
2   user.email_changed  1696886401  user:42 {"old":"user@example.com","new":"alice@example.com"}    2
3   post.created    1696886402  post:123    {"title":"Hello","author":42}   1
4   post.published  1696886403  post:123    {"published_at":1696886403} 2

Fields: - event_id: Globally unique, sequential - type: Event type (e.g., 'user.registered') - timestamp: When event occurred - aggregate_id: Entity this event relates to (e.g., 'user:42') - data: Event payload (JSON) - version: Event version for this aggregate (1, 2, 3...)

Why this schema: - aggregate_id enables "all events for user 42" - version tracks sequence for each entity - type enables "all registration events" - timestamp enables time-based queries


API Specification

Function: publish(event_type, data, aggregate_id=None)

Purpose: Publish new event to event store

Parameters: - event_type (str): Event type (e.g., 'user.registered') - data (dict): Event data/payload - aggregate_id (str, optional): Entity ID (e.g., 'user:42')

Returns: - event_id (int): Unique event identifier

Behavior: 1. Generate sequential event_id 2. Get current timestamp 3. Calculate version for aggregate (if aggregate_id provided) 4. Append to current month's TSV file 5. Return event_id

Example:

from dbbasic_events import publish

# User registration
event_id = publish('user.registered', {
    'user_id': 42,
    'email': 'user@example.com',
    'name': 'Alice'
}, aggregate_id='user:42')

# Post creation
publish('post.created', {
    'post_id': 123,
    'title': 'Hello World',
    'author': 42
}, aggregate_id='post:123')

# Generic event (no aggregate)
publish('system.backup_completed', {
    'duration': 120,
    'size_mb': 500
})

Function: subscribe(event_type=None, aggregate_id=None, since=None)

Purpose: Query events from event store

Parameters: - event_type (str, optional): Filter by event type - aggregate_id (str, optional): Filter by aggregate - since (int, optional): Unix timestamp to start from

Returns: - Iterator of events (dicts)

Behavior: 1. Determine which TSV files to read (based on since) 2. Read events (using grep/zgrep for filtering) 3. Parse and yield events in order

Example:

from dbbasic_events import subscribe

# All user registration events
for event in subscribe('user.registered'):
    print(f"User {event['data']['user_id']} registered")

# All events for specific user
for event in subscribe(aggregate_id='user:42'):
    print(f"{event['type']}: {event['data']}")

# Recent events (last 24 hours)
yesterday = int(time.time()) - 86400
for event in subscribe(since=yesterday):
    process_event(event)

# All events of type since timestamp
for event in subscribe('payment.received', since=start_of_month):
    total += event['data']['amount']

Function: replay(aggregate_id, handlers)

Purpose: Rebuild state by replaying events

Parameters: - aggregate_id (str): Entity to replay - handlers (dict): Event type → handler function

Returns: - Final state after replaying all events

Behavior: 1. Get all events for aggregate (in order) 2. For each event, call handler 3. Handler updates state 4. Return final state

Example:

from dbbasic_events import replay

# Rebuild user state from events
def handle_user_registered(state, event):
    state['email'] = event['data']['email']
    state['name'] = event['data']['name']
    return state

def handle_email_changed(state, event):
    state['email'] = event['data']['new_email']
    return state

user_state = replay('user:42', {
    'user.registered': handle_user_registered,
    'user.email_changed': handle_email_changed
})

# user_state = {'email': 'current@example.com', 'name': 'Alice'}

Function: project(event_type, reducer, initial=None)

Purpose: Aggregate events into projection/view

Parameters: - event_type (str): Event type to aggregate - reducer (function): Reducer function (state, event) → new_state - initial (any): Initial state

Returns: - Final aggregated state

Behavior: 1. Subscribe to event_type 2. Reduce events using reducer function 3. Return final state

Example:

from dbbasic_events import project

# Calculate total revenue
def sum_payments(total, event):
    return total + event['data']['amount']

total_revenue = project('payment.received', sum_payments, initial=0.0)

# Count active users
def count_users(state, event):
    if event['type'] == 'user.registered':
        state['total'] += 1
    elif event['type'] == 'user.deleted':
        state['total'] -= 1
    return state

user_count = project('user.*', count_users, initial={'total': 0})

Implementation

Core Implementation (~80 lines)

import os
import time
import json
from datetime import datetime
from dbbasic_tsv import append, query

EVENTS_DIR = os.getenv('EVENTS_DIR', 'data/events')

def _current_month_file():
    """Get current month's event file"""
    year = datetime.now().year
    month = datetime.now().strftime('%m')
    os.makedirs(f'{EVENTS_DIR}/{year}', exist_ok=True)
    return f'{EVENTS_DIR}/{year}/{month}.tsv'

def _next_event_id():
    """Generate sequential event ID"""
    # Read last event ID from current month
    try:
        with open(_current_month_file()) as f:
            lines = f.readlines()
            if lines:
                last_id = int(lines[-1].split('\t')[0])
                return last_id + 1
    except:
        pass
    return 1

def _get_version(aggregate_id):
    """Get next version for aggregate"""
    if not aggregate_id:
        return 1

    # Count events for this aggregate
    events = list(subscribe(aggregate_id=aggregate_id))
    return len(events) + 1

def publish(event_type, data, aggregate_id=None):
    """Publish event to event store"""
    event_id = _next_event_id()
    timestamp = int(time.time())
    version = _get_version(aggregate_id)

    append(_current_month_file(), [
        str(event_id),
        event_type,
        str(timestamp),
        aggregate_id or '',
        json.dumps(data),
        str(version)
    ])

    return event_id

def subscribe(event_type=None, aggregate_id=None, since=None):
    """Query events from event store"""
    import subprocess
    import glob

    # Determine which files to search
    if since:
        start_date = datetime.fromtimestamp(since)
        # Search from that month forward
        pattern = f'{EVENTS_DIR}/**/*.tsv*'
    else:
        # Search all event files
        pattern = f'{EVENTS_DIR}/**/*.tsv*'

    files = sorted(glob.glob(pattern, recursive=True))

    for file in files:
        # Use zgrep for .gz files, grep for .tsv
        cmd = ['zgrep' if file.endswith('.gz') else 'grep', '.', file]

        try:
            output = subprocess.check_output(cmd, text=True)
            for line in output.strip().split('\n'):
                if not line:
                    continue

                parts = line.split('\t')
                event = {
                    'event_id': int(parts[0]),
                    'type': parts[1],
                    'timestamp': int(parts[2]),
                    'aggregate_id': parts[3] if parts[3] else None,
                    'data': json.loads(parts[4]),
                    'version': int(parts[5])
                }

                # Apply filters
                if event_type and event['type'] != event_type:
                    continue
                if aggregate_id and event['aggregate_id'] != aggregate_id:
                    continue
                if since and event['timestamp'] < since:
                    continue

                yield event
        except subprocess.CalledProcessError:
            continue

def replay(aggregate_id, handlers):
    """Replay events to rebuild state"""
    state = {}

    for event in subscribe(aggregate_id=aggregate_id):
        handler = handlers.get(event['type'])
        if handler:
            state = handler(state, event)

    return state

def project(event_type, reducer, initial=None):
    """Aggregate events into projection"""
    state = initial

    for event in subscribe(event_type=event_type):
        state = reducer(state, event)

    return state

~80 lines for complete event sourcing.


Dependencies


Usage Examples

Event Sourcing Pattern

from dbbasic_events import publish, subscribe, replay

# Publish events
publish('user.registered', {
    'user_id': 42,
    'email': 'alice@example.com',
    'name': 'Alice'
}, aggregate_id='user:42')

publish('user.email_changed', {
    'user_id': 42,
    'old_email': 'alice@example.com',
    'new_email': 'alice.smith@example.com'
}, aggregate_id='user:42')

publish('user.name_changed', {
    'user_id': 42,
    'old_name': 'Alice',
    'new_name': 'Alice Smith'
}, aggregate_id='user:42')

# Rebuild current state from events
def handle_registered(state, event):
    state['user_id'] = event['data']['user_id']
    state['email'] = event['data']['email']
    state['name'] = event['data']['name']
    return state

def handle_email_changed(state, event):
    state['email'] = event['data']['new_email']
    return state

def handle_name_changed(state, event):
    state['name'] = event['data']['new_name']
    return state

user = replay('user:42', {
    'user.registered': handle_registered,
    'user.email_changed': handle_email_changed,
    'user.name_changed': handle_name_changed
})

# user = {'user_id': 42, 'email': 'alice.smith@example.com', 'name': 'Alice Smith'}

Analytics & Reporting

from dbbasic_events import subscribe, project

# Total revenue
def sum_revenue(total, event):
    return total + event['data']['amount']

revenue = project('payment.received', sum_revenue, initial=0.0)
print(f"Total revenue: ${revenue}")

# Active users
active_users = set()
for event in subscribe('page.viewed', since=yesterday):
    active_users.add(event['data']['user_id'])

print(f"Active users today: {len(active_users)}")

# Conversion funnel
funnel = {'viewed': 0, 'clicked': 0, 'purchased': 0}
for event in subscribe(since=start_of_month):
    if event['type'] == 'product.viewed':
        funnel['viewed'] += 1
    elif event['type'] == 'product.clicked':
        funnel['clicked'] += 1
    elif event['type'] == 'payment.received':
        funnel['purchased'] += 1

print(f"Conversion: {funnel['purchased'] / funnel['viewed'] * 100}%")

Audit Trail

# Who changed what and when
for event in subscribe(aggregate_id='order:123'):
    print(f"{event['timestamp']}: {event['type']}")
    print(f"  Data: {event['data']}")

# Output:
# 1696886400: order.created
#   Data: {'customer': 42, 'total': 99.99}
# 1696886401: order.item_added
#   Data: {'sku': 'ABC', 'quantity': 2}
# 1696886402: order.paid
#   Data: {'amount': 99.99, 'method': 'card'}
# 1696886403: order.shipped
#   Data: {'tracking': 'ABC123'}

Event-Driven Architecture

from dbbasic_events import publish
from dbbasic_queue import enqueue

# When user registers
def register_user(email, password):
    user = User.create(email, password)

    # Publish event
    publish('user.registered', {
        'user_id': user.id,
        'email': email,
        'registered_at': time.time()
    }, aggregate_id=f'user:{user.id}')

    return user

# Separate worker subscribes to events
def event_processor():
    """Process recent events and trigger actions"""
    for event in subscribe(since=last_processed_time):
        if event['type'] == 'user.registered':
            # Queue welcome email
            enqueue('send_email', {
                'to': event['data']['email'],
                'template': 'welcome'
            })

            # Update analytics
            enqueue('update_analytics', {
                'event': 'user_registered',
                'user_id': event['data']['user_id']
            })

# Cron runs event processor
# */5 * * * * python3 workers/event_processor.py

Performance Characteristics

Storage

Events Uncompressed Compressed Notes
1M/month 100MB 10MB Typical mid-size app
12M/year 1.2GB 120MB Annual storage
60M (5 years) 6GB 600MB Long-term storage

Even at 1M events/month, storage is minimal.

Query Performance

Operation Time Notes
Publish event 0.1ms Append to file
Subscribe (current month) 0.5s grep 100MB file
Subscribe (specific type) 0.2s grep filtered
Subscribe (aggregate) 0.1s grep specific ID
Subscribe (compressed month) 2s zgrep 10MB file
Replay aggregate 0.5s Read + process events

Scaling

Events/Month File Size Query Time Notes
100K 10MB <0.1s Small app
1M 100MB ~0.5s Medium app
10M 1GB ~5s Large app
100M 10GB ~50s Very large (consider upgrade)

When to upgrade: - Query time > 5s consistently - Events > 10M/month - Multiple servers need event access - Real-time event processing required


Rotation & Compression

Automatic Monthly Rotation

#!/bin/bash
# /etc/cron.daily/dbbasic-events-compress
# Compress last month's events

YEAR=$(date -d "last month" +%Y)
MONTH=$(date -d "last month" +%m)

EVENT_FILE="data/events/${YEAR}/${MONTH}.tsv"

if [ -f "$EVENT_FILE" ]; then
    gzip "$EVENT_FILE"
    echo "Compressed $EVENT_FILE"
fi

Run daily - when month changes, previous month gets compressed.

Archive Old Years

#!/bin/bash
# Archive events older than 2 years to cold storage

CUTOFF_YEAR=$(($(date +%Y) - 2))

for year in data/events/*/; do
    year_num=$(basename $year)
    if [ $year_num -lt $CUTOFF_YEAR ]; then
        tar czf "archive/events-${year_num}.tar.gz" "data/events/${year_num}"
        rm -rf "data/events/${year_num}"
        echo "Archived events from ${year_num}"
    fi
done

Archive to: - S3 / object storage - Backup server - Tape (for compliance)

Events are permanent, but cold storage is cheap.


Swappable Backend Design

Interface (Same Across All Backends)

# Public API (never changes)
from dbbasic_events import publish, subscribe, replay, project

Backend Implementations

Default: TSV

# dbbasic_events/__init__.py
from .tsv_backend import publish, subscribe, replay, project

PostgreSQL Backend:

# dbbasic_events_postgres/__init__.py
# Same functions, different implementation

def publish(event_type, data, aggregate_id=None):
    cursor.execute("""
        INSERT INTO events (type, timestamp, aggregate_id, data)
        VALUES (?, ?, ?, ?)
    """, (event_type, time.time(), aggregate_id, json.dumps(data)))

Kafka Backend:

# dbbasic_events_kafka/__init__.py
# Same functions, Kafka implementation

def publish(event_type, data, aggregate_id=None):
    producer.send('events', {
        'type': event_type,
        'data': data,
        'aggregate_id': aggregate_id
    })

Migration Path

Start:

# requirements.txt
dbbasic-events  # Uses TSV

Upgrade to PostgreSQL:

# requirements.txt
dbbasic-events-postgres  # Drop-in replacement

# config
export EVENTS_BACKEND=postgres
export DATABASE_URL=postgresql://...

Upgrade to Kafka:

# requirements.txt
dbbasic-events-kafka

# config
export EVENTS_BACKEND=kafka
export KAFKA_BROKERS=localhost:9092

Application code unchanged. Just swap package and config.


Why TSV Works at Scale

The Math

Modern hardware (2025): - NVMe SSD: 7GB/s read - 100MB event file: 0.014 seconds to read - grep: ~500MB/s throughput - 100MB file: 0.2 seconds to grep

Even with 10M events/month: - 1GB uncompressed - grep takes ~2 seconds - Compressed: 100MB = 0.2s to search

This is acceptable for most queries.

Query Optimization

Don't need to scan entire history:

# Recent events (current month, uncompressed)
subscribe('user.registered', since=last_month)
# Fast: grep 100MB file

# Historical analysis (batch job)
subscribe('payment.received', since=start_of_year)
# Slower but acceptable: zgrep 12 compressed files

Real-time = current month (fast) Historical = compressed files (slower but rare)

When TSV Becomes Slow

Measured thresholds: - Events > 100M/month (10GB files) - Query time > 10s consistently - Real-time processing required (< 1s) - Multiple servers need concurrent access

Then upgrade to PostgreSQL or Kafka.

But most apps never hit this.


Integration with Other Modules

dbbasic-queue Integration

# Publish event, queue async actions
from dbbasic_events import publish
from dbbasic_queue import enqueue

def create_post(title, author):
    post = Post.create(title, author)

    # Publish event (permanent record)
    publish('post.created', {
        'post_id': post.id,
        'title': title,
        'author': author
    }, aggregate_id=f'post:{post.id}')

    # Queue notifications (async work)
    enqueue('notify_followers', {'author_id': author, 'post_id': post.id})

    return post

dbbasic-logs Integration

# Log important events
from dbbasic_events import publish
from dbbasic_logs import log

def publish_with_logging(event_type, data, aggregate_id=None):
    """Publish event and log it"""
    event_id = publish(event_type, data, aggregate_id)
    log.info(f"Event published: {event_type}", event_id=event_id, aggregate_id=aggregate_id)
    return event_id

Building CQRS Pattern

Command side (writes):

def update_user_email(user_id, new_email):
    # Validate
    user = User.get(user_id)
    old_email = user.email

    # Publish event (source of truth)
    publish('user.email_changed', {
        'user_id': user_id,
        'old_email': old_email,
        'new_email': new_email
    }, aggregate_id=f'user:{user_id}')

    # Update read model (denormalized)
    User.update(user_id, email=new_email)

Query side (reads):

def get_user(user_id):
    # Read from denormalized model (fast)
    return User.get(user_id)

def get_user_history(user_id):
    # Replay events for audit trail
    return list(subscribe(aggregate_id=f'user:{user_id}'))

Benefits: - Fast reads (from denormalized model) - Complete history (from events) - Can rebuild read model anytime


Testing Requirements

Unit Tests

from dbbasic_events import publish, subscribe, replay, project

def test_publish_event():
    event_id = publish('test.event', {'foo': 'bar'})
    assert event_id > 0

    events = list(subscribe('test.event'))
    assert len(events) > 0
    assert events[-1]['data']['foo'] == 'bar'

def test_aggregate_events():
    publish('user.registered', {'email': 'old@example.com'}, aggregate_id='user:42')
    publish('user.email_changed', {'new_email': 'new@example.com'}, aggregate_id='user:42')

    events = list(subscribe(aggregate_id='user:42'))
    assert len(events) == 2
    assert events[0]['version'] == 1
    assert events[1]['version'] == 2

def test_replay():
    publish('counter.incremented', {'amount': 5}, aggregate_id='counter:1')
    publish('counter.incremented', {'amount': 3}, aggregate_id='counter:1')

    def increment(state, event):
        state['value'] = state.get('value', 0) + event['data']['amount']
        return state

    result = replay('counter:1', {'counter.incremented': increment})
    assert result['value'] == 8

def test_project():
    publish('sale.completed', {'amount': 100})
    publish('sale.completed', {'amount': 200})

    total = project('sale.completed', lambda sum, e: sum + e['data']['amount'], initial=0)
    assert total == 300

Performance Tests

import time

def test_large_volume():
    # Publish 10,000 events
    start = time.time()
    for i in range(10000):
        publish('test.event', {'index': i})
    publish_time = time.time() - start
    print(f"Published 10K events in {publish_time:.2f}s")

    # Query them
    start = time.time()
    events = list(subscribe('test.event'))
    query_time = time.time() - start
    print(f"Queried 10K events in {query_time:.2f}s")

    assert len(events) == 10000
    assert publish_time < 5.0  # Should be fast
    assert query_time < 2.0    # Should be reasonable

Architectural Decisions

Why Events Are Different from Queue/Logs

Queue: - Jobs to execute - Deleted after completion - Temporary

Logs: - Debugging information - Rotated (30-90 days) - Observability

Events: - Permanent record - Never deleted - Source of truth

Events are data, not diagnostics.

Why TSV for Events?

Events need: - Append-only (TSV perfect for this) - Queryable (grep/dbbasic-tsv) - Permanent (disk storage) - Huge volume (compression helps)

TSV provides: - Simple appends (fast) - Plain text (debuggable) - Compressible (10:1 ratio) - Standard format (consistent with dbbasic)

Why Monthly Partitions?

Alternatives: - Daily files (too many files) - Single file (too large) - Yearly files (harder to manage)

Monthly wins: - ~1M events = 100MB (manageable size) - 12 files/year (not too many) - Current month uncompressed (fast) - Old months compressed (space efficient)

Why Not Kafka from the Start?

Kafka is for: - Multiple services consuming events in real-time - Distributed systems - High throughput (100K+ events/second) - Multiple servers

Most apps don't need this.

Kafka setup: - JVM (500MB+ RAM) - ZooKeeper (another service) - Kafka broker - Configuration complexity - Operational overhead

TSV setup: - Write to file - Compress old files - Done

Start simple. Upgrade when measured.

The Swap-Out Strategy

Why design for swappable backends:

  1. Start fast - TSV works today
  2. Learn - Understand event patterns in your app
  3. Measure - See actual volume/performance
  4. Upgrade informed - Swap when data demands it

Example progression:

Month 1: TSV (100K events)
  Query time: 0.1s ✓

Month 12: TSV (1M events)
  Query time: 0.5s ✓

Month 24: TSV (10M events)
  Query time: 5s ⚠️

Month 25: PostgreSQL (measured need)
  Query time: 0.05s ✓

Month 48: Kafka (multiple services)
  Real-time streaming ✓

Each upgrade based on measured need, not imagined scale.


Comparison to Alternatives

Feature TSV PostgreSQL Kafka AWS EventBridge
Setup None DB setup Kafka+ZK AWS account
Storage Files Database Topics Cloud
Query grep SQL Stream API CloudWatch
Cost $0 $0 (self-host) $0 (self-host) $$$$
Compression gzip (10:1) Some Snappy Managed
Retention Forever Forever Days-weeks 7-90 days
Real-time No (batch) No (polling) Yes Yes
Multi-server NFS Yes Yes Yes
Complexity Very low Medium High Medium
Lines of code 80 150 300 50 (SDK)

TSV: Start here PostgreSQL: Upgrade at 10M+ events Kafka: Upgrade at 100M+ events or multi-server


When to Upgrade

Metrics to Watch

Stay with TSV when: - Events < 10M/month - Query time < 5s - Batch processing acceptable - Single server

Upgrade to PostgreSQL when: - Events > 10M/month - Need faster queries (< 1s) - Complex filtering needed - Already running PostgreSQL

Upgrade to Kafka when: - Events > 100M/month - Real-time streaming required - Multiple services consuming events - Multi-server architecture

Migration Checklist

Before upgrading, ask: - [ ] Did we measure query performance? - [ ] Is it actually too slow for our use case? - [ ] Can we optimize TSV queries first? - [ ] Do we understand the added complexity? - [ ] Is our team ready to operate Kafka/PostgreSQL?

Don't upgrade because: - ❌ "We might need it someday" - ❌ "Everyone else uses Kafka" - ❌ "It sounds more professional"

Upgrade because: - ✅ Measured: "Queries taking 10s, users complaining" - ✅ Measured: "10M events/month, TSV struggling" - ✅ Requirement: "Multiple services need real-time events"


Common Questions

Q: Can I delete events?

A: Generally no - events are permanent record.

Exception: GDPR/privacy compliance

# Remove user's personal data from events
def anonymize_user_events(user_id):
    """Replace user data with anonymized version (GDPR)"""
    for month_file in glob.glob('data/events/**/*.tsv'):
        # Filter and rewrite with anonymized data
        filter_file(month_file, lambda row:
            anonymize_row(row) if f'user:{user_id}' in row else row
        )

Q: How do I replay all events?

A: Read files in chronological order:

for event in subscribe():  # No filters = all events
    process_event(event)

With compression, reading 60M events (5 years) = ~600MB compressed = minutes.

Q: What about event versioning?

A: The version field tracks aggregate version:

# User 42 events:
event_id=1, version=1: user.registered
event_id=5, version=2: user.email_changed
event_id=9, version=3: user.name_changed

Rebuild user state = replay versions 1→2→3 in order.

Q: Can events trigger other events?

A: Yes (saga pattern):

# Event processor (cron job)
for event in subscribe('order.paid', since=last_run):
    # Trigger next events
    publish('inventory.reserved', {
        'order_id': event['data']['order_id'],
        'items': event['data']['items']
    })

    publish('shipping.requested', {
        'order_id': event['data']['order_id']
    })

Q: How do I handle schema changes?

A: Event data is JSON - additive changes work:

# Old events
{'user_id': 42, 'email': 'user@example.com'}

# New events (added field)
{'user_id': 42, 'email': 'user@example.com', 'referrer': 'google'}

# Handlers handle both
def handle_registered(state, event):
    state['email'] = event['data']['email']
    state['referrer'] = event['data'].get('referrer', 'direct')  # Default for old events
    return state

Never modify old events - they're historical facts.


Package Structure

dbbasic-events/
├── dbbasic_events/
│   ├── __init__.py          # Main implementation (80 lines)
│   ├── tsv_backend.py       # TSV storage (default)
│   └── compress.py          # Compression script
├── tests/
│   ├── test_events.py
│   ├── test_replay.py
│   └── test_performance.py
├── setup.py
├── README.md
├── LICENSE
└── CHANGELOG.md

Optional backends (separate packages):
dbbasic-events-postgres/     # PostgreSQL backend
dbbasic-events-kafka/        # Kafka backend
dbbasic-events-sqlite/       # SQLite backend

Success Criteria

This implementation is successful if:

  1. Simple: < 100 lines for TSV backend
  2. Append-Only: Immutable event log
  3. Queryable: grep/subscribe API
  4. Scalable: Handles 10M+ events with compression
  5. Swappable: Interface allows backend upgrades
  6. Permanent: Events never lost
  7. Debuggable: cat/zcat shows events
  8. Event Sourcing: Replay/project support

The Bigger Picture: Swappable Architecture

This is the key to dbbasic's 10x advantage:

Start Simple, Swap When Needed

┌─────────────────┐
│  Your App Code  │  (never changes)
└────────┬────────┘
         │ (interface)
    ┌────┴────┐
    │         │
┌───▼───┐ ┌──▼──────┐ ┌──▼─────┐
│  TSV  │ │ Postgres│ │ Kafka  │
│Backend│ │ Backend │ │Backend │
└───────┘ └─────────┘ └────────┘
  Day 1     Year 2      Year 5

Your application:

from dbbasic_events import publish, subscribe
# This never changes

Backend swaps:

# .env file
EVENTS_BACKEND=tsv        # Start
EVENTS_BACKEND=postgres   # Upgrade
EVENTS_BACKEND=kafka      # Scale

All modules work this way: - dbbasic-sessions: Signed cookies → Redis sessions - dbbasic-queue: TSV → Redis → Kafka - dbbasic-events: TSV → PostgreSQL → Kafka - dbbasic-logs: TSV → Elasticsearch

Build features on day 1. Swap infrastructure when measured.


References


Summary

dbbasic-events provides event sourcing without the complexity:

What you get: - Permanent event log (append-only) - Event sourcing (replay state) - CQRS support (command/query separation) - Analytics (aggregate events) - Audit trail (full history)

How it works: - TSV files (monthly partitions) - Compression (10:1 ratio) - grep/zgrep (search) - 80 lines of code

Why it scales: - Modern hardware is fast - 12M events/year = 120MB compressed - grep 100MB in 0.2s - Good enough for 90% of apps

When to upgrade: - Measured performance issues - Events > 10M/month - Real-time processing required - Multiple services need events

Upgrade path: - Same interface - Swap backend - Application code unchanged

Start simple. Measure. Swap when needed.

Not because you might need scale. Because you measured you need scale.


Next Steps: Implement TSV backend, test at volume, ship.

No Kafka. No ZooKeeper. No complexity.

Just append-only TSV files and Unix compression.

80 lines of code that grow with your business.