Skip to content

Redesign event publishing system to replace audit package #1339

@AmanGIT07

Description

@AmanGIT07

Event Publisher Redesign

Context

The audit package is being deprecated:

  • Audit loggingauditrecord package (already implemented)
  • Event publishingpublisher package (this design)

Problem

Current architecture mixes concerns and has limitations:

audit.GetAuditor(ctx, orgID).Log(event, target)
       │
       ▼
audit.Service.Create()
       │
       ├──► audit DB (deprecated)
       ├──► webhookService.Publish() (HTTP POST)
       └──► channel (buffer=10) → ChanListener (1 worker) → switch → EnsureDefaultPlan()

Issues:

  • Context-based DI hack
  • Single worker, switch-based routing
  • In-memory only (events lost on crash)
  • Only ONE internal consumer: org.created → billing setup

Solution

Usage

// Current
audit.GetAuditor(ctx, orgID).LogWithAttrs(audit.OrgCreatedEvent, audit.OrgTarget(newOrg.ID), attrs)

// New - equally simple
s.publisher.Publish(ctx, publisher.Event{
    OrgID:  newOrg.ID,
    Type:   publisher.OrgCreatedEvent,
    Target: publisher.OrgTarget(newOrg.ID, newOrg.Name),
    Attrs:  map[string]string{"title": newOrg.Title},
})

Event Struct

type Target struct {
    ID, Type, Name string
}

type Event struct {
    OrgID  string
    Type   string             // "app.organization.created"
    Target Target
    Attrs  map[string]string  // optional
}

Publisher handles internally: event ID, actor extraction (from context), timestamp, webhook payload format.

Publisher Interface

type Handler func(ctx context.Context, event Event) error

type Publisher interface {
    Publish(ctx context.Context, event Event) error
    Subscribe(eventTypes []string, handler Handler)  // empty = all events
    Start(ctx context.Context) error
    Stop(ctx context.Context) error
}

Queue Implementations

1. In-Memory

publisher.NewMemoryPublisher(
    publisher.WithWorkers(4),
    publisher.WithBufferSize(100),
)
Aspect Detail
Delivery At-most-once
Persistence None - events lost on crash
Scaling Single instance only
Use case Development, low-criticality events

2. PostgreSQL

How it works:

Publish()          event_queue table           Workers
    │                    │                        │
    │  INSERT ──────────►│◄─────── SELECT         │
    │  + NOTIFY          │         FOR UPDATE     │
    │                    │         SKIP LOCKED    │
    │                    │                        │
  1. Publish() writes event to table (+ optional NOTIFY)
  2. Workers read using SELECT FOR UPDATE SKIP LOCKED (prevents double-processing)
  3. After processing, worker deletes the row

Worker strategies:

  • Polling - Query every N ms. Trade-off: latency = poll interval, constant DB load
  • LISTEN/NOTIFY - DB notifies workers on insert. Trade-off: lower latency, but notifications can be missed
  • Hybrid - NOTIFY + fallback polling. Trade-off: best of both, more complex

Implementation options:

  1. pgmq - PostgreSQL extension that handles queue mechanics (visibility timeout, indexing, partitioning). Simple API: pgmq.send(), pgmq.read(), pgmq.delete()

    Note: pgmq is a community extension, not available as AWS RDS extension. We will need workarounds like Trusted Language Extensions.

  2. DIY with SKIP LOCKED - Custom table + SELECT FOR UPDATE SKIP LOCKED query. Works on any PostgreSQL. More code to maintain but no extension dependency.

publisher.NewPGPublisher(
    publisher.WithDB(db),
    publisher.WithTable("event_queue"),
)
Aspect Detail
Delivery At-least-once
Persistence ACID guaranteed
Scaling SKIP LOCKED handles multiple workers (some overhead at high worker count)
Retry Visibility timeout (pgmq) or custom retry logic (DIY)
Use case Production, no additional infrastructure

Trade-offs vs Redis:

  • Higher latency than Redis
  • Lock contention overhead with many workers competing for jobs
  • Polling adds latency; LISTEN/NOTIFY adds complexity
  • Additional load on the database (queries, writes, index maintenance)
  • Benefit: no new infrastructure, transactional consistency with app data

3. Redis Streams

publisher.NewRedisPublisher(
    publisher.WithRedisClient(redisClient),
    publisher.WithStream("frontier:events"),
    publisher.WithConsumerGroup("frontier"),
)
Aspect Detail
Delivery At-least-once
Persistence Events persisted in Redis
Scaling Consumer groups for horizontal scaling
Retry Built-in with XPENDING
Use case Production, high throughput

Broker Comparison

Factor In-Memory PostgreSQL Redis Streams Kafka
Persistence None Excellent Good Excellent
Throughput High Moderate High Very high
Infrastructure None Already have Additional Heavy
Operations Simple Simple Simple Complex
Scaling Single instance Advisory locks Consumer groups Partitions

Verdict: Kafka is overkill - we have ONE internal event consumer (org.created → billing). PostgreSQL is a good middle ground (no extra infra), Redis Streams for higher throughput needs.

Idempotency Requirement

