Skip to content

goforj/events

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

47 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

events logo

events is a typed event bus library for local dispatch and distributed pub/sub.

Go Reference License: MIT Go Test Go version Latest tag Go Report Card Codecov Unit tests (executed count) Integration tests (executed count)

What events is

events is a typed event bus for Go and handles event publication and fan-out. Durable background work such as retries and worker queues belongs in queue.

It lets applications publish and subscribe to events using normal Go types, with delivery handled either in-process or through distributed backends like NATS, Redis, Kafka, or Google Pub/Sub.

Installation

go get github.com/goforj/events

Quick Start

package main

import (
	"context"
	"fmt"

	"github.com/goforj/events"
)

type UserCreated struct {
	ID string `json:"id"`
}

func main() {
	bus, _ := events.NewSync()
	_, _ = bus.Subscribe(func(ctx context.Context, event UserCreated) error {
		fmt.Println("received", event.ID, ctx != nil)
		return nil
	})
	_ = bus.Publish(UserCreated{ID: "123"})
}

Topic Override

type UserCreated struct {
	ID string `json:"id"`
}

func (UserCreated) Topic() string { return "users.created" }

Drivers

Optional distributed backends are separate modules. Keep dependencies lean and install only what you use:

go get github.com/goforj/events/driver/natsevents
go get github.com/goforj/events/driver/redisevents
go get github.com/goforj/events/driver/kafkaevents
go get github.com/goforj/events/driver/natsjetstreamevents
go get github.com/goforj/events/driver/gcppubsubevents
go get github.com/goforj/events/driver/snsevents

Drivers

Driver / Backend Mode Fan-out Durable Queue Semantics Notes
Sync In-process x x Root-backed synchronous dispatch in the caller path.
Null Drop-only x x x Root-backed no-op transport for disabled eventing and tests.
NATS Distributed pub/sub x x Subject-based transport with live integration coverage.
NATS JetStream Distributed stream Partial x Ephemeral JetStream consumers preserve subscribe/close semantics while adding durable stream storage.
Redis Distributed pub/sub x x Redis pub/sub transport; Streams are intentionally deferred.
Kafka Distributed topic/log Partial x Current driver validates topic-based fan-out compatibility, not full consumer-group semantics.
SNS Distributed topic plus queue Partial x SNS fan-out with per-subscription SQS queues to preserve bus-style delivery semantics.
Google Pub/Sub Distributed topic/subscription Partial x Emulator-backed Google Pub/Sub integration with per-subscription fan-out mapping.
SQS Queue target Planned Deferred until a separate async capability surface is intentionally introduced.

Driver Constructor Quick Examples

Use root constructors for local backends, and driver-module constructors for distributed backends. Driver backends live in separate modules so applications only import/link the optional dependencies they actually use.

package main

import (
	"context"

	"github.com/goforj/events"
	"github.com/goforj/events/driver/gcppubsubevents"
	"github.com/goforj/events/driver/kafkaevents"
	"github.com/goforj/events/driver/natsjetstreamevents"
	"github.com/goforj/events/driver/natsevents"
	"github.com/goforj/events/driver/redisevents"
	"github.com/goforj/events/driver/snsevents"
)

func main() {
	ctx := context.Background()

	events.NewSync()
	events.NewNull()

	natsevents.New(natsevents.Config{URL: "nats://127.0.0.1:4222"})
	natsjetstreamevents.New(natsjetstreamevents.Config{URL: "nats://127.0.0.1:4222"})
	redisevents.New(redisevents.Config{Addr: "127.0.0.1:6379"})
	kafkaevents.New(kafkaevents.Config{Brokers: []string{"127.0.0.1:9092"}})
	gcppubsubevents.New(ctx, gcppubsubevents.Config{
		ProjectID: "events-project",
		URI:       "127.0.0.1:8085",
	})
	snsevents.New(snsevents.Config{
		Region:   "us-east-1",
		Endpoint: "http://127.0.0.1:4566",
	})
}

Benchmarks

To refresh the live benchmark snapshot and regenerate the charts:

sh scripts/refresh-bench-snapshot.sh

These charts compare one publish-plus-delivery round trip for sync and each enabled distributed driver fixture.

Note: sns and gcppubsub run through local emulators in this repo, so read those results as development approximations rather than direct managed-service latency comparisons.

Events backend latency chart

Events backend throughput chart

Events backend bytes chart

Events backend allocations chart

These checks are for obvious regression detection, not for noisy micro-optimism or hard CI performance gates.

API Index

Group Functions
Bus Driver Ready ReadyContext
Config Config gcppubsubevents.Config kafkaevents.Config natsevents.Config natsjetstreamevents.Config redisevents.Config snsevents.Config
Construction New NewNull NewSync
Driver Constructors gcppubsubevents.New kafkaevents.New natsevents.New natsjetstreamevents.New redisevents.New snsevents.New
Lifecycle Close
Options Option WithCodec
Publish Publish PublishContext TopicEvent
Subscribe Subscribe SubscribeContext Subscription
Testing Fake Fake.Bus Fake.Count Fake.Records Fake.Reset NewFake Record

