JetStream Bus
Durable, distributed event processing with NATS JetStream, including replay and DLQ.
Source: examples/04_jetstream_bus.go
Stream Setup
nc, _ := nats.Connect(nats.DefaultURL)
js, _ := natsjs.New(nc)
// Create stream
js.CreateOrUpdateStream(ctx, natsjs.StreamConfig{
Name: "CHAT_EVENTS",
Subjects: []string{"chat.>"},
MaxAge: 7 * 24 * time.Hour,
MaxBytes: 1 << 30,
Storage: natsjs.FileStorage,
})
Bus Setup
registry := eventjson.NewRegistry()
registry.Register("chat.message", func() event.Event { return &ChatMessage{} }, 1)
chain := invoker.NewChain(
invoker.NewMetrics(nil),
invoker.NewIdempotency(invoker.NewMemoryIdempotencyStore(),
invoker.IdempotencyConfig{ProcessingTTL: 5 * time.Minute}, nil),
invoker.NewRetry(invoker.RetryPolicy{
MaxAttempts: 3, BaseDelay: 100 * time.Millisecond, MaxDelay: 5 * time.Second,
}, nil),
)
bus := jetstream.NewBus(js, registry, jetstream.BusOptions{
Invoker: chain,
})
Subscribe and Publish
bus.Subscribe(ctx, "chat.message.*", handler,
event.WithStream("CHAT_EVENTS"),
event.WithHandlerName("persist-message"),
event.WithConsumer("chat-processor"),
)
// Subject defaults to evt.Name() ("chat.message")
bus.Emit(ctx, &ChatMessage{MessageID: "msg-1", RoomID: "room-general", Content: "Hello world"})
// Override subject for hierarchical routing
bus.Emit(ctx, &ChatMessage{MessageID: "msg-2", RoomID: "room-general", Content: "Hello again"},
event.WithSubject("chat.message.room-general"),
)
Replay from DLQ
replayer := jetstream.NewReplayer(js)
replayer.Replay(ctx, jetstream.ReplayOptions{
FromStream: "CHAT_EVENTS_DLQ",
ToSubject: "chat.message",
Limit: 100,
})
DLQ Publisher
dlq := jetstream.NewDQL(js, nil, "")
chain := invoker.NewChain(
invoker.NewRetry(policy, nil),
invoker.NewDLQ(dlq, nil),
)