Skip to content

Commit b5180f0

Browse files
Support graceful shutdown
Flush the buffer to clickhouse before stopping the api
1 parent 2e4839a commit b5180f0

File tree

2 files changed

+52
-2
lines changed

2 files changed

+52
-2
lines changed

code/collector-api/cmd/api/events.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@ type Events struct {
1818
ch chan TrackingWithGeo
1919
buf []TrackingWithGeo
2020
mu sync.Mutex
21+
done chan struct{} // Channel to signal shutdown
22+
wg sync.WaitGroup
2123
}
2224

2325
func NewEvents(store shared.DataStore) *Events {
2426
return &Events{
2527
store: store,
2628
ch: make(chan TrackingWithGeo, 2000),
29+
done: make(chan struct{}),
2730
}
2831
}
2932

@@ -49,6 +52,11 @@ func (e *Events) Run() {
4952

5053
case <-ticker.C:
5154
e.flush()
55+
56+
case <-e.done:
57+
fmt.Println("Shutting down Events processor, flushing remaining data...")
58+
e.flush()
59+
return
5260
}
5361
}
5462
}
@@ -83,3 +91,7 @@ func (e *Events) flush() {
8391
fmt.Println("ERROR inserting batch:", err)
8492
}
8593
}
94+
95+
func (e *Events) Stop() {
96+
close(e.done)
97+
}

code/collector-api/cmd/api/main.go

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package main
22

33
import (
4+
"context"
45
"encoding/json"
56
"flag"
67
"fmt"
78
"log"
89
"net/http"
10+
"os"
11+
"os/signal"
12+
"syscall"
13+
"time"
914

1015
"vortex/shared"
1116

@@ -59,8 +64,41 @@ func main() {
5964
// CORS support
6065
handler := cors.Default().Handler(mux)
6166

62-
log.Println("Listening on :9876")
63-
log.Fatal(http.ListenAndServe(":9876", handler))
67+
srv := &http.Server{
68+
Addr: ":9876",
69+
Handler: handler,
70+
}
71+
72+
// On shutdown
73+
stop := make(chan os.Signal, 1)
74+
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
75+
76+
// Execute server in goroutine to allow graceful shutdown
77+
go func() {
78+
log.Println("Listening on :9876")
79+
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
80+
log.Fatalf("Listen error: %s\n", err)
81+
}
82+
}()
83+
84+
// Wait for interrupt signal
85+
sig := <-stop
86+
log.Printf("Signal received (%v). Starting graceful shutdown...\n", sig)
87+
88+
// Shutdown the HTTP server first (stop accepting new requests)
89+
// Give the server a 5-second grace period to finish active requests
90+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
91+
defer cancel()
92+
93+
if err := srv.Shutdown(ctx); err != nil {
94+
log.Fatalf("HTTP server shutdown failed: %+v", err)
95+
}
96+
log.Println("HTTP server stopped.")
97+
98+
// Flush ClickHouse buffers
99+
events.Stop()
100+
101+
log.Println("Vortex shutdown successfully.")
64102
}
65103

66104
// Handlers

0 commit comments

Comments
 (0)