From bf0daa364c234793d327395cafd9438155002d58 Mon Sep 17 00:00:00 2001 From: David Rotermund <54365609+davrot@users.noreply.github.com> Date: Wed, 14 Feb 2024 22:15:53 +0100 Subject: [PATCH] Add files via upload --- .../functions/adjust_factor.py | 87 +++++++++++++++++++ .../functions/align_cameras.py | 53 +++-------- .../functions/heart_beat_frequency.py | 49 +++++++++++ reproduction_effort/functions/make_mask.py | 12 ++- .../functions/preprocessing.py | 62 ++++++++----- 5 files changed, 197 insertions(+), 66 deletions(-) create mode 100644 reproduction_effort/functions/adjust_factor.py create mode 100644 reproduction_effort/functions/heart_beat_frequency.py diff --git a/reproduction_effort/functions/adjust_factor.py b/reproduction_effort/functions/adjust_factor.py new file mode 100644 index 0000000..52c902f --- /dev/null +++ b/reproduction_effort/functions/adjust_factor.py @@ -0,0 +1,87 @@ +import torch +import math + + +def adjust_factor( + input_acceptor: torch.Tensor, + input_donor: torch.Tensor, + lower_frequency_heartbeat: float, + upper_frequency_heartbeat: float, + sample_frequency: float, + mask: torch.Tensor, +) -> tuple[float, float]: + + number_of_active_pixel: torch.Tensor = mask.type(dtype=torch.float32).sum() + signal_acceptor: torch.Tensor = (input_acceptor * mask.unsqueeze(-1)).sum( + dim=0 + ).sum(dim=0) / number_of_active_pixel + + signal_donor: torch.Tensor = (input_donor * mask.unsqueeze(-1)).sum(dim=0).sum( + dim=0 + ) / number_of_active_pixel + + signal_acceptor_offset = signal_acceptor.mean() + signal_donor_offset = signal_donor.mean() + + signal_acceptor = signal_acceptor - signal_acceptor_offset + signal_donor = signal_donor - signal_donor_offset + + blackman_window = torch.blackman_window( + window_length=signal_acceptor.shape[0], + periodic=True, + dtype=signal_acceptor.dtype, + device=signal_acceptor.device, + ) + + signal_acceptor *= blackman_window + signal_donor *= blackman_window + nfft: int = int(2 ** math.ceil(math.log2(signal_donor.shape[0]))) + nfft = max([256, nfft]) + + signal_acceptor_fft: torch.Tensor = torch.fft.rfft(signal_acceptor, n=nfft) + signal_donor_fft: torch.Tensor = torch.fft.rfft(signal_donor, n=nfft) + + frequency_axis: torch.Tensor = ( + torch.fft.rfftfreq(nfft, device=signal_acceptor_fft.device) * sample_frequency + ) + + signal_acceptor_power: torch.Tensor = torch.abs(signal_acceptor_fft) ** 2 + signal_acceptor_power[1:-1] *= 2 + + signal_donor_power: torch.Tensor = torch.abs(signal_donor_fft) ** 2 + signal_donor_power[1:-1] *= 2 + + if frequency_axis[-1] != (sample_frequency / 2.0): + signal_acceptor_power[-1] *= 2 + signal_donor_power[-1] *= 2 + + signal_acceptor_power /= blackman_window.sum() ** 2 + signal_donor_power /= blackman_window.sum() ** 2 + + idx = torch.where( + (frequency_axis >= lower_frequency_heartbeat) + * (frequency_axis <= upper_frequency_heartbeat) + )[0] + + frequency_axis = frequency_axis[idx] + signal_acceptor_power = signal_acceptor_power[idx] + signal_donor_power = signal_donor_power[idx] + + acceptor_range = signal_acceptor_power.max() - signal_acceptor_power.min() + + donor_range = signal_donor_power.max() - signal_donor_power.min() + + acceptor_correction_factor: float = float( + 0.5 + * ( + 1 + + (signal_acceptor_offset * torch.sqrt(donor_range)) + / (signal_donor_offset * torch.sqrt(acceptor_range)) + ) + ) + + donor_correction_factor: float = float( + acceptor_correction_factor / (2 * acceptor_correction_factor - 1) + ) + + return donor_correction_factor, acceptor_correction_factor diff --git a/reproduction_effort/functions/align_cameras.py b/reproduction_effort/functions/align_cameras.py index be9b696..97d52bb 100644 --- a/reproduction_effort/functions/align_cameras.py +++ b/reproduction_effort/functions/align_cameras.py @@ -1,8 +1,5 @@ import torch import torchvision as tv # type: ignore -import numpy as np -import json -import scipy.io as sio # type: ignore from functions.align_refref import align_refref from functions.perform_donor_volume_rotation import perform_donor_volume_rotation @@ -12,8 +9,9 @@ from functions.ImageAlignment import ImageAlignment @torch.no_grad() def align_cameras( - filename_raw_json: str, - filename_bin_mat: str, + channels: list[str], + data: torch.Tensor, + ref_image: torch.Tensor, device: torch.device, dtype: torch.dtype, batch_size: int, @@ -30,18 +28,6 @@ def align_cameras( ]: image_alignment = ImageAlignment(default_dtype=dtype, device=device) - # --- Load data --- - with open(filename_raw_json, "r") as file_handle: - metadata: dict = json.load(file_handle) - channels: list[str] = metadata["channelKey"] - - data = torch.tensor( - sio.loadmat(filename_bin_mat)["nparray"].astype(np.float32), - device=device, - dtype=dtype, - ) - # --==-- DONE --==-- - # --- Get reference image --- acceptor_index: int = channels.index("acceptor") donor_index: int = channels.index("donor") @@ -55,32 +41,25 @@ def align_cameras( donor = data[..., donor_index].moveaxis(-1, 0).clone() oxygenation = data[..., oxygenation_index].moveaxis(-1, 0).clone() volume = data[..., volume_index].moveaxis(-1, 0).clone() + + ref_image_acceptor = ref_image[..., acceptor_index].clone() + ref_image_donor = ref_image[..., donor_index].clone() + ref_image_oxygenation = ref_image[..., oxygenation_index].clone() + ref_image_volume = ref_image[..., volume_index].clone() del data # --==-- DONE --==-- # --- Calculate translation and rotation between the reference images --- angle_refref, tvec_refref, ref_image_acceptor, ref_image_donor = align_refref( - ref_image_acceptor=acceptor[ - acceptor.shape[0] // 2, - :, - :, - ], - ref_image_donor=donor[ - donor.shape[0] // 2, - :, - :, - ], + ref_image_acceptor=ref_image_acceptor, + ref_image_donor=ref_image_donor, image_alignment=image_alignment, batch_size=batch_size, fill_value=fill_value, ) ref_image_oxygenation = tv.transforms.functional.affine( - img=oxygenation[ - oxygenation.shape[0] // 2, - :, - :, - ].unsqueeze(0), + img=ref_image_oxygenation.unsqueeze(0), angle=-float(angle_refref), translate=[0, 0], scale=1.0, @@ -97,15 +76,7 @@ def align_cameras( shear=0, interpolation=tv.transforms.InterpolationMode.BILINEAR, fill=fill_value, - ) - - ref_image_oxygenation = ref_image_oxygenation.squeeze(0) - - ref_image_volume = volume[ - volume.shape[0] // 2, - :, - :, - ].clone() + ).squeeze(0) # --==-- DONE --==-- diff --git a/reproduction_effort/functions/heart_beat_frequency.py b/reproduction_effort/functions/heart_beat_frequency.py new file mode 100644 index 0000000..99b2985 --- /dev/null +++ b/reproduction_effort/functions/heart_beat_frequency.py @@ -0,0 +1,49 @@ +import torch + + +def heart_beat_frequency( + input: torch.Tensor, + lower_frequency_heartbeat: float, + upper_frequency_heartbeat: float, + sample_frequency: float, + mask: torch.Tensor, +) -> float: + + number_of_active_pixel: torch.Tensor = mask.type(dtype=torch.float32).sum() + signal: torch.Tensor = (input * mask.unsqueeze(-1)).sum(dim=0).sum( + dim=0 + ) / number_of_active_pixel + signal = signal - signal.mean() + + hamming_window = torch.hamming_window( + window_length=signal.shape[0], + periodic=True, + alpha=0.54, + beta=0.46, + dtype=signal.dtype, + device=signal.device, + ) + + signal *= hamming_window + + signal_fft: torch.Tensor = torch.fft.rfft(signal) + frequency_axis: torch.Tensor = ( + torch.fft.rfftfreq(signal.shape[0], device=input.device) * sample_frequency + ) + signal_power: torch.Tensor = torch.abs(signal_fft) ** 2 + signal_power[1:-1] *= 2 + + if frequency_axis[-1] != (sample_frequency / 2.0): + signal_power[-1] *= 2 + signal_power /= hamming_window.sum() ** 2 + + idx = torch.where( + (frequency_axis > lower_frequency_heartbeat) + * (frequency_axis < upper_frequency_heartbeat) + )[0] + frequency_axis = frequency_axis[idx] + signal_power = signal_power[idx] + + heart_rate = float(frequency_axis[torch.argmax(signal_power)]) + + return heart_rate diff --git a/reproduction_effort/functions/make_mask.py b/reproduction_effort/functions/make_mask.py index 0e6bdf4..3a57fac 100644 --- a/reproduction_effort/functions/make_mask.py +++ b/reproduction_effort/functions/make_mask.py @@ -5,7 +5,10 @@ import torch @torch.no_grad() def make_mask( - filename_mask: str, data: torch.Tensor, device: torch.device, dtype: torch.dtype + filename_mask: str, + camera_sequence: list[torch.Tensor], + device: torch.device, + dtype: torch.dtype, ) -> torch.Tensor: mask: torch.Tensor = torch.tensor( sio.loadmat(filename_mask)["maskInfo"]["maskIdx2D"][0][0], @@ -20,7 +23,10 @@ def make_mask( dtype=dtype, ) - if torch.any(data.flatten() >= limit): - mask = mask & ~(torch.any(torch.any(data >= limit, dim=-1), dim=-1)) + for id in range(0, len(camera_sequence)): + if torch.any(camera_sequence[id].flatten() >= limit): + mask = mask & ~(torch.any(camera_sequence[id] >= limit, dim=-1)) + if torch.any(camera_sequence[id].flatten() < 0): + mask = mask & ~(torch.any(camera_sequence[id] < 0, dim=-1)) return mask diff --git a/reproduction_effort/functions/preprocessing.py b/reproduction_effort/functions/preprocessing.py index 2fd12b4..7d953fc 100644 --- a/reproduction_effort/functions/preprocessing.py +++ b/reproduction_effort/functions/preprocessing.py @@ -1,10 +1,9 @@ -import scipy.io as sio # type: ignore import torch -import numpy as np -import json + from functions.make_mask import make_mask -from functions.convert_camera_sequenc_to_list import convert_camera_sequenc_to_list +from functions.heart_beat_frequency import heart_beat_frequency +from functions.adjust_factor import adjust_factor from functions.preprocess_camera_sequence import preprocess_camera_sequence from functions.interpolate_along_time import interpolate_along_time from functions.gauss_smear import gauss_smear @@ -13,8 +12,8 @@ from functions.regression import regression @torch.no_grad() def preprocessing( - filename_metadata: str, - filename_data: str, + cameras: list[str], + camera_sequence: list[torch.Tensor], filename_mask: str, device: torch.device, first_none_ramp_frame: int, @@ -22,29 +21,19 @@ def preprocessing( temporal_width: float, target_camera: list[str], regressor_cameras: list[str], + lower_frequency_heartbeat: float, + upper_frequency_heartbeat: float, + sample_frequency: float, dtype: torch.dtype = torch.float32, ) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]: - data: torch.Tensor = torch.tensor( - sio.loadmat(filename_data)["data"].astype(np.float32), + mask: torch.Tensor = make_mask( + filename_mask=filename_mask, + camera_sequence=camera_sequence, device=device, dtype=dtype, ) - with open(filename_metadata, "r") as file_handle: - metadata: dict = json.load(file_handle) - cameras: list[str] = metadata["channelKey"] - - required_order: list[str] = ["acceptor", "donor", "oxygenation", "volume"] - - mask: torch.Tensor = make_mask( - filename_mask=filename_mask, data=data, device=device, dtype=dtype - ) - - camera_sequence: list[torch.Tensor] = convert_camera_sequenc_to_list( - data=data, required_order=required_order, cameras=cameras - ) - for num_cams in range(len(camera_sequence)): camera_sequence[num_cams], mask = preprocess_camera_sequence( camera_sequence=camera_sequence[num_cams], @@ -61,6 +50,15 @@ def preprocessing( for id in range(0, len(camera_sequence)): camera_sequence_filtered.append(camera_sequence[id].clone()) + idx_volume: int = cameras.index("volume") + heart_rate: float = heart_beat_frequency( + input=camera_sequence_filtered[idx_volume], + lower_frequency_heartbeat=lower_frequency_heartbeat, + upper_frequency_heartbeat=upper_frequency_heartbeat, + sample_frequency=sample_frequency, + mask=mask, + ) + camera_sequence_filtered = gauss_smear( camera_sequence_filtered, mask.type(dtype=dtype), @@ -90,4 +88,24 @@ def preprocessing( ) results.append(output) + lower_frequency_heartbeat_selection: float = heart_rate - 3 + upper_frequency_heartbeat_selection: float = heart_rate + 3 + + donor_correction_factor, acceptor_correction_factor = adjust_factor( + input_acceptor=results[0], + input_donor=results[1], + lower_frequency_heartbeat=lower_frequency_heartbeat_selection, + upper_frequency_heartbeat=upper_frequency_heartbeat_selection, + sample_frequency=sample_frequency, + mask=mask, + ) + + results[0] = acceptor_correction_factor * ( + results[0] - results[0].mean(dim=-1, keepdim=True) + ) + results[0].mean(dim=-1, keepdim=True) + + results[1] = donor_correction_factor * ( + results[1] - results[1].mean(dim=-1, keepdim=True) + ) + results[1].mean(dim=-1, keepdim=True) + return results[0], results[1], mask