Subscriber Pattern

Organize handlers into modular subscribers for clean code organization.

Source: examples/05_subscriber_pattern.go

The Subscriber Interface

type Subscriber interface {
    Name() string
    Events() map[string]HandleFn
}

Example: Email Subscriber

type EmailSubscriber struct {
    smtpHost string
    from     string
}

func (s *EmailSubscriber) Name() string { return "email-subscriber" }

func (s *EmailSubscriber) Events() map[string]event.HandleFn {
    return map[string]event.HandleFn{
        "user.signed_up": s.handleUserSignedUp,
        "user.upgraded":  s.handleUserUpgraded,
        "user.deleted":   s.handleUserDeleted,
    }
}

func (s *EmailSubscriber) handleUserSignedUp(ctx context.Context, evt event.Event) error {
    e := evt.(UserSignedUp)
    slog.Info("sending welcome email", "email", e.Email)
    return nil
}

Bulk Registration

bus.RegisterSubscribers(ctx,
    NewEmailSubscriber("smtp.example.com", "noreply@example.com"),
    NewAnalyticsSubscriber(),
    NewBillingSubscriber("sk_test_xxx"),
)

Each subscriber’s Name() is used as the handler name for metrics and logging.

JetStream Subscribers with SubscribeOptionsProvider

For JetStream buses, subscribers must provide stream configuration. Implement SubscribeOptionsProvider to carry these options:

type ChatSubscriber struct{}

func (s *ChatSubscriber) Name() string { return "chat-subscriber" }

func (s *ChatSubscriber) Events() map[string]event.HandleFn {
    return map[string]event.HandleFn{
        "chat.message.*": s.handleMessage,
    }
}

// SubscribeOptions provides JetStream-specific options for all handlers.
func (s *ChatSubscriber) SubscribeOptions() []event.SubscribeOption {
    return []event.SubscribeOption{
        event.WithStream("CHAT_EVENTS"),
        event.WithConsumer("chat-processor"),
        event.WithMaxDeliver(5),
    }
}

func (s *ChatSubscriber) handleMessage(ctx context.Context, evt event.Event) error {
    msg := evt.(*ChatMessage)
    slog.Info("processing message", "id", msg.MessageID, "room", msg.RoomID)
    return nil
}

Registration is identical across all buses:

jsBus.RegisterSubscribers(ctx, &ChatSubscriber{})

RegisterSubscribers automatically detects SubscribeOptionsProvider and merges the provided options with the handler name.

OnErr Interface

Events can implement OnErr for event-level error notification:

type CriticalEvent struct {
    ID      string
    Payload string
}

func (e *CriticalEvent) Name() string { return "critical.event" }

func (e *CriticalEvent) OnErr(ctx context.Context, err error, listenerName string) {
    slog.Error("critical event failed", "event_id", e.ID, "handler", listenerName, "error", err)
    // Alert, notify, etc.
}

OnErr is called by the bus after the global OnErr callback, giving the event itself a chance to react to failures.


Back to top

Copyright © 2025 Isaque de Souza Barbosa. Distributed under the MIT License.