JetStream Bus
The JetStream Bus provides durable, distributed event processing with at-least-once delivery.
Setup
import (
"github.com/nats-io/nats.go"
natsjs "github.com/nats-io/nats.go/jetstream"
"github.com/isaquesb/go-event-bus/jetstream"
eventjson "github.com/isaquesb/go-event-bus/json"
)
nc, _ := nats.Connect(nats.DefaultURL)
js, _ := natsjs.New(nc)
registry := eventjson.NewRegistry()
// ... register events
bus := jetstream.NewBus(js, registry, jetstream.BusOptions{
Invoker: chain,
OnErr: errorHandler,
CircuitOpenDelay: 30 * time.Second,
RateLimitDelay: 5 * time.Second,
})
BusOptions
| Field | Type | Default | Description |
|---|---|---|---|
Invoker |
event.Invoker |
required | Invoker chain |
OnErr |
func(...) |
nil | Error callback |
CircuitOpenDelay |
time.Duration |
30s (warning) | NAK delay when circuit is open |
RateLimitDelay |
time.Duration |
5s (warning) | NAK delay when rate limited |
Publishing
// Subject defaults to evt.Name()
err := bus.Emit(ctx, chatMessage)
// Override subject (e.g. for hierarchical NATS subjects)
err = bus.Emit(ctx, chatMessage, event.WithSubject("chat.message.room-general"))
The subject defaults to evt.Name(). Use WithSubject to override it for hierarchical routing.
Subscribing
bus.Subscribe(ctx, "chat.message.*", handler,
event.WithStream("CHAT_EVENTS"),
event.WithHandlerName("persist-message"),
event.WithConsumer("chat-processor"),
event.WithMaxDeliver(5),
event.WithBackOff([]time.Duration{1*time.Second, 5*time.Second, 30*time.Second}),
)
WithStream is required. If MaxDeliver or BackOff are not set, defaults are used with a logged warning.
ACK/NAK Strategy
The bus classifies invoker errors to decide message acknowledgment:
| Error | Action |
|---|---|
nil |
msg.Ack() |
ErrDuplicate |
msg.Ack() (already processed) |
ErrCircuitOpen |
msg.NakWithDelay(CircuitOpenDelay) |
ErrRateLimited |
msg.NakWithDelay(RateLimitDelay) |
| Other errors | msg.Nak() (immediate retry, bounded by MaxDeliver/BackOff) |
| Decode failure | msg.Term() (malformed, never retried) |
Stream Configuration
Streams must be created before subscribing. Example:
js.CreateOrUpdateStream(ctx, natsjs.StreamConfig{
Name: "CHAT_EVENTS",
Subjects: []string{"chat.>"},
Retention: natsjs.LimitsPolicy,
MaxAge: 7 * 24 * time.Hour,
MaxBytes: 1 << 30,
Replicas: 3,
Storage: natsjs.FileStorage,
})
Replay
The Replayer replays events from one stream to another subject:
replayer := jetstream.NewReplayer(js)
err := replayer.Replay(ctx, jetstream.ReplayOptions{
FromStream: "CHAT_EVENTS_DLQ",
ToSubject: "chat.message",
Limit: 100,
})
DLQ Publisher
dlq := jetstream.NewDQL(js, registry, "dlq")
// Use with: invoker.NewDLQ(dlq, metrics)
RegisterSubscribers
Bulk registration using the Subscriber interface:
bus.RegisterSubscribers(ctx,
NewChatSubscriber(),
NewAuditSubscriber(),
)
For JetStream, subscribers must provide stream options via the SubscribeOptionsProvider interface:
type ChatSubscriber struct{}
func (s *ChatSubscriber) Name() string { return "chat-subscriber" }
func (s *ChatSubscriber) Events() map[string]event.HandleFn {
return map[string]event.HandleFn{
"chat.message.*": s.handleMessage,
}
}
// SubscribeOptions provides JetStream-specific options for all handlers in this subscriber.
func (s *ChatSubscriber) SubscribeOptions() []event.SubscribeOption {
return []event.SubscribeOption{
event.WithStream("CHAT_EVENTS"),
event.WithConsumer("chat-processor"),
}
}
Close
Close() drains all consumers and waits for in-flight messages to complete.