Skip to content

Commit c558124

Browse files
authored
"Add mutex to avoid race condition (#90)
Signed-off-by: Aneesh Puttur <aputtur@redhat.com>
1 parent b0e5ff0 commit c558124

File tree

2 files changed

+80
-22
lines changed

2 files changed

+80
-22
lines changed

v2/server.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ var once sync.Once
6161
var ServerInstance *Server
6262
var healthCheckPause = 2 * time.Second
6363

64-
type serverStatus int
64+
type ServerStatus int
6565

6666
const (
6767
API_VERSION = "2.0"
@@ -88,8 +88,9 @@ type Server struct {
8888
httpServer *http.Server
8989
pubSubAPI *pubsubv1.API
9090
subscriberAPI *subscriberApi.API
91-
status serverStatus
91+
status ServerStatus
9292
statusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error
93+
statusLock sync.RWMutex
9394
}
9495

9596
// SubscriptionInfo
@@ -279,21 +280,37 @@ func (s *Server) Port() int {
279280

280281
// Ready gives the status of the server
281282
func (s *Server) Ready() bool {
283+
s.statusLock.RLock()
284+
defer s.statusLock.RUnlock()
282285
return s.status == started
283286
}
284287

288+
// SetStatus safely updates the server status
289+
func (s *Server) SetStatus(newStatus ServerStatus) {
290+
s.statusLock.Lock()
291+
defer s.statusLock.Unlock()
292+
s.status = newStatus
293+
}
294+
295+
func (s *Server) GetStatus() ServerStatus {
296+
s.statusLock.RLock()
297+
defer s.statusLock.RUnlock()
298+
return s.status
299+
}
300+
285301
// GetHostPath returns hostpath
286302
func (s *Server) GetHostPath() *types.URI {
287303
return types.ParseURI(fmt.Sprintf("http://localhost:%d%s", s.port, s.apiPath))
288304
}
289305

290306
// Start will start res routes service
291307
func (s *Server) Start() {
292-
if s.status == started || s.status == starting {
308+
currentStatus := s.GetStatus()
309+
if currentStatus == started || currentStatus == starting {
293310
log.Infof("Server is already running at port %d", s.port)
294311
return
295312
}
296-
s.status = starting
313+
s.SetStatus(starting)
297314
r := mux.NewRouter()
298315

299316
api := r.PathPrefix(s.apiPath).Subrouter()
@@ -482,7 +499,7 @@ func (s *Server) Start() {
482499

483500
log.Infof("starting v2 rest api server at port %d, endpoint %s", s.port, s.apiPath)
484501
go wait.Until(func() {
485-
s.status = started
502+
s.SetStatus(started)
486503
s.httpServer = &http.Server{
487504
ReadHeaderTimeout: HTTPReadHeaderTimeout,
488505
Addr: fmt.Sprintf(":%d", s.port),
@@ -491,7 +508,7 @@ func (s *Server) Start() {
491508
err := s.httpServer.ListenAndServe()
492509
if err != nil {
493510
log.Errorf("restarting due to error with api server %s\n", err.Error())
494-
s.status = failed
511+
s.SetStatus(failed)
495512
}
496513
}, 1*time.Second, s.closeCh)
497514
}

v2/server_test.go

Lines changed: 57 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,20 +46,22 @@ import (
4646
var (
4747
server *restapi.Server
4848

49-
eventOutCh chan *channel.DataChan
50-
closeCh chan struct{}
51-
wg sync.WaitGroup
52-
port = 8990
53-
apHost = "localhost"
54-
apPath = "/api/ocloudNotifications/v2/"
55-
resource = "/east-edge-10/Node3/sync/sync-status/sync-state"
56-
resourceInvalid = "/east-edge-10/Node3/invalid"
57-
storePath = "."
58-
ObjSub pubsub.PubSub
59-
ObjPub pubsub.PubSub
60-
testSource = "/sync/sync-status/sync-state"
61-
testType = "event.synchronization-state-change"
62-
endpoint = "http://localhost:8990//api/ocloudNotifications/v2/dummy"
49+
eventOutCh chan *channel.DataChan
50+
closeCh chan struct{}
51+
wg sync.WaitGroup
52+
port = 8990
53+
apHost = "localhost"
54+
apPath = "/api/ocloudNotifications/v2/"
55+
resource = "/east-edge-10/Node3/sync/sync-status/sync-state"
56+
resourceInvalid = "/east-edge-10/Node3/invalid"
57+
storePath = "."
58+
ObjSub pubsub.PubSub
59+
ObjPub pubsub.PubSub
60+
testSource = "/sync/sync-status/sync-state"
61+
testType = "event.synchronization-state-change"
62+
endpoint = "http://localhost:8990//api/ocloudNotifications/v2/dummy"
63+
onceCloseEvent sync.Once
64+
onceCloseCloseCh sync.Once
6365
)
6466

6567
func onReceiveOverrideFn(e cloudevents.Event, d *channel.DataChan) error {
@@ -772,8 +774,15 @@ func TestServer_End(*testing.T) {
772774
}
773775
os.Remove("pub.json")
774776
os.Remove("sub.json")
775-
close(eventOutCh)
776-
close(closeCh)
777+
// hanlding go test -race ./...
778+
// by closing channel only once
779+
onceCloseEvent.Do(func() {
780+
close(eventOutCh)
781+
})
782+
783+
onceCloseCloseCh.Do(func() {
784+
close(closeCh)
785+
})
777786
}
778787

779788
// Rest client to make http request
@@ -829,3 +838,35 @@ func (r *Rest) PostEvent(url *types.URI, e event.Event) error {
829838
}
830839
return nil
831840
}
841+
842+
func TestServerStatusConcurrency(*testing.T) {
843+
s := &restapi.Server{} // Adjust import as needed if your package name differs
844+
statuses := []restapi.ServerStatus{
845+
0,
846+
1,
847+
2,
848+
3,
849+
}
850+
var wg sync.WaitGroup
851+
wg.Add(2)
852+
853+
// Writer goroutine: updates status multiple times
854+
go func() {
855+
defer wg.Done()
856+
for i := 0; i < 1000; i++ {
857+
s.SetStatus(statuses[i%len(statuses)])
858+
time.Sleep(1 * time.Millisecond)
859+
}
860+
}()
861+
862+
// Reader goroutine: reads Ready() status
863+
go func() {
864+
defer wg.Done()
865+
for i := 0; i < 1000; i++ {
866+
_ = s.Ready()
867+
time.Sleep(1 * time.Millisecond)
868+
}
869+
}()
870+
871+
wg.Wait()
872+
}

0 commit comments

Comments
 (0)