Bus

Driver

Driver reports the active backend.

bus, _ := events.NewSync()
fmt.Println(bus.Driver())
// Output: sync

Ready

Ready reports whether the bus is ready.

bus, _ := events.NewSync()
fmt.Println(bus.Ready() == nil)
// Output: true

ReadyContext

ReadyContext reports whether the bus is ready.

bus, _ := events.NewSync()
fmt.Println(bus.ReadyContext(context.Background()) == nil)
// Output: true

Config

Config

Config configures root bus construction.

Example: define bus construction config

cfg := events.Config{Driver: eventscore.DriverSync}

Example: define bus construction config with all fields

cfg := events.Config{
	Driver:    eventscore.DriverSync, // default: "sync" when empty and no Transport is provided
	Codec:     nil,                   // default: nil uses the built-in JSON codec
	Transport: nil,                   // default: nil keeps dispatch in-process
}

gcppubsubevents.Config

Config configures Google Pub/Sub transport construction.

Example: define Google Pub/Sub driver config

cfg := gcppubsubevents.Config{
	ProjectID: "events-project",
	URI:       "127.0.0.1:8085",
}

Example: define Google Pub/Sub driver config with all fields

cfg := gcppubsubevents.Config{
	ProjectID: "events-project",
	URI:       "127.0.0.1:8085", // default: "" is invalid unless Client is provided
	Client:    nil,              // default: nil creates a client from ProjectID and URI
}

kafkaevents.Config

Config configures Kafka transport construction.

Example: define Kafka driver config

cfg := kafkaevents.Config{Brokers: []string{"127.0.0.1:9092"}}

Example: define Kafka driver config with all fields

cfg := kafkaevents.Config{
	Brokers: []string{"127.0.0.1:9092"},
	Dialer:  nil, // default: nil uses a zero-value kafka.Dialer
	Writer:  nil, // default: nil builds a writer with single-message, auto-topic defaults
}

natsevents.Config

Config configures NATS transport construction.

Example: define NATS driver config

cfg := natsevents.Config{URL: "nats://127.0.0.1:4222"}

Example: define NATS driver config with all fields

cfg := natsevents.Config{
	URL:  "nats://127.0.0.1:4222",
	Conn: nil, // default: nil dials URL instead of reusing an existing connection
}

natsjetstreamevents.Config

Config configures NATS JetStream transport construction.

Example: define NATS JetStream driver config

cfg := natsjetstreamevents.Config{URL: "nats://127.0.0.1:4222"}

Example: define NATS JetStream driver config with all fields

cfg := natsjetstreamevents.Config{
	URL:               "nats://127.0.0.1:4222",
	Conn:              nil,                    // default: nil dials URL instead of reusing an existing connection
	SubjectPrefix:     "events.",              // default: "events."
	StreamNamePrefix:  "EVENTS_",              // default: "EVENTS_"
	InactiveThreshold: 30 * time.Second,       // default: 30s
	AckWait:           30 * time.Second,       // default: 30s
	FetchMaxWait:      250 * time.Millisecond, // default: 250ms
	Storage:           jetstream.MemoryStorage,// default: MemoryStorage
}

redisevents.Config

Config configures Redis transport construction.

Example: define Redis driver config

cfg := redisevents.Config{Addr: "127.0.0.1:6379"}

Example: define Redis driver config with all fields

cfg := redisevents.Config{
	Addr:   "127.0.0.1:6379",
	Client: nil, // default: nil constructs a client from Addr
}

snsevents.Config

Config configures SNS transport construction.

Example: define SNS driver config

cfg := snsevents.Config{
	Region:   "us-east-1",
	Endpoint: "http://127.0.0.1:4566",
}

Example: define SNS driver config with all fields

cfg := snsevents.Config{
	Region:            "us-east-1",
	Endpoint:          "http://127.0.0.1:4566", // default: "" uses normal AWS resolution
	SNSClient:         nil,                      // default: nil creates a client from Region and Endpoint
	SQSClient:         nil,                      // default: nil creates a client from Region and Endpoint
	TopicNamePrefix:   "events-",                // default: ""
	QueueNamePrefix:   "events-",                // default: ""
	WaitTimeSeconds:   1,                        // default: 1
	VisibilityTimeout: 30,                       // default: 30
}

Construction

New

New constructs a root bus for the requested driver.

bus, _ := events.New(events.Config{Driver: "sync"})
fmt.Println(bus.Driver())
// Output: sync

NewNull

NewNull constructs the root null bus.

bus, _ := events.NewNull()
fmt.Println(bus.Driver())
// Output: null

NewSync

