-
Notifications
You must be signed in to change notification settings - Fork 31
Expand file tree
/
Copy pathhttp_distributed.go
More file actions
209 lines (176 loc) · 6.25 KB
/
http_distributed.go
File metadata and controls
209 lines (176 loc) · 6.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
)
// serveDistributedWorker handles HTTP requests for distributed benchmark execution.
// It accepts POST requests with HttpbenchParameters and returns CollectResult.
func serveDistributedWorker(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for cross-origin requests
setCORSHeaders(w)
// Handle preflight OPTIONS request
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusOK)
return
}
// Only accept POST requests
if r.Method != http.MethodPost {
logWarn(0, "invalid method %s, only POST is allowed", r.Method)
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// Check Authorization header if worker API auth key is set
if len(httpWorkerApiAuthKey) > 0 {
authHeader := r.Header.Get("Authorization")
if authHeader != fmt.Sprintf("Bearer %s", httpWorkerApiAuthKey) {
logWarn(0, "invalid Authorization header %s", authHeader)
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
}
// Parse request parameters
var params HttpbenchParameters
if err := json.NewDecoder(r.Body).Decode(¶ms); err != nil {
logError(0, "failed to decode request body: %v", err)
http.Error(w, fmt.Sprintf("Invalid request body: %v", err), http.StatusBadRequest)
return
}
var seqId = params.SequenceId
logDebug(seqId, "received benchmark request: %s", params.String())
// Execute benchmark
worker := NewWorker(seqId)
result, err := handleStartup(worker, params)
if err != nil {
logError(seqId, "benchmark execution failed: %v", err)
http.Error(w, fmt.Sprintf("Benchmark failed: %v", err), http.StatusInternalServerError)
return
}
if result == nil {
logError(seqId, "benchmark returned nil result")
http.Error(w, "Internal error: nil result", http.StatusInternalServerError)
return
}
// Send JSON response
w.Header().Set("Content-Type", httpContentTypeJSON)
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(result); err != nil {
logError(seqId, "failed to encode response: %v", err)
}
}
// setCORSHeaders sets Cross-Origin Resource Sharing headers
func setCORSHeaders(w http.ResponseWriter) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
}
// postDistributedWorker sends a benchmark request to a distributed worker node.
// It uses a 5-minute timeout to allow for long-running benchmarks.
func postDistributedWorker(uri string, body []byte) (*CollectResult, error) {
logDebug(0, "sending request to worker %s, body size: %d bytes", uri, len(body))
// Create HTTP client with timeout
client := &http.Client{
Timeout: 0, // Infinite timeout for distributed communication
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 0, // No idle timeout
},
}
req, err := http.NewRequestWithContext(context.Background(),
http.MethodPost, uri, bytes.NewBuffer(body))
if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", httpContentTypeJSON)
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", httpWorkerApiAuthKey))
// Send request
resp, err := client.Do(req)
if err != nil {
logError(0, "failed to send request to worker %s: %v", uri, err)
return nil, fmt.Errorf("worker request failed: %w", err)
}
defer resp.Body.Close()
// Check response status
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
logError(0, "worker %s returned status %d: %s", uri, resp.StatusCode, string(body))
return nil, fmt.Errorf("worker %s returned status %d: %s", uri, resp.StatusCode, string(body))
}
// Parse response
var result CollectResult
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
logDebug(0, "received result from worker %s: %d requests completed", uri, result.LatsTotal)
return &result, nil
}
// postAllDistributedWorkers sends benchmark requests to all distributed worker nodes concurrently.
// It collects results from all workers and merges them into a single result.
// Workers that fail are logged but don't cause the entire operation to fail.
func postAllDistributedWorkers(workerAddrs flagSlice, jsonParams []byte) (*CollectResult, error) {
if len(workerAddrs) == 0 {
return nil, fmt.Errorf("no worker addresses provided")
}
logInfo(0, "distributing benchmark to %d worker(s)", len(workerAddrs))
var (
wg sync.WaitGroup
mu sync.Mutex
resultList []*CollectResult
failedCnt int
)
// Send requests to all workers concurrently
for _, addr := range workerAddrs {
wg.Add(1)
workerURL := buildWorkerURL(addr)
logDebug(0, "dispatching to worker: %s", workerURL)
go func(url string) {
defer wg.Done()
result, err := postDistributedWorker(url, jsonParams)
if err != nil {
logWarn(0, "worker %s failed: %v", url, err)
mu.Lock()
failedCnt++
mu.Unlock()
return
}
if result != nil {
mu.Lock()
resultList = append(resultList, result)
mu.Unlock()
logDebug(0, "worker %s completed successfully", url)
}
}(workerURL)
}
// Wait for all workers to complete
wg.Wait()
// Check if any workers succeeded
if len(resultList) == 0 {
return nil, fmt.Errorf("all %d worker(s) failed", len(workerAddrs))
}
logInfo(0, "collected results from %d worker(s), failedCnt: %d",
len(resultList), failedCnt)
// Merge all results
mergedResult := mergeCollectResult(nil, resultList...)
return mergedResult, nil
}
// buildWorkerURL constructs the full worker API URL from an address.
// It adds the http:// scheme if not present and appends the API path.
func buildWorkerURL(workerAddr string) string {
// Trim whitespace
workerAddr = strings.TrimSpace(workerAddr)
// Check if scheme is already present
if strings.HasPrefix(workerAddr, "http://") ||
strings.HasPrefix(workerAddr, "https://") {
// Remove trailing slash if present
workerAddr = strings.TrimSuffix(workerAddr, "/")
return fmt.Sprintf("%s%s", workerAddr, httpWorkerApiURL)
}
// Add default http:// scheme
return fmt.Sprintf("http://%s%s", workerAddr, httpWorkerApiURL)
}