Skip to content

Commit ab551e5

Browse files
committed
feat(proxy): add agent discovery with health monitoring
- Add new AgentDiscovery service for discovering and tracking other agents in the network - Implement periodic agent discovery from Core API with 60-second update interval - Add AgentHealthChecker for monitoring agent health and latency metrics - Implement health check loop running every 30 seconds with configurable thresholds - Add support for geographic location tracking (manual and IP-based) - Add intelligent agent selection based on health score and latency - Add distance calculation using Haversine formula for geo-aware routing - Initialize agent discovery on startup with automatic background updates - Add comprehensive logging for discovery and health check operations - Enable anycast routing to leverage discovered agents for improved performance
1 parent 1d39b79 commit ab551e5

File tree

5 files changed

+514
-26
lines changed

5 files changed

+514
-26
lines changed

main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ func main() {
7474
// Initialize HTTP client provider for health checks
7575
proxy.InitHTTPClientProvider()
7676

77+
// Initialize agent discovery for anycast routing
78+
log.Println("Starting Agent Discovery...")
79+
agentDiscovery := proxy.GetAgentDiscovery(coreURL, agentKey)
80+
_ = agentDiscovery // Discovery starts automatically
81+
7782
// настраиваем статистику коллектор
7883
statsCollector := stats.GetCollector()
7984
statsCollector.SetConfig(coreURL, agentID, agentKey)

proxy/agentdiscovery.go

Lines changed: 371 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,371 @@
1+
package proxy
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io"
7+
"log"
8+
"math"
9+
"net/http"
10+
"sync"
11+
"time"
12+
)
13+
14+
// AgentDiscovery manages discovery and health monitoring of other agents
15+
type AgentDiscovery struct {
16+
mu sync.RWMutex
17+
coreURL string
18+
agentKey string
19+
agents map[string]*DiscoveredAgent // key: agentId
20+
lastUpdate time.Time
21+
updateTicker *time.Ticker
22+
healthChecker *AgentHealthChecker
23+
}
24+
25+
// DiscoveredAgent represents a discovered agent in the network
26+
type DiscoveredAgent struct {
27+
AgentID string `json:"agentId"`
28+
Endpoint string `json:"endpoint"` // https://ip:443
29+
IPAddress string `json:"ipAddress"`
30+
Location *Location `json:"location"`
31+
IsActive bool `json:"isActive"`
32+
LastSeen time.Time `json:"lastSeen"`
33+
// Health metrics
34+
Latency time.Duration `json:"-"`
35+
HealthScore float64 `json:"-"` // 0.0 to 1.0
36+
LastHealthCheck time.Time `json:"-"`
37+
}
38+
39+
// Location represents geographic location of an agent
40+
type Location struct {
41+
Country string `json:"country"`
42+
CountryCode string `json:"countryCode"`
43+
City string `json:"city"`
44+
Region string `json:"region"`
45+
Latitude float64 `json:"latitude"`
46+
Longitude float64 `json:"longitude"`
47+
}
48+
49+
// AgentHealthChecker monitors health of discovered agents
50+
type AgentHealthChecker struct {
51+
mu sync.RWMutex
52+
client *http.Client
53+
results map[string]*HealthCheckResult
54+
}
55+
56+
// HealthCheckResult stores health check results for an agent
57+
type HealthCheckResult struct {
58+
IsHealthy bool
59+
Latency time.Duration
60+
LastCheck time.Time
61+
FailureCount int
62+
}
63+
64+
var (
65+
globalAgentDiscovery *AgentDiscovery
66+
globalAgentDiscoveryOnce sync.Once
67+
)
68+
69+
// GetAgentDiscovery returns the global agent discovery instance
70+
func GetAgentDiscovery(coreURL, agentKey string) *AgentDiscovery {
71+
globalAgentDiscoveryOnce.Do(func() {
72+
globalAgentDiscovery = &AgentDiscovery{
73+
coreURL: coreURL,
74+
agentKey: agentKey,
75+
agents: make(map[string]*DiscoveredAgent),
76+
healthChecker: NewAgentHealthChecker(),
77+
}
78+
// Start periodic updates
79+
globalAgentDiscovery.Start()
80+
})
81+
return globalAgentDiscovery
82+
}
83+
84+
// NewAgentHealthChecker creates a new health checker
85+
func NewAgentHealthChecker() *AgentHealthChecker {
86+
return &AgentHealthChecker{
87+
client: &http.Client{
88+
Timeout: 5 * time.Second,
89+
},
90+
results: make(map[string]*HealthCheckResult),
91+
}
92+
}
93+
94+
// Start begins periodic agent discovery and health checks
95+
func (ad *AgentDiscovery) Start() {
96+
// Initial discovery
97+
ad.DiscoverAgents()
98+
99+
// Start periodic discovery (every 60 seconds)
100+
ad.updateTicker = time.NewTicker(60 * time.Second)
101+
go func() {
102+
for range ad.updateTicker.C {
103+
ad.DiscoverAgents()
104+
}
105+
}()
106+
107+
// Start health checking (every 30 seconds)
108+
go ad.healthCheckLoop()
109+
}
110+
111+
// Stop stops the discovery service
112+
func (ad *AgentDiscovery) Stop() {
113+
if ad.updateTicker != nil {
114+
ad.updateTicker.Stop()
115+
}
116+
}
117+
118+
// DiscoverAgents fetches list of active agents from Core API
119+
func (ad *AgentDiscovery) DiscoverAgents() error {
120+
req, err := http.NewRequest("GET", ad.coreURL+"/api/agent/list", nil)
121+
if err != nil {
122+
log.Printf("[AgentDiscovery] Error creating request: %v", err)
123+
return err
124+
}
125+
126+
req.Header.Set("Authorization", "Bearer "+ad.agentKey)
127+
128+
client := &http.Client{Timeout: 10 * time.Second}
129+
resp, err := client.Do(req)
130+
if err != nil {
131+
log.Printf("[AgentDiscovery] Error fetching agents: %v", err)
132+
return err
133+
}
134+
defer resp.Body.Close()
135+
136+
if resp.StatusCode != http.StatusOK {
137+
log.Printf("[AgentDiscovery] Error response: %d", resp.StatusCode)
138+
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
139+
}
140+
141+
body, err := io.ReadAll(resp.Body)
142+
if err != nil {
143+
log.Printf("[AgentDiscovery] Error reading response: %v", err)
144+
return err
145+
}
146+
147+
var response struct {
148+
Agents []struct {
149+
AgentID string `json:"agentId"`
150+
IPAddress string `json:"ipAddress"`
151+
IsActive bool `json:"isActive"`
152+
LastSeen string `json:"lastSeen"`
153+
IPInfo *struct {
154+
Country string `json:"country"`
155+
CountryCode string `json:"countryCode"`
156+
City string `json:"city"`
157+
Region string `json:"region"`
158+
Lat float64 `json:"lat"`
159+
Lon float64 `json:"lon"`
160+
} `json:"ipInfo"`
161+
ManualLocation *struct {
162+
Country string `json:"country"`
163+
City string `json:"city"`
164+
Region string `json:"region"`
165+
} `json:"manualLocation"`
166+
} `json:"agents"`
167+
}
168+
169+
if err := json.Unmarshal(body, &response); err != nil {
170+
log.Printf("[AgentDiscovery] Error parsing response: %v", err)
171+
return err
172+
}
173+
174+
ad.mu.Lock()
175+
defer ad.mu.Unlock()
176+
177+
// Update agents map
178+
newAgents := make(map[string]*DiscoveredAgent)
179+
for _, agentData := range response.Agents {
180+
if !agentData.IsActive || agentData.IPAddress == "" {
181+
continue
182+
}
183+
184+
// Build endpoint URL
185+
endpoint := fmt.Sprintf("https://%s", agentData.IPAddress)
186+
187+
// Use manual location if set, otherwise use ipInfo
188+
var location *Location
189+
if agentData.ManualLocation != nil && agentData.ManualLocation.Country != "" {
190+
location = &Location{
191+
Country: agentData.ManualLocation.Country,
192+
City: agentData.ManualLocation.City,
193+
Region: agentData.ManualLocation.Region,
194+
}
195+
} else if agentData.IPInfo != nil {
196+
location = &Location{
197+
Country: agentData.IPInfo.Country,
198+
CountryCode: agentData.IPInfo.CountryCode,
199+
City: agentData.IPInfo.City,
200+
Region: agentData.IPInfo.Region,
201+
Latitude: agentData.IPInfo.Lat,
202+
Longitude: agentData.IPInfo.Lon,
203+
}
204+
}
205+
206+
lastSeen, _ := time.Parse(time.RFC3339, agentData.LastSeen)
207+
208+
agent := &DiscoveredAgent{
209+
AgentID: agentData.AgentID,
210+
Endpoint: endpoint,
211+
IPAddress: agentData.IPAddress,
212+
Location: location,
213+
IsActive: agentData.IsActive,
214+
LastSeen: lastSeen,
215+
HealthScore: 1.0, // Default to healthy
216+
}
217+
218+
// Preserve existing health metrics if agent was already known
219+
if existing, ok := ad.agents[agentData.AgentID]; ok {
220+
agent.Latency = existing.Latency
221+
agent.HealthScore = existing.HealthScore
222+
agent.LastHealthCheck = existing.LastHealthCheck
223+
}
224+
225+
newAgents[agentData.AgentID] = agent
226+
}
227+
228+
ad.agents = newAgents
229+
ad.lastUpdate = time.Now()
230+
231+
log.Printf("[AgentDiscovery] Discovered %d active agents", len(newAgents))
232+
return nil
233+
}
234+
235+
// healthCheckLoop periodically checks health of all discovered agents
236+
func (ad *AgentDiscovery) healthCheckLoop() {
237+
ticker := time.NewTicker(30 * time.Second)
238+
defer ticker.Stop()
239+
240+
for range ticker.C {
241+
ad.mu.RLock()
242+
agents := make([]*DiscoveredAgent, 0, len(ad.agents))
243+
for _, agent := range ad.agents {
244+
agents = append(agents, agent)
245+
}
246+
ad.mu.RUnlock()
247+
248+
// Check health of each agent
249+
for _, agent := range agents {
250+
go ad.checkAgentHealth(agent)
251+
}
252+
}
253+
}
254+
255+
// checkAgentHealth performs a health check on a single agent
256+
func (ad *AgentDiscovery) checkAgentHealth(agent *DiscoveredAgent) {
257+
startTime := time.Now()
258+
259+
// Try to reach agent's health endpoint
260+
healthURL := agent.Endpoint + "/health"
261+
req, err := http.NewRequest("GET", healthURL, nil)
262+
if err != nil {
263+
ad.updateHealthScore(agent.AgentID, false, 0)
264+
return
265+
}
266+
267+
resp, err := ad.healthChecker.client.Do(req)
268+
latency := time.Since(startTime)
269+
270+
if err != nil {
271+
ad.updateHealthScore(agent.AgentID, false, latency)
272+
return
273+
}
274+
defer resp.Body.Close()
275+
276+
isHealthy := resp.StatusCode == http.StatusOK
277+
ad.updateHealthScore(agent.AgentID, isHealthy, latency)
278+
}
279+
280+
// updateHealthScore updates health metrics for an agent
281+
func (ad *AgentDiscovery) updateHealthScore(agentID string, isHealthy bool, latency time.Duration) {
282+
ad.mu.Lock()
283+
defer ad.mu.Unlock()
284+
285+
agent, ok := ad.agents[agentID]
286+
if !ok {
287+
return
288+
}
289+
290+
agent.Latency = latency
291+
agent.LastHealthCheck = time.Now()
292+
293+
// Calculate health score based on:
294+
// - Is the agent responding? (0.5 weight)
295+
// - Latency (0.5 weight)
296+
healthScore := 0.0
297+
if isHealthy {
298+
healthScore += 0.5
299+
300+
// Latency score: 0-100ms = 0.5, 100-500ms = 0.25-0.5, >500ms = 0-0.25
301+
if latency < 100*time.Millisecond {
302+
healthScore += 0.5
303+
} else if latency < 500*time.Millisecond {
304+
healthScore += 0.25 + (0.25 * (1.0 - float64(latency-100*time.Millisecond)/float64(400*time.Millisecond)))
305+
} else {
306+
healthScore += 0.25 * (1.0 - math.Min(1.0, float64(latency-500*time.Millisecond)/float64(2000*time.Millisecond)))
307+
}
308+
}
309+
310+
agent.HealthScore = healthScore
311+
}
312+
313+
// GetHealthyAgents returns list of healthy agents sorted by health score
314+
func (ad *AgentDiscovery) GetHealthyAgents() []*DiscoveredAgent {
315+
ad.mu.RLock()
316+
defer ad.mu.RUnlock()
317+
318+
healthy := make([]*DiscoveredAgent, 0)
319+
for _, agent := range ad.agents {
320+
if agent.HealthScore > 0.3 { // Threshold for "healthy"
321+
healthy = append(healthy, agent)
322+
}
323+
}
324+
325+
return healthy
326+
}
327+
328+
// GetNearestAgent returns the nearest healthy agent to a given location
329+
func (ad *AgentDiscovery) GetNearestAgent(clientLat, clientLon float64) *DiscoveredAgent {
330+
agents := ad.GetHealthyAgents()
331+
if len(agents) == 0 {
332+
return nil
333+
}
334+
335+
var nearest *DiscoveredAgent
336+
minDistance := math.MaxFloat64
337+
338+
for _, agent := range agents {
339+
if agent.Location == nil {
340+
continue
341+
}
342+
343+
distance := haversineDistance(clientLat, clientLon, agent.Location.Latitude, agent.Location.Longitude)
344+
345+
// Weight distance by health score (prefer healthier agents)
346+
weightedDistance := distance / agent.HealthScore
347+
348+
if weightedDistance < minDistance {
349+
minDistance = weightedDistance
350+
nearest = agent
351+
}
352+
}
353+
354+
return nearest
355+
}
356+
357+
// haversineDistance calculates distance between two points on Earth in kilometers
358+
func haversineDistance(lat1, lon1, lat2, lon2 float64) float64 {
359+
const earthRadius = 6371.0 // km
360+
361+
dLat := (lat2 - lat1) * math.Pi / 180.0
362+
dLon := (lon2 - lon1) * math.Pi / 180.0
363+
364+
a := math.Sin(dLat/2)*math.Sin(dLat/2) +
365+
math.Cos(lat1*math.Pi/180.0)*math.Cos(lat2*math.Pi/180.0)*
366+
math.Sin(dLon/2)*math.Sin(dLon/2)
367+
368+
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
369+
370+
return earthRadius * c
371+
}

0 commit comments

Comments
 (0)