Skip to content

Commit dcb1b75

Browse files
hnnsgstfssnHannes Gustafssonlostluck
authored
Add OrderedListState support in Go SDK (#37629)
* Add OrderedListState support in Go SDK The Go SDK stateful DoFn API was missing OrderedListState, the only state type already supported in Java and Python but absent from the Go plumbing. This adds the missing pieces. - state.OrderedList[T] with Add, Read, ReadRange, Clear, ClearRange operating on int64 sort keys - Provider interface wired through exec, graphx translate, and the harness state manager to the Fn API beam:coder:ordered_list_state:v1 URN - Unit tests (state, userstate, sideinput) and integration tests against the prism runner Relates to the following issues. - #20510 tracking issue for Go SDK state & timer support (closed, but ordered list was not included) - #22736 the original "Implement State in Go" task (closed with Value/Bag/Combining only) - #18493 open, tracks full portable user state coverage including ordered list - #25894 open, composite state proposal that explicitly depends on ordered list as a primitive * Add OrderedListState documenation and examples for Go Documentation snippet in the programming guide, standalone example pipeline, and CHANGES.md entry. * Run go fmt * Drop accidentally added URN * Rename TimestampedValue to OrderedListValue OrderedListValue will make the connection to the state type clearer, since it is largely referencing the sort key as a generic sort key. * Update CHANGES to reference the PR Since the is no clear issue for this particular change, the PR is referenced instead. * Reformat the CHANGES entry Add back a ) that was dropped, prefix with (Go) following existing entries and put directly after existing entries. * pull -> issues * Filter TestOrderedListState for unsupported runners Runners that lack full state support (direct, portable, flink, samza, spark) may fail the new integration test. Add it to their filter lists so presubmit and postcommit skip it on those runners, matching the existing pattern for other state tests. --------- Co-authored-by: Hannes Gustafsson <hnngstfssn@gmail.com> Co-authored-by: Robert Burke <lostluck@users.noreply.github.com>
1 parent 97da2d6 commit dcb1b75

File tree

17 files changed

+833
-25
lines changed

17 files changed

+833
-25
lines changed

CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@
6969

7070
## New Features / Improvements
7171

72+
* (Python) Added exception chaining to preserve error context in CloudSQLEnrichmentHandler, processes utilities, and core transforms ([#37422](https://github.com/apache/beam/issues/37422)).
73+
* (Python) Added a pipeline option `--experiments=pip_no_build_isolation` to disable build isolation when installing dependencies in the runtime environment ([#37331](https://github.com/apache/beam/issues/37331)).
74+
* (Go) Added OrderedListState support to the Go SDK stateful DoFn API ([#37629](https://github.com/apache/beam/issues/37629)).
7275
* Added support for large pipeline options via a file (Python) ([#37370](https://github.com/apache/beam/issues/37370)).
76+
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
7377
* Supported infer schema from dataclass (Python) ([#22085](https://github.com/apache/beam/issues/22085)). Default coder for typehint-ed (or set with_output_type) for non-frozen dataclasses changed to RowCoder. To preserve the old behavior (fast primitive coder), explicitly register the type with FastPrimitiveCoder.
7478
* Updates minimum Go version to 1.26.1 ([#37897](https://github.com/apache/beam/issues/37897)).
7579
* (Python) Added image embedding support in `apache_beam.ml.rag` package ([#37628](https://github.com/apache/beam/issues/37628)).
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
// ordered_list_state is a toy pipeline demonstrating the use of OrderedListState.
17+
// It creates keyed elements with timestamps, stores them in ordered list state,
18+
// and reads back sub-ranges to emit summaries per key.
19+
package main
20+
21+
import (
22+
"context"
23+
"flag"
24+
"fmt"
25+
26+
"github.com/apache/beam/sdks/v2/go/pkg/beam"
27+
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
28+
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
29+
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
30+
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
31+
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
32+
)
33+
34+
// eventLogFn accumulates timestamped events per key using OrderedListState
35+
// and emits a summary of events seen so far.
36+
type eventLogFn struct {
37+
Events state.OrderedList[string]
38+
}
39+
40+
func (fn *eventLogFn) ProcessElement(p state.Provider, key string, ts int64, emit func(string)) error {
41+
// Store an event using the input value as the sort key.
42+
event := fmt.Sprintf("event@%d", ts)
43+
fn.Events.Add(p, ts, event)
44+
45+
// Read all events accumulated so far for this key.
46+
entries, ok, err := fn.Events.Read(p)
47+
if err != nil {
48+
return err
49+
}
50+
if ok {
51+
latest := entries[len(entries)-1]
52+
emit(fmt.Sprintf("key=%s count=%d latest=%s (sort_key=%d)", key, len(entries), latest.Value, latest.SortKey))
53+
}
54+
55+
return nil
56+
}
57+
58+
func init() {
59+
register.DoFn4x1[state.Provider, string, int64, func(string), error](&eventLogFn{})
60+
register.Emitter1[string]()
61+
register.Function1x2(toKeyed)
62+
}
63+
64+
// toKeyed maps an integer to a KV pair of (key, timestamp).
65+
func toKeyed(i int) (string, int64) {
66+
return fmt.Sprintf("user-%d", i%3), int64(i * 1000)
67+
}
68+
69+
func main() {
70+
flag.Parse()
71+
beam.Init()
72+
73+
ctx := context.Background()
74+
75+
p, s := beam.NewPipelineWithRoot()
76+
77+
// Create a small set of input elements.
78+
impulse := beam.CreateList(s, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
79+
80+
// Key and timestamp each element.
81+
keyed := beam.ParDo(s, toKeyed, impulse)
82+
83+
// Apply the stateful DoFn with OrderedListState.
84+
summaries := beam.ParDo(s, &eventLogFn{
85+
Events: state.MakeOrderedListState[string]("events"),
86+
}, keyed)
87+
88+
debug.Print(s, summaries)
89+
90+
if err := beamx.Run(ctx, p); err != nil {
91+
log.Exitf(ctx, "Failed to execute job: %v", err)
92+
}
93+
}

sdks/go/examples/snippets/04transforms.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,38 @@ func combineState(s beam.Scope, input beam.PCollection) beam.PCollection {
743743
return combined
744744
}
745745

746+
// [START ordered_list_state]
747+
748+
// orderedListStateFn tracks timestamped events per key and reads a sub-range.
749+
type orderedListStateFn struct {
750+
Events state.OrderedList[string]
751+
}
752+
753+
func (s *orderedListStateFn) ProcessElement(p state.Provider, key string, event string, emit func(string)) error {
754+
// Add the event with the current timestamp as the sort key.
755+
now := time.Now().UnixMilli()
756+
s.Events.Add(p, now, event)
757+
758+
// Read a sub-range of events (e.g. the last hour).
759+
oneHourAgo := now - 3600000
760+
entries, ok, err := s.Events.ReadRange(p, oneHourAgo, now+1)
761+
if err != nil {
762+
return err
763+
}
764+
if ok {
765+
for _, e := range entries {
766+
emit(fmt.Sprintf("%s@%d", e.Value, e.SortKey))
767+
}
768+
}
769+
770+
// Clear events older than one hour.
771+
s.Events.ClearRange(p, 0, oneHourAgo)
772+
773+
return nil
774+
}
775+
776+
// [END ordered_list_state]
777+
746778
// [START event_time_timer]
747779

748780
type eventTimerDoFn struct {

sdks/go/pkg/beam/core/graph/fn.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1368,10 +1368,10 @@ func validateState(fn *DoFn, numIn mainInputs) error {
13681368
"unique per DoFn", k, orig, s)
13691369
}
13701370
t := s.StateType()
1371-
if t != state.TypeValue && t != state.TypeBag && t != state.TypeCombining && t != state.TypeSet && t != state.TypeMap {
1371+
if t != state.TypeValue && t != state.TypeBag && t != state.TypeCombining && t != state.TypeSet && t != state.TypeMap && t != state.TypeOrderedList {
13721372
err := errors.Errorf("Unrecognized state type %v for state %v", t, s)
13731373
return errors.SetTopLevelMsgf(err, "Unrecognized state type %v for state %v. Currently the only supported state"+
1374-
"types are state.Value, state.Combining, state.Bag, state.Set, and state.Map", t, s)
1374+
"types are state.Value, state.Combining, state.Bag, state.Set, state.Map, and state.OrderedList", t, s)
13751375
}
13761376
stateKeys[k] = s
13771377
}

sdks/go/pkg/beam/core/runtime/exec/data.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ type StateReader interface {
8989
OpenMultimapKeysUserStateReader(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.ReadCloser, error)
9090
// OpenMultimapKeysUserStateClearer opens a byte stream for clearing all keys of user multimap state.
9191
OpenMultimapKeysUserStateClearer(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)
92+
// OpenOrderedListUserStateReader opens a byte stream for reading user ordered list state in the range [start, end).
93+
OpenOrderedListUserStateReader(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, start, end int64) (io.ReadCloser, error)
94+
// OpenOrderedListUserStateAppender opens a byte stream for appending user ordered list state.
95+
OpenOrderedListUserStateAppender(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.Writer, error)
96+
// OpenOrderedListUserStateClearer opens a byte stream for clearing user ordered list state in the range [start, end).
97+
OpenOrderedListUserStateClearer(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, start, end int64) (io.Writer, error)
9298
// GetSideInputCache returns the SideInputCache being used at the harness level.
9399
GetSideInputCache() SideCache
94100
}

sdks/go/pkg/beam/core/runtime/exec/sideinput_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,21 @@ func (t *testStateReader) OpenMultimapKeysUserStateClearer(ctx context.Context,
173173
return nil, nil
174174
}
175175

176+
// OpenOrderedListUserStateReader for the testStateReader is a no-op.
177+
func (t *testStateReader) OpenOrderedListUserStateReader(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, start, end int64) (io.ReadCloser, error) {
178+
return nil, nil
179+
}
180+
181+
// OpenOrderedListUserStateAppender for the testStateReader is a no-op.
182+
func (t *testStateReader) OpenOrderedListUserStateAppender(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte) (io.Writer, error) {
183+
return nil, nil
184+
}
185+
186+
// OpenOrderedListUserStateClearer for the testStateReader is a no-op.
187+
func (t *testStateReader) OpenOrderedListUserStateClearer(ctx context.Context, id StreamID, userStateID string, key []byte, w []byte, start, end int64) (io.Writer, error) {
188+
return nil, nil
189+
}
190+
176191
func (t *testStateReader) GetSideInputCache() SideCache {
177192
return &testSideCache{}
178193
}

sdks/go/pkg/beam/core/runtime/exec/translate.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,8 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
563563
kcID = ms.KeyCoderId
564564
} else if ss := spec.GetSetSpec(); ss != nil {
565565
kcID = ss.ElementCoderId
566+
} else if ols := spec.GetOrderedListSpec(); ols != nil {
567+
cID = ols.ElementCoderId
566568
} else {
567569
return nil, errors.Errorf("Unrecognized state type %v", spec)
568570
}

sdks/go/pkg/beam/core/runtime/exec/userstate.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ import (
2020
"context"
2121
"fmt"
2222
"io"
23+
"math"
2324

2425
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
2526
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
2627
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
2728
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
2829
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
30+
"google.golang.org/protobuf/encoding/protowire"
2931
)
3032

3133
type stateProvider struct {
@@ -41,6 +43,7 @@ type stateProvider struct {
4143
blindBagWriteCountsByKey map[string]int // Tracks blind writes to bags before a read.
4244
initialMapValuesByKey map[string]map[string]any
4345
initialMapKeysByKey map[string][]any
46+
initialOrderedListByKey map[string][]any
4447
readersByKey map[string]io.ReadCloser
4548
appendersByKey map[string]io.Writer
4649
clearersByKey map[string]io.Writer
@@ -466,6 +469,152 @@ func (s *stateProvider) getMultiMapKeyReader(userStateID string) (io.ReadCloser,
466469
return s.readersByKey[userStateID], nil
467470
}
468471

472+
// ReadOrderedListState reads an ordered list state from the State API.
473+
// It fetches the full range on first access and caches the result.
474+
func (s *stateProvider) ReadOrderedListState(userStateID string) ([]any, []state.Transaction, error) {
475+
initialValue, ok := s.initialOrderedListByKey[userStateID]
476+
if !ok {
477+
initialValue = []any{}
478+
rw, err := s.getOrderedListReader(userStateID, math.MinInt64, math.MaxInt64)
479+
if err != nil {
480+
return nil, nil, err
481+
}
482+
for {
483+
entry, err := decodeOrderedListEntry(rw, s.codersByKey[userStateID])
484+
if err == io.EOF {
485+
break
486+
}
487+
if err != nil {
488+
return nil, nil, err
489+
}
490+
initialValue = append(initialValue, entry)
491+
}
492+
s.initialOrderedListByKey[userStateID] = initialValue
493+
}
494+
495+
transactions, ok := s.transactionsByKey[userStateID]
496+
if !ok {
497+
transactions = []state.Transaction{}
498+
}
499+
500+
return initialValue, transactions, nil
501+
}
502+
503+
// WriteOrderedListState writes a single entry to the ordered list state.
504+
// The wire format is: varint(sortKey) || coder_encoded(value).
505+
func (s *stateProvider) WriteOrderedListState(val state.Transaction) error {
506+
ap, err := s.getOrderedListAppender(val.Key)
507+
if err != nil {
508+
return err
509+
}
510+
511+
sortKey := val.MapKey.(int64)
512+
if err := encodeOrderedListEntry(sortKey, val.Val, ap, s.codersByKey[val.Key]); err != nil {
513+
return err
514+
}
515+
516+
if transactions, ok := s.transactionsByKey[val.Key]; ok {
517+
s.transactionsByKey[val.Key] = append(transactions, val)
518+
} else {
519+
s.transactionsByKey[val.Key] = []state.Transaction{val}
520+
}
521+
522+
return nil
523+
}
524+
525+
// ClearOrderedListState clears entries in a range from the ordered list state.
526+
func (s *stateProvider) ClearOrderedListState(val state.Transaction) error {
527+
r := val.MapKey.([2]int64)
528+
cl, err := s.getOrderedListClearer(val.Key, r[0], r[1])
529+
if err != nil {
530+
return err
531+
}
532+
_, err = cl.Write([]byte{})
533+
if err != nil {
534+
return err
535+
}
536+
537+
if transactions, ok := s.transactionsByKey[val.Key]; ok {
538+
s.transactionsByKey[val.Key] = append(transactions, val)
539+
} else {
540+
s.transactionsByKey[val.Key] = []state.Transaction{val}
541+
}
542+
543+
return nil
544+
}
545+
546+
func (s *stateProvider) getOrderedListReader(userStateID string, start, end int64) (io.ReadCloser, error) {
547+
r, err := s.sr.OpenOrderedListUserStateReader(s.ctx, s.SID, userStateID, s.elementKey, s.window, start, end)
548+
if err != nil {
549+
return nil, err
550+
}
551+
return r, nil
552+
}
553+
554+
func (s *stateProvider) getOrderedListAppender(userStateID string) (io.Writer, error) {
555+
w, err := s.sr.OpenOrderedListUserStateAppender(s.ctx, s.SID, userStateID, s.elementKey, s.window)
556+
if err != nil {
557+
return nil, err
558+
}
559+
return w, nil
560+
}
561+
562+
func (s *stateProvider) getOrderedListClearer(userStateID string, start, end int64) (io.Writer, error) {
563+
w, err := s.sr.OpenOrderedListUserStateClearer(s.ctx, s.SID, userStateID, s.elementKey, s.window, start, end)
564+
if err != nil {
565+
return nil, err
566+
}
567+
return w, nil
568+
}
569+
570+
// encodeOrderedListEntry writes varint(uint64(sortKey)) || coder_encoded(value) to w.
571+
// The entire entry is buffered before writing so that each w.Write call
572+
// delivers a complete entry (important when w is a stateKeyWriter that
573+
// sends each Write as a separate gRPC Append request).
574+
func encodeOrderedListEntry(sortKey int64, val any, w io.Writer, c *coder.Coder) error {
575+
var buf bytes.Buffer
576+
b := protowire.AppendVarint(nil, uint64(sortKey))
577+
buf.Write(b)
578+
fv := FullValue{Elm: val}
579+
enc := MakeElementEncoder(coder.SkipW(c))
580+
if err := enc.Encode(&fv, &buf); err != nil {
581+
return err
582+
}
583+
_, err := w.Write(buf.Bytes())
584+
return err
585+
}
586+
587+
// decodeOrderedListEntry reads varint(sortKey) || coder_encoded(value) from r.
588+
func decodeOrderedListEntry(r io.Reader, c *coder.Coder) (state.OrderedListEntry, error) {
589+
// Read varint byte-by-byte.
590+
var buf [10]byte // max varint size
591+
var n int
592+
for n = 0; n < len(buf); n++ {
593+
_, err := r.Read(buf[n : n+1])
594+
if err != nil {
595+
if n == 0 {
596+
return state.OrderedListEntry{}, err
597+
}
598+
return state.OrderedListEntry{}, fmt.Errorf("unexpected error reading varint: %w", err)
599+
}
600+
if buf[n]&0x80 == 0 {
601+
n++
602+
break
603+
}
604+
}
605+
sortKey, consumed := protowire.ConsumeVarint(buf[:n])
606+
if consumed < 0 {
607+
return state.OrderedListEntry{}, fmt.Errorf("invalid varint in ordered list entry")
608+
}
609+
610+
dec := MakeElementDecoder(coder.SkipW(c))
611+
fv, err := dec.Decode(r)
612+
if err != nil {
613+
return state.OrderedListEntry{}, err
614+
}
615+
return state.OrderedListEntry{SortKey: int64(sortKey), Value: fv.Elm}, nil
616+
}
617+
469618
func (s *stateProvider) encodeKey(userStateID string, key any) ([]byte, error) {
470619
fv := FullValue{Elm: key}
471620
enc := MakeElementEncoder(coder.SkipW(s.keyCodersByID[userStateID]))
@@ -533,6 +682,7 @@ func (s *userStateAdapter) NewStateProvider(ctx context.Context, reader StateRea
533682
blindBagWriteCountsByKey: make(map[string]int),
534683
initialMapValuesByKey: make(map[string]map[string]any),
535684
initialMapKeysByKey: make(map[string][]any),
685+
initialOrderedListByKey: make(map[string][]any),
536686
readersByKey: make(map[string]io.ReadCloser),
537687
appendersByKey: make(map[string]io.Writer),
538688
clearersByKey: make(map[string]io.Writer),

0 commit comments

Comments
 (0)