At-least-once delivery (PostgreSQL, Redis Streams) means handlers may receive the same event multiple times (e.g., worker crashes after processing but before acknowledging). Handlers must be idempotent:

// Example: billing handler should check if already processed
func billingHandler(ctx context.Context, event publisher.Event) error {
    // Idempotency check - skip if already processed
    if s.billingService.HasDefaultPlan(ctx, event.OrgID) {
        return nil  // Already done, no-op
    }
    return s.billingService.EnsureDefaultPlan(ctx, event.OrgID)
}

This is not required for in-memory (at-most-once) but becomes critical for persistent queues.

Feasibility: Handler → Service

Moving publish calls from handlers to services appears feasible:

  • Services return created/updated entities (have all data needed for events)
  • Actor is extracted from context (available in service layer)
  • Publisher in pkg/ avoids circular dependencies with core/ services

Note: May need validation during implementation for edge cases or circular dependency issues.

Example: Organization Creation

Current (Handler):

// internal/api/v1beta1connect/organization.go
func (h *Handler) CreateOrganization(ctx context.Context, req *connect.Request[...]) (*connect.Response[...], error) {
    newOrg, err := h.orgService.Create(ctx, organization.Organization{...})
    if err != nil {
        return nil, err
    }

    // Event publishing in handler
    audit.GetAuditor(ctx, newOrg.ID).LogWithAttrs(audit.OrgCreatedEvent, audit.OrgTarget(newOrg.ID), map[string]string{
        "title": newOrg.Title,
        "name":  newOrg.Name,
    })

    return connect.NewResponse(&frontierv1beta1.CreateOrganizationResponse{...}), nil
}

New (Service):

// core/organization/service.go
func (s *Service) Create(ctx context.Context, org Organization) (Organization, error) {
    newOrg, err := s.repository.Create(ctx, Organization{...})
    if err != nil {
        return Organization{}, err
    }

    // ... existing logic (add member, attach to platform) ...

    // Event publishing in service
    if s.publisher != nil {
        s.publisher.Publish(ctx, publisher.Event{
            OrgID:  newOrg.ID,
            Type:   publisher.OrgCreatedEvent,
            Target: publisher.OrgTarget(newOrg.ID, newOrg.Name),
            Attrs:  map[string]string{"title": newOrg.Title},
        })
    }

    return newOrg, nil
}

// internal/api/v1beta1connect/organization.go
func (h *Handler) CreateOrganization(ctx context.Context, req *connect.Request[...]) (*connect.Response[...], error) {
    newOrg, err := h.orgService.Create(ctx, organization.Organization{...})
    if err != nil {
        return nil, err
    }
    // No event publishing - handled by service
    return connect.NewResponse(&frontierv1beta1.CreateOrganizationResponse{...}), nil
}

Package Changes

Package Action
pkg/publisher/ ADD - Event, Publisher interface, Memory/Redis implementations
core/audit/ DELETE - entire package
core/event/publisher.go, listener.go DELETE
core/event/service.go KEEP - billing logic
core/webhook/ ADD - handler bridging publisher → webhook.Publish()
Handlers MODIFY - remove audit.GetAuditor() calls (28 calls in 12 files)

Services Requiring Publisher Injection

Service Events Published
organization org.created, org.updated, org.member.created/deleted
project project.created, project.updated, project.deleted
group group.created, group.updated, group.deleted, group.members.removed
user user.created, user.updated, user.deleted
serviceuser serviceuser.created, serviceuser.deleted
role role.created, role.updated, role.deleted
resource resource.created, resource.updated, resource.deleted
policy policy.created, policy.deleted

Publisher injected via functional options pattern - only where needed, existing tests unaffected.

Wiring

// Create publisher
pub := publisher.NewMemoryPublisher(publisher.WithWorkers(4))

// Subscribe handlers
pub.Subscribe([]string{}, webhookHandler)                          // all → webhooks
pub.Subscribe([]string{publisher.OrgCreatedEvent}, billingHandler) // org.created → billing

// Inject into services that publish events (functional options pattern)
orgService := organization.NewService(repo, ..., organization.WithPublisher(pub))
groupService := group.NewService(repo, ..., group.WithPublisher(pub))
// ... other services that need publisher

// Services without events - no change
metaSchemaService := metaschema.NewService(repo, ...)  // no WithPublisher needed

// Start
go pub.Start(ctx)

Migration

  1. Add pkg/publisher (interface + memory implementation)
  2. Add webhook and billing handlers
  3. Inject publisher into services, add Publish() calls
  4. Remove audit.GetAuditor() from handlers
  5. Delete core/audit/, core/event/publisher.go, core/event/listener.go

Future Work

  • PostgreSQL queue implementation
  • Redis Streams implementation
  • Retry with exponential backoff
  • Dead letter queue
  • Move billing logic from core/event/ to billing/

Config

In-Memory:

event:
  driver: memory
  workers: 4
  buffer_size: 100

PostgreSQL:

event:
  driver: postgres
  workers: 4
  postgres:
    queue: "events"
    poll_interval: 100ms

Redis Streams:

event:
  driver: redis
  workers: 4
  redis:
    stream: "frontier:events"
    consumer_group: "frontier"

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions