From 6f6b1c00f0ca6aca676b7c2a518c7de8bce646e4 Mon Sep 17 00:00:00 2001 From: David Rotermund <54365609+davrot@users.noreply.github.com> Date: Thu, 22 Feb 2024 14:40:35 +0100 Subject: [PATCH] Add files via upload --- .../binning_aligned_process.py | 119 ++++++++---------- .../binning_aligned_process_classic.py | 2 +- 2 files changed, 56 insertions(+), 65 deletions(-) diff --git a/reproduction_effort/binning_aligned_process.py b/reproduction_effort/binning_aligned_process.py index 8c88503..18b65ca 100644 --- a/reproduction_effort/binning_aligned_process.py +++ b/reproduction_effort/binning_aligned_process.py @@ -49,8 +49,6 @@ test_overwrite_with_old_aligned: bool = True filename_data_binning_replace: str = "bin_old/Exp001_Trial001_Part001.mat" filename_data_aligned_replace: str = "aligned_old/Exp001_Trial001_Part001.mat" -remove_heartbeat: bool = True - data = torch.tensor(np.load(filename_raw).astype(np.float32), dtype=dtype) with open(filename_raw_json, "r") as file_handle: @@ -121,76 +119,71 @@ if test_overwrite_with_old_aligned: # -> -if remove_heartbeat: - mask: torch.Tensor = make_mask( - filename_mask=filename_mask, - camera_sequence=camera_sequence, - device=device, - dtype=dtype, - ) - mask_flatten = mask.flatten(start_dim=0, end_dim=-1) +mask: torch.Tensor = make_mask( + filename_mask=filename_mask, + camera_sequence=camera_sequence, + device=device, + dtype=dtype, +) - interpolate_along_time(camera_sequence) +mask_flatten = mask.flatten(start_dim=0, end_dim=-1) - heartbeat_ts: torch.Tensor = bandpass( - data=camera_sequence[channels.index("volume")].clone(), - device=camera_sequence[channels.index("volume")].device, +interpolate_along_time(camera_sequence) + +heartbeat_ts: torch.Tensor = bandpass( + data=camera_sequence[channels.index("volume")].clone(), + device=camera_sequence[channels.index("volume")].device, + low_frequency=lower_freqency_bandpass, + high_frequency=upper_freqency_bandpass, + fs=sample_frequency, + filtfilt_chuck_size=10, +) + +heartbeat_ts_copy = heartbeat_ts.clone() + +heartbeat_ts = heartbeat_ts.flatten(start_dim=0, end_dim=-2) +heartbeat_ts = heartbeat_ts[mask_flatten, :] + +heartbeat_ts = heartbeat_ts.movedim(0, -1) +heartbeat_ts -= heartbeat_ts.mean(dim=0, keepdim=True) + +volume_heartbeat, _, _ = torch.linalg.svd(heartbeat_ts, full_matrices=False) +volume_heartbeat = volume_heartbeat[:, 0] +volume_heartbeat -= volume_heartbeat[first_none_ramp_frame:].mean() +volume_heartbeat = volume_heartbeat.unsqueeze(0).unsqueeze(0) + +heartbeat_coefficients: list[torch.Tensor] = [] +for i in range(0, len(camera_sequence)): + y = bandpass( + data=camera_sequence[i].clone(), + device=camera_sequence[i].device, low_frequency=lower_freqency_bandpass, high_frequency=upper_freqency_bandpass, fs=sample_frequency, filtfilt_chuck_size=10, - ) + )[..., first_none_ramp_frame:] + y -= y.mean(dim=-1, keepdim=True) - heartbeat_ts_copy = heartbeat_ts.clone() - - heartbeat_ts = heartbeat_ts.flatten(start_dim=0, end_dim=-2) - heartbeat_ts = heartbeat_ts[mask_flatten, :] - - heartbeat_ts = heartbeat_ts.movedim(0, -1) - heartbeat_ts -= heartbeat_ts.mean(dim=0, keepdim=True) - - volume_heartbeat, _, _ = torch.linalg.svd(heartbeat_ts, full_matrices=False) - volume_heartbeat = volume_heartbeat[:, 0] - volume_heartbeat -= volume_heartbeat[first_none_ramp_frame:].mean() - volume_heartbeat = volume_heartbeat.unsqueeze(0).unsqueeze(0) - - heartbeat_coefficients: list[torch.Tensor] = [] - for i in range(0, len(camera_sequence)): - y = bandpass( - data=camera_sequence[i].clone(), - device=camera_sequence[i].device, - low_frequency=lower_freqency_bandpass, - high_frequency=upper_freqency_bandpass, - fs=sample_frequency, - filtfilt_chuck_size=10, - )[..., first_none_ramp_frame:] - y -= y.mean(dim=-1, keepdim=True) - - heartbeat_coefficients.append( - ( - (volume_heartbeat[..., first_none_ramp_frame:] * y).sum( - dim=-1, keepdim=True - ) - / (volume_heartbeat[..., first_none_ramp_frame:] ** 2).sum( - dim=-1, keepdim=True - ) + heartbeat_coefficients.append( + ( + (volume_heartbeat[..., first_none_ramp_frame:] * y).sum( + dim=-1, keepdim=True + ) + / (volume_heartbeat[..., first_none_ramp_frame:] ** 2).sum( + dim=-1, keepdim=True ) - * mask.unsqueeze(-1) ) - del y + * mask.unsqueeze(-1) + ) +del y - donor_power_factor = heartbeat_coefficients[channels.index("donor")].clone() - acceptor_power_factor = heartbeat_coefficients[channels.index("acceptor")].clone() - power_factors: None | list[torch.Tensor] = [ - donor_power_factor, - acceptor_power_factor, - ] +donor_power_factor = heartbeat_coefficients[channels.index("donor")].clone() +acceptor_power_factor = heartbeat_coefficients[channels.index("acceptor")].clone() - for i in range(0, len(camera_sequence)): - camera_sequence[i] -= heartbeat_coefficients[i] * volume_heartbeat -else: - power_factors = None + +for i in range(0, len(camera_sequence)): + camera_sequence[i] -= heartbeat_coefficients[i] * volume_heartbeat # <- data_acceptor, data_donor, mask = preprocessing( @@ -203,10 +196,8 @@ data_acceptor, data_donor, mask = preprocessing( temporal_width=temporal_width, target_camera=target_camera, regressor_cameras=regressor_cameras, - lower_frequency_heartbeat=lower_frequency_heartbeat, - upper_frequency_heartbeat=upper_frequency_heartbeat, - sample_frequency=sample_frequency, - power_factors=power_factors, + donor_correction_factor=donor_power_factor, + acceptor_correction_factor=acceptor_power_factor, ) ratio_sequence: torch.Tensor = data_acceptor / data_donor diff --git a/reproduction_effort/binning_aligned_process_classic.py b/reproduction_effort/binning_aligned_process_classic.py index f4f68eb..0c9037a 100644 --- a/reproduction_effort/binning_aligned_process_classic.py +++ b/reproduction_effort/binning_aligned_process_classic.py @@ -8,7 +8,7 @@ import scipy.io as sio # type: ignore from functions.binning import binning from functions.align_cameras import align_cameras -from functions.preprocessing import preprocessing +from functions.preprocessing_classic import preprocessing from functions.bandpass import bandpass