Add files via upload

This commit is contained in:
David Rotermund 2023-07-30 23:52:42 +02:00 committed by GitHub
parent 40b794c1e9
commit 61ec9ceb5c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 3402 additions and 0 deletions

137
analyse_raw_data.py Normal file
View file

@ -0,0 +1,137 @@
import torch
from functions.Anime import Anime
from functions.DataContainer import DataContainer
import matplotlib.pyplot as plt
import numpy as np
import argh
import os
def main(
path: str = "/data_1/hendrik/2023-07-17/M_Sert_Cre_41/raw",
experiment_id: int = 1,
trial_id: int = 1,
use_svd: bool = True, # i.e. use SVD
mask_threshold: float | None = 0.05, # Between 0 and 1.0
show_example_timeseries: bool = True,
example_position_x: int = 280,
example_position_y: int = 440,
movie_play: bool = False,
movie_vmin_scale: float = 0.5,
movie_vmax_scale: float = 0.5,
movie_enable_mask: bool = False,
export_results: bool = True,
export_path: str = "Export",
):
if use_svd:
print("SVD mode")
else:
print("Classic mode")
if export_results:
os.makedirs(export_path, exist_ok=True)
initital_mask_name: str | None = None
initital_mask_update: bool = True
initital_mask_roi: bool = False # default: True
start_position: int = 0
start_position_coefficients: int = 100
svd_iterations: int = 1 # SVD iterations: Do not touch! Keep at 1
bin_size: int = 4
display_logging_messages: bool = False
save_logging_messages: bool = False
# Post data processing modifiations
gaussian_blur_kernel_size: int | None = 3
gaussian_blur_sigma: float = 1.0
bin_size_post: int | None = None
# ------------------------
example_position_x = example_position_x // bin_size
example_position_y = example_position_y // bin_size
if bin_size_post is not None:
example_position_x = example_position_x // bin_size_post
example_position_y = example_position_y // bin_size_post
torch_device: torch.device = torch.device(
"cuda:0" if torch.cuda.is_available() else "cpu"
)
af = DataContainer(
path=path,
device=torch_device,
display_logging_messages=display_logging_messages,
save_logging_messages=save_logging_messages,
)
result, mask = af.automatic_load(
experiment_id=experiment_id,
trial_id=trial_id,
start_position=start_position,
remove_heartbeat=use_svd, # i.e. use SVD
iterations=svd_iterations,
bin_size=bin_size,
initital_mask_name=initital_mask_name,
initital_mask_update=initital_mask_update,
initital_mask_roi=initital_mask_roi,
start_position_coefficients=start_position_coefficients,
gaussian_blur_kernel_size=gaussian_blur_kernel_size,
gaussian_blur_sigma=gaussian_blur_sigma,
bin_size_post=bin_size_post,
threshold=mask_threshold,
)
if show_example_timeseries:
plt.plot(result[:, example_position_x, example_position_y].cpu())
plt.show()
if export_results:
if use_svd:
np.save(
os.path.join(export_path, f"SVD_{experiment_id}_{trial_id}_data.npy"),
result.cpu().numpy(),
)
if mask is not None:
np.save(
os.path.join(
export_path, f"SVD_{experiment_id}_{trial_id}_mask.npy"
),
result.cpu().numpy(),
)
else:
np.save(
os.path.join(
export_path, f"Classic_{experiment_id}_{trial_id}_data.npy"
),
result.cpu().numpy(),
)
if mask is not None:
np.save(
os.path.join(
export_path, f"Classic_{experiment_id}_{trial_id}_mask.npy"
),
result.cpu().numpy(),
)
if movie_play:
ani = Anime()
if movie_enable_mask:
ani.show(
result - 1.0,
mask=mask,
vmin_scale=movie_vmin_scale,
vmax_scale=movie_vmax_scale,
)
else:
ani.show(
result - 1.0, vmin_scale=movie_vmin_scale, vmax_scale=movie_vmax_scale
)
if __name__ == "__main__":
argh.dispatch_command(main)

285
analyse_x_correlation.py Normal file
View file

