Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 134 additions & 174 deletions StatTools/analysis/dpcca.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,6 @@ def _covariation_single_signal(signal: np.ndarray):
return F


def _covariation(signal_1: np.ndarray, signal_2: np.ndarray = None):
"""
Implementation equation (4) from [1]

[1] Yuan, N., Fu, Z., Zhang, H. et al. Detrended Partial-Cross-Correlation Analysis: A New Method for Analyzing Correlations in Complex System. Sci Rep 5, 8143 (2015). https://doi.org/10.1038/srep08143
"""
if signal_2 is None:
return _covariation_single_signal(signal_1)
F = np.zeros((signal_1.shape[0], signal_1.shape[0]), dtype=float)
for n in range(signal_1.shape[0]):
for m in range(signal_2.shape[0]):
F[n][m] = np.mean(signal_1[n] * signal_2[m])
return F


def _correlation(F: np.ndarray):
"""
Implementation equation (6) from [1]
Expand All @@ -52,13 +37,15 @@ def _correlation(F: np.ndarray):
return R


def _cross_correlation(R: np.ndarray):
def _partial_correlation(R: np.ndarray):
"""
Implementation equation (9) from [1]
[1] Yuan, N., Fu, Z., Zhang, H. et al. Detrended Partial-Cross-Correlation Analysis: A New Method for Analyzing Correlations in Complex System. Sci Rep 5, 8143 (2015). https://doi.org/10.1038/srep08143
"""
P = np.zeros((R.shape[0], R.shape[0]), dtype=float)
Cinv = np.linalg.inv(R)
# justification: finding the inverse matrix with pinv (Moore-Penrose pseudo-inverse of a matrix)
# to avoid interaction with singular correlation matrix
Cinv = np.linalg.pinv(R)
for n in range(R.shape[0]):
for m in range(n + 1):
if Cinv[n][n] * Cinv[m][m] < 0:
Expand All @@ -81,13 +68,12 @@ def _detrend(current_signal: np.ndarray, pd: np.int32):
Returns:
y_detrended(np.ndarray): Detrended data array.
"""
current_signal_values = len(current_signal)
xw = np.arange(current_signal_values, dtype=np.int32)
p_fit = np.polyfit(xw, current_signal, deg=pd)
current_signal = np.asarray(current_signal, dtype=np.float64)
n = len(current_signal)
xw = np.arange(n, dtype=np.int32)
p_fit = np.polyfit(xw, current_signal, deg=pd, rcond=None)
z_fit = np.polyval(p_fit, xw)
y_detrended = np.zeros_like(current_signal, dtype=np.float64)
y_detrended[:] = current_signal - z_fit
return y_detrended
return current_signal - z_fit