NewSync constructs the root sync bus.

bus, _ := events.NewSync()
fmt.Println(bus.Driver())
// Output: sync

Driver Constructors

gcppubsubevents.New

New constructs a Google Pub/Sub-backed driver.

driver, _ := gcppubsubevents.New(context.Background(), gcppubsubevents.Config{
	ProjectID: "events-project",
	URI:       "127.0.0.1:8085",
})

kafkaevents.New

New constructs a Kafka-backed driver.

driver, _ := kafkaevents.New(kafkaevents.Config{Brokers: []string{"127.0.0.1:9092"}})

natsevents.New

New connects a NATS-backed driver from config.

driver, _ := natsevents.New(natsevents.Config{URL: "nats://127.0.0.1:4222"})

natsjetstreamevents.New

New connects a NATS JetStream-backed driver from config.

driver, _ := natsjetstreamevents.New(natsjetstreamevents.Config{URL: "nats://127.0.0.1:4222"})

redisevents.New

New constructs a Redis pub/sub-backed driver.

driver, _ := redisevents.New(redisevents.Config{Addr: "127.0.0.1:6379"})

snsevents.New

New constructs an SNS-backed driver.

driver, _ := snsevents.New(snsevents.Config{
	Region:   "us-east-1",
	Endpoint: "http://127.0.0.1:4566",
})

Lifecycle

Close

Close closes the underlying Pub/Sub client.

driver, _ := redisevents.New(redisevents.Config{Addr: "127.0.0.1:6379"})

Options

Option

Option configures root bus behavior.

WithCodec

WithCodec overrides the default event codec.

bus, _ := events.NewSync(events.WithCodec(nil))
fmt.Println(bus.Driver())
// Output: sync

Publish

Publish

Publish publishes an event using the background context.

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
_, _ = bus.Subscribe(func(event UserCreated) {
	fmt.Println(event.ID)
})
_ = bus.Publish(UserCreated{ID: "123"})
// Output: 123

PublishContext

PublishContext publishes an event using the configured codec and dispatch flow.

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
_, _ = bus.Subscribe(func(ctx context.Context, event UserCreated) error {
	fmt.Println(event.ID, ctx != nil)
	return nil
})
_ = bus.PublishContext(context.Background(), UserCreated{ID: "123"})
// Output: 123 true

TopicEvent

TopicEvent overrides the derived topic for an event.

Subscribe

Subscribe

Subscribe registers a handler using the background context.

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
sub, _ := bus.Subscribe(func(ctx context.Context, event UserCreated) error {
	fmt.Println(event.ID)
	return nil
})
defer sub.Close()
_ = bus.Publish(UserCreated{ID: "123"})
// Output: 123

SubscribeContext

SubscribeContext registers a typed handler.

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
sub, _ := bus.SubscribeContext(context.Background(), func(ctx context.Context, event UserCreated) error {
	fmt.Println(event.ID, ctx != nil)
	return nil
})
defer sub.Close()
_ = bus.PublishContext(context.Background(), UserCreated{ID: "123"})
// Output: 123 true

Subscription

Subscription releases a subscription when closed.

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
sub, _ := bus.Subscribe(func(event UserCreated) {
	fmt.Println("received", event.ID)
})
_ = bus.Publish(UserCreated{ID: "123"})
_ = sub.Close()
_ = bus.Publish(UserCreated{ID: "456"})
// Output: received 123

Testing

Fake

Fake provides a root-package testing helper that records published events.

fake := events.NewFake()
fmt.Println(fake.Count())
// Output: 0

Fake.Bus

Bus returns the wrapped API to inject into code under test.

fake := events.NewFake()
bus := fake.Bus()
fmt.Println(bus.Ready() == nil)
// Output: true

Fake.Count

Count returns the total number of recorded publishes.

type UserCreated struct {
	ID string `json:"id"`
}

fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fmt.Println(fake.Count())
// Output: 1

Fake.Records

Records returns a copy of recorded publishes.

type UserCreated struct {
	ID string `json:"id"`
}

fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fmt.Println(len(fake.Records()))
// Output: 1

Fake.Reset

Reset clears recorded publishes.

type UserCreated struct {
	ID string `json:"id"`
}

fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fake.Reset()
fmt.Println(fake.Count())
// Output: 0

NewFake

NewFake creates a new fake event harness backed by the root sync bus.

fake := events.NewFake()
fmt.Println(fake.Count())
// Output: 0

Record

Record captures one published event observed by a Fake bus.

type UserCreated struct {
	ID string `json:"id"`
}

record := events.Record{Event: UserCreated{ID: "123"}}
fmt.Printf("%T\n", record.Event)
// Output: main.UserCreated

Docs Tooling

The repository includes lightweight docs tooling under docs/.

Run the watcher to auto-regenerate docs on file changes:

sh docs/watcher.sh

Packages

 
 
 

Contributors