NATS Bus

The NATS Bus provides fire-and-forget distributed messaging via NATS Core.

Setup

import (
    "github.com/nats-io/nats.go"
    eventjson "github.com/isaquesb/go-event-bus/json"
    eventnats "github.com/isaquesb/go-event-bus/nats"
)

nc, _ := nats.Connect(nats.DefaultURL)
registry := eventjson.NewRegistry()
// ... register events

bus := eventnats.NewNatsBus(nc, registry, eventnats.BusOptions{
    RequestTimeout: 5 * time.Second,
    Invoker:        chain,
    OnErr:          errorHandler,
})

Options

Field Type Default Description
RequestTimeout time.Duration 5s Default timeout for EmitSync
Invoker event.Invoker nil Optional invoker chain
OnErr func(...) nil Error callback

Publishing

Fire-and-Forget (Emit)

// Subject defaults to evt.Name()
err := bus.Emit(ctx, myEvent)

// Override subject
err = bus.Emit(ctx, myEvent, event.WithSubject("custom.subject"))

Encodes the event via the registry and publishes to evt.Name() by default. Use WithSubject to override.

Request/Reply (EmitSync)

response, err := bus.EmitSync(ctx, myEvent)

Sends a NATS request and waits for a response. The response is decoded back into an event. Timeout is either from the context deadline or RequestTimeout.

Subscribing

bus.Subscribe(ctx, "user.created", handler, event.WithHandlerName("email-sender"))

For request/reply handlers:

bus.SubscribeSync(ctx, "user.created", handler, event.WithHandlerName("responder"))

SubscribeSync decodes the incoming message, runs the handler through the invoker chain, then encodes and responds with the event.

RegisterSubscribers

Bulk registration using the Subscriber interface — consistent with LocalBus and JetStreamBus:

bus.RegisterSubscribers(ctx,
    NewEmailSubscriber(),
    NewAuditSubscriber(),
)

Subscribers implementing SubscribeOptionsProvider will have their options merged automatically.

Tracing

The NATS Bus creates OpenTelemetry spans for both Emit and EmitSync operations with attributes:

  • messaging.system: “nats”
  • messaging.destination: subject name
  • event.name: event name

Close

Close() closes the underlying NATS connection.


Back to top

Copyright © 2025 Isaque de Souza Barbosa. Distributed under the MIT License.