-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpso.py
More file actions
348 lines (300 loc) · 12.8 KB
/
pso.py
File metadata and controls
348 lines (300 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
"""
PSO + Method-B refinement: narrow bounds around PSO best and re-run stronger PSO.
- Code comments in English.
- Replace RUN_PSO=True to False if you already have a PSO best and want to run only the refined stage.
- Dependencies: numpy, scipy, matplotlib, pyswarms
pip install numpy scipy matplotlib pyswarms
"""
import numpy as np
import matplotlib.pyplot as plt
import pyswarms as ps
import time
# ---------------------------
# Basic settings (tweak as needed)
# ---------------------------
SAMPLE_FREQ = 512.0
N_SAMPLE = 512
DELTA_T = 1.0 / SAMPLE_FREQ
TARGET_SNR = 10.0
# Initial (coarse) PSO settings — used to get a coarse best estimate
RUN_PSO = True # run initial (coarse) PSO; set False to skip
COARSE_N_RUNS = 8
COARSE_N_ITERS = 1000
COARSE_N_PARTICLES = 30
COARSE_OPTIONS = {'c1': 0.5, 'c2': 0.3, 'w': 0.9}
# Global parameter bounds
BOUNDS_LO = np.array([1.0, 1.0, 1.0])
BOUNDS_HI = np.array([180.0, 10.0, 10.0])
GLOBAL_BOUNDS = (BOUNDS_LO, BOUNDS_HI)
# Refined PSO settings (Method B): stronger search within narrowed bounds
REFINED_RUNS = 10
REFINED_ITERS = 1500
REFINED_PARTICLES = 60
REFINED_OPTIONS = {'c1': 0.5, 'c2': 0.3, 'w': 0.8}
# Delta window around coarse best to define refined bounds (tune these)
# Example: a1 +/- 12, a2 +/- 1.5, a3 +/- 1.5
DELTAS = np.array([24.0, 4.0, 3.0])
# True parameters for synthetic demonstration (you can remove / replace)
TRUE_PARAMS = np.array([10.0, 3.0, 3.0])
# Toggles
DO_PLOT = True
# ---------------------------
# Helper functions (adapted)
# ---------------------------
def crcbgenqcsig(dataX, snr, qcCoefs):
"""Generate quadratic chirp, Euclidean-normalize to `snr`."""
a1, a2, a3 = qcCoefs
phaseVec = a1 * dataX + a2 * (dataX ** 2) + a3 * (dataX ** 3)
sigVec = np.sin(2.0 * np.pi * phaseVec)
sigVec = snr * sigVec / np.linalg.norm(sigVec)
return sigVec
def innerprodpsd(x_vec, y_vec, samp_freq, psd_vals):
"""
Calculate the inner product of vectors x_vec and y_vec
for Gaussian stationary noise with specified PSD.
Parameters
----------
x_vec : array_like
First time series vector.
y_vec : array_like
Second time series vector.
samp_freq : float
Sampling frequency (Hz).
psd_vals : array_like
PSD values at positive DFT frequencies.
Returns
-------
inn_prod : float
Inner product value.
"""
x_vec = np.asarray(x_vec)
y_vec = np.asarray(y_vec)
n_samples = len(x_vec)
if len(y_vec) != n_samples:
raise ValueError("Vectors must be of the same length")
k_nyq = n_samples // 2 + 1
if len(psd_vals) != k_nyq:
raise ValueError("PSD values must be specified at positive DFT frequencies")
fft_x = np.fft.fft(x_vec)
fft_y = np.fft.fft(y_vec)
# Handle even/odd number of samples when replicating PSD for negative freqs
neg_f_strt = 1 - (n_samples % 2)
psd_vec4norm = np.concatenate([
psd_vals,
psd_vals[(k_nyq - neg_f_strt) - 1 : 0 : -1]
])
data_len = samp_freq * n_samples
inn_prod = (1 / data_len) * np.dot(fft_x / psd_vec4norm, np.conj(fft_y))
return np.real(inn_prod)
def normsig4psd(sig_vec, samp_freq, psd_vec, snr):
"""
Normalize a signal to have a specified SNR in given noise PSD.
Parameters
----------
sig_vec : array_like
Signal vector to normalize.
samp_freq : float
Sampling frequency (Hz).
psd_vec : array_like
PSD values at positive DFT frequencies.
snr : float
Desired signal-to-noise ratio.
innerprod_func : callable
Function to compute the inner product with PSD weighting.
Must have signature innerprod_func(x_vec, y_vec, samp_freq, psd_vals).
Returns
-------
norm_sig_vec : ndarray
Normalized signal vector.
norm_fac : float
Normalization factor applied to sig_vec.
"""
sig_vec = np.asarray(sig_vec)
n_samples = len(sig_vec)
k_nyq = n_samples // 2 + 1
if len(psd_vec) != k_nyq:
raise ValueError("Length of PSD is not correct")
# Norm of signal squared (inner product with itself)
norm_sig_sqrd = innerprodpsd(sig_vec, sig_vec, samp_freq, psd_vec)
# Normalization factor
norm_fac = snr / np.sqrt(norm_sig_sqrd)
# Apply normalization
norm_sig_vec = norm_fac * sig_vec
return norm_sig_vec, norm_fac
def compute_template(a1, a2, a3, timeVec, sampFreq, psdPosFreq):
"""Return PSD-unit-norm template for (a1,a2,a3)."""
base = crcbgenqcsig(timeVec, 1.0, [a1, a2, a3])
tmpl, _ = normsig4psd(base, sampFreq, psdPosFreq, 1.0)
return tmpl
def compute_glrt_from_template(dataVec, templateVec, sampFreq, psdPosFreq):
"""Compute GLRT = (<data|template>)^2."""
llr = innerprodpsd(dataVec, templateVec, sampFreq, psdPosFreq)
return float(llr ** 2)
# ---------------------------
# Toy PSD / data generation (demo)
# ---------------------------
def noisePSD(f):
"""Toy PSD: bump between 50 and 100 Hz, else 1."""
if 50.0 <= f <= 100.0:
return (f - 50.0) * (100.0 - f) / 625.0 + 1.0
else:
return 1.0
def analytic_signal_function(t, A, a1, a2, a3):
"""Quadratic chirp raw signal (not PSD-normalized)."""
return A * np.sin(2.0 * np.pi * (a1 * t + a2 * t**2 + a3 * t**3))
def generate_noise_from_psd(psd_pos, N, Fs, rng):
"""Synthesize real noise with specified one-sided PSD (simple freq-domain method)."""
kNyq = N // 2 + 1
if len(psd_pos) != kNyq:
raise ValueError("psd_pos length mismatch")
X = np.zeros(N, dtype=complex)
# DC
X[0] = np.sqrt(Fs * N * psd_pos[0]) * rng.normal()
is_even = (N % 2 == 0)
if is_even:
nyq_ind = N // 2
X[nyq_ind] = np.sqrt(Fs * N * psd_pos[nyq_ind]) * rng.normal()
upper_k = kNyq - 1
if is_even:
upper_k = kNyq - 2
for k in range(1, upper_k + 1):
S_k = psd_pos[k]
sigma = np.sqrt(0.5 * Fs * N * S_k)
a = rng.normal()
b = rng.normal()
X[k] = sigma * (a + 1j * b)
X[-k] = np.conj(X[k])
return np.fft.ifft(X).real
# build time and PSD vectors
time_vec = np.arange(0, N_SAMPLE) * DELTA_T
kNyq = N_SAMPLE // 2 + 1
delta_f = SAMPLE_FREQ / N_SAMPLE
freqs_pos = np.arange(0, kNyq) * delta_f
psdPosFreq = np.array([noisePSD(f) for f in freqs_pos])
# synthesize observed data (signal scaled to TARGET_SNR + noise)
rng = np.random.default_rng(12345)
s_clean = analytic_signal_function(time_vec, 1.0, TRUE_PARAMS[0], TRUE_PARAMS[1], TRUE_PARAMS[2])
sig_scaled, _ = normsig4psd(s_clean, SAMPLE_FREQ, psdPosFreq, TARGET_SNR)
noise_t = generate_noise_from_psd(psdPosFreq, N_SAMPLE, SAMPLE_FREQ, rng)
dataVec = sig_scaled + noise_t
# ---------------------------
# PSO objective
# ---------------------------
def pso_objective(x_particles):
"""Return array of costs = -GLRT for each particle (minimize)."""
n = x_particles.shape[0]
costs = np.zeros(n)
for i in range(n):
a1, a2, a3 = x_particles[i]
try:
tmpl = compute_template(a1, a2, a3, time_vec, SAMPLE_FREQ, psdPosFreq)
glrt = compute_glrt_from_template(dataVec, tmpl, SAMPLE_FREQ, psdPosFreq)
costs[i] = -glrt
except Exception:
costs[i] = 1e12
return costs
# ---------------------------
# Coarse PSO (optional)
# ---------------------------
start_time = time.time()
coarse_best = None
if RUN_PSO:
print("Running coarse PSO...")
best_overall = {"cost": np.inf, "params": None, "history": None}
for run in range(COARSE_N_RUNS):
opt = ps.single.GlobalBestPSO(n_particles=COARSE_N_PARTICLES,
dimensions=3,
options=COARSE_OPTIONS,
bounds=GLOBAL_BOUNDS)
best_cost, best_pos = opt.optimize(pso_objective, iters=COARSE_N_ITERS, verbose=False)
print(f"Coarse run {run+1}/{COARSE_N_RUNS}: best_cost={best_cost:.6e}, best_pos={best_pos}")
if best_cost < best_overall["cost"]:
best_overall["cost"] = best_cost
best_overall["params"] = best_pos
best_overall["history"] = getattr(opt, "cost_history", None)
coarse_best = best_overall
print("Coarse PSO best params:", coarse_best["params"], "GLRT:", -coarse_best["cost"])
else:
# If skipping coarse PSO, user must supply a coarse best. Here we use TRUE_PARAMS as placeholder.
coarse_best = {"params": TRUE_PARAMS.copy(), "cost": -compute_glrt_from_template(dataVec,
compute_template(*TRUE_PARAMS, time_vec, SAMPLE_FREQ, psdPosFreq),
SAMPLE_FREQ, psdPosFreq),
"history": None}
print("RUN_PSO=False; using placeholder coarse best = TRUE_PARAMS.")
# ---------------------------
# Method B: narrow bounds around coarse best and re-run stronger PSO
# ---------------------------
coarse_params = np.asarray(coarse_best["params"], dtype=float).reshape(3,)
# compute new bounds as center +/- DELTAS, clipped to global bounds
new_lo = np.maximum(BOUNDS_LO, coarse_params - DELTAS)
new_hi = np.minimum(BOUNDS_HI, coarse_params + DELTAS)
refined_bounds = (new_lo, new_hi)
print("\nRefined search bounds centered on coarse best:")
for i, name in enumerate(['a1','a2','a3']):
print(f" {name}: [{refined_bounds[0][i]:.6g}, {refined_bounds[1][i]:.6g}] (center={coarse_params[i]:.6g})")
# run several refined PSO runs within narrowed bounds
print("\nRunning refined PSO within narrowed bounds...")
refined_best_overall = {"cost": np.inf, "params": None, "history": None}
for run in range(REFINED_RUNS):
opt_ref = ps.single.GlobalBestPSO(n_particles=REFINED_PARTICLES,
dimensions=3,
options=REFINED_OPTIONS,
bounds=refined_bounds)
best_cost_ref, best_pos_ref = opt_ref.optimize(pso_objective, iters=REFINED_ITERS, verbose=False)
print(f"Refined run {run+1}/{REFINED_RUNS}: best_cost={best_cost_ref:.6e}, best_pos={best_pos_ref}")
if best_cost_ref < refined_best_overall["cost"]:
refined_best_overall["cost"] = best_cost_ref
refined_best_overall["params"] = best_pos_ref
refined_best_overall["history"] = getattr(opt_ref, "cost_history", None)
refined_params = refined_best_overall["params"]
refined_glrt = -refined_best_overall["cost"]
print("\nRefined best params:", refined_params)
print("Refined best GLRT:", refined_glrt)
# ---------------------------
# Compare refined result to truth and coarse best
# ---------------------------
# generate fitted waveform and compare (scale both to same PSD-weighted SNR used in generation)
sig_ref_raw = analytic_signal_function(time_vec, 1.0, refined_params[0], refined_params[1], refined_params[2])
sig_ref_scaled, _ = normsig4psd(sig_ref_raw, SAMPLE_FREQ, psdPosFreq, TARGET_SNR)
sig_coarse_raw = analytic_signal_function(time_vec, 1.0, coarse_params[0], coarse_params[1], coarse_params[2])
sig_coarse_scaled, _ = normsig4psd(sig_coarse_raw, SAMPLE_FREQ, psdPosFreq, TARGET_SNR)
sig_true_scaled = sig_scaled # from data generation above
# metrics
mse_ref_true = np.mean((sig_ref_scaled - sig_true_scaled)**2)
corr_ref_true = np.corrcoef(sig_ref_scaled, sig_true_scaled)[0,1]
mse_coarse_true = np.mean((sig_coarse_scaled - sig_true_scaled)**2)
corr_coarse_true = np.corrcoef(sig_coarse_scaled, sig_true_scaled)[0,1]
print("\nComparison metrics:")
print(f" Coarse params: {coarse_params}")
print(f" MSE (coarse vs true) = {mse_coarse_true:.6e}, corr = {corr_coarse_true:.6f}")
print(f" Refined params: {refined_params}")
print(f" MSE (refined vs true) = {mse_ref_true:.6e}, corr = {corr_ref_true:.6f}")
# ---------------------------
# Plotting (optional)
# ---------------------------
if DO_PLOT:
zoom_n = min(200, N_SAMPLE)
plt.figure(figsize=(10,4))
plt.plot(time_vec[:zoom_n], dataVec[:zoom_n], label='observed (data)', alpha=0.7)
plt.plot(time_vec[:zoom_n], sig_true_scaled[:zoom_n], label='true (scaled)')
plt.plot(time_vec[:zoom_n], sig_coarse_scaled[:zoom_n], label='coarse fit (scaled)', linestyle='--')
plt.plot(time_vec[:zoom_n], sig_ref_scaled[:zoom_n], label='refined fit (scaled)', linewidth=2)
plt.xlabel('Time [s]')
plt.ylabel('Amplitude')
plt.title('Observed vs True vs Coarse vs Refined (zoomed)')
plt.legend()
plt.tight_layout()
plt.show()
# plot refined convergence if available
if refined_best_overall["history"] is not None:
plt.figure(figsize=(8,4))
glrt_hist = -np.array(refined_best_overall["history"])
plt.plot(np.arange(1, len(glrt_hist)+1), glrt_hist)
plt.xlabel('Iteration')
plt.ylabel('GLRT (best-so-far)')
plt.title('Refined PSO convergence (best GLRT per iter)')
plt.tight_layout()
plt.show()
end_time = time.time()
print("\nMethod-B refinement complete.")
print(f"程序运行时间: {end_time - start_time:.2f} 秒")