Go implementation of Durable Streams.
Tested against conformance tests v0.1.8:
| Suite | Tests | Passed | Failed |
|---|---|---|---|
| Server (memorystorage) | 195 | 195 | 0 |
| Client | 177 | 177 | 0 |
Client features: batching, sse, longPoll, streaming, dynamicHeaders
This implementation uses the same offset format as the reference Node.js implementation:
<readSeq>_<byteOffset>
Both components are 16-digit zero-padded integers, e.g., 0000000000000000_0000000000000042.
This ensures lexicographic sortability as required by PROTOCOL.md Section 6.
| Package | Coverage |
|---|---|
| durablestream | 90.7% |
| durablestream/transport | 93.1% |
| durablestream/storage/memorystorage | 96.2% |
| durablestream/internal/protocol | 98.1% |
func ExampleHandler() {
storage := memorystorage.New()
handler := durablestream.NewHandler(storage, nil)
mux := http.NewServeMux()
mux.Handle("/v1/stream/", http.StripPrefix("/v1/stream/", handler))
log.Println("Listening on :4437")
log.Fatal(http.ListenAndServe(":4437", mux))
}func ExampleClient() {
ctx := context.Background()
client := durablestream.NewClient("http://localhost:4437/streams", nil)
_, err := client.Create(ctx, "events", &durablestream.CreateOptions{
ContentType: "application/json",
})
if err != nil {
log.Fatal(err)
}
// Write using Writer
writer, err := client.Writer(ctx, "events")
if err != nil {
log.Fatal(err)
}
event := map[string]any{"type": "user.created", "id": 123}
if err := writer.SendJSON(event, nil); err != nil {
log.Fatal(err)
}
fmt.Println("Appended at offset:", writer.Offset())
// Read using Reader
reader := client.Reader("events", durablestream.ZeroOffset)
defer reader.Close()
result, err := reader.Read(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println("Got data:", len(result.Data) > 0)
fmt.Println("Next offset:", result.NextOffset)
}func ExampleReader() {
ctx := context.Background()
client := durablestream.NewClient("http://localhost:4437/streams", nil)
// Create a reader starting from offset 0
reader := client.Reader("events", durablestream.ZeroOffset)
defer reader.Close()
for msg, err := range reader.Messages(ctx) {
if err != nil {
log.Fatal(err)
}
// Use msg.String() for text, msg.Bytes() for raw bytes,
// or msg.Decode(&v) for JSON
fmt.Println("Received:", msg.String())
}
}MIT - see LICENSE