Add files via upload
This commit is contained in:
parent
82984f2210
commit
bf0daa364c
5 changed files with 197 additions and 66 deletions
87
reproduction_effort/functions/adjust_factor.py
Normal file
87
reproduction_effort/functions/adjust_factor.py
Normal file
|
@ -0,0 +1,87 @@
|
||||||
|
import torch
|
||||||
|
import math
|
||||||
|
|
||||||
|
|
||||||
|
def adjust_factor(
|
||||||
|
input_acceptor: torch.Tensor,
|
||||||
|
input_donor: torch.Tensor,
|
||||||
|
lower_frequency_heartbeat: float,
|
||||||
|
upper_frequency_heartbeat: float,
|
||||||
|
sample_frequency: float,
|
||||||
|
mask: torch.Tensor,
|
||||||
|
) -> tuple[float, float]:
|
||||||
|
|
||||||
|
number_of_active_pixel: torch.Tensor = mask.type(dtype=torch.float32).sum()
|
||||||
|
signal_acceptor: torch.Tensor = (input_acceptor * mask.unsqueeze(-1)).sum(
|
||||||
|
dim=0
|
||||||
|
).sum(dim=0) / number_of_active_pixel
|
||||||
|
|
||||||
|
signal_donor: torch.Tensor = (input_donor * mask.unsqueeze(-1)).sum(dim=0).sum(
|
||||||
|
dim=0
|
||||||
|
) / number_of_active_pixel
|
||||||
|
|
||||||
|
signal_acceptor_offset = signal_acceptor.mean()
|
||||||
|
signal_donor_offset = signal_donor.mean()
|
||||||
|
|
||||||
|
signal_acceptor = signal_acceptor - signal_acceptor_offset
|
||||||
|
signal_donor = signal_donor - signal_donor_offset
|
||||||
|
|
||||||
|
blackman_window = torch.blackman_window(
|
||||||
|
window_length=signal_acceptor.shape[0],
|
||||||
|
periodic=True,
|
||||||
|
dtype=signal_acceptor.dtype,
|
||||||
|
device=signal_acceptor.device,
|
||||||
|
)
|
||||||
|
|
||||||
|
signal_acceptor *= blackman_window
|
||||||
|
signal_donor *= blackman_window
|
||||||
|
nfft: int = int(2 ** math.ceil(math.log2(signal_donor.shape[0])))
|
||||||
|
nfft = max([256, nfft])
|
||||||
|
|
||||||
|
signal_acceptor_fft: torch.Tensor = torch.fft.rfft(signal_acceptor, n=nfft)
|
||||||
|
signal_donor_fft: torch.Tensor = torch.fft.rfft(signal_donor, n=nfft)
|
||||||
|
|
||||||
|
frequency_axis: torch.Tensor = (
|
||||||
|
torch.fft.rfftfreq(nfft, device=signal_acceptor_fft.device) * sample_frequency
|
||||||
|
)
|
||||||
|
|
||||||
|
signal_acceptor_power: torch.Tensor = torch.abs(signal_acceptor_fft) ** 2
|
||||||
|
signal_acceptor_power[1:-1] *= 2
|
||||||
|
|
||||||
|
signal_donor_power: torch.Tensor = torch.abs(signal_donor_fft) ** 2
|
||||||
|
signal_donor_power[1:-1] *= 2
|
||||||
|
|
||||||
|
if frequency_axis[-1] != (sample_frequency / 2.0):
|
||||||
|
signal_acceptor_power[-1] *= 2
|
||||||
|
signal_donor_power[-1] *= 2
|
||||||
|
|
||||||
|
signal_acceptor_power /= blackman_window.sum() ** 2
|
||||||
|
signal_donor_power /= blackman_window.sum() ** 2
|
||||||
|
|
||||||
|
idx = torch.where(
|
||||||
|
(frequency_axis >= lower_frequency_heartbeat)
|
||||||
|
* (frequency_axis <= upper_frequency_heartbeat)
|
||||||
|
)[0]
|
||||||
|
|
||||||
|
frequency_axis = frequency_axis[idx]
|
||||||
|
signal_acceptor_power = signal_acceptor_power[idx]
|
||||||
|
signal_donor_power = signal_donor_power[idx]
|
||||||
|
|
||||||
|
acceptor_range = signal_acceptor_power.max() - signal_acceptor_power.min()
|
||||||
|
|
||||||
|
donor_range = signal_donor_power.max() - signal_donor_power.min()
|
||||||
|
|
||||||
|
acceptor_correction_factor: float = float(
|
||||||
|
0.5
|
||||||
|
* (
|
||||||
|
1
|
||||||
|
+ (signal_acceptor_offset * torch.sqrt(donor_range))
|
||||||
|
/ (signal_donor_offset * torch.sqrt(acceptor_range))
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
donor_correction_factor: float = float(
|
||||||
|
acceptor_correction_factor / (2 * acceptor_correction_factor - 1)
|
||||||
|
)
|
||||||
|
|
||||||
|
return donor_correction_factor, acceptor_correction_factor
|
|
@ -1,8 +1,5 @@
|
||||||
import torch
|
import torch
|
||||||
import torchvision as tv # type: ignore
|
import torchvision as tv # type: ignore
|
||||||
import numpy as np
|
|
||||||
import json
|
|
||||||
import scipy.io as sio # type: ignore
|
|
||||||
|
|
||||||
from functions.align_refref import align_refref
|
from functions.align_refref import align_refref
|
||||||
from functions.perform_donor_volume_rotation import perform_donor_volume_rotation
|
from functions.perform_donor_volume_rotation import perform_donor_volume_rotation
|
||||||
|
@ -12,8 +9,9 @@ from functions.ImageAlignment import ImageAlignment
|
||||||
|
|
||||||
@torch.no_grad()
|
@torch.no_grad()
|
||||||
def align_cameras(
|
def align_cameras(
|
||||||
filename_raw_json: str,
|
channels: list[str],
|
||||||
filename_bin_mat: str,
|
data: torch.Tensor,
|
||||||
|
ref_image: torch.Tensor,
|
||||||
device: torch.device,
|
device: torch.device,
|
||||||
dtype: torch.dtype,
|
dtype: torch.dtype,
|
||||||
batch_size: int,
|
batch_size: int,
|
||||||
|
@ -30,18 +28,6 @@ def align_cameras(
|
||||||
]:
|
]:
|
||||||
image_alignment = ImageAlignment(default_dtype=dtype, device=device)
|
image_alignment = ImageAlignment(default_dtype=dtype, device=device)
|
||||||
|
|
||||||
# --- Load data ---
|
|
||||||
with open(filename_raw_json, "r") as file_handle:
|
|
||||||
metadata: dict = json.load(file_handle)
|
|
||||||
channels: list[str] = metadata["channelKey"]
|
|
||||||
|
|
||||||
data = torch.tensor(
|
|
||||||
sio.loadmat(filename_bin_mat)["nparray"].astype(np.float32),
|
|
||||||
device=device,
|
|
||||||
dtype=dtype,
|
|
||||||
)
|
|
||||||
# --==-- DONE --==--
|
|
||||||
|
|
||||||
# --- Get reference image ---
|
# --- Get reference image ---
|
||||||
acceptor_index: int = channels.index("acceptor")
|
acceptor_index: int = channels.index("acceptor")
|
||||||
donor_index: int = channels.index("donor")
|
donor_index: int = channels.index("donor")
|
||||||
|
@ -55,32 +41,25 @@ def align_cameras(
|
||||||
donor = data[..., donor_index].moveaxis(-1, 0).clone()
|
donor = data[..., donor_index].moveaxis(-1, 0).clone()
|
||||||
oxygenation = data[..., oxygenation_index].moveaxis(-1, 0).clone()
|
oxygenation = data[..., oxygenation_index].moveaxis(-1, 0).clone()
|
||||||
volume = data[..., volume_index].moveaxis(-1, 0).clone()
|
volume = data[..., volume_index].moveaxis(-1, 0).clone()
|
||||||
|
|
||||||
|
ref_image_acceptor = ref_image[..., acceptor_index].clone()
|
||||||
|
ref_image_donor = ref_image[..., donor_index].clone()
|
||||||
|
ref_image_oxygenation = ref_image[..., oxygenation_index].clone()
|
||||||
|
ref_image_volume = ref_image[..., volume_index].clone()
|
||||||
del data
|
del data
|
||||||
# --==-- DONE --==--
|
# --==-- DONE --==--
|
||||||
|
|
||||||
# --- Calculate translation and rotation between the reference images ---
|
# --- Calculate translation and rotation between the reference images ---
|
||||||
angle_refref, tvec_refref, ref_image_acceptor, ref_image_donor = align_refref(
|
angle_refref, tvec_refref, ref_image_acceptor, ref_image_donor = align_refref(
|
||||||
ref_image_acceptor=acceptor[
|
ref_image_acceptor=ref_image_acceptor,
|
||||||
acceptor.shape[0] // 2,
|
ref_image_donor=ref_image_donor,
|
||||||
:,
|
|
||||||
:,
|
|
||||||
],
|
|
||||||
ref_image_donor=donor[
|
|
||||||
donor.shape[0] // 2,
|
|
||||||
:,
|
|
||||||
:,
|
|
||||||
],
|
|
||||||
image_alignment=image_alignment,
|
image_alignment=image_alignment,
|
||||||
batch_size=batch_size,
|
batch_size=batch_size,
|
||||||
fill_value=fill_value,
|
fill_value=fill_value,
|
||||||
)
|
)
|
||||||
|
|
||||||
ref_image_oxygenation = tv.transforms.functional.affine(
|
ref_image_oxygenation = tv.transforms.functional.affine(
|
||||||
img=oxygenation[
|
img=ref_image_oxygenation.unsqueeze(0),
|
||||||
oxygenation.shape[0] // 2,
|
|
||||||
:,
|
|
||||||
:,
|
|
||||||
].unsqueeze(0),
|
|
||||||
angle=-float(angle_refref),
|
angle=-float(angle_refref),
|
||||||
translate=[0, 0],
|
translate=[0, 0],
|
||||||
scale=1.0,
|
scale=1.0,
|
||||||
|
@ -97,15 +76,7 @@ def align_cameras(
|
||||||
shear=0,
|
shear=0,
|
||||||
interpolation=tv.transforms.InterpolationMode.BILINEAR,
|
interpolation=tv.transforms.InterpolationMode.BILINEAR,
|
||||||
fill=fill_value,
|
fill=fill_value,
|
||||||
)
|
).squeeze(0)
|
||||||
|
|
||||||
ref_image_oxygenation = ref_image_oxygenation.squeeze(0)
|
|
||||||
|
|
||||||
ref_image_volume = volume[
|
|
||||||
volume.shape[0] // 2,
|
|
||||||
:,
|
|
||||||
:,
|
|
||||||
].clone()
|
|
||||||
|
|
||||||
# --==-- DONE --==--
|
# --==-- DONE --==--
|
||||||
|
|
||||||
|
|
49
reproduction_effort/functions/heart_beat_frequency.py
Normal file
49
reproduction_effort/functions/heart_beat_frequency.py
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
import torch
|
||||||
|
|
||||||
|
|
||||||
|
def heart_beat_frequency(
|
||||||
|
input: torch.Tensor,
|
||||||
|
lower_frequency_heartbeat: float,
|
||||||
|
upper_frequency_heartbeat: float,
|
||||||
|
sample_frequency: float,
|
||||||
|
mask: torch.Tensor,
|
||||||
|
) -> float:
|
||||||
|
|
||||||
|
number_of_active_pixel: torch.Tensor = mask.type(dtype=torch.float32).sum()
|
||||||
|
signal: torch.Tensor = (input * mask.unsqueeze(-1)).sum(dim=0).sum(
|
||||||
|
dim=0
|
||||||
|
) / number_of_active_pixel
|
||||||
|
signal = signal - signal.mean()
|
||||||
|
|
||||||
|
hamming_window = torch.hamming_window(
|
||||||
|
window_length=signal.shape[0],
|
||||||
|
periodic=True,
|
||||||
|
alpha=0.54,
|
||||||
|
beta=0.46,
|
||||||
|
dtype=signal.dtype,
|
||||||
|
device=signal.device,
|
||||||
|
)
|
||||||
|
|
||||||
|
signal *= hamming_window
|
||||||
|
|
||||||
|
signal_fft: torch.Tensor = torch.fft.rfft(signal)
|
||||||
|
frequency_axis: torch.Tensor = (
|
||||||
|
torch.fft.rfftfreq(signal.shape[0], device=input.device) * sample_frequency
|
||||||
|
)
|
||||||
|
signal_power: torch.Tensor = torch.abs(signal_fft) ** 2
|
||||||
|
signal_power[1:-1] *= 2
|
||||||
|
|
||||||
|
if frequency_axis[-1] != (sample_frequency / 2.0):
|
||||||
|
signal_power[-1] *= 2
|
||||||
|
signal_power /= hamming_window.sum() ** 2
|
||||||
|
|
||||||
|
idx = torch.where(
|
||||||
|
(frequency_axis > lower_frequency_heartbeat)
|
||||||
|
* (frequency_axis < upper_frequency_heartbeat)
|
||||||
|
)[0]
|
||||||
|
frequency_axis = frequency_axis[idx]
|
||||||
|
signal_power = signal_power[idx]
|
||||||
|
|
||||||
|
heart_rate = float(frequency_axis[torch.argmax(signal_power)])
|
||||||
|
|
||||||
|
return heart_rate
|
|
@ -5,7 +5,10 @@ import torch
|
||||||
|
|
||||||
@torch.no_grad()
|
@torch.no_grad()
|
||||||
def make_mask(
|
def make_mask(
|
||||||
filename_mask: str, data: torch.Tensor, device: torch.device, dtype: torch.dtype
|
filename_mask: str,
|
||||||
|
camera_sequence: list[torch.Tensor],
|
||||||
|
device: torch.device,
|
||||||
|
dtype: torch.dtype,
|
||||||
) -> torch.Tensor:
|
) -> torch.Tensor:
|
||||||
mask: torch.Tensor = torch.tensor(
|
mask: torch.Tensor = torch.tensor(
|
||||||
sio.loadmat(filename_mask)["maskInfo"]["maskIdx2D"][0][0],
|
sio.loadmat(filename_mask)["maskInfo"]["maskIdx2D"][0][0],
|
||||||
|
@ -20,7 +23,10 @@ def make_mask(
|
||||||
dtype=dtype,
|
dtype=dtype,
|
||||||
)
|
)
|
||||||
|
|
||||||
if torch.any(data.flatten() >= limit):
|
for id in range(0, len(camera_sequence)):
|
||||||
mask = mask & ~(torch.any(torch.any(data >= limit, dim=-1), dim=-1))
|
if torch.any(camera_sequence[id].flatten() >= limit):
|
||||||
|
mask = mask & ~(torch.any(camera_sequence[id] >= limit, dim=-1))
|
||||||
|
if torch.any(camera_sequence[id].flatten() < 0):
|
||||||
|
mask = mask & ~(torch.any(camera_sequence[id] < 0, dim=-1))
|
||||||
|
|
||||||
return mask
|
return mask
|
||||||
|
|
|
@ -1,10 +1,9 @@
|
||||||
import scipy.io as sio # type: ignore
|
|
||||||
import torch
|
import torch
|
||||||
import numpy as np
|
|
||||||
import json
|
|
||||||
|
|
||||||
from functions.make_mask import make_mask
|
from functions.make_mask import make_mask
|
||||||
from functions.convert_camera_sequenc_to_list import convert_camera_sequenc_to_list
|
from functions.heart_beat_frequency import heart_beat_frequency
|
||||||
|
from functions.adjust_factor import adjust_factor
|
||||||
from functions.preprocess_camera_sequence import preprocess_camera_sequence
|
from functions.preprocess_camera_sequence import preprocess_camera_sequence
|
||||||
from functions.interpolate_along_time import interpolate_along_time
|
from functions.interpolate_along_time import interpolate_along_time
|
||||||
from functions.gauss_smear import gauss_smear
|
from functions.gauss_smear import gauss_smear
|
||||||
|
@ -13,8 +12,8 @@ from functions.regression import regression
|
||||||
|
|
||||||
@torch.no_grad()
|
@torch.no_grad()
|
||||||
def preprocessing(
|
def preprocessing(
|
||||||
filename_metadata: str,
|
cameras: list[str],
|
||||||
filename_data: str,
|
camera_sequence: list[torch.Tensor],
|
||||||
filename_mask: str,
|
filename_mask: str,
|
||||||
device: torch.device,
|
device: torch.device,
|
||||||
first_none_ramp_frame: int,
|
first_none_ramp_frame: int,
|
||||||
|
@ -22,29 +21,19 @@ def preprocessing(
|
||||||
temporal_width: float,
|
temporal_width: float,
|
||||||
target_camera: list[str],
|
target_camera: list[str],
|
||||||
regressor_cameras: list[str],
|
regressor_cameras: list[str],
|
||||||
|
lower_frequency_heartbeat: float,
|
||||||
|
upper_frequency_heartbeat: float,
|
||||||
|
sample_frequency: float,
|
||||||
dtype: torch.dtype = torch.float32,
|
dtype: torch.dtype = torch.float32,
|
||||||
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
|
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
|
||||||
|
|
||||||
data: torch.Tensor = torch.tensor(
|
mask: torch.Tensor = make_mask(
|
||||||
sio.loadmat(filename_data)["data"].astype(np.float32),
|
filename_mask=filename_mask,
|
||||||
|
camera_sequence=camera_sequence,
|
||||||
device=device,
|
device=device,
|
||||||
dtype=dtype,
|
dtype=dtype,
|
||||||
)
|
)
|
||||||
|
|
||||||
with open(filename_metadata, "r") as file_handle:
|
|
||||||
metadata: dict = json.load(file_handle)
|
|
||||||
cameras: list[str] = metadata["channelKey"]
|
|
||||||
|
|
||||||
required_order: list[str] = ["acceptor", "donor", "oxygenation", "volume"]
|
|
||||||
|
|
||||||
mask: torch.Tensor = make_mask(
|
|
||||||
filename_mask=filename_mask, data=data, device=device, dtype=dtype
|
|
||||||
)
|
|
||||||
|
|
||||||
camera_sequence: list[torch.Tensor] = convert_camera_sequenc_to_list(
|
|
||||||
data=data, required_order=required_order, cameras=cameras
|
|
||||||
)
|
|
||||||
|
|
||||||
for num_cams in range(len(camera_sequence)):
|
for num_cams in range(len(camera_sequence)):
|
||||||
camera_sequence[num_cams], mask = preprocess_camera_sequence(
|
camera_sequence[num_cams], mask = preprocess_camera_sequence(
|
||||||
camera_sequence=camera_sequence[num_cams],
|
camera_sequence=camera_sequence[num_cams],
|
||||||
|
@ -61,6 +50,15 @@ def preprocessing(
|
||||||
for id in range(0, len(camera_sequence)):
|
for id in range(0, len(camera_sequence)):
|
||||||
camera_sequence_filtered.append(camera_sequence[id].clone())
|
camera_sequence_filtered.append(camera_sequence[id].clone())
|
||||||
|
|
||||||
|
idx_volume: int = cameras.index("volume")
|
||||||
|
heart_rate: float = heart_beat_frequency(
|
||||||
|
input=camera_sequence_filtered[idx_volume],
|
||||||
|
lower_frequency_heartbeat=lower_frequency_heartbeat,
|
||||||
|
upper_frequency_heartbeat=upper_frequency_heartbeat,
|
||||||
|
sample_frequency=sample_frequency,
|
||||||
|
mask=mask,
|
||||||
|
)
|
||||||
|
|
||||||
camera_sequence_filtered = gauss_smear(
|
camera_sequence_filtered = gauss_smear(
|
||||||
camera_sequence_filtered,
|
camera_sequence_filtered,
|
||||||
mask.type(dtype=dtype),
|
mask.type(dtype=dtype),
|
||||||
|
@ -90,4 +88,24 @@ def preprocessing(
|
||||||
)
|
)
|
||||||
results.append(output)
|
results.append(output)
|
||||||
|
|
||||||
|
lower_frequency_heartbeat_selection: float = heart_rate - 3
|
||||||
|
upper_frequency_heartbeat_selection: float = heart_rate + 3
|
||||||
|
|
||||||
|
donor_correction_factor, acceptor_correction_factor = adjust_factor(
|
||||||
|
input_acceptor=results[0],
|
||||||
|
input_donor=results[1],
|
||||||
|
lower_frequency_heartbeat=lower_frequency_heartbeat_selection,
|
||||||
|
upper_frequency_heartbeat=upper_frequency_heartbeat_selection,
|
||||||
|
sample_frequency=sample_frequency,
|
||||||
|
mask=mask,
|
||||||
|
)
|
||||||
|
|
||||||
|
results[0] = acceptor_correction_factor * (
|
||||||
|
results[0] - results[0].mean(dim=-1, keepdim=True)
|
||||||
|
) + results[0].mean(dim=-1, keepdim=True)
|
||||||
|
|
||||||
|
results[1] = donor_correction_factor * (
|
||||||
|
results[1] - results[1].mean(dim=-1, keepdim=True)
|
||||||
|
) + results[1].mean(dim=-1, keepdim=True)
|
||||||
|
|
||||||
return results[0], results[1], mask
|
return results[0], results[1], mask
|
||||||
|
|
Loading…
Reference in a new issue