# @profile()
Expand Down Expand Up @@ -151,9 +137,9 @@ def dpcca_worker(

Y = np.array([np.concatenate(Y[i]) for i in range(Y.shape[0])])

F[s_i] = _covariation(Y)
F[s_i] = _covariation_single_signal(Y)
R[s_i] = _correlation(F[s_i])
P[s_i] = _cross_correlation(R[s_i])
P[s_i] = _partial_correlation(R[s_i])

return P, R, F

Expand All @@ -170,9 +156,9 @@ def tds_dpcca_worker(
) -> Union[tuple, None]:
"""
Core of DPCAA algorithm with time lags. Takes bunch of S-values and returns 3 4d-matrices: first index
represents length of time lags array. There is global data and indices: data in all input array and
local data: data and indices in current window. Comparison signal_1 and signal_2 with time delays by indicies:
find correlation and etc of x[i] and y[i+tau] and x[i] and y[i-tau] where tau is value of time lag.
represents length of time lags array. There is global data and indices: data in all input array as s_val.
Comparison of signal_1 and signal_2 with time delays by windows(s_val) shifting: find correlation and etc
of x[i] and y[i+tau] and x[i] and y[i-tau] where tau is value of time lag.

Args:
s (Union[int, Iterable]): points where fluctuation function F(s) is calculated.
Expand Down Expand Up @@ -202,120 +188,115 @@ def tds_dpcca_worker(

"""

if max_time_delay is None:
return dpcca_worker(s, arr, step, pd, gc_params, n_integral=n_integral)

s_list = [s] if isinstance(s, int) else list(s)

if time_delays is not None:
time_delay_list = np.arange(-time_delays, time_delays + 1, dtype=int)
time_delay_list = time_delays
elif max_time_delay is not None:
time_delay_list = np.arange(-max_time_delay, max_time_delay + 1, dtype=int)
else:
raise ValueError("Use lags")

n_lags = len(time_delay_list) # length of input time lags array
n_lags = len(time_delay_list)
n_signals, n = arr.shape

cumsum_arr = arr
for _ in range(n_integral):
cumsum_arr = np.cumsum(cumsum_arr, axis=1) # integral sum

f = np.zeros(
(n_lags, len(s_list), n_signals, n_signals), dtype=np.float64
) # covariation
r = np.zeros(
(n_lags, len(s_list), n_signals, n_signals), dtype=np.float64
) # levels of cross correlation
p = np.zeros(
(n_lags, len(s_list), n_signals, n_signals), dtype=np.float64
) # partial cross correlation levels
cumsum_arr = np.cumsum(cumsum_arr, axis=1)

min_lag = min(time_delay_list)
max_lag = max(time_delay_list)
start_arr = max(0, -min_lag) # value of points of data should be trimmed
end_arr = max(0, max_lag) # value of points of data should be trimmed
if start_arr + end_arr >= n:
raise ValueError("need less time delay")
valid_arr_lag = n - start_arr - end_arr # new size of array with time lags

f = np.zeros((n_lags, len(s_list), n_signals, n_signals), dtype=np.float64)
r = np.zeros((n_lags, len(s_list), n_signals, n_signals), dtype=np.float64)
p = np.zeros((n_lags, len(s_list), n_signals, n_signals), dtype=np.float64)

for s_i, s_val in enumerate(s_list):

if s_val > n:
if s_val > valid_arr_lag:
raise ValueError("Time window couldnt be larger then input data array")
step_s = int(step * s_val)
n_windows_arr = []
for lag in time_delay_list:
len_lagged = valid_arr_lag - abs(lag)
if len_lagged > 0:
cur_window = (len_lagged - s_val) // step_s + 1
n_windows_arr.append(
cur_window
) # calculation value of windows for each lag
else:
n_windows_arr.append(0)

start = np.arange(
0, n - s_val + 1
) # all indices of beginning of the windows in input data array
start_window = start[
:: int(step * s_val)
] # biginning of the all windows with step
n_windows = len(start_window) # value of windows

signal_view = np.lib.stride_tricks.sliding_window_view(
cumsum_arr, window_shape=s_val, axis=1
) # sliding window
signal_view = signal_view[
:, :: int(step * s_val), :
] # (signals,all windows, len of window)

# We have global data and indices: data in all input array
# local data: data and indices in current window
# compare signal_1 and signal_2 with time delays by indices: find correlation and etc of
# x[i] and y[i+tau] and x[i] and y[i-tau] where tau is value of time lag. Also we have limits:
# if time lag>0: global start index-value of start position of current window, global_end: min(start_pos+s_val,n-lag)
# where s_val is length of current window also index of current value must be less then (n-lag) where n is length of input array
# local end index must be less then (length of current window-time lag). Cross points is value of points for analyse.
# if time lag<0: global start: max(value of start position of current window, value of start position of current window-time lag)
# global end is value of start position of current window+ length of current window.
# Also we have shift_sig: index of current value in current time window of signal without lag: x[i] and x[i+tau]
# and shift_sig_lag: index of current value in current time window of signal with lag
# index of current value in current time window of signal with lag: x[i-tau] and x[i].
n_windows = min(n_windows_arr) if n_windows_arr else 0
if n_windows <= 0:
continue
all_windows = np.zeros((n_lags, n_signals, n_windows, s_val))
for lag_index, lag in enumerate(time_delay_list):
start_base_signal = start_arr
len_base_signal = valid_arr_lag

start_lagged_signal = start_arr + lag # start position of shifted signal
len_lagged_signal = valid_arr_lag - abs(
lag
) # valid length of analyzed data with lag
if start_lagged_signal < 0 or len_lagged_signal > n:
continue
min_len = min(
len_base_signal, len_lagged_signal
) # valid length of analyzed data for each signal
if min_len < s_val:
continue
shifted_arr = np.zeros(
(n_signals, min_len)
) # array for all signals with valid length
shifted_arr[0, :] = cumsum_arr[
0, start_base_signal : start_base_signal + min_len
] # signal without lag
for sig_idx in range(1, n_signals):
shifted_arr[sig_idx, :] = cumsum_arr[
sig_idx, start_lagged_signal : start_lagged_signal + min_len
] # signals with lag in input array

windows = np.lib.stride_tricks.sliding_window_view(
shifted_arr, window_shape=s_val, axis=1
) # window of shifted data with step with lag
windows = windows[:, ::step_s, :]
useful_windows = min(
windows.shape[1], n_windows
) # find min between actual length and calculated for lag

if useful_windows > 0:
all_windows[lag_index, :, :useful_windows, :] = windows[
:, :useful_windows, :
] # copy of windows with current lag to general array with data
if np.all(all_windows == 0):
continue # if size of all windows<0: skip current s
common_windows = all_windows.reshape(n_lags * n_signals, n_windows, s_val)
detrended = np.zeros((n_lags * n_signals, n_windows * s_val))
for i in range(n_lags * n_signals):
position_idx = 0
for w in range(n_windows):
start_pos = start_window[w]
if lag >= 0: # value of start position of current window
global_end = min(start_pos + s_val, n - lag)
if (
start_pos >= global_end
): # start position cant be larger then global index lag
continue
local_end = s_val - lag
if local_end <= 0:
continue

cross_points = min(global_end - start_pos, local_end)
if cross_points <= 0:
continue

shift_sig = 0
shift_sig_lag = lag
else:

global_start = max(start_pos, start_pos - lag)
global_end = start_pos + s_val
if global_start >= global_end:
continue

cross_points = global_end - global_start
if cross_points <= 0:
continue
shift_sig = -lag
shift_sig_lag = 0

signal_windows = np.zeros((n_signals, cross_points), dtype=float)
signal_lag_windows = np.zeros((n_signals, cross_points), dtype=float)
for sig_idx in range(n_signals):

data_true = signal_view[
sig_idx, w, shift_sig : shift_sig + cross_points
] # detrended array with selected data
data_lag_true = signal_view[
sig_idx, w, shift_sig_lag : cross_points + shift_sig_lag
] # detrended array with selected data with time lags

signal_windows[sig_idx] = _detrend(data_true, pd)
signal_lag_windows[sig_idx] = _detrend(data_lag_true, pd)
covariation = _covariation(signal_windows, signal_lag_windows)
correlation = _correlation(covariation)
cross_correlation = _cross_correlation(correlation)

f[lag_index, s_i] = covariation
r[lag_index, s_i] = correlation
p[lag_index, s_i] = cross_correlation

detrend_data = _detrend(common_windows[i, w, :], pd)
detrended[i, position_idx : position_idx + s_val] = detrend_data
position_idx += s_val # detrend in each window
covariation = _covariation_single_signal(detrended)
correlation = _correlation(covariation)
partial_correlation = _partial_correlation(correlation)

covariation_lag = covariation.reshape(n_lags, n_signals, n_lags, n_signals)
correlation_lag = correlation.reshape(n_lags, n_signals, n_lags, n_signals)
partial_correlation_lag = partial_correlation.reshape(
n_lags, n_signals, n_lags, n_signals
)
for D in range(n_lags):
f[D, s_i, :, :] = covariation_lag[D, :, D, :]
r[D, s_i, :, :] = correlation_lag[D, :, D, :]
p[D, s_i, :, :] = partial_correlation_lag[D, :, D, :]
return p, r, f


Expand All @@ -332,6 +313,7 @@ def dpcca(
step: float,
s: Union[int, Iterable],
max_lag=None,
time_delays=None,
buffer=None,
gc_params: tuple = None,
short_vectors: bool = False,
Expand Down Expand Up @@ -384,57 +366,6 @@ def dpcca(
stacklevel=2,
)

if max_lag is not None:
concatenate_all = False # concatenate if 1d array , no need to use 3d P, R, F
if arr.ndim == 1:
arr = np.array([arr])
concatenate_all = True

if isinstance(s, Iterable):
init_s_len = len(s)

s = list(filter(lambda x: x <= arr.shape[1] / 4, s))
if len(s) < 1:
raise ValueError(
"All input S values are larger than vector shape / 4 !"
)

if len(s) != init_s_len:
print(f"\tDPCAA warning: only following S values are in use: {s}")

elif isinstance(s, (float, int)):
if s > arr.shape[1] / 4:
raise ValueError("Cannot use S > L / 4")
s = (s,)

if (processes == 1 or len(s) == 1) and max_lag is not None:
p, r, f = tds_dpcca_worker(
s,
arr,
step,
pd,
time_delays=None,
max_time_delay=max_lag,
gc_params=gc_params,
n_integral=n_integral,
)

if concatenate_all:
return concatenate_3d_matrices(p, r, f) + (s,)
else:
return p, r, f, s

if short_vectors:
return dpcca_worker(
s,
arr,
step,
pd,
gc_params=gc_params,
short_vectors=True,
n_integral=n_integral,
) + (s,)

concatenate_all = False # concatenate if 1d array , no need to use 3d P, R, F
if arr.ndim == 1:
arr = np.array([arr])
Expand Down Expand Up @@ -475,6 +406,35 @@ def dpcca(
)
s = (s,)

if max_lag is not None or (time_delays is not None):

p, r, f = tds_dpcca_worker(
s,
arr,
step,
pd,
time_delays=time_delays,
max_time_delay=max_lag,
gc_params=gc_params,
n_integral=n_integral,
)

if concatenate_all:
return concatenate_3d_matrices(p, r, f) + (s,)
else:
return p, r, f, s

if short_vectors:
return dpcca_worker(
s,
arr,
step,
pd,
gc_params=gc_params,
short_vectors=True,
n_integral=n_integral,
) + (s,)

if processes == 1 or len(s) == 1:
p, r, f = dpcca_worker(
s, arr, step, pd, gc_params=gc_params, n_integral=n_integral
Expand Down
Loading
Loading