kk_contour_net_shallow/RenderStimuli/render.py

270 lines
8.3 KiB
Python
Raw Permalink Normal View History

2023-07-28 15:42:20 +02:00
# %%
import torch
import time
import scipy
import os
import matplotlib.pyplot as plt
import numpy as np
import contours
import glob
USE_CEXT_FROM_DAVID = True
if USE_CEXT_FROM_DAVID:
# from CPPExtensions.PyTCopyCPU import TCopyCPU
from CPPExtensions.PyTCopyCPU import TCopyCPU as render_stimulus_CPP
def render_gaborfield(posori, params, verbose=False):
scale_factor = params["scale_factor"]
n_source = params["n_source"]
n_change = params["n_change"]
n_phase = params["n_phase"]
# convert sizes to pixel units
lambda_PIX = params["lambda_gabor"] * scale_factor
sigma_PIX = params["sigma_gabor"] * scale_factor
r_gab_PIX = int(params["d_gabor"] * scale_factor / 2)
d_gab_PIX = r_gab_PIX * 2 + 1
# make filterbank
gabors = torch.zeros(
[n_phase, n_source, n_change, d_gab_PIX, d_gab_PIX], dtype=torch.float32
)
for i_phase in range(n_phase):
phase = (torch.pi * 2 * i_phase) / n_phase
gabors[i_phase], dirs_source, dirs_change = contours.gaborner_filterbank(
r_gab=r_gab_PIX,
n_source=n_source,
n_change=n_change,
lambdah=lambda_PIX,
sigma=sigma_PIX,
phase=phase,
normalize=params["normalize_gabor"],
torch_device="cpu",
)
gabors = gabors.reshape([n_phase * n_source * n_change, d_gab_PIX, d_gab_PIX])
n_contours = posori.shape[0]
# discretize ALL stimuli
if verbose:
print("Discretizing START!!!")
t_dis0 = time.perf_counter()
(
index_srcchg,
index_x,
index_y,
x_canvas,
y_canvas,
) = contours.discretize_stimuli(
posori=posori,
x_range=params["x_range"],
y_range=params["y_range"],
scale_factor=scale_factor,
r_gab_PIX=r_gab_PIX,
n_source=n_source,
n_change=n_change,
n_phase=n_phase,
torch_device="cpu",
)
t_dis1 = time.perf_counter()
if verbose:
print(f"Discretizing END, took {t_dis1-t_dis0} seconds.!!!")
if verbose:
print("Generation START!!!")
t0 = time.perf_counter()
if not USE_CEXT_FROM_DAVID:
if verbose:
print(" (using NUMPY...)")
output = torch.zeros(
(
n_contours,
y_canvas - d_gab_PIX + 1,
x_canvas - d_gab_PIX + 1,
),
device="cpu",
dtype=torch.float32,
)
kernels_cpu = gabors.detach().cpu()
for i_con in range(n_contours):
output[i_con] = contours.render_stimulus(
kernels=kernels_cpu,
index_element=index_srcchg[i_con],
index_y=index_y[i_con],
index_x=index_x[i_con],
y_canvas=y_canvas,
x_canvas=x_canvas,
torch_device="cpu",
)
output = torch.clip(output, -1, +1)
else:
if verbose:
print(" (using C++...)")
copyier = render_stimulus_CPP()
number_of_cpu_processes = os.cpu_count()
output_dav_tmp = torch.zeros(
(
n_contours,
y_canvas + 2 * r_gab_PIX,
x_canvas + 2 * r_gab_PIX,
),
device="cpu",
dtype=torch.float32,
)
# Umsort!
n_elements_total = 0
for i_con in range(n_contours):
n_elements_total += len(index_x[i_con])
sparse_matrix = torch.zeros(
(n_elements_total, 4), device="cpu", dtype=torch.int64
)
i_elements_total = 0
for i_con in range(n_contours):
n_add = len(index_x[i_con])
sparse_matrix[i_elements_total : i_elements_total + n_add, 0] = i_con
sparse_matrix[
i_elements_total : i_elements_total + n_add, 1
] = index_srcchg[i_con]
sparse_matrix[i_elements_total : i_elements_total + n_add, 2] = index_y[
i_con
]
sparse_matrix[i_elements_total : i_elements_total + n_add, 3] = index_x[
i_con
]
i_elements_total += n_add
assert i_elements_total == n_elements_total, "UNBEHAGEN macht sich breit!"
# output_dav_tmp.fill_(0.0)
copyier.process(
sparse_matrix.data_ptr(),
int(sparse_matrix.shape[0]),
int(sparse_matrix.shape[1]),
gabors.data_ptr(),
int(gabors.shape[0]),
int(gabors.shape[1]),
int(gabors.shape[2]),
output_dav_tmp.data_ptr(),
int(output_dav_tmp.shape[0]),
int(output_dav_tmp.shape[1]),
int(output_dav_tmp.shape[2]),
int(number_of_cpu_processes),
)
output = torch.clip(
output_dav_tmp[
:,
d_gab_PIX - 1 : -(d_gab_PIX - 1),
d_gab_PIX - 1 : -(d_gab_PIX - 1),
],
-1,
+1,
)
t1 = time.perf_counter()
if verbose:
print(f"Generating END, took {t1-t0} seconds.!!!")
if verbose:
print("Showing first and last stimulus generated...")
plt.imshow(output[0], cmap="gray", vmin=-1, vmax=+1)
plt.show()
plt.imshow(output[-1], cmap="gray", vmin=-1, vmax=+1)
plt.show()
print(f"Processed {n_contours} stimuli in {t1-t_dis0} seconds!")
return output
def render_gaborfield_frommatfiles(files, params, varname, varname_dist, altpath=None, verbose=False):
n_total = 0
n_files = len(files)
print(f"Going through {n_files} contour files...")
for i_file in range(n_files):
# get path, basename, suffix...
full = files[i_file]
path, file = os.path.split(full)
base, suffix = os.path.splitext(file)
# load file
mat = scipy.io.loadmat(full)
# posori = mat[varname]
if "dist" in full:
posori = mat[varname_dist]
else:
posori = mat[varname]
n_contours = posori.shape[0]
n_total += n_contours
print(f" ...file {file} contains {n_contours} contours.")
# process...
gaborfield = render_gaborfield(posori, params=params, verbose=verbose)
# save
if altpath:
savepath = altpath
else:
savepath = path
savefull = savepath + os.sep + base + "_RENDERED.npz"
print(f" ...saving under {savefull}...")
gaborfield = (torch.clip(gaborfield, -1, 1) * 127 + 128).type(torch.uint8)
np.savez_compressed(savefull, gaborfield=gaborfield)
return n_total
if __name__ == "__main__":
TESTMODE = "files" # "files" or "posori"
# cutout for stimuli, and gabor parameters
params = {
"x_range": [140, 940],
"y_range": [140, 940],
"scale_factor": 0.25, # scale to convert coordinates to pixel values
"d_gabor": 40,
"lambda_gabor": 16,
"sigma_gabor": 8,
"n_phase": 4,
"normalize_gabor": True,
# number of directions for dictionary
"n_source": 32,
"n_change": 32,
}
if TESTMODE == "files":
num = int(9)
path = f"/data_1/kk/StimulusGeneration/Alicorn/Coignless/Base0{num}0_n100"
files = glob.glob(path + os.sep + "*.mat")
t0 = time.perf_counter()
n_total = render_gaborfield_frommatfiles(
files=files, params=params, varname=f"Table_base_crn0{num}0", varname_dist=f"Table_base_crn0{num}0_dist", altpath="./Output100/Coignless" #intr crn base
)
t1 = time.perf_counter()
dt = t1 - t0
print(
f"Rendered {n_total} contours in {dt} secs, yielding {n_total/dt} contours/sec."
)
if TESTMODE == "posori":
print("Sample stimulus generation:")
print("===========================")
# load contours, multiplex coordinates to simulate a larger set of contours
n_multiplex = 500
mat = scipy.io.loadmat("z.mat")
posori = np.tile(mat["z"], (n_multiplex, 1))
n_contours = posori.shape[0]
print(f"Processing {n_contours} contour stimuli")
output = render_gaborfield(posori, params=params, verbose=True)
# output8 = (torch.clip(output, -1, 1) * 127 + 128).type(torch.uint8)
# np.savez_compressed("output8_compressed.npz", output8=output8)
# %%