@ -0,0 +1,285 @@
from functions.DataContainer import DataContainer
import torch
import matplotlib.pyplot as plt
import argh
import os
import numpy as np
@torch.no_grad()
def _calculate_cross_corelation(
a: torch.Tensor | None, b: torch.Tensor | None, data_shape: torch.Size
) -> torch.Tensor:
assert a is not None
assert b is not None
assert a.ndim == 3
assert b.ndim == 3
assert a.shape[0] == b.shape[0]
assert a.shape[1] == b.shape[1]
assert a.shape[2] == b.shape[2]
output = (
(
torch.fft.fftshift(
torch.fft.irfft(
a * b.conj(),
dim=0,
),
dim=0,
)
/ int(data_shape[0])
)
.mean(-1)
.mean(-1)
)
output = output[data_shape[0] // 2 : -data_shape[0] // 2]
return output
@torch.no_grad()
def _prepare_data(input: torch.Tensor) -> torch.Tensor:
input -= input.mean(dim=0, keepdim=True)
input /= input.std(dim=0, keepdim=True) + 1e-20
input = torch.cat(
(torch.zeros_like(input), input, torch.zeros_like(input)),
dim=0,
)
input = torch.fft.rfft(
input,
dim=0,
)
return input
@torch.no_grad()
def process_combinations(
path: str,
torch_device: torch.device,
remove_heartbeat: bool = True,
experiment_id: int = 1,
trial_id: int = 1,
remove_linear: bool = False,
) -> tuple[
torch.Tensor,
torch.Tensor,
torch.Tensor,
torch.Tensor,
torch.Tensor,
torch.Tensor,
torch.Tensor,
torch.Tensor,
torch.Tensor,
torch.Tensor,
torch.Tensor,
torch.Tensor,
]:
af = DataContainer(
path=path,
device=torch_device,
display_logging_messages=False,
save_logging_messages=False,
)
af.cleaned_load_data(
experiment_id=experiment_id,
trial_id=trial_id,
align=True,
iterations=1,
lowrank_method=True,
lowrank_q=6,
remove_heartbeat=remove_heartbeat,
remove_mean=False,
remove_linear=remove_linear,
remove_heartbeat_mean=False,
remove_heartbeat_linear=False,
bin_size=4,
do_frame_shift=True,
enable_secondary_data=True,
mmap_mode=True,
initital_mask=None,
start_position_coefficients=0,
)
assert af.acceptor is not None
assert af.donor is not None
assert af.oxygenation is not None
assert af.volume is not None
data_shape = af.acceptor.shape
af.acceptor = _prepare_data(af.acceptor)
af.donor = _prepare_data(af.donor)
af.oxygenation = _prepare_data(af.oxygenation)
af.volume = _prepare_data(af.volume)
x_aa = _calculate_cross_corelation(
a=af.acceptor, b=af.acceptor, data_shape=data_shape
)
time_axis = (
torch.arange(0, x_aa.shape[0], device=x_aa.device, dtype=x_aa.dtype)
- float(torch.argmax(x_aa))
) / 100.0
x_dd = _calculate_cross_corelation(a=af.donor, b=af.donor, data_shape=data_shape)
x_oo = _calculate_cross_corelation(
a=af.oxygenation, b=af.oxygenation, data_shape=data_shape
)
x_vv = _calculate_cross_corelation(a=af.volume, b=af.volume, data_shape=data_shape)
x_ad = _calculate_cross_corelation(a=af.acceptor, b=af.donor, data_shape=data_shape)
x_ao = _calculate_cross_corelation(
a=af.acceptor, b=af.oxygenation, data_shape=data_shape
)
x_av = _calculate_cross_corelation(
a=af.acceptor, b=af.volume, data_shape=data_shape
)
x_da = _calculate_cross_corelation(a=af.donor, b=af.acceptor, data_shape=data_shape)
x_do = _calculate_cross_corelation(
a=af.donor, b=af.oxygenation, data_shape=data_shape
)
x_dv = _calculate_cross_corelation(a=af.donor, b=af.volume, data_shape=data_shape)
x_vo = _calculate_cross_corelation(
a=af.volume, b=af.oxygenation, data_shape=data_shape
)
return (x_aa, time_axis, x_dd, x_oo, x_vv, x_ad, x_ao, x_av, x_da, x_do, x_dv, x_vo)
def make_a_x_correlation_plot(
x_aa: torch.Tensor,
time_axis: torch.Tensor,
x_dd: torch.Tensor,
x_oo: torch.Tensor,
x_vv: torch.Tensor,
x_ad: torch.Tensor,
x_ao: torch.Tensor,
x_av: torch.Tensor,
x_da: torch.Tensor,
x_do: torch.Tensor,
x_dv: torch.Tensor,
x_vo: torch.Tensor,
) -> None:
plt.subplot(2, 2, 1)
plt.plot(time_axis.cpu(), x_aa.cpu(), label="acceptor")
plt.plot(time_axis.cpu(), x_dd.cpu(), label="donor")
plt.plot(time_axis.cpu(), x_oo.cpu(), label="oxygenation")
plt.plot(time_axis.cpu(), x_vv.cpu(), label="volume")
plt.legend()
plt.ylabel("Auto-Correlation")
plt.xlabel("Tau [sec]")
plt.subplot(2, 2, 2)
plt.plot(time_axis.cpu(), x_ad.cpu(), label="donor")
plt.plot(time_axis.cpu(), x_ao.cpu(), label="oxygenation")
plt.plot(time_axis.cpu(), x_av.cpu(), label="volume")
plt.legend()
plt.ylabel("X-Correlation with acceptor")
plt.xlabel("Tau [sec]")
plt.subplot(2, 2, 3)
plt.plot(time_axis.cpu(), x_da.cpu(), label="acceptor")
plt.plot(time_axis.cpu(), x_do.cpu(), label="oxygenation")
plt.plot(time_axis.cpu(), x_dv.cpu(), label="volume")
plt.legend()
plt.ylabel("X-Correlation with donor")
plt.xlabel("Tau [sec]")
plt.subplot(2, 2, 4)
plt.plot(time_axis.cpu(), x_vo.cpu(), label="volume -> oxygenation")
plt.legend()
plt.ylabel("X-Correlation")
plt.xlabel("Tau [sec]")
plt.show()
@torch.no_grad()
def main(
path: str = "/data_1/hendrik/2023-07-17/M_Sert_Cre_41/raw",
use_svd: bool = True,
remove_linear_trend: bool = False,
experiment_id: int = 1,
trial_id: int = 1,
plot_results: bool = True,
export_results: bool = True,
export_path: str = "Export_Correlation",
) -> None:
if use_svd:
print("SVD mode")
else:
print("Classic mode")
if export_results:
os.makedirs(export_path, exist_ok=True)
torch_device: torch.device = torch.device(
"cuda:0" if torch.cuda.is_available() else "cpu"
)
(
x_aa,
time_axis,
x_dd,
x_oo,
x_vv,
x_ad,
x_ao,
x_av,
x_da,
x_do,
x_dv,
x_vo,
) = process_combinations(
path=path,
torch_device=torch_device,
experiment_id=experiment_id,
trial_id=trial_id,
remove_heartbeat=use_svd,
remove_linear=remove_linear_trend,
)
if export_results:
if use_svd:
np.savez(
os.path.join(export_path, f"SVD_{experiment_id}_{trial_id}_data.npz"),
time_axis=time_axis.cpu().numpy(),
x_aa=x_aa.cpu().numpy(),
x_dd=x_dd.cpu().numpy(),
x_oo=x_oo.cpu().numpy(),
x_vv=x_vv.cpu().numpy(),
x_ad=x_ad.cpu().numpy(),
x_ao=x_ao.cpu().numpy(),
x_av=x_av.cpu().numpy(),
x_da=x_da.cpu().numpy(),
x_do=x_do.cpu().numpy(),
x_dv=x_dv.cpu().numpy(),
x_vo=x_vo.cpu().numpy(),
)
else:
np.savez(
os.path.join(
export_path, f"Classic_{experiment_id}_{trial_id}_data.npz"
),
time_axis=time_axis.cpu().numpy(),
x_aa=x_aa.cpu().numpy(),
x_dd=x_dd.cpu().numpy(),
x_oo=x_oo.cpu().numpy(),
x_vv=x_vv.cpu().numpy(),
x_ad=x_ad.cpu().numpy(),
x_ao=x_ao.cpu().numpy(),
x_av=x_av.cpu().numpy(),
x_da=x_da.cpu().numpy(),
x_do=x_do.cpu().numpy(),
x_dv=x_dv.cpu().numpy(),
x_vo=x_vo.cpu().numpy(),
)
if plot_results:
make_a_x_correlation_plot(
x_aa, time_axis, x_dd, x_oo, x_vv, x_ad, x_ao, x_av, x_da, x_do, x_dv, x_vo
)
if __name__ == "__main__":
argh.dispatch_command(main)

View file

@ -0,0 +1,31 @@
import numpy as np
import matplotlib.pyplot as plt
import os
export_path: str = "Export"
experiment_id: int = 1
trial_id: int = 1
example_position_x: int = 280
example_position_y: int = 440
bin_size: int = 4
svd_1 = np.load(os.path.join(export_path, f"SVD_{experiment_id}_{trial_id}_data.npy"))
classic = np.load(
os.path.join(export_path, f"Classic_{experiment_id}_{trial_id}_data.npy")
)
example_position_x = example_position_x // bin_size
example_position_y = example_position_y // bin_size
dim = 2
plt.subplot(dim, 1, 1)
plt.plot(svd_1[:, example_position_x, example_position_y] - 1.0)
plt.title("SVD - 1.0")
plt.subplot(dim, 1, 2)
plt.plot(classic[:, example_position_x, example_position_y] - 1.0)
plt.title("Classic - 1.0")
plt.show()

90
functions/Anime.py Normal file
View file

@ -0,0 +1,90 @@
import numpy as np
import torch
import matplotlib.pyplot as plt
import matplotlib.animation
class Anime:
def __init__(self) -> None:
super().__init__()
def show(
self,
input: torch.Tensor | np.ndarray,
mask: torch.Tensor | np.ndarray | None = None,
vmin: float | None = None,
vmax: float | None = None,
cmap: str = "hot",
axis_off: bool = True,
show_frame_count: bool = True,
interval: int = 100,
repeat: bool = False,
colorbar: bool = True,
vmin_scale: float | None = None,
vmax_scale: float | None = None,
) -> None:
assert input.ndim == 3
if isinstance(input, torch.Tensor):
input_np: np.ndarray = input.cpu().numpy()
if mask is not None:
mask_np: np.ndarray | None = (mask == 0).cpu().numpy()
else:
mask_np = None
else:
input_np = input
if mask is not None:
mask_np = mask == 0 # type: ignore
else:
mask_np = None
if vmin is None:
vmin = float(np.where(np.isfinite(input_np), input_np, 0.0).min())
if vmax is None:
vmax = float(np.where(np.isfinite(input_np), input_np, 0.0).max())
if vmin_scale is not None:
vmin *= vmin_scale
if vmax_scale is not None:
vmax *= vmax_scale
fig = plt.figure()
image = np.nan_to_num(input_np[0, ...], copy=True, nan=0.0)
if mask_np is not None:
image[mask_np] = float("NaN")
image_handle = plt.imshow(
image,
cmap=cmap,
vmin=vmin,
vmax=vmax,
)
if colorbar:
plt.colorbar()
if axis_off:
plt.axis("off")
def next_frame(i: int) -> None:
image = np.nan_to_num(input_np[i, ...], copy=True, nan=0.0)
if mask_np is not None:
image[mask_np] = float("NaN")
image_handle.set_data(image)
if show_frame_count:
bar_length: int = 10
filled_length = int(round(bar_length * i / input_np.shape[0]))
bar = "\u25A0" * filled_length + "\u25A1" * (bar_length - filled_length)
plt.title(f"{bar} {i} of {int(input_np.shape[0]-1)}", loc="left")
return
_ = matplotlib.animation.FuncAnimation(
fig,
next_frame,
frames=int(input.shape[0]),
interval=interval,
repeat=repeat,
)
plt.show()

1849
functions/DataContainer.py Normal file

File diff suppressed because it is too large Load diff

1010
functions/ImageAlignment.py Normal file

File diff suppressed because it is too large Load diff