events is a typed event bus library for local dispatch and distributed pub/sub.
events is a typed event bus for Go and handles event publication and fan-out. Durable background work such as retries and worker queues belongs in queue.
It lets applications publish and subscribe to events using normal Go types, with delivery handled either in-process or through distributed backends like NATS, Redis, Kafka, or Google Pub/Sub.
go get github.com/goforj/eventspackage main
import (
"context"
"fmt"
"github.com/goforj/events"
)
type UserCreated struct {
ID string `json:"id"`
}
func main() {
bus, _ := events.NewSync()
_, _ = bus.Subscribe(func(ctx context.Context, event UserCreated) error {
fmt.Println("received", event.ID, ctx != nil)
return nil
})
_ = bus.Publish(UserCreated{ID: "123"})
}type UserCreated struct {
ID string `json:"id"`
}
func (UserCreated) Topic() string { return "users.created" }Optional distributed backends are separate modules. Keep dependencies lean and install only what you use:
go get github.com/goforj/events/driver/natsevents
go get github.com/goforj/events/driver/redisevents
go get github.com/goforj/events/driver/kafkaevents
go get github.com/goforj/events/driver/natsjetstreamevents
go get github.com/goforj/events/driver/gcppubsubevents
go get github.com/goforj/events/driver/snseventsUse root constructors for local backends, and driver-module constructors for distributed backends. Driver backends live in separate modules so applications only import/link the optional dependencies they actually use.
package main
import (
"context"
"github.com/goforj/events"
"github.com/goforj/events/driver/gcppubsubevents"
"github.com/goforj/events/driver/kafkaevents"
"github.com/goforj/events/driver/natsjetstreamevents"
"github.com/goforj/events/driver/natsevents"
"github.com/goforj/events/driver/redisevents"
"github.com/goforj/events/driver/snsevents"
)
func main() {
ctx := context.Background()
events.NewSync()
events.NewNull()
natsevents.New(natsevents.Config{URL: "nats://127.0.0.1:4222"})
natsjetstreamevents.New(natsjetstreamevents.Config{URL: "nats://127.0.0.1:4222"})
redisevents.New(redisevents.Config{Addr: "127.0.0.1:6379"})
kafkaevents.New(kafkaevents.Config{Brokers: []string{"127.0.0.1:9092"}})
gcppubsubevents.New(ctx, gcppubsubevents.Config{
ProjectID: "events-project",
URI: "127.0.0.1:8085",
})
snsevents.New(snsevents.Config{
Region: "us-east-1",
Endpoint: "http://127.0.0.1:4566",
})
}To refresh the live benchmark snapshot and regenerate the charts:
sh scripts/refresh-bench-snapshot.shThese charts compare one publish-plus-delivery round trip for sync and each enabled distributed driver fixture.
Note: sns and gcppubsub run through local emulators in this repo, so read those results as development approximations rather than direct managed-service latency comparisons.
These checks are for obvious regression detection, not for noisy micro-optimism or hard CI performance gates.
| Group | Functions |
|---|---|
| Bus | Driver Ready ReadyContext |
| Config | Config gcppubsubevents.Config kafkaevents.Config natsevents.Config natsjetstreamevents.Config redisevents.Config snsevents.Config |
| Construction | New NewNull NewSync |
| Driver Constructors | gcppubsubevents.New kafkaevents.New natsevents.New natsjetstreamevents.New redisevents.New snsevents.New |
| Lifecycle | Close |
| Options | Option WithCodec |
| Publish | Publish PublishContext TopicEvent |
| Subscribe | Subscribe SubscribeContext Subscription |
| Testing | Fake Fake.Bus Fake.Count Fake.Records Fake.Reset NewFake Record |
Driver reports the active backend.
bus, _ := events.NewSync()
fmt.Println(bus.Driver())
// Output: syncReady reports whether the bus is ready.
bus, _ := events.NewSync()
fmt.Println(bus.Ready() == nil)
// Output: trueReadyContext reports whether the bus is ready.
bus, _ := events.NewSync()
fmt.Println(bus.ReadyContext(context.Background()) == nil)
// Output: trueConfig configures root bus construction.
Example: define bus construction config
cfg := events.Config{Driver: eventscore.DriverSync}Example: define bus construction config with all fields
cfg := events.Config{
Driver: eventscore.DriverSync, // default: "sync" when empty and no Transport is provided
Codec: nil, // default: nil uses the built-in JSON codec
Transport: nil, // default: nil keeps dispatch in-process
}Config configures Google Pub/Sub transport construction.
Example: define Google Pub/Sub driver config
cfg := gcppubsubevents.Config{
ProjectID: "events-project",
URI: "127.0.0.1:8085",
}Example: define Google Pub/Sub driver config with all fields
cfg := gcppubsubevents.Config{
ProjectID: "events-project",
URI: "127.0.0.1:8085", // default: "" is invalid unless Client is provided
Client: nil, // default: nil creates a client from ProjectID and URI
}Config configures Kafka transport construction.
Example: define Kafka driver config
cfg := kafkaevents.Config{Brokers: []string{"127.0.0.1:9092"}}Example: define Kafka driver config with all fields
cfg := kafkaevents.Config{
Brokers: []string{"127.0.0.1:9092"},
Dialer: nil, // default: nil uses a zero-value kafka.Dialer
Writer: nil, // default: nil builds a writer with single-message, auto-topic defaults
}Config configures NATS transport construction.
Example: define NATS driver config
cfg := natsevents.Config{URL: "nats://127.0.0.1:4222"}Example: define NATS driver config with all fields
cfg := natsevents.Config{
URL: "nats://127.0.0.1:4222",
Conn: nil, // default: nil dials URL instead of reusing an existing connection
}Config configures NATS JetStream transport construction.
Example: define NATS JetStream driver config
cfg := natsjetstreamevents.Config{URL: "nats://127.0.0.1:4222"}Example: define NATS JetStream driver config with all fields
cfg := natsjetstreamevents.Config{
URL: "nats://127.0.0.1:4222",
Conn: nil, // default: nil dials URL instead of reusing an existing connection
SubjectPrefix: "events.", // default: "events."
StreamNamePrefix: "EVENTS_", // default: "EVENTS_"
InactiveThreshold: 30 * time.Second, // default: 30s
AckWait: 30 * time.Second, // default: 30s
FetchMaxWait: 250 * time.Millisecond, // default: 250ms
Storage: jetstream.MemoryStorage,// default: MemoryStorage
}Config configures Redis transport construction.
Example: define Redis driver config
cfg := redisevents.Config{Addr: "127.0.0.1:6379"}Example: define Redis driver config with all fields
cfg := redisevents.Config{
Addr: "127.0.0.1:6379",
Client: nil, // default: nil constructs a client from Addr
}Config configures SNS transport construction.
Example: define SNS driver config
cfg := snsevents.Config{
Region: "us-east-1",
Endpoint: "http://127.0.0.1:4566",
}Example: define SNS driver config with all fields
cfg := snsevents.Config{
Region: "us-east-1",
Endpoint: "http://127.0.0.1:4566", // default: "" uses normal AWS resolution
SNSClient: nil, // default: nil creates a client from Region and Endpoint
SQSClient: nil, // default: nil creates a client from Region and Endpoint
TopicNamePrefix: "events-", // default: ""
QueueNamePrefix: "events-", // default: ""
WaitTimeSeconds: 1, // default: 1
VisibilityTimeout: 30, // default: 30
}New constructs a root bus for the requested driver.
bus, _ := events.New(events.Config{Driver: "sync"})
fmt.Println(bus.Driver())
// Output: syncNewNull constructs the root null bus.
bus, _ := events.NewNull()
fmt.Println(bus.Driver())
// Output: nullNewSync constructs the root sync bus.
bus, _ := events.NewSync()
fmt.Println(bus.Driver())
// Output: syncNew constructs a Google Pub/Sub-backed driver.
driver, _ := gcppubsubevents.New(context.Background(), gcppubsubevents.Config{
ProjectID: "events-project",
URI: "127.0.0.1:8085",
})New constructs a Kafka-backed driver.
driver, _ := kafkaevents.New(kafkaevents.Config{Brokers: []string{"127.0.0.1:9092"}})New connects a NATS-backed driver from config.
driver, _ := natsevents.New(natsevents.Config{URL: "nats://127.0.0.1:4222"})New connects a NATS JetStream-backed driver from config.
driver, _ := natsjetstreamevents.New(natsjetstreamevents.Config{URL: "nats://127.0.0.1:4222"})New constructs a Redis pub/sub-backed driver.
driver, _ := redisevents.New(redisevents.Config{Addr: "127.0.0.1:6379"})New constructs an SNS-backed driver.
driver, _ := snsevents.New(snsevents.Config{
Region: "us-east-1",
Endpoint: "http://127.0.0.1:4566",
})Close closes the underlying Pub/Sub client.
driver, _ := redisevents.New(redisevents.Config{Addr: "127.0.0.1:6379"})Option configures root bus behavior.
WithCodec overrides the default event codec.
bus, _ := events.NewSync(events.WithCodec(nil))
fmt.Println(bus.Driver())
// Output: syncPublish publishes an event using the background context.
type UserCreated struct {
ID string `json:"id"`
}
bus, _ := events.NewSync()
_, _ = bus.Subscribe(func(event UserCreated) {
fmt.Println(event.ID)
})
_ = bus.Publish(UserCreated{ID: "123"})
// Output: 123PublishContext publishes an event using the configured codec and dispatch flow.
type UserCreated struct {
ID string `json:"id"`
}
bus, _ := events.NewSync()
_, _ = bus.Subscribe(func(ctx context.Context, event UserCreated) error {
fmt.Println(event.ID, ctx != nil)
return nil
})
_ = bus.PublishContext(context.Background(), UserCreated{ID: "123"})
// Output: 123 trueTopicEvent overrides the derived topic for an event.
Subscribe registers a handler using the background context.
type UserCreated struct {
ID string `json:"id"`
}
bus, _ := events.NewSync()
sub, _ := bus.Subscribe(func(ctx context.Context, event UserCreated) error {
fmt.Println(event.ID)
return nil
})
defer sub.Close()
_ = bus.Publish(UserCreated{ID: "123"})
// Output: 123SubscribeContext registers a typed handler.
type UserCreated struct {
ID string `json:"id"`
}
bus, _ := events.NewSync()
sub, _ := bus.SubscribeContext(context.Background(), func(ctx context.Context, event UserCreated) error {
fmt.Println(event.ID, ctx != nil)
return nil
})
defer sub.Close()
_ = bus.PublishContext(context.Background(), UserCreated{ID: "123"})
// Output: 123 trueSubscription releases a subscription when closed.
type UserCreated struct {
ID string `json:"id"`
}
bus, _ := events.NewSync()
sub, _ := bus.Subscribe(func(event UserCreated) {
fmt.Println("received", event.ID)
})
_ = bus.Publish(UserCreated{ID: "123"})
_ = sub.Close()
_ = bus.Publish(UserCreated{ID: "456"})
// Output: received 123Fake provides a root-package testing helper that records published events.
fake := events.NewFake()
fmt.Println(fake.Count())
// Output: 0Bus returns the wrapped API to inject into code under test.
fake := events.NewFake()
bus := fake.Bus()
fmt.Println(bus.Ready() == nil)
// Output: trueCount returns the total number of recorded publishes.
type UserCreated struct {
ID string `json:"id"`
}
fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fmt.Println(fake.Count())
// Output: 1Records returns a copy of recorded publishes.
type UserCreated struct {
ID string `json:"id"`
}
fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fmt.Println(len(fake.Records()))
// Output: 1Reset clears recorded publishes.
type UserCreated struct {
ID string `json:"id"`
}
fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fake.Reset()
fmt.Println(fake.Count())
// Output: 0NewFake creates a new fake event harness backed by the root sync bus.
fake := events.NewFake()
fmt.Println(fake.Count())
// Output: 0Record captures one published event observed by a Fake bus.
type UserCreated struct {
ID string `json:"id"`
}
record := events.Record{Event: UserCreated{ID: "123"}}
fmt.Printf("%T\n", record.Event)
// Output: main.UserCreatedThe repository includes lightweight docs tooling under docs/.
Run the watcher to auto-regenerate docs on file changes:
sh docs/watcher.sh