-
Notifications
You must be signed in to change notification settings - Fork 42
Description
Event Publisher Redesign
Context
The audit package is being deprecated:
- Audit logging →
auditrecordpackage (already implemented) - Event publishing →
publisherpackage (this design)
Problem
Current architecture mixes concerns and has limitations:
audit.GetAuditor(ctx, orgID).Log(event, target)
│
▼
audit.Service.Create()
│
├──► audit DB (deprecated)
├──► webhookService.Publish() (HTTP POST)
└──► channel (buffer=10) → ChanListener (1 worker) → switch → EnsureDefaultPlan()
Issues:
- Context-based DI hack
- Single worker, switch-based routing
- In-memory only (events lost on crash)
- Only ONE internal consumer:
org.created→ billing setup
Solution
Usage
// Current
audit.GetAuditor(ctx, orgID).LogWithAttrs(audit.OrgCreatedEvent, audit.OrgTarget(newOrg.ID), attrs)
// New - equally simple
s.publisher.Publish(ctx, publisher.Event{
OrgID: newOrg.ID,
Type: publisher.OrgCreatedEvent,
Target: publisher.OrgTarget(newOrg.ID, newOrg.Name),
Attrs: map[string]string{"title": newOrg.Title},
})Event Struct
type Target struct {
ID, Type, Name string
}
type Event struct {
OrgID string
Type string // "app.organization.created"
Target Target
Attrs map[string]string // optional
}Publisher handles internally: event ID, actor extraction (from context), timestamp, webhook payload format.
Publisher Interface
type Handler func(ctx context.Context, event Event) error
type Publisher interface {
Publish(ctx context.Context, event Event) error
Subscribe(eventTypes []string, handler Handler) // empty = all events
Start(ctx context.Context) error
Stop(ctx context.Context) error
}Queue Implementations
1. In-Memory
publisher.NewMemoryPublisher(
publisher.WithWorkers(4),
publisher.WithBufferSize(100),
)| Aspect | Detail |
|---|---|
| Delivery | At-most-once |
| Persistence | None - events lost on crash |
| Scaling | Single instance only |
| Use case | Development, low-criticality events |
2. PostgreSQL
How it works:
Publish() event_queue table Workers
│ │ │
│ INSERT ──────────►│◄─────── SELECT │
│ + NOTIFY │ FOR UPDATE │
│ │ SKIP LOCKED │
│ │ │
Publish()writes event to table (+ optional NOTIFY)- Workers read using
SELECT FOR UPDATE SKIP LOCKED(prevents double-processing) - After processing, worker deletes the row
Worker strategies:
- Polling - Query every N ms. Trade-off: latency = poll interval, constant DB load
- LISTEN/NOTIFY - DB notifies workers on insert. Trade-off: lower latency, but notifications can be missed
- Hybrid - NOTIFY + fallback polling. Trade-off: best of both, more complex
Implementation options:
-
pgmq - PostgreSQL extension that handles queue mechanics (visibility timeout, indexing, partitioning). Simple API:
pgmq.send(),pgmq.read(),pgmq.delete()Note: pgmq is a community extension, not available as AWS RDS extension. We will need workarounds like Trusted Language Extensions.
-
DIY with SKIP LOCKED - Custom table +
SELECT FOR UPDATE SKIP LOCKEDquery. Works on any PostgreSQL. More code to maintain but no extension dependency.
publisher.NewPGPublisher(
publisher.WithDB(db),
publisher.WithTable("event_queue"),
)| Aspect | Detail |
|---|---|
| Delivery | At-least-once |
| Persistence | ACID guaranteed |
| Scaling | SKIP LOCKED handles multiple workers (some overhead at high worker count) |
| Retry | Visibility timeout (pgmq) or custom retry logic (DIY) |
| Use case | Production, no additional infrastructure |
Trade-offs vs Redis:
- Higher latency than Redis
- Lock contention overhead with many workers competing for jobs
- Polling adds latency; LISTEN/NOTIFY adds complexity
- Additional load on the database (queries, writes, index maintenance)
- Benefit: no new infrastructure, transactional consistency with app data
3. Redis Streams
publisher.NewRedisPublisher(
publisher.WithRedisClient(redisClient),
publisher.WithStream("frontier:events"),
publisher.WithConsumerGroup("frontier"),
)| Aspect | Detail |
|---|---|
| Delivery | At-least-once |
| Persistence | Events persisted in Redis |
| Scaling | Consumer groups for horizontal scaling |
| Retry | Built-in with XPENDING |
| Use case | Production, high throughput |
Broker Comparison
| Factor | In-Memory | PostgreSQL | Redis Streams | Kafka |
|---|---|---|---|---|
| Persistence | None | Excellent | Good | Excellent |
| Throughput | High | Moderate | High | Very high |
| Infrastructure | None | Already have | Additional | Heavy |
| Operations | Simple | Simple | Simple | Complex |
| Scaling | Single instance | Advisory locks | Consumer groups | Partitions |
Verdict: Kafka is overkill - we have ONE internal event consumer (org.created → billing). PostgreSQL is a good middle ground (no extra infra), Redis Streams for higher throughput needs.
Idempotency Requirement
At-least-once delivery (PostgreSQL, Redis Streams) means handlers may receive the same event multiple times (e.g., worker crashes after processing but before acknowledging). Handlers must be idempotent:
// Example: billing handler should check if already processed
func billingHandler(ctx context.Context, event publisher.Event) error {
// Idempotency check - skip if already processed
if s.billingService.HasDefaultPlan(ctx, event.OrgID) {
return nil // Already done, no-op
}
return s.billingService.EnsureDefaultPlan(ctx, event.OrgID)
}This is not required for in-memory (at-most-once) but becomes critical for persistent queues.
Feasibility: Handler → Service
Moving publish calls from handlers to services appears feasible:
- Services return created/updated entities (have all data needed for events)
- Actor is extracted from context (available in service layer)
- Publisher in
pkg/avoids circular dependencies withcore/services
Note: May need validation during implementation for edge cases or circular dependency issues.
Example: Organization Creation
Current (Handler):
// internal/api/v1beta1connect/organization.go
func (h *Handler) CreateOrganization(ctx context.Context, req *connect.Request[...]) (*connect.Response[...], error) {
newOrg, err := h.orgService.Create(ctx, organization.Organization{...})
if err != nil {
return nil, err
}
// Event publishing in handler
audit.GetAuditor(ctx, newOrg.ID).LogWithAttrs(audit.OrgCreatedEvent, audit.OrgTarget(newOrg.ID), map[string]string{
"title": newOrg.Title,
"name": newOrg.Name,
})
return connect.NewResponse(&frontierv1beta1.CreateOrganizationResponse{...}), nil
}New (Service):
// core/organization/service.go
func (s *Service) Create(ctx context.Context, org Organization) (Organization, error) {
newOrg, err := s.repository.Create(ctx, Organization{...})
if err != nil {
return Organization{}, err
}
// ... existing logic (add member, attach to platform) ...
// Event publishing in service
if s.publisher != nil {
s.publisher.Publish(ctx, publisher.Event{
OrgID: newOrg.ID,
Type: publisher.OrgCreatedEvent,
Target: publisher.OrgTarget(newOrg.ID, newOrg.Name),
Attrs: map[string]string{"title": newOrg.Title},
})
}
return newOrg, nil
}
// internal/api/v1beta1connect/organization.go
func (h *Handler) CreateOrganization(ctx context.Context, req *connect.Request[...]) (*connect.Response[...], error) {
newOrg, err := h.orgService.Create(ctx, organization.Organization{...})
if err != nil {
return nil, err
}
// No event publishing - handled by service
return connect.NewResponse(&frontierv1beta1.CreateOrganizationResponse{...}), nil
}Package Changes
| Package | Action |
|---|---|
pkg/publisher/ |
ADD - Event, Publisher interface, Memory/Redis implementations |
core/audit/ |
DELETE - entire package |
core/event/publisher.go, listener.go |
DELETE |
core/event/service.go |
KEEP - billing logic |
core/webhook/ |
ADD - handler bridging publisher → webhook.Publish() |
| Handlers | MODIFY - remove audit.GetAuditor() calls (28 calls in 12 files) |
Services Requiring Publisher Injection
| Service | Events Published |
|---|---|
organization |
org.created, org.updated, org.member.created/deleted |
project |
project.created, project.updated, project.deleted |
group |
group.created, group.updated, group.deleted, group.members.removed |
user |
user.created, user.updated, user.deleted |
serviceuser |
serviceuser.created, serviceuser.deleted |
role |
role.created, role.updated, role.deleted |
resource |
resource.created, resource.updated, resource.deleted |
policy |
policy.created, policy.deleted |
Publisher injected via functional options pattern - only where needed, existing tests unaffected.
Wiring
// Create publisher
pub := publisher.NewMemoryPublisher(publisher.WithWorkers(4))
// Subscribe handlers
pub.Subscribe([]string{}, webhookHandler) // all → webhooks
pub.Subscribe([]string{publisher.OrgCreatedEvent}, billingHandler) // org.created → billing
// Inject into services that publish events (functional options pattern)
orgService := organization.NewService(repo, ..., organization.WithPublisher(pub))
groupService := group.NewService(repo, ..., group.WithPublisher(pub))
// ... other services that need publisher
// Services without events - no change
metaSchemaService := metaschema.NewService(repo, ...) // no WithPublisher needed
// Start
go pub.Start(ctx)Migration
- Add
pkg/publisher(interface + memory implementation) - Add webhook and billing handlers
- Inject publisher into services, add Publish() calls
- Remove audit.GetAuditor() from handlers
- Delete
core/audit/,core/event/publisher.go,core/event/listener.go
Future Work
- PostgreSQL queue implementation
- Redis Streams implementation
- Retry with exponential backoff
- Dead letter queue
- Move billing logic from
core/event/tobilling/
Config
In-Memory:
event:
driver: memory
workers: 4
buffer_size: 100PostgreSQL:
event:
driver: postgres
workers: 4
postgres:
queue: "events"
poll_interval: 100msRedis Streams:
event:
driver: redis
workers: 4
redis:
stream: "frontier:events"
consumer_group: "frontier"