Upload files to "/"

This commit is contained in:
David Rotermund 2025-04-08 15:20:17 +02:00
commit a7566b6820
22 changed files with 3709 additions and 0 deletions

13
L1NormLayer.py Normal file
View file

@ -0,0 +1,13 @@
import torch
class L1NormLayer(torch.nn.Module):
epsilon: float
def __init__(self, epsilon: float = 10e-20) -> None:
super().__init__()
self.epsilon = epsilon
def forward(self, input: torch.Tensor) -> torch.Tensor:
return input / (input.sum(dim=1, keepdim=True) + self.epsilon)

237
NNMF2d.py Normal file
View file

@ -0,0 +1,237 @@
import torch
class NNMF2d(torch.nn.Module):
in_channels: int
out_channels: int
weight: torch.Tensor
iterations: int
epsilon: float | None
init_min: float
init_max: float
local_learning: bool
local_learning_kl: bool
def __init__(
self,
in_channels: int,
out_channels: int,
device=None,
dtype=None,
iterations: int = 20,
epsilon: float | None = None,
init_min: float = 0.0,
init_max: float = 1.0,
local_learning: bool = False,
local_learning_kl: bool = False,
) -> None:
factory_kwargs = {"device": device, "dtype": dtype}
super().__init__()
self.init_min = init_min
self.init_max = init_max
self.in_channels = in_channels
self.out_channels = out_channels
self.iterations = iterations
self.local_learning = local_learning
self.local_learning_kl = local_learning_kl
self.weight = torch.nn.parameter.Parameter(
torch.empty((out_channels, in_channels), **factory_kwargs)
)
self.reset_parameters()
self.functional_nnmf2d = FunctionalNNMF2d.apply
self.epsilon = epsilon
def extra_repr(self) -> str:
s: str = f"{self.in_channels}, {self.out_channels}"
if self.epsilon is not None:
s += f", epsilon={self.epsilon}"
s += f", local_learning={self.local_learning}"
if self.local_learning:
s += f", local_learning_kl={self.local_learning_kl}"
return s
def reset_parameters(self) -> None:
torch.nn.init.uniform_(self.weight, a=self.init_min, b=self.init_max)
def forward(self, input: torch.Tensor) -> torch.Tensor:
positive_weights = torch.abs(self.weight)
positive_weights = positive_weights / (
positive_weights.sum(dim=1, keepdim=True) + 10e-20
)
h_dyn = self.functional_nnmf2d(
input,
positive_weights,
self.out_channels,
self.iterations,
self.epsilon,
self.local_learning,
self.local_learning_kl,
)
return h_dyn
class FunctionalNNMF2d(torch.autograd.Function):
@staticmethod
def forward( # type: ignore
ctx,
input: torch.Tensor,
weight: torch.Tensor,
out_channels: int,
iterations: int,
epsilon: float | None,
local_learning: bool,
local_learning_kl: bool,
) -> torch.Tensor:
# Prepare h
h = torch.full(
(input.shape[0], out_channels, input.shape[-2], input.shape[-1]),
1.0 / float(out_channels),
device=input.device,
dtype=input.dtype,
)
h = h.movedim(1, -1)
input = input.movedim(1, -1)
for _ in range(0, iterations):
reconstruction = torch.nn.functional.linear(h, weight.T)
reconstruction += 1e-20
if epsilon is None:
h *= torch.nn.functional.linear((input / reconstruction), weight)
else:
h *= 1 + epsilon * torch.nn.functional.linear(
(input / reconstruction), weight
)
h /= h.sum(-1, keepdim=True) + 10e-20
h = h.movedim(-1, 1)
input = input.movedim(-1, 1)
# ###########################################################
# Save the necessary data for the backward pass
# ###########################################################
ctx.save_for_backward(input, weight, h)
ctx.local_learning = local_learning
ctx.local_learning_kl = local_learning_kl
assert torch.isfinite(h).all()
return h
@staticmethod
@torch.autograd.function.once_differentiable
def backward(ctx, grad_output: torch.Tensor) -> tuple[ # type: ignore
torch.Tensor,
torch.Tensor | None,
None,
None,
None,
None,
None,
]:
# ##############################################
# Default values
# ##############################################
grad_weight: torch.Tensor | None = None
# ##############################################
# Get the variables back
# ##############################################
(input, weight, h) = ctx.saved_tensors
# The back prop gradient
h = h.movedim(1, -1)
grad_output = grad_output.movedim(1, -1)
input = input.movedim(1, -1)
big_r = torch.nn.functional.linear(h, weight.T)
big_r_div = 1.0 / (big_r + 1e-20)
factor_x_div_r = input * big_r_div
grad_input: torch.Tensor = (
torch.nn.functional.linear(h * grad_output, weight.T) * big_r_div
)
del big_r_div
# The weight gradient
if ctx.local_learning is False:
del big_r
grad_weight = -torch.nn.functional.linear(
h.reshape(
grad_input.shape[0] * grad_input.shape[1] * grad_input.shape[2],
h.shape[3],
).T,
(factor_x_div_r * grad_input)
.reshape(
grad_input.shape[0] * grad_input.shape[1] * grad_input.shape[2],
grad_input.shape[3],
)
.T,
)
grad_weight += torch.nn.functional.linear(
(h * grad_output)
.reshape(
grad_input.shape[0] * grad_input.shape[1] * grad_input.shape[2],
h.shape[3],
)
.T,
factor_x_div_r.reshape(
grad_input.shape[0] * grad_input.shape[1] * grad_input.shape[2],
grad_input.shape[3],
).T,
)
else:
if ctx.local_learning_kl:
grad_weight = -torch.nn.functional.linear(
h.reshape(
grad_input.shape[0] * grad_input.shape[1] * grad_input.shape[2],
h.shape[3],
).T,
factor_x_div_r.reshape(
grad_input.shape[0] * grad_input.shape[1] * grad_input.shape[2],
grad_input.shape[3],
).T,
)
else:
grad_weight = -torch.nn.functional.linear(
h.reshape(
grad_input.shape[0] * grad_input.shape[1] * grad_input.shape[2],
h.shape[3],
).T,
(2 * (input - big_r))
.reshape(
grad_input.shape[0] * grad_input.shape[1] * grad_input.shape[2],
grad_input.shape[3],
)
.T,
)
grad_input = grad_input.movedim(-1, 1)
assert torch.isfinite(grad_input).all()
assert torch.isfinite(grad_weight).all()
return (
grad_input,
grad_weight,
None,
None,
None,
None,
None,
)

510
___HDynamicLayer.py Normal file
View file

@ -0,0 +1,510 @@
import torch
from network.PyHDynamicCNNCPU import HDynamicCNNCPU
from network.PyHDynamicCNNGPU import HDynamicCNNGPU
global_sbs_gpu_setting: list[torch.Tensor] = []
global_sbs_size: list[torch.Tensor] = []
global_sbs_hdynamic_cpp: list[HDynamicCNNCPU | HDynamicCNNGPU] = []
class HDynamicLayer(torch.nn.Module):
_sbs_gpu_setting_position: int
_sbs_hdynamic_cpp_position: int
_gpu_tuning_factor: int
_number_of_cpu_processes: int
_output_size: list[int]
_w_trainable: bool
_output_layer: bool
_local_learning: bool
device: torch.device
default_dtype: torch.dtype
_force_forward_h_dynamic_on_cpu: bool
def __init__(
self,
output_size: list[int],
output_layer: bool = False,
local_learning: bool = False,
number_of_cpu_processes: int = 1,
w_trainable: bool = False,
skip_gradient_calculation: bool = False,
device: torch.device | None = None,
default_dtype: torch.dtype | None = None,
gpu_tuning_factor: int = 5,
force_forward_h_dynamic_on_cpu: bool = False,
) -> None:
super().__init__()
assert device is not None
self.device = device
self.default_dtype = default_dtype
self._gpu_tuning_factor = int(gpu_tuning_factor)
self._number_of_cpu_processes = int(number_of_cpu_processes)
self._w_trainable = bool(w_trainable)
self._skip_gradient_calculation = bool(skip_gradient_calculation)
self._output_size = output_size
self._output_layer = bool(output_layer)
self._local_learning = bool(local_learning)
self._force_forward_h_dynamic_on_cpu = force_forward_h_dynamic_on_cpu
global_sbs_gpu_setting.append(torch.tensor([0]))
global_sbs_size.append(torch.tensor([0, 0, 0, 0]))
if (device == torch.device("cpu")) or (
self._force_forward_h_dynamic_on_cpu is True
):
global_sbs_hdynamic_cpp.append(HDynamicCNNCPU())
else:
global_sbs_hdynamic_cpp.append(HDynamicCNNGPU())
self._sbs_gpu_setting_position = len(global_sbs_gpu_setting) - 1
self._sbs_hdynamic_cpp_position = len(global_sbs_hdynamic_cpp) - 1
self.functional_sbs = FunctionalSbS.apply
####################################################################
# Forward #
####################################################################
def forward(
self,
input: torch.Tensor,
spike: torch.Tensor,
epsilon_xy: torch.Tensor,
epsilon_t_0: torch.Tensor,
weights: torch.Tensor,
h_initial: torch.Tensor,
last_grad_scale: torch.Tensor,
labels: torch.Tensor | None = None,
keep_last_grad_scale: bool = False,
disable_scale_grade: bool = True,
forgetting_offset: float = -1.0,
) -> torch.Tensor:
if labels is None:
labels_copy: torch.Tensor = torch.tensor(
[], dtype=torch.int64, device=self.device
)
else:
labels_copy = (
labels.detach().clone().type(dtype=torch.int64).to(device=self.device)
)
if (spike.shape[-2] * spike.shape[-1]) > self._gpu_tuning_factor:
gpu_tuning_factor = self._gpu_tuning_factor
else:
gpu_tuning_factor = 0
parameter_list = torch.tensor(
[
int(self._number_of_cpu_processes), # 0
int(self._output_size[0]), # 1
int(self._output_size[1]), # 2
int(gpu_tuning_factor), # 3
int(self._sbs_gpu_setting_position), # 4
int(self._sbs_hdynamic_cpp_position), # 5
int(self._w_trainable), # 6
int(disable_scale_grade), # 7
int(keep_last_grad_scale), # 8
int(self._skip_gradient_calculation), # 9
int(self._output_layer), # 10
int(self._local_learning), # 11
],
dtype=torch.int64,
)
# SbS forward functional
return self.functional_sbs(
input,
spike,
epsilon_xy,
epsilon_t_0,
weights,
h_initial,
parameter_list,
last_grad_scale,
torch.tensor(
forgetting_offset, device=self.device, dtype=self.default_dtype
),
labels_copy,
)
class FunctionalSbS(torch.autograd.Function):
@staticmethod
def forward( # type: ignore
ctx,
input: torch.Tensor,
spikes: torch.Tensor,
epsilon_xy: torch.Tensor | None,
epsilon_t_0: torch.Tensor,
weights: torch.Tensor,
h_initial: torch.Tensor,
parameter_list: torch.Tensor,
grad_output_scale: torch.Tensor,
forgetting_offset: torch.Tensor,
labels: torch.Tensor,
) -> torch.Tensor:
number_of_spikes: int = int(spikes.shape[1])
output_size_0: int = int(parameter_list[1])
output_size_1: int = int(parameter_list[2])
gpu_tuning_factor: int = int(parameter_list[3])
sbs_gpu_setting_position = int(parameter_list[4])
sbs_hdynamic_cpp_position = int(parameter_list[5])
if (
isinstance(
global_sbs_hdynamic_cpp[sbs_hdynamic_cpp_position], HDynamicCNNCPU
)
is True
):
are_we_on_a_cpu: bool = True
work_device: torch.device = torch.device("cpu")
else:
are_we_on_a_cpu = False
work_device = input.device
target_device: torch.device = input.device
if target_device == work_device:
data_is_on_the_same_device: bool = True
else:
data_is_on_the_same_device = False
if are_we_on_a_cpu is True:
hdyn_number_of_cpu_processes: int = int(parameter_list[0])
else:
hdyn_number_of_cpu_processes = -1
# ###########################################################
# H dynamic
# ###########################################################
assert epsilon_t_0.ndim == 1
assert epsilon_t_0.shape[0] >= number_of_spikes
# ############################################
# Make space for the results
# ############################################
output_work: torch.Tensor = torch.empty(
(
int(input.shape[0]),
int(weights.shape[1]),
output_size_0,
output_size_1,
),
dtype=input.dtype,
device=work_device,
)
assert output_work.is_contiguous() is True
if epsilon_xy is not None:
assert epsilon_xy.is_contiguous() is True
assert epsilon_xy.ndim == 3
if data_is_on_the_same_device is False:
epsilon_xy_work = epsilon_xy.to(work_device)
else:
epsilon_xy_work = epsilon_xy
else:
epsilon_xy_work = None
assert epsilon_t_0.is_contiguous() is True
if data_is_on_the_same_device is False:
epsilon_t_0_work = epsilon_t_0.to(work_device)
else:
epsilon_t_0_work = epsilon_t_0
assert weights.is_contiguous() is True
if data_is_on_the_same_device is False:
weights_work = weights.to(work_device)
else:
weights_work = weights
assert spikes.is_contiguous() is True
if data_is_on_the_same_device is False:
spikes_work = spikes.to(work_device)
else:
spikes_work = spikes
assert h_initial.is_contiguous() is True
if data_is_on_the_same_device is False:
h_initial_work = h_initial.to(work_device)
else:
h_initial_work = h_initial
assert weights.ndim == 2
assert h_initial.ndim == 1
sbs_profile = global_sbs_gpu_setting[sbs_gpu_setting_position].clone()
sbs_size = global_sbs_size[sbs_gpu_setting_position].clone()
if are_we_on_a_cpu is False:
if (
(sbs_profile.numel() == 1)
or (sbs_size[0] != int(output_work.shape[0]))
or (sbs_size[1] != int(output_work.shape[1]))
or (sbs_size[2] != int(output_work.shape[2]))
or (sbs_size[3] != int(output_work.shape[3]))
):
sbs_profile = torch.zeros(
(14, 7), dtype=torch.int64, device=torch.device("cpu")
)
global_sbs_hdynamic_cpp[sbs_hdynamic_cpp_position].gpu_occupancy_export(
int(output_work.shape[2]),
int(output_work.shape[3]),
int(output_work.shape[0]),
int(output_work.shape[1]),
sbs_profile.data_ptr(),
int(sbs_profile.shape[0]),
int(sbs_profile.shape[1]),
)
global_sbs_gpu_setting[sbs_gpu_setting_position] = sbs_profile.clone()
sbs_size[0] = int(output_work.shape[0])
sbs_size[1] = int(output_work.shape[1])
sbs_size[2] = int(output_work.shape[2])
sbs_size[3] = int(output_work.shape[3])
global_sbs_size[sbs_gpu_setting_position] = sbs_size.clone()
else:
global_sbs_hdynamic_cpp[sbs_hdynamic_cpp_position].gpu_occupancy_import(
sbs_profile.data_ptr(),
int(sbs_profile.shape[0]),
int(sbs_profile.shape[1]),
)
global_sbs_hdynamic_cpp[sbs_hdynamic_cpp_position].update(
output_work.data_ptr(),
int(output_work.shape[0]),
int(output_work.shape[1]),
int(output_work.shape[2]),
int(output_work.shape[3]),
epsilon_xy_work.data_ptr() if epsilon_xy_work is not None else int(0),
int(epsilon_xy_work.shape[0]) if epsilon_xy_work is not None else int(0),
int(epsilon_xy_work.shape[1]) if epsilon_xy_work is not None else int(0),
int(epsilon_xy_work.shape[2]) if epsilon_xy_work is not None else int(0),
epsilon_t_0_work.data_ptr(),
int(epsilon_t_0_work.shape[0]),
weights_work.data_ptr(),
int(weights_work.shape[0]),
int(weights_work.shape[1]),
spikes_work.data_ptr(),
int(spikes_work.shape[0]),
int(spikes_work.shape[1]),
int(spikes_work.shape[2]),
int(spikes_work.shape[3]),
h_initial_work.data_ptr(),
int(h_initial_work.shape[0]),
hdyn_number_of_cpu_processes,
float(forgetting_offset.cpu().item()),
int(gpu_tuning_factor),
)
if data_is_on_the_same_device is False:
output = output_work.to(target_device)
else:
output = output_work
# print(output)
# print(output.sum(dim=1))
# print(output.sum(dim=1).shape)
# exit()
# ###########################################################
# Save the necessary data for the backward pass
# ###########################################################
ctx.save_for_backward(
input,
weights,
output,
parameter_list,
grad_output_scale,
labels,
)
return output
@staticmethod
def backward(ctx, grad_output):
# ##############################################
# Get the variables back
# ##############################################
(
input,
weights,
output,
parameter_list,
last_grad_scale,
labels,
) = ctx.saved_tensors
assert labels.numel() > 0
# ##############################################
# Default output
# ##############################################
grad_input = None
grad_spikes = None
grad_eps_xy = None
grad_epsilon_t_0 = None
grad_weights = None
grad_h_initial = None
grad_parameter_list = None
grad_forgetting_offset = None
grad_labels = None
# ##############################################
# Parameters
# ##############################################
parameter_w_trainable: bool = bool(parameter_list[6])
parameter_disable_scale_grade: bool = bool(parameter_list[7])
parameter_keep_last_grad_scale: bool = bool(parameter_list[8])
parameter_skip_gradient_calculation: bool = bool(parameter_list[9])
parameter_output_layer: bool = bool(parameter_list[10])
parameter_local_learning: bool = bool(parameter_list[11])
# ##############################################
# Dealing with overall scale of the gradient
# ##############################################
if parameter_disable_scale_grade is False:
if parameter_keep_last_grad_scale is True:
last_grad_scale = torch.tensor(
[torch.abs(grad_output).max(), last_grad_scale]
).max()
grad_output /= last_grad_scale
grad_output_scale = last_grad_scale.clone()
input /= input.sum(dim=1, keepdim=True, dtype=weights.dtype) + 1e-20
# #################################################
# User doesn't want us to calculate the gradients
# #################################################
if parameter_skip_gradient_calculation is True:
return (
grad_input,
grad_spikes,
grad_eps_xy,
grad_epsilon_t_0,
grad_weights,
grad_h_initial,
grad_parameter_list,
grad_output_scale,
grad_forgetting_offset,
grad_labels,
)
# #################################################
# Calculate backprop error (grad_input)
# #################################################
backprop_r: torch.Tensor = weights.unsqueeze(0).unsqueeze(-1).unsqueeze(
-1
) * output.unsqueeze(1)
backprop_bigr: torch.Tensor = backprop_r.sum(dim=2)
backprop_z: torch.Tensor = backprop_r * (
1.0 / (backprop_bigr + 1e-20)
).unsqueeze(2)
grad_input: torch.Tensor = (backprop_z * grad_output.unsqueeze(1)).sum(2)
del backprop_z
# #################################################
# Calculate weight gradient (grad_weights)
# #################################################
if parameter_w_trainable is False:
# #################################################
# We don't train this weight
# #################################################
grad_weights = None
elif (parameter_output_layer is False) and (parameter_local_learning is True):
# #################################################
# Local learning
# #################################################
grad_weights = (
(-2 * (input - backprop_bigr).unsqueeze(2) * output.unsqueeze(1))
.sum(0)
.sum(-1)
.sum(-1)
)
elif (parameter_output_layer is True) and (parameter_local_learning is True):
target_one_hot: torch.Tensor = torch.zeros(
(
labels.shape[0],
output.shape[1],
),
device=input.device,
dtype=input.dtype,
)
target_one_hot.scatter_(
1,
labels.to(input.device).unsqueeze(1),
torch.ones(
(labels.shape[0], 1),
device=input.device,
dtype=input.dtype,
),
)
target_one_hot = target_one_hot.unsqueeze(-1).unsqueeze(-1)
# (-2 * (input - backprop_bigr).unsqueeze(2) * (target_one_hot-output).unsqueeze(1))
# (-2 * input.unsqueeze(2) * (target_one_hot-output).unsqueeze(1))
grad_weights = (
(
-2
* (input - backprop_bigr).unsqueeze(2)
* target_one_hot.unsqueeze(1)
)
.sum(0)
.sum(-1)
.sum(-1)
)
else:
# #################################################
# Backprop
# #################################################
backprop_f: torch.Tensor = output.unsqueeze(1) * (
input / (backprop_bigr**2 + 1e-20)
).unsqueeze(2)
result_omega: torch.Tensor = backprop_bigr.unsqueeze(
2
) * grad_output.unsqueeze(1)
result_omega -= (backprop_r * grad_output.unsqueeze(1)).sum(2).unsqueeze(2)
result_omega *= backprop_f
del backprop_f
grad_weights = result_omega.sum(0).sum(-1).sum(-1)
del result_omega
del backprop_bigr
del backprop_r
return (
grad_input,
grad_spikes,
grad_eps_xy,
grad_epsilon_t_0,
grad_weights,
grad_h_initial,
grad_parameter_list,
grad_output_scale,
grad_forgetting_offset,
grad_labels,
)

252
___SpikeLayer.py Normal file
View file

@ -0,0 +1,252 @@
import torch
from network.PySpikeGenerationCPU import SpikeGenerationCPU
from network.PySpikeGenerationGPU import SpikeGenerationGPU
global_spike_generation_gpu_setting: list[torch.Tensor] = []
global_spike_size: list[torch.Tensor] = []
global_spike_generation_cpp: list[SpikeGenerationCPU | SpikeGenerationGPU] = []
class SpikeLayer(torch.nn.Module):
_spike_generation_cpp_position: int
_spike_generation_gpu_setting_position: int
_number_of_cpu_processes: int
_number_of_spikes: int
device: torch.device
_force_forward_spike_on_cpu: bool
_force_forward_spike_output_on_cpu: bool
def __init__(
self,
number_of_spikes: int = -1,
number_of_cpu_processes: int = 1,
device: torch.device | None = None,
force_forward_spike_on_cpu: bool = False,
force_forward_spike_output_on_cpu: bool = False,
) -> None:
super().__init__()
assert device is not None
self.device = device
self._number_of_cpu_processes = number_of_cpu_processes
self._number_of_spikes = number_of_spikes
self._force_forward_spike_on_cpu = force_forward_spike_on_cpu
self._force_forward_spike_output_on_cpu = force_forward_spike_output_on_cpu
global_spike_generation_gpu_setting.append(torch.tensor([0]))
global_spike_size.append(torch.tensor([0, 0, 0, 0]))
if (device == torch.device("cpu")) or (
self._force_forward_spike_on_cpu is True
):
global_spike_generation_cpp.append(SpikeGenerationCPU())
else:
global_spike_generation_cpp.append(SpikeGenerationGPU())
self._spike_generation_cpp_position = len(global_spike_generation_cpp) - 1
self._spike_generation_gpu_setting_position = (
len(global_spike_generation_gpu_setting) - 1
)
self.functional_spike_generation = FunctionalSpikeGeneration.apply
####################################################################
# Forward #
####################################################################
def forward(
self,
input: torch.Tensor,
number_of_spikes: int | None = None,
) -> torch.Tensor:
if number_of_spikes is None:
number_of_spikes = self._number_of_spikes
assert number_of_spikes > 0
parameter_list = torch.tensor(
[
int(self._number_of_cpu_processes), # 0
int(self._spike_generation_cpp_position), # 1
int(self._spike_generation_gpu_setting_position), # 2
int(number_of_spikes), # 3
int(self._force_forward_spike_output_on_cpu), # 4
],
dtype=torch.int64,
)
return self.functional_spike_generation(input, parameter_list)
class FunctionalSpikeGeneration(torch.autograd.Function):
@staticmethod
def forward( # type: ignore
ctx,
input: torch.Tensor,
parameter_list: torch.Tensor,
) -> torch.Tensor:
assert input.dim() == 4
spike_generation_cpp_position = int(parameter_list[1])
spike_generation_gpu_setting_position = int(parameter_list[2])
number_of_spikes: int = int(parameter_list[3])
force_forward_spike_output_on_cpu: bool = bool(parameter_list[4])
if (
isinstance(
global_spike_generation_cpp[spike_generation_cpp_position],
SpikeGenerationCPU,
)
is True
):
are_we_on_a_cpu: bool = True
work_device: torch.device = torch.device("cpu")
else:
are_we_on_a_cpu = False
work_device = input.device
target_device: torch.device = input.device
if target_device == work_device:
data_is_on_the_same_device: bool = True
else:
data_is_on_the_same_device = False
if are_we_on_a_cpu is True:
spike_number_of_cpu_processes: int = int(parameter_list[0])
else:
spike_number_of_cpu_processes = -1
# ###########################################################
# Spike generation
# ###########################################################
# ############################################
# Normalized cumsum
# (beware of the pytorch bug! Thus .clone()!)
# ############################################
if data_is_on_the_same_device is False:
input_work = input.to(work_device)
else:
input_work = input
# input_work = input
input_cumsum: torch.Tensor = torch.cumsum(input_work, dim=1, dtype=input.dtype)
input_cumsum_last: torch.Tensor = input_cumsum[:, -1, :, :].unsqueeze(1).clone()
input_cumsum /= input_cumsum_last
# ############################################
# Get the required random numbers
# ############################################
random_values = torch.rand(
size=[
input_cumsum.shape[0],
number_of_spikes,
input_cumsum.shape[2],
input_cumsum.shape[3],
],
dtype=input.dtype,
device=work_device,
)
# ############################################
# Make space for the results
# ############################################
spikes_work = torch.empty_like(
random_values, dtype=torch.int64, device=work_device
)
assert input_cumsum.is_contiguous() is True
assert random_values.is_contiguous() is True
assert spikes_work.is_contiguous() is True
# time_start: float = time.perf_counter()
spike_generation_profile = global_spike_generation_gpu_setting[
spike_generation_gpu_setting_position
].clone()
spike_generation_size = global_spike_size[
spike_generation_gpu_setting_position
].clone()
if are_we_on_a_cpu is False:
if (
(spike_generation_profile.numel() == 1)
or (spike_generation_size[0] != int(spikes_work.shape[0]))
or (spike_generation_size[1] != int(spikes_work.shape[1]))
or (spike_generation_size[2] != int(spikes_work.shape[2]))
or (spike_generation_size[3] != int(spikes_work.shape[3]))
):
spike_generation_profile = torch.zeros(
(1, 7), dtype=torch.int64, device=torch.device("cpu")
)
global_spike_generation_cpp[
spike_generation_cpp_position
].gpu_occupancy_export(
int(spikes_work.shape[2]),
int(spikes_work.shape[3]),
int(spikes_work.shape[0]),
int(spikes_work.shape[1]),
spike_generation_profile.data_ptr(),
int(spike_generation_profile.shape[0]),
int(spike_generation_profile.shape[1]),
)
global_spike_generation_gpu_setting[
spike_generation_gpu_setting_position
] = spike_generation_profile.clone()
spike_generation_size[0] = int(spikes_work.shape[0])
spike_generation_size[1] = int(spikes_work.shape[1])
spike_generation_size[2] = int(spikes_work.shape[2])
spike_generation_size[3] = int(spikes_work.shape[3])
global_spike_size[
spike_generation_gpu_setting_position
] = spike_generation_size.clone()
else:
global_spike_generation_cpp[
spike_generation_cpp_position
].gpu_occupancy_import(
spike_generation_profile.data_ptr(),
int(spike_generation_profile.shape[0]),
int(spike_generation_profile.shape[1]),
)
global_spike_generation_cpp[spike_generation_cpp_position].spike_generation(
input_cumsum.data_ptr(),
int(input_cumsum.shape[0]),
int(input_cumsum.shape[1]),
int(input_cumsum.shape[2]),
int(input_cumsum.shape[3]),
random_values.data_ptr(),
int(random_values.shape[0]),
int(random_values.shape[1]),
int(random_values.shape[2]),
int(random_values.shape[3]),
spikes_work.data_ptr(),
int(spikes_work.shape[0]),
int(spikes_work.shape[1]),
int(spikes_work.shape[2]),
int(spikes_work.shape[3]),
int(spike_number_of_cpu_processes),
)
if (force_forward_spike_output_on_cpu is True) and (are_we_on_a_cpu is True):
spikes = spikes_work
elif data_is_on_the_same_device is False:
spikes = spikes_work.to(target_device)
else:
spikes = spikes_work
return spikes
@staticmethod
def backward(ctx, grad_output):
grad_input = grad_output
grad_parameter_list = None
return (grad_input, grad_parameter_list)

292
append_block.py Normal file
View file

@ -0,0 +1,292 @@
import torch
from tools.L1NormLayer import L1NormLayer
from tools.NNMF2d import NNMF2d
from tools.append_parameter import append_parameter
def append_block(
network: torch.nn.Sequential,
number_of_neurons_a: int,
number_of_neurons_b: int,
test_image: torch.Tensor,
parameter_neuron_a: list[torch.nn.parameter.Parameter],
parameter_neuron_b: list[torch.nn.parameter.Parameter],
parameter_batchnorm2d: list[torch.nn.parameter.Parameter],
device: torch.device,
dilation: tuple[int, int] | int = 1,
padding: tuple[int, int] | int = 0,
stride: tuple[int, int] | int = 1,
kernel_size: tuple[int, int] = (5, 5),
epsilon: float | None = None,
iterations: int = 20,
local_learning: bool = False,
local_learning_kl: bool = False,
momentum: float = 0.1,
track_running_stats: bool = False,
type_of_neuron_a: int = 0,
type_of_neuron_b: int = 0,
batch_norm_neuron_a: bool = True,
batch_norm_neuron_b: bool = True,
bias_norm_neuron_a: bool = False,
bias_norm_neuron_b: bool = True,
) -> torch.Tensor:
assert (type_of_neuron_a > 0) or (type_of_neuron_b > 0)
if number_of_neurons_b <= 0:
number_of_neurons_b = number_of_neurons_a
if number_of_neurons_a <= 0:
number_of_neurons_a = number_of_neurons_b
assert (type_of_neuron_a == 1) or (type_of_neuron_a == 2)
assert (
(type_of_neuron_b == 0)
or (type_of_neuron_b == 1)
or (type_of_neuron_b == 2)
or (type_of_neuron_b == 3)
)
kernel_size_internal: list[int] = [kernel_size[-2], kernel_size[-1]]
if kernel_size[0] < 1:
kernel_size_internal[0] = test_image.shape[-2]
if kernel_size[1] < 1:
kernel_size_internal[1] = test_image.shape[-1]
network.append(torch.nn.ReLU())
test_image = network[-1](test_image)
# I need the output size
mock_output = (
torch.nn.functional.conv2d(
torch.zeros(
1,
1,
test_image.shape[2],
test_image.shape[3],
),
torch.zeros((1, 1, kernel_size_internal[0], kernel_size_internal[1])),
stride=stride,
padding=padding,
dilation=dilation,
)
.squeeze(0)
.squeeze(0)
)
network.append(
torch.nn.Unfold(
kernel_size=(kernel_size_internal[-2], kernel_size_internal[-1]),
dilation=dilation,
padding=padding,
stride=stride,
)
)
test_image = network[-1](test_image)
network.append(
torch.nn.Fold(
output_size=mock_output.shape,
kernel_size=(1, 1),
dilation=1,
padding=0,
stride=1,
)
)
test_image = network[-1](test_image)
network.append(L1NormLayer())
test_image = network[-1](test_image)
if type_of_neuron_a == 1:
network.append(
NNMF2d(
in_channels=test_image.shape[1],
out_channels=number_of_neurons_a,
epsilon=epsilon,
iterations=iterations,
local_learning=local_learning,
local_learning_kl=local_learning_kl,
).to(device)
)
test_image = network[-1](test_image)
append_parameter(module=network[-1], parameter_list=parameter_neuron_a)
elif type_of_neuron_a == 2:
network.append(
torch.nn.Conv2d(
in_channels=test_image.shape[1],
out_channels=number_of_neurons_a,
kernel_size=(1, 1),
bias=bias_norm_neuron_a,
).to(device)
)
test_image = network[-1](test_image)
append_parameter(module=network[-1], parameter_list=parameter_neuron_a)
else:
assert (type_of_neuron_a == 1) or (type_of_neuron_a == 2)
if batch_norm_neuron_a:
if (test_image.shape[-1] > 1) or (test_image.shape[-2] > 1):
network.append(
torch.nn.BatchNorm2d(
num_features=test_image.shape[1],
momentum=momentum,
track_running_stats=track_running_stats,
device=device,
)
)
test_image = network[-1](test_image)
append_parameter(module=network[-1], parameter_list=parameter_batchnorm2d)
if type_of_neuron_b == 0:
pass
elif type_of_neuron_b == 1:
network.append(torch.nn.ReLU())
test_image = network[-1](test_image)
network.append(L1NormLayer())
test_image = network[-1](test_image)
network.append(
NNMF2d(
in_channels=test_image.shape[1],
out_channels=number_of_neurons_b,
epsilon=epsilon,
iterations=iterations,
local_learning=local_learning,
local_learning_kl=local_learning_kl,
).to(device)
)
# Init the cnn top layers 1x1 conv2d layers
for name, param in network[-1].named_parameters():
with torch.no_grad():
print(param.shape)
if name == "weight":
if number_of_neurons_a >= param.shape[0]:
param.data[: param.shape[0], : param.shape[0]] = torch.eye(
param.shape[0], dtype=param.dtype, device=param.device
)
param.data[param.shape[0] :, :] = 0
param.data[:, param.shape[0] :] = 0
param.data += 1.0 / 10000.0
test_image = network[-1](test_image)
append_parameter(module=network[-1], parameter_list=parameter_neuron_b)
elif type_of_neuron_b == 2:
network.append(torch.nn.ReLU())
test_image = network[-1](test_image)
network.append(L1NormLayer())
test_image = network[-1](test_image)
network.append(
torch.nn.Conv2d(
in_channels=test_image.shape[1],
out_channels=number_of_neurons_b,
kernel_size=(1, 1),
stride=(1, 1),
padding=(0, 0),
bias=bias_norm_neuron_b,
device=device,
)
)
# Init the cnn top layers 1x1 conv2d layers
for name, param in network[-1].named_parameters():
with torch.no_grad():
if name == "bias":
param.data *= 0
param.data += (torch.rand_like(param) - 0.5) / 10000.0
if name == "weight":
if number_of_neurons_b >= param.shape[0]:
assert param.shape[-2] == 1
assert param.shape[-1] == 1
param.data[: param.shape[0], : param.shape[0], 0, 0] = (
torch.eye(
param.shape[0], dtype=param.dtype, device=param.device
)
)
param.data[param.shape[0] :, :, 0, 0] = 0
param.data[:, param.shape[0] :, 0, 0] = 0
param.data += (torch.rand_like(param) - 0.5) / 10000.0
test_image = network[-1](test_image)
append_parameter(module=network[-1], parameter_list=parameter_neuron_b)
elif type_of_neuron_b == 3: # W positive
# import torch.nn.utils.parametrize as P
network.append(torch.nn.ReLU())
test_image = network[-1](test_image)
network.append(L1NormLayer())
test_image = network[-1](test_image)
network.append(
torch.nn.Conv2d(
in_channels=test_image.shape[1],
out_channels=number_of_neurons_b,
kernel_size=(1, 1),
stride=(1, 1),
padding=(0, 0),
bias=bias_norm_neuron_b,
device=device,
)
)
# Init the cnn top layers 1x1 conv2d layers
for name, param in network[-1].named_parameters():
with torch.no_grad():
if name == "bias":
param.data *= 0
param.data += (torch.rand_like(param) - 0.5) / 10000.0
if name == "weight":
if number_of_neurons_b >= param.shape[0]:
assert param.shape[-2] == 1
assert param.shape[-1] == 1
param.data[: param.shape[0], : param.shape[0], 0, 0] = (
torch.eye(
param.shape[0], dtype=param.dtype, device=param.device
)
)
param.data[param.shape[0] :, :, 0, 0] = 0
param.data[:, param.shape[0] :, 0, 0] = 0
param.data += (torch.rand_like(param) - 0.5) / 10000.0
param.data = torch.nn.Parameter(torch.abs(param.data))
# class positive_weight(torch.nn.Module):
# def forward(self, x):
# return torch.abs(x)
# class step_weight(torch.nn.Module):
# def forward(self, x: torch.Tensor) -> torch.Tensor:
# "step function"
#
# beta: float = 100.0
# return 0.5 + 0.5 * torch.tanh(beta * x)
# # return torch.where(x > 0, torch.ones_like(x), torch.zeros_like(x))
# P.register_parametrization(network[-1], "weight", step_weight())
test_image = network[-1](test_image)
append_parameter(module=network[-1], parameter_list=parameter_neuron_b)
else:
raise ValueError("Unknown type of neuron")
if (test_image.shape[-1] > 1) or (test_image.shape[-2] > 1):
if (batch_norm_neuron_b) and (type_of_neuron_b > 0):
network.append(
torch.nn.BatchNorm2d(
num_features=test_image.shape[1],
device=device,
momentum=momentum,
track_running_stats=track_running_stats,
)
)
test_image = network[-1](test_image)
append_parameter(module=network[-1], parameter_list=parameter_batchnorm2d)
return test_image

8
append_parameter.py Normal file
View file

@ -0,0 +1,8 @@
import torch
def append_parameter(
module: torch.nn.Module, parameter_list: list[torch.nn.parameter.Parameter]
):
for netp in module.parameters():
parameter_list.append(netp)

31
data_loader.py Normal file
View file

@ -0,0 +1,31 @@
import torch
def data_loader(
pattern: torch.Tensor,
labels: torch.Tensor,
worker_init_fn,
generator,
batch_size: int = 128,
shuffle: bool = True,
torch_device: torch.device = torch.device("cpu"),
) -> torch.utils.data.dataloader.DataLoader:
assert pattern.ndim >= 3
pattern_storage: torch.Tensor = pattern.to(torch_device).type(torch.float32)
if pattern_storage.ndim == 3:
pattern_storage = pattern_storage.unsqueeze(1)
pattern_storage /= pattern_storage.max()
label_storage: torch.Tensor = labels.to(torch_device).type(torch.int64)
dataloader = torch.utils.data.DataLoader(
torch.utils.data.TensorDataset(pattern_storage, label_storage),
batch_size=batch_size,
shuffle=shuffle,
worker_init_fn=worker_init_fn,
generator=generator,
)
return dataloader

163
get_the_data.py Normal file
View file

@ -0,0 +1,163 @@
import torch
import torchvision # type: ignore
from tools.data_loader import data_loader
from torchvision.transforms import v2 # type: ignore
import numpy as np
def get_the_data(
dataset: str,
batch_size_train: int,
batch_size_test: int,
torch_device: torch.device,
input_dim_x: int,
input_dim_y: int,
flip_p: float = 0.5,
jitter_brightness: float = 0.5,
jitter_contrast: float = 0.1,
jitter_saturation: float = 0.1,
jitter_hue: float = 0.15,
da_auto_mode: bool = False,
disable_da: bool = False,
) -> tuple[
torch.utils.data.dataloader.DataLoader,
torch.utils.data.dataloader.DataLoader,
torchvision.transforms.Compose,
torchvision.transforms.Compose,
]:
if dataset == "MNIST":
tv_dataset_train = torchvision.datasets.MNIST(
root="data", train=True, download=True
)
tv_dataset_test = torchvision.datasets.MNIST(
root="data", train=False, download=True
)
elif dataset == "FashionMNIST":
tv_dataset_train = torchvision.datasets.FashionMNIST(
root="data", train=True, download=True
)
tv_dataset_test = torchvision.datasets.FashionMNIST(
root="data", train=False, download=True
)
elif dataset == "CIFAR10":
tv_dataset_train = torchvision.datasets.CIFAR10(
root="data", train=True, download=True
)
tv_dataset_test = torchvision.datasets.CIFAR10(
root="data", train=False, download=True
)
else:
raise NotImplementedError("This dataset is not implemented.")
def seed_worker(worker_id):
worker_seed = torch.initial_seed() % 2**32
np.random.seed(worker_seed)
torch.random.seed(worker_seed)
g = torch.Generator()
g.manual_seed(0)
if dataset == "MNIST" or dataset == "FashionMNIST":
train_dataloader = data_loader(
torch_device=torch_device,
batch_size=batch_size_train,
pattern=tv_dataset_train.data,
labels=tv_dataset_train.targets,
shuffle=True,
worker_init_fn=seed_worker,
generator=g,
)
test_dataloader = data_loader(
torch_device=torch_device,
batch_size=batch_size_test,
pattern=tv_dataset_test.data,
labels=tv_dataset_test.targets,
shuffle=False,
worker_init_fn=seed_worker,
generator=g,
)
# Data augmentation filter
test_processing_chain = torchvision.transforms.Compose(
transforms=[torchvision.transforms.CenterCrop((input_dim_x, input_dim_y))],
)
if disable_da:
train_processing_chain = torchvision.transforms.Compose(
transforms=[
torchvision.transforms.CenterCrop((input_dim_x, input_dim_y))
],
)
else:
train_processing_chain = torchvision.transforms.Compose(
transforms=[
torchvision.transforms.RandomCrop((input_dim_x, input_dim_y))
],
)
else:
train_dataloader = data_loader(
torch_device=torch_device,
batch_size=batch_size_train,
pattern=torch.tensor(tv_dataset_train.data).movedim(-1, 1),
labels=torch.tensor(tv_dataset_train.targets),
shuffle=True,
worker_init_fn=seed_worker,
generator=g,
)
test_dataloader = data_loader(
torch_device=torch_device,
batch_size=batch_size_test,
pattern=torch.tensor(tv_dataset_test.data).movedim(-1, 1),
labels=torch.tensor(tv_dataset_test.targets),
shuffle=False,
worker_init_fn=seed_worker,
generator=g,
)
# Data augmentation filter
test_processing_chain = torchvision.transforms.Compose(
transforms=[torchvision.transforms.CenterCrop((input_dim_x, input_dim_y))],
)
if disable_da:
train_processing_chain = torchvision.transforms.Compose(
transforms=[
torchvision.transforms.CenterCrop((input_dim_x, input_dim_y))
],
)
else:
if da_auto_mode:
train_processing_chain = torchvision.transforms.Compose(
transforms=[
v2.AutoAugment(
policy=torchvision.transforms.AutoAugmentPolicy(
v2.AutoAugmentPolicy.CIFAR10
)
),
torchvision.transforms.CenterCrop((input_dim_x, input_dim_y)),
],
)
else:
train_processing_chain = torchvision.transforms.Compose(
transforms=[
torchvision.transforms.RandomCrop((input_dim_x, input_dim_y)),
torchvision.transforms.RandomHorizontalFlip(p=flip_p),
torchvision.transforms.ColorJitter(
brightness=jitter_brightness,
contrast=jitter_contrast,
saturation=jitter_saturation,
hue=jitter_hue,
),
],
)
return (
train_dataloader,
test_dataloader,
train_processing_chain,
test_processing_chain,
)

View file

@ -0,0 +1,356 @@
#include "HDynamicCNNCPU.h"
#include <omp.h>
#include <stdio.h>
#include <string.h>
#include <chrono>
#include <algorithm>
#include <cassert>
#include <iostream>
// #define DEBUGSHOWTIMEGLOBAL
HDynamicCNNCPU::HDynamicCNNCPU()
{
};
HDynamicCNNCPU::~HDynamicCNNCPU()
{
};
void HDynamicCNNCPU::entrypoint(
int64_t h_pointer_addr,
int64_t h_dim_0,
int64_t h_dim_1,
int64_t h_dim_2,
int64_t h_dim_3,
int64_t epsilon_xy_pointer_addr,
int64_t epsilon_xy_dim_0,
int64_t epsilon_xy_dim_1,
int64_t epsilon_xy_dim_2,
int64_t epsilon_t_pointer_addr,
int64_t epsilon_t_dim_0,
int64_t weights_pointer_addr,
int64_t weights_dim_0,
int64_t weights_dim_1,
int64_t input_pointer_addr,
int64_t input_dim_0,
int64_t input_dim_1,
int64_t input_dim_2,
int64_t input_dim_3,
int64_t init_vector_pointer_addr,
int64_t init_vector_dim_0,
int64_t number_of_processes,
float forgetting_offset,
int64_t gpu_tuning_factor)
{
size_t number_of_pattern = input_dim_0;
size_t h_dim = init_vector_dim_0;
float* h_init_ptr = (float*)init_vector_pointer_addr;
assert((h_init_ptr != nullptr));
assert((h_dim > 0));
float* h_pointer = (float*)h_pointer_addr;
assert((h_pointer != nullptr));
assert((h_dim_0 > 0));
assert((h_dim_1 > 0));
assert((h_dim_2 > 0));
assert((h_dim_3 > 0));
size_t h_dim_c0 = h_dim_1 * h_dim_2 * h_dim_3;
size_t h_dim_c1 = h_dim_2 * h_dim_3;
size_t h_dim_c2 = h_dim_3;
float* epsilon_xy_pointer = nullptr;
size_t epsilon_xy_dim_c0 = 0;
size_t epsilon_xy_dim_c1 = 0;
if (epsilon_xy_pointer_addr != 0)
{
epsilon_xy_pointer = (float*)epsilon_xy_pointer_addr;
assert((epsilon_xy_pointer != nullptr));
assert((epsilon_xy_dim_0 > 0));
assert((epsilon_xy_dim_1 > 0));
assert((epsilon_xy_dim_2 > 0));
epsilon_xy_dim_c0 = epsilon_xy_dim_2 * epsilon_xy_dim_1;
epsilon_xy_dim_c1 = epsilon_xy_dim_2;
}
float* epsilon_t_pointer = (float*)epsilon_t_pointer_addr;
assert((epsilon_t_pointer != nullptr));
assert((epsilon_t_dim_0 > 0));
float* weights_pointer = (float*)weights_pointer_addr;
assert((weights_pointer != nullptr));
assert((weights_dim_0 > 0));
assert((weights_dim_1 > 0));
size_t weights_dim_c0 = weights_dim_1;
int64_t* input_pointer = (int64_t*)input_pointer_addr;
assert((input_pointer != nullptr));
assert((input_dim_0 > 0));
assert((input_dim_1 > 0));
assert((input_dim_2 > 0));
assert((input_dim_3 > 0));
size_t input_dim_c0 = input_dim_1 * input_dim_2 * input_dim_3;
size_t input_dim_c1 = input_dim_2 * input_dim_3;
size_t input_dim_c2 = input_dim_3;
assert((h_dim == weights_dim_1));
size_t number_of_spikes = input_dim_1;
size_t dim_x = input_dim_2;
size_t dim_y = input_dim_3;
float forgetting_offset_local = forgetting_offset / static_cast<float>(h_dim);
// --------------------
assert((number_of_processes > 0));
omp_set_num_threads(number_of_processes);
#ifdef DEBUGSHOWTIMEGLOBAL
using TIME_resolution = std::chrono::nanoseconds;
auto TIME_start = std::chrono::high_resolution_clock::now();
#endif
#pragma omp parallel for
for (size_t pattern_id = 0; pattern_id < number_of_pattern; pattern_id++)
{
update(
h_init_ptr,
h_pointer,
h_dim_c0,
h_dim_c1,
h_dim_c2,
h_dim,
epsilon_xy_pointer,
epsilon_xy_dim_c0,
epsilon_xy_dim_c1,
epsilon_t_pointer,
weights_pointer,
weights_dim_c0,
input_pointer,
input_dim_c0,
input_dim_c1,
input_dim_c2,
number_of_spikes,
dim_x,
dim_y,
forgetting_offset,
forgetting_offset_local,
pattern_id);
}
#ifdef DEBUGSHOWTIMEGLOBAL
auto TIME_end = std::chrono::high_resolution_clock::now();
float TIME_measured = TIME_resolution(TIME_end - TIME_start).count();
std::cout << "Time used : " << TIME_measured/(1000.0*1000.0) << "ms" << std::endl;
#endif
return;
};
void HDynamicCNNCPU::update(
float* h_init_ptr,
float* h_pointer,
size_t h_dim_c0,
size_t h_dim_c1,
size_t h_dim_c2,
size_t h_dim,
float* epsilon_xy_pointer,
size_t epsilon_xy_dim_c0,
size_t epsilon_xy_dim_c1,
float* epsilon_t_pointer,
float* weights_pointer,
size_t weights_dim_c0,
int64_t* input_pointer,
size_t input_dim_c0,
size_t input_dim_c1,
size_t input_dim_c2,
size_t number_of_spikes,
size_t dim_x,
size_t dim_y,
float forgetting_offset,
float forgetting_offset_local,
size_t pattern_id)
{
float* h_ptr;
float* epsilon_xy_ptr = nullptr;
int64_t* input_ptr;
for (size_t counter_x = 0; counter_x < dim_x; counter_x++)
{
for (size_t counter_y = 0; counter_y < dim_y; counter_y++)
{
if (epsilon_xy_dim_c1 != 0)
{
epsilon_xy_ptr = epsilon_xy_pointer +
counter_x * epsilon_xy_dim_c1 + counter_y;
}
h_ptr = h_pointer +
pattern_id * h_dim_c0 + counter_x * h_dim_c2 + counter_y;
input_ptr = input_pointer +
pattern_id * input_dim_c0 + counter_x * input_dim_c2 + counter_y;
update_one_ip(
h_init_ptr,
h_ptr,
h_dim_c1,
h_dim,
weights_pointer,
weights_dim_c0,
input_ptr,
input_dim_c1,
epsilon_xy_ptr,
epsilon_xy_dim_c0,
epsilon_t_pointer,
number_of_spikes,
forgetting_offset,
forgetting_offset_local);
}
}
return;
};
void HDynamicCNNCPU::update_one_ip(
float* h_init_ptr,
float* h_pointer,
size_t h_dim_c1,
size_t h_dim,
float* weights_pointer,
size_t weights_dim_c0,
int64_t* input_pointer,
size_t input_dim_c1,
float* epsilon_xy_pointer,
size_t epsilon_xy_dim_c0,
float* epsilon_t_pointer,
size_t number_of_spikes,
float forgetting_offset,
float forgetting_offset_local)
{
float* h_temp = new float[h_dim];
float* h_subsegment = new float[h_dim];
memcpy(h_subsegment, h_init_ptr, sizeof(float) * h_dim);
float h_temp_sum;
float temp_value;
float epsilon_subsegment;
float epsilon_scale = 1.0;
int64_t* spike;
float* w_ptr;
for (size_t counter_spike = 0; counter_spike < number_of_spikes; counter_spike++)
{
if (epsilon_scale > 1E10)
{
temp_value = 1.0 / epsilon_scale;
#pragma omp simd
for (size_t counter = 0; counter < h_dim; counter++)
{
h_subsegment[counter] *= temp_value;
}
epsilon_scale = 1.0;
}
spike = input_pointer + counter_spike * input_dim_c1;
if (*spike < 0)
{
break;
}
if (epsilon_xy_dim_c0 != 0)
{
epsilon_subsegment =
epsilon_xy_pointer[*spike * epsilon_xy_dim_c0] * epsilon_t_pointer[counter_spike];
}
else
{
epsilon_subsegment = epsilon_t_pointer[counter_spike];
}
w_ptr = weights_pointer + *spike * weights_dim_c0;
memcpy(h_temp, h_subsegment, sizeof(float) * h_dim);
#pragma omp simd
for (size_t counter = 0; counter < h_dim; counter++)
{
h_temp[counter] *= w_ptr[counter];
}
h_temp_sum = 0.0;
#pragma omp simd reduction(+ : h_temp_sum)
for (size_t counter = 0; counter < h_dim; counter++)
{
h_temp_sum += h_temp[counter];
}
if (h_temp_sum > 1E-10)
{
temp_value = epsilon_scale * epsilon_subsegment / h_temp_sum;
#pragma omp simd
for (size_t counter = 0; counter < h_dim; counter++)
{
h_temp[counter] *= temp_value;
}
#pragma omp simd
for (size_t counter = 0; counter < h_dim; counter++)
{
h_subsegment[counter] += h_temp[counter];
}
if (forgetting_offset_local > 0.0)
{
temp_value =
epsilon_scale * epsilon_subsegment * forgetting_offset_local;
#pragma omp simd
for (size_t counter = 0; counter < h_dim; counter++)
{
h_subsegment[counter] += temp_value;
}
epsilon_scale *=
1.0 + epsilon_subsegment * (1.0 + forgetting_offset);
}
else
{
epsilon_scale *= 1.0 + epsilon_subsegment;
}
}
}
temp_value = 1.0 / epsilon_scale;
#pragma omp simd
for (size_t counter = 0; counter < h_dim; counter++)
{
h_pointer[counter * h_dim_c1] =
h_subsegment[counter] * temp_value;
}
delete[] h_temp;
delete[] h_subsegment;
return;
};

View file

@ -0,0 +1,85 @@
#ifndef HDYNAMICCNNCPU
#define HDYNAMICCNNCPU
#include <unistd.h>
#include <cctype>
#include <iostream>
class HDynamicCNNCPU
{
public:
HDynamicCNNCPU();
~HDynamicCNNCPU();
void entrypoint(
int64_t h_pointer_addr,
int64_t h_dim_0,
int64_t h_dim_1,
int64_t h_dim_2,
int64_t h_dim_3,
int64_t epsilon_xy_pointer_addr,
int64_t epsilon_xy_dim_0,
int64_t epsilon_xy_dim_1,
int64_t epsilon_xy_dim_2,
int64_t epsilon_t_pointer_addr,
int64_t epsilon_t_dim_0,
int64_t weights_pointer_addr,
int64_t weights_dim_0,
int64_t weights_dim_1,
int64_t input_pointer_addr,
int64_t input_dim_0,
int64_t input_dim_1,
int64_t input_dim_2,
int64_t input_dim_3,
int64_t init_vector_pointer_addr,
int64_t init_vector_dim_0,
int64_t number_of_processes,
float forgetting_offset,
int64_t gpu_tuning_factor);
private:
void update(
float* h_init_ptr,
float* h_pointer,
size_t h_dim_c0,
size_t h_dim_c1,
size_t h_dim_c2,
size_t h_dim,
float* epsilon_xy_pointer,
size_t epsilon_xy_dim_c0,
size_t epsilon_xy_dim_c1,
float* epsilon_t_pointer,
float* weights_pointer,
size_t weights_dim_c0,
int64_t* input_pointer,
size_t input_dim_c0,
size_t input_dim_c1,
size_t input_dim_c2,
size_t number_of_spikes,
size_t dim_x,
size_t dim_y,
float forgetting_offset,
float forgetting_offset_local,
size_t pattern_id);
void update_one_ip(
float* h_init_ptr,
float* h_pointer,
size_t h_dim_c1,
size_t h_dim,
float* weights_pointer,
size_t weights_dim_c0,
int64_t* input_pointer,
size_t input_dim_c1,
float* epsilon_xy_pointer,
size_t epsilon_xy_dim_c0,
float* epsilon_t_pointer,
size_t number_of_spikes,
float forgetting_offset,
float forgetting_offset_local);
};
#endif /* HDYNAMICCNNCPU */

View file

@ -0,0 +1,33 @@
include ../.env
export
name = HDynamicCNN
type = CPU
PYPOSTFIX := $(shell $(PYBIN)python3-config --extension-suffix)
PYBIND11INCLUDE := $(shell $(PYBIN)python3 -m pybind11 --includes)
PARAMETERS_O = $(PARAMETERS_O_CPU) $(PYBIND11INCLUDE)
PARAMETERS_Linker = $(PARAMETERS_Linker_CPU)
so_file = Py$(name)$(type)$(PYPOSTFIX)
pyi_file = Py$(name)$(type).pyi
all: ../$(so_file)
$(O_DIRS)$(name)$(type).o: $(name)$(type).h $(name)$(type).cpp
mkdir -p $(O_DIRS)
$(CC) $(PARAMETERS_O) -c $(name)$(type).cpp -o $(O_DIRS)$(name)$(type).o
$(O_DIRS)Py$(name)$(type).o: $(name)$(type).h Py$(name)$(type).cpp
mkdir -p $(O_DIRS)
$(CC) $(PARAMETERS_O) -c Py$(name)$(type).cpp -o $(O_DIRS)Py$(name)$(type).o
../$(so_file): $(O_DIRS)$(name)$(type).o $(O_DIRS)Py$(name)$(type).o
$(CC) $(PARAMETERS_Linker) -o ../$(so_file) $(O_DIRS)$(name)$(type).o $(O_DIRS)Py$(name)$(type).o
#######################
clean:
rm -rf $(O_DIRS)
rm -f ../$(so_file)
rm -f ../$(pyi_file)

View file

@ -0,0 +1,14 @@
#include <pybind11/pybind11.h>
#include "HDynamicCNNCPU.h"
namespace py = pybind11;
PYBIND11_MODULE(PyHDynamicCNNCPU, m)
{
m.doc() = "HDynamicCNNCPU Module";
py::class_<HDynamicCNNCPU>(m, "HDynamicCNNCPU")
.def(py::init<>())
.def("update",
&HDynamicCNNCPU::entrypoint);
}

64
loss_function.py Normal file
View file

@ -0,0 +1,64 @@
import torch
# loss_mode == 0: "normal" SbS loss function mixture
# loss_mode == 1: cross_entropy
def loss_function(
h: torch.Tensor,
labels: torch.Tensor,
loss_mode: int = 0,
number_of_output_neurons: int = 10,
loss_coeffs_mse: float = 0.0,
loss_coeffs_kldiv: float = 0.0,
) -> torch.Tensor | None:
assert loss_mode >= 0
assert loss_mode <= 1
assert h.ndim == 2
if loss_mode == 0:
# Convert label into one hot
target_one_hot: torch.Tensor = torch.zeros(
(
labels.shape[0],
number_of_output_neurons,
),
device=h.device,
dtype=h.dtype,
)
target_one_hot.scatter_(
1,
labels.to(h.device).unsqueeze(1),
torch.ones(
(labels.shape[0], 1),
device=h.device,
dtype=h.dtype,
),
)
my_loss: torch.Tensor = ((h - target_one_hot) ** 2).sum(dim=0).mean(
dim=0
) * loss_coeffs_mse
my_loss = (
my_loss
+ (
(target_one_hot * torch.log((target_one_hot + 1e-20) / (h + 1e-20)))
.sum(dim=0)
.mean(dim=0)
)
* loss_coeffs_kldiv
)
my_loss = my_loss / (abs(loss_coeffs_kldiv) + abs(loss_coeffs_mse))
return my_loss
elif loss_mode == 1:
my_loss = torch.nn.functional.cross_entropy(h, labels.to(h.device))
return my_loss
else:
return None

531
make_network.py Normal file
View file

@ -0,0 +1,531 @@
import torch
from tools.append_block import append_block
from tools.L1NormLayer import L1NormLayer
from tools.NNMF2d import NNMF2d
from tools.append_parameter import append_parameter
import json
from jsmin import jsmin
def make_network(
input_dim_x: int,
input_dim_y: int,
input_number_of_channel: int,
device: torch.device,
config_network_filename: str = "config_network.json",
) -> tuple[
torch.nn.Sequential,
list[list[torch.nn.parameter.Parameter]],
list[str],
]:
with open(config_network_filename, "r") as file:
minified = jsmin(file.read())
config_network = json.loads(minified)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["number_of_neurons_b"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["kernel_size_conv"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["stride_conv"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["padding_conv"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["dilation_conv"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["kernel_size_pool"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["stride_pool"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["padding_pool"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["dilation_pool"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["type_of_pooling"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["local_learning_pooling"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["local_learning_use_kl_pooling"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["type_of_neuron_a"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["type_of_neuron_b"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["batch_norm_neuron_a"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["batch_norm_neuron_b"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["bias_norm_neuron_a"])
)
assert len(list(config_network["number_of_neurons_a"])) == len(
list(config_network["bias_norm_neuron_b"])
)
parameter_neuron_b: list[torch.nn.parameter.Parameter] = []
parameter_neuron_a: list[torch.nn.parameter.Parameter] = []
parameter_batchnorm2d: list[torch.nn.parameter.Parameter] = []
parameter_neuron_pool: list[torch.nn.parameter.Parameter] = []
test_image = torch.ones(
(1, input_number_of_channel, input_dim_x, input_dim_y), device=device
)
network = torch.nn.Sequential()
network = network.to(device)
epsilon: float | None = None
if isinstance(config_network["epsilon"], float):
epsilon = float(config_network["epsilon"])
for block_id in range(0, len(list(config_network["number_of_neurons_a"]))):
test_image = append_block(
network=network,
number_of_neurons_a=int(
list(config_network["number_of_neurons_a"])[block_id]
),
number_of_neurons_b=int(
list(config_network["number_of_neurons_b"])[block_id]
),
test_image=test_image,
dilation=list(list(config_network["dilation_conv"])[block_id]),
padding=list(list(config_network["padding_conv"])[block_id]),
stride=list(list(config_network["stride_conv"])[block_id]),
kernel_size=list(list(config_network["kernel_size_conv"])[block_id]),
epsilon=epsilon,
local_learning = bool(
list(config_network["local_learning"])[block_id]
),
local_learning_kl = bool(
list(config_network["local_learning_kl"])[block_id]
),
iterations=int(config_network["iterations"]),
device=device,
parameter_neuron_a=parameter_neuron_a,
parameter_neuron_b=parameter_neuron_b,
parameter_batchnorm2d=parameter_batchnorm2d,
type_of_neuron_a=int(list(config_network["type_of_neuron_a"])[block_id]),
type_of_neuron_b=int(list(config_network["type_of_neuron_b"])[block_id]),
batch_norm_neuron_a=bool(
list(config_network["batch_norm_neuron_a"])[block_id]
),
batch_norm_neuron_b=bool(
list(config_network["batch_norm_neuron_b"])[block_id]
),
bias_norm_neuron_a=bool(
list(config_network["bias_norm_neuron_a"])[block_id]
),
bias_norm_neuron_b=bool(
list(config_network["bias_norm_neuron_b"])[block_id]
),
)
if (int(list(list(config_network["kernel_size_pool"])[block_id])[0]) > 0) and (
(int(list(list(config_network["kernel_size_pool"])[block_id])[1]) > 0)
):
if int(list(config_network["type_of_pooling"])[block_id]) == 0:
pass
elif int(list(config_network["type_of_pooling"])[block_id]) == 1:
network.append(
torch.nn.AvgPool2d(
kernel_size=(
(
int(
list(
list(config_network["kernel_size_pool"])[
block_id
]
)[0]
)
),
(
int(
list(
list(config_network["kernel_size_pool"])[
block_id
]
)[1]
)
),
),
stride=(
(
int(
list(list(config_network["stride_pool"])[block_id])[
0
]
)
),
(
int(
list(list(config_network["stride_pool"])[block_id])[
1
]
)
),
),
padding=(
(
int(
list(
list(config_network["padding_pool"])[block_id]
)[0]
)
),
(
int(
list(
list(config_network["padding_pool"])[block_id]
)[1]
)
),
),
)
)
test_image = network[-1](test_image)
elif int(list(config_network["type_of_pooling"])[block_id]) == 2:
network.append(
torch.nn.MaxPool2d(
kernel_size=(
(
int(
list(
list(config_network["kernel_size_pool"])[
block_id
]
)[0]
)
),
(
int(
list(
list(config_network["kernel_size_pool"])[
block_id
]
)[1]
)
),
),
stride=(
(
int(
list(list(config_network["stride_pool"])[block_id])[
0
]
)
),
(
int(
list(list(config_network["stride_pool"])[block_id])[
1
]
)
),
),
padding=(
(
int(
list(
list(config_network["padding_pool"])[block_id]
)[0]
)
),
(
int(
list(
list(config_network["padding_pool"])[block_id]
)[1]
)
),
),
)
)
test_image = network[-1](test_image)
elif (int(list(config_network["type_of_pooling"])[block_id]) == 3) or (
int(list(config_network["type_of_pooling"])[block_id]) == 4
):
network.append(torch.nn.ReLU())
test_image = network[-1](test_image)
mock_output = (
torch.nn.functional.conv2d(
torch.zeros(
1,
1,
test_image.shape[2],
test_image.shape[3],
),
torch.zeros(
(
1,
1,
int(
list(
list(config_network["kernel_size_pool"])[
block_id
]
)[0]
),
int(
list(
list(config_network["kernel_size_pool"])[
block_id
]
)[1]
),
)
),
stride=(
(
int(
list(list(config_network["stride_pool"])[block_id])[
0
]
)
),
(
int(
list(list(config_network["stride_pool"])[block_id])[
1
]
)
),
),
padding=(
(
int(
list(
list(config_network["padding_pool"])[block_id]
)[0]
)
),
(
int(
list(
list(config_network["padding_pool"])[block_id]
)[1]
)
),
),
dilation=(
(
int(
list(
list(config_network["dilation_pool"])[block_id]
)[0]
)
),
(
int(
list(
list(config_network["dilation_pool"])[block_id]
)[1]
)
),
),
)
.squeeze(0)
.squeeze(0)
)
network.append(
torch.nn.Unfold(
kernel_size=(
int(
list(
list(config_network["kernel_size_pool"])[block_id]
)[0]
),
int(
list(
list(config_network["kernel_size_pool"])[block_id]
)[1]
),
),
stride=(
(
int(
list(list(config_network["stride_pool"])[block_id])[
0
]
)
),
(
int(
list(list(config_network["stride_pool"])[block_id])[
1
]
)
),
),
padding=(
(
int(
list(
list(config_network["padding_pool"])[block_id]
)[0]
)
),
(
int(
list(
list(config_network["padding_pool"])[block_id]
)[1]
)
),
),
dilation=(
(
int(
list(
list(config_network["dilation_pool"])[block_id]
)[0]
)
),
(
int(
list(
list(config_network["dilation_pool"])[block_id]
)[1]
)
),
),
)
)
test_image = network[-1](test_image)
network.append(
torch.nn.Fold(
output_size=mock_output.shape,
kernel_size=(1, 1),
dilation=1,
padding=0,
stride=1,
)
)
test_image = network[-1](test_image)
network.append(L1NormLayer())
test_image = network[-1](test_image)
if int(list(config_network["type_of_pooling"])[block_id]) == 3:
network.append(
torch.nn.Conv2d(
in_channels=test_image.shape[1],
out_channels=test_image.shape[1]
// (
int(
list(
list(config_network["kernel_size_pool"])[
block_id
]
)[0]
)
* int(
list(
list(config_network["kernel_size_pool"])[
block_id
]
)[1]
)
),
kernel_size=(1, 1),
bias=False,
).to(device)
)
else:
network.append(
NNMF2d(
in_channels=test_image.shape[1],
out_channels=test_image.shape[1]
// (
int(
list(
list(config_network["kernel_size_pool"])[
block_id
]
)[0]
)
* int(
list(
list(config_network["kernel_size_pool"])[
block_id
]
)[1]
)
),
epsilon=epsilon,
local_learning=bool(
list(config_network["local_learning_pooling"])[block_id]
),
local_learning_kl=bool(
list(config_network["local_learning_use_kl_pooling"])[
block_id
]
),
).to(device)
)
test_image = network[-1](test_image)
append_parameter(
module=network[-1], parameter_list=parameter_neuron_pool
)
network.append(
torch.nn.BatchNorm2d(
num_features=test_image.shape[1],
device=device,
momentum=0.1,
track_running_stats=False,
)
)
test_image = network[-1](test_image)
append_parameter(
module=network[-1], parameter_list=parameter_batchnorm2d
)
else:
assert int(list(config_network["type_of_pooling"])[block_id]) > 4
network.append(torch.nn.Softmax(dim=1))
test_image = network[-1](test_image)
network.append(torch.nn.Flatten())
test_image = network[-1](test_image)
parameters: list[list[torch.nn.parameter.Parameter]] = [
parameter_neuron_a,
parameter_neuron_b,
parameter_batchnorm2d,
parameter_neuron_pool,
]
name_list: list[str] = ["neuron a", "neuron b", "batchnorm2d", "neuron pool"]
return (
network,
parameters,
name_list,
)

32
make_optimize.py Normal file
View file

@ -0,0 +1,32 @@
import torch
def make_optimize(
parameters: list[list[torch.nn.parameter.Parameter]],
lr_initial: list[float],
eps=1e-10,
) -> tuple[
list[torch.optim.Adam | None],
list[torch.optim.lr_scheduler.ReduceLROnPlateau | None],
]:
list_optimizer: list[torch.optim.Adam | None] = []
list_lr_scheduler: list[torch.optim.lr_scheduler.ReduceLROnPlateau | None] = []
assert len(parameters) == len(lr_initial)
for i in range(0, len(parameters)):
if len(parameters[i]) > 0:
list_optimizer.append(torch.optim.Adam(parameters[i], lr=lr_initial[i]))
else:
list_optimizer.append(None)
for i in range(0, len(list_optimizer)):
if list_optimizer[i] is not None:
pass
list_lr_scheduler.append(
torch.optim.lr_scheduler.ReduceLROnPlateau(list_optimizer[i], eps=eps) # type: ignore
)
else:
list_lr_scheduler.append(None)
return (list_optimizer, list_lr_scheduler)

380
pybind11_auto_pyi.py Normal file
View file

@ -0,0 +1,380 @@
# Based on
# https://github.com/sizmailov/pybind11-stubgen/blob/master/pybind11_stubgen/__init__.py
from __future__ import annotations
import importlib
import logging
import re
from argparse import ArgumentParser, Namespace
from pathlib import Path
import glob
from pybind11_stubgen.parser.interface import IParser
from pybind11_stubgen.parser.mixins.error_handlers import (
IgnoreAllErrors,
IgnoreInvalidExpressionErrors,
IgnoreInvalidIdentifierErrors,
IgnoreUnresolvedNameErrors,
LogErrors,
LoggerData,
SuggestCxxSignatureFix,
TerminateOnFatalErrors,
)
from pybind11_stubgen.parser.mixins.filter import (
FilterClassMembers,
FilterInvalidIdentifiers,
FilterPybind11ViewClasses,
FilterPybindInternals,
FilterTypingModuleAttributes,
)
from pybind11_stubgen.parser.mixins.fix import (
FixBuiltinTypes,
FixCurrentModulePrefixInTypeNames,
FixMissing__all__Attribute,
FixMissing__future__AnnotationsImport,
FixMissingEnumMembersAnnotation,
FixMissingFixedSizeImport,
FixMissingImports,
FixMissingNoneHashFieldAnnotation,
FixNumpyArrayDimAnnotation,
FixNumpyArrayDimTypeVar,
FixNumpyArrayFlags,
FixNumpyArrayRemoveParameters,
FixNumpyDtype,
FixPEP585CollectionNames,
FixPybind11EnumStrDoc,
FixRedundantBuiltinsAnnotation,
FixRedundantMethodsFromBuiltinObject,
FixScipyTypeArguments,
FixTypingTypeNames,
FixValueReprRandomAddress,
OverridePrintSafeValues,
RemoveSelfAnnotation,
ReplaceReadWritePropertyWithField,
RewritePybind11EnumValueRepr,
)
from pybind11_stubgen.parser.mixins.parse import (
BaseParser,
ExtractSignaturesFromPybind11Docstrings,
ParserDispatchMixin,
)
from pybind11_stubgen.printer import Printer
from pybind11_stubgen.structs import QualifiedName
from pybind11_stubgen.writer import Writer
class CLIArgs(Namespace):
output_dir: str
root_suffix: str
ignore_invalid_expressions: re.Pattern | None
ignore_invalid_identifiers: re.Pattern | None
ignore_unresolved_names: re.Pattern | None
ignore_all_errors: bool
enum_class_locations: list[tuple[re.Pattern, str]]
numpy_array_wrap_with_annotated: bool
numpy_array_use_type_var: bool
numpy_array_remove_parameters: bool
print_invalid_expressions_as_is: bool
print_safe_value_reprs: re.Pattern | None
exit_code: bool
dry_run: bool
stub_extension: str
module_name: str
def arg_parser() -> ArgumentParser:
def regex(pattern_str: str) -> re.Pattern:
try:
return re.compile(pattern_str)
except re.error as e:
raise ValueError(f"Invalid REGEX pattern: {e}")
def regex_colon_path(regex_path: str) -> tuple[re.Pattern, str]:
pattern_str, path = regex_path.rsplit(":", maxsplit=1)
if any(not part.isidentifier() for part in path.split(".")):
raise ValueError(f"Invalid PATH: {path}")
return regex(pattern_str), path
parser = ArgumentParser(
prog="pybind11-stubgen", description="Generates stubs for specified modules"
)
parser.add_argument(
"-o",
"--output-dir",
help="The root directory for output stubs",
default=".",
)
parser.add_argument(
"--root-suffix",
type=str,
default=None,
dest="root_suffix",
help="Top-level module directory suffix",
)
parser.add_argument(
"--ignore-invalid-expressions",
metavar="REGEX",
default=None,
type=regex,
help="Ignore invalid expressions matching REGEX",
)
parser.add_argument(
"--ignore-invalid-identifiers",
metavar="REGEX",
default=None,
type=regex,
help="Ignore invalid identifiers matching REGEX",
)
parser.add_argument(
"--ignore-unresolved-names",
metavar="REGEX",
default=None,
type=regex,
help="Ignore unresolved names matching REGEX",
)
parser.add_argument(
"--ignore-all-errors",
default=False,
action="store_true",
help="Ignore all errors during module parsing",
)
parser.add_argument(
"--enum-class-locations",
dest="enum_class_locations",
metavar="REGEX:LOC",
action="append",
default=[],
type=regex_colon_path,
help="Locations of enum classes in "
"<enum-class-name-regex>:<path-to-class> format. "
"Example: `MyEnum:foo.bar.Baz`",
)
numpy_array_fix = parser.add_mutually_exclusive_group()
numpy_array_fix.add_argument(
"--numpy-array-wrap-with-annotated",
default=False,
action="store_true",
help="Replace numpy/scipy arrays of "
"'ARRAY_T[TYPE, [*DIMS], *FLAGS]' format with "
"'Annotated[ARRAY_T, TYPE, FixedSize|DynamicSize(*DIMS), *FLAGS]'",
)
numpy_array_fix.add_argument(
"--numpy-array-use-type-var",
default=False,
action="store_true",
help="Replace 'numpy.ndarray[numpy.float32[m, 1]]' with "
"'numpy.ndarray[tuple[M, typing.Literal[1]], numpy.dtype[numpy.float32]]'",
)
numpy_array_fix.add_argument(
"--numpy-array-remove-parameters",
default=False,
action="store_true",
help="Replace 'numpy.ndarray[...]' with 'numpy.ndarray'",
)
parser.add_argument(
"--print-invalid-expressions-as-is",
default=False,
action="store_true",
help="Suppress the replacement with '...' of invalid expressions"
"found in annotations",
)
parser.add_argument(
"--print-safe-value-reprs",
metavar="REGEX",
default=None,
type=regex,
help="Override the print-safe check for values matching REGEX",
)
parser.add_argument(
"--exit-code",
action="store_true",
dest="exit_code",
help="On error exits with 1 and skips stub generation",
)
parser.add_argument(
"--dry-run",
action="store_true",
dest="dry_run",
help="Don't write stubs. Parses module and report errors",
)
parser.add_argument(
"--stub-extension",
type=str,
default="pyi",
metavar="EXT",
choices=["pyi", "py"],
help="The file extension of the generated stubs. "
"Must be 'pyi' (default) or 'py'",
)
return parser
def stub_parser_from_args(args: CLIArgs) -> IParser:
error_handlers_top: list[type] = [
LoggerData,
*([IgnoreAllErrors] if args.ignore_all_errors else []),
*([IgnoreInvalidIdentifierErrors] if args.ignore_invalid_identifiers else []),
*([IgnoreInvalidExpressionErrors] if args.ignore_invalid_expressions else []),
*([IgnoreUnresolvedNameErrors] if args.ignore_unresolved_names else []),
]
error_handlers_bottom: list[type] = [
LogErrors,
SuggestCxxSignatureFix,
*([TerminateOnFatalErrors] if args.exit_code else []),
]
numpy_fixes: list[type] = [
*([FixNumpyArrayDimAnnotation] if args.numpy_array_wrap_with_annotated else []),
*([FixNumpyArrayDimTypeVar] if args.numpy_array_use_type_var else []),
*(
[FixNumpyArrayRemoveParameters]
if args.numpy_array_remove_parameters
else []
),
]
class Parser(
*error_handlers_top, # type: ignore[misc]
FixMissing__future__AnnotationsImport,
FixMissing__all__Attribute,
FixMissingNoneHashFieldAnnotation,
FixMissingImports,
FilterTypingModuleAttributes,
FixPEP585CollectionNames,
FixTypingTypeNames,
FixScipyTypeArguments,
FixMissingFixedSizeImport,
FixMissingEnumMembersAnnotation,
OverridePrintSafeValues,
*numpy_fixes, # type: ignore[misc]
FixNumpyDtype,
FixNumpyArrayFlags,
FixCurrentModulePrefixInTypeNames,
FixBuiltinTypes,
RewritePybind11EnumValueRepr,
FilterClassMembers,
ReplaceReadWritePropertyWithField,
FilterInvalidIdentifiers,
FixValueReprRandomAddress,
FixRedundantBuiltinsAnnotation,
FilterPybindInternals,
FilterPybind11ViewClasses,
FixRedundantMethodsFromBuiltinObject,
RemoveSelfAnnotation,
FixPybind11EnumStrDoc,
ExtractSignaturesFromPybind11Docstrings,
ParserDispatchMixin,
BaseParser,
*error_handlers_bottom, # type: ignore[misc]
):
pass
parser = Parser()
if args.enum_class_locations:
parser.set_pybind11_enum_locations(dict(args.enum_class_locations))
if args.ignore_invalid_identifiers is not None:
parser.set_ignored_invalid_identifiers(args.ignore_invalid_identifiers)
if args.ignore_invalid_expressions is not None:
parser.set_ignored_invalid_expressions(args.ignore_invalid_expressions)
if args.ignore_unresolved_names is not None:
parser.set_ignored_unresolved_names(args.ignore_unresolved_names)
if args.print_safe_value_reprs is not None:
parser.set_print_safe_value_pattern(args.print_safe_value_reprs)
return parser
def main() -> None:
files = glob.glob("*.so")
for fid in files:
idx: int = fid.find(".")
module_name: str = fid[:idx]
print("Processing: " + module_name)
logging.basicConfig(
level=logging.INFO,
format="%(name)s - [%(levelname)7s] %(message)s",
)
args = arg_parser().parse_args(namespace=CLIArgs())
parser = stub_parser_from_args(args)
printer = Printer(
invalid_expr_as_ellipses=not args.print_invalid_expressions_as_is
)
out_dir, sub_dir = to_output_and_subdir(
output_dir=args.output_dir,
module_name=module_name,
root_suffix=args.root_suffix,
)
run(
parser,
printer,
module_name,
out_dir,
sub_dir=sub_dir,
dry_run=args.dry_run,
writer=Writer(stub_ext=args.stub_extension),
)
def to_output_and_subdir(
output_dir: str, module_name: str, root_suffix: str | None
) -> tuple[Path, Path | None]:
out_dir = Path(output_dir)
module_path = module_name.split(".")
if root_suffix is None:
return out_dir.joinpath(*module_path[:-1]), None
else:
module_path = [f"{module_path[0]}{root_suffix}", *module_path[1:]]
if len(module_path) == 1:
sub_dir = Path(module_path[-1])
else:
sub_dir = None
return out_dir.joinpath(*module_path[:-1]), sub_dir
def run(
parser: IParser,
printer: Printer,
module_name: str,
out_dir: Path,
sub_dir: Path | None,
dry_run: bool,
writer: Writer,
):
module = parser.handle_module(
QualifiedName.from_str(module_name), importlib.import_module(module_name)
)
parser.finalize()
if module is None:
raise RuntimeError(f"Can't parse {module_name}")
if dry_run:
return
out_dir.mkdir(exist_ok=True, parents=True)
writer.write_module(module, printer, to=out_dir, sub_dir=sub_dir)
if __name__ == "__main__":
main()

127
run_network_test.py Normal file
View file

@ -0,0 +1,127 @@
import time
import numpy as np
import torch
import json
from jsmin import jsmin
import os
from torch.utils.tensorboard import SummaryWriter
from tools.make_network import make_network
from tools.get_the_data import get_the_data
from tools.loss_function import loss_function
from tools.make_optimize import make_optimize
def main(
rand_seed: int = 21,
only_print_network: bool = False,
iterations: int = 20,
model_iterations: int = 20,
config_network_filename: str = "config_network.json",
config_data_filename: str = "config_data.json",
config_lr_parameter_filename: str = "config_lr_parameter.json",
) -> None:
os.makedirs("Models", exist_ok=True)
device: torch.device = (
torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
)
torch.set_default_dtype(torch.float32)
# Some parameters
with open(config_data_filename, "r") as file:
minified = jsmin(file.read())
config_data = json.loads(minified)
with open(config_lr_parameter_filename, "r") as file:
minified = jsmin(file.read())
config_lr_parameter = json.loads(minified)
torch.manual_seed(rand_seed)
torch.cuda.manual_seed(rand_seed)
np.random.seed(rand_seed)
if (
str(config_data["dataset"]) == "MNIST"
or str(config_data["dataset"]) == "FashionMNIST"
):
input_number_of_channel: int = 1
input_dim_x: int = 24
input_dim_y: int = 24
else:
input_number_of_channel = 3
input_dim_x = 28
input_dim_y = 28
train_dataloader, test_dataloader, train_processing_chain, test_processing_chain = (
get_the_data(
str(config_data["dataset"]),
int(config_data["batch_size_train"]),
int(config_data["batch_size_test"]),
device,
input_dim_x,
input_dim_y,
flip_p=float(config_data["flip_p"]),
jitter_brightness=float(config_data["jitter_brightness"]),
jitter_contrast=float(config_data["jitter_contrast"]),
jitter_saturation=float(config_data["jitter_saturation"]),
jitter_hue=float(config_data["jitter_hue"]),
da_auto_mode=bool(config_data["da_auto_mode"]),
)
)
my_string: str = f"seed_{rand_seed}_{model_iterations}"
default_path: str = f"{my_string}"
log_dir: str = f"test_log_{default_path}_{iterations}"
network = torch.load(f"Models/Model_{default_path}.pt", weights_only=False)
network = network.to(device=device)
network.eval()
print(f"Layers are set to {iterations} iterations.")
for layer in network:
if hasattr(layer, 'iterations'):
layer.iterations = iterations
if only_print_network:
print(network)
exit()
tb = SummaryWriter(log_dir=log_dir)
print()
t_start: float = time.perf_counter()
test_correct: int = 0
test_number: int = 0
# Switch the network into evalution mode
network.eval()
with torch.no_grad():
for image, target in test_dataloader:
output = network(test_processing_chain(image))
test_correct += (output.argmax(dim=1) == target).sum().cpu().numpy()
test_number += target.shape[0]
t_testing = time.perf_counter()
perfomance_test_correct: float = 100.0 * test_correct / test_number
tb.add_scalar("Test Number Correct", test_correct, 0)
print(f"Testing: Correct={perfomance_test_correct:.2f}%")
print(
f"Time: Testing={(t_testing - t_start):.1f}sec"
)
tb.flush()
tb.close()
return

235
run_network_train.py Normal file
View file

@ -0,0 +1,235 @@
import time
import numpy as np
import torch
import json
from jsmin import jsmin
import os
from torch.utils.tensorboard import SummaryWriter
from tools.make_network import make_network
from tools.get_the_data import get_the_data
from tools.loss_function import loss_function
from tools.make_optimize import make_optimize
def main(
rand_seed: int = 21,
only_print_network: bool = False,
config_network_filename: str = "config_network.json",
config_data_filename: str = "config_data.json",
config_lr_parameter_filename: str = "config_lr_parameter.json",
) -> None:
os.makedirs("Models", exist_ok=True)
device: torch.device = (
torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
)
torch.set_default_dtype(torch.float32)
# Some parameters
with open(config_data_filename, "r") as file:
minified = jsmin(file.read())
config_data = json.loads(minified)
with open(config_lr_parameter_filename, "r") as file:
minified = jsmin(file.read())
config_lr_parameter = json.loads(minified)
torch.manual_seed(rand_seed)
torch.cuda.manual_seed(rand_seed)
np.random.seed(rand_seed)
if (
str(config_data["dataset"]) == "MNIST"
or str(config_data["dataset"]) == "FashionMNIST"
):
input_number_of_channel: int = 1
input_dim_x: int = 24
input_dim_y: int = 24
else:
input_number_of_channel = 3
input_dim_x = 28
input_dim_y = 28
train_dataloader, test_dataloader, train_processing_chain, test_processing_chain = (
get_the_data(
str(config_data["dataset"]),
int(config_data["batch_size_train"]),
int(config_data["batch_size_test"]),
device,
input_dim_x,
input_dim_y,
flip_p=float(config_data["flip_p"]),
jitter_brightness=float(config_data["jitter_brightness"]),
jitter_contrast=float(config_data["jitter_contrast"]),
jitter_saturation=float(config_data["jitter_saturation"]),
jitter_hue=float(config_data["jitter_hue"]),
da_auto_mode=bool(config_data["da_auto_mode"]),
)
)
(
network,
parameters,
name_list,
) = make_network(
input_dim_x=input_dim_x,
input_dim_y=input_dim_y,
input_number_of_channel=input_number_of_channel,
device=device,
config_network_filename=config_network_filename,
)
print(network)
print()
print("Information about used parameters:")
number_of_parameter: int = 0
for i, parameter_list in enumerate(parameters):
count_parameter: int = 0
for parameter_element in parameter_list:
count_parameter += parameter_element.numel()
print(f"{name_list[i]}: {count_parameter}")
number_of_parameter += count_parameter
print(f"total number of parameter: {number_of_parameter}")
if only_print_network:
exit()
(
optimizers,
lr_schedulers,
) = make_optimize(
parameters=parameters,
lr_initial=[
float(config_lr_parameter["lr_initial_neuron_a"]),
float(config_lr_parameter["lr_initial_neuron_b"]),
float(config_lr_parameter["lr_initial_norm"]),
float(config_lr_parameter["lr_initial_batchnorm2d"]),
],
)
my_string: str = f"seed_{rand_seed}"
default_path: str = f"{my_string}"
log_dir: str = f"log_{default_path}"
tb = SummaryWriter(log_dir=log_dir)
for epoch_id in range(0, int(config_lr_parameter["number_of_epoch"])):
print()
print(f"Epoch: {epoch_id}")
t_start: float = time.perf_counter()
train_loss: float = 0.0
train_correct: int = 0
train_number: int = 0
test_correct: int = 0
test_number: int = 0
# Switch the network into training mode
network.train()
# This runs in total for one epoch split up into mini-batches
for image, target in train_dataloader:
# Clean the gradient
for i in range(0, len(optimizers)):
if optimizers[i] is not None:
optimizers[i].zero_grad() # type: ignore
output = network(train_processing_chain(image))
loss = loss_function(
h=output,
labels=target,
number_of_output_neurons=output.shape[1],
loss_mode=int(config_lr_parameter["loss_mode"]),
loss_coeffs_mse=float(config_lr_parameter["loss_coeffs_mse"]),
loss_coeffs_kldiv=float(config_lr_parameter["loss_coeffs_kldiv"]),
)
assert loss is not None
train_loss += loss.item()
train_correct += (output.argmax(dim=1) == target).sum().cpu().numpy()
train_number += target.shape[0]
# Calculate backprop
loss.backward()
# Update the parameter
# Clean the gradient
for i in range(0, len(optimizers)):
if optimizers[i] is not None:
optimizers[i].step() # type: ignore
perfomance_train_correct: float = 100.0 * train_correct / train_number
# Update the learning rate
for i in range(0, len(lr_schedulers)):
if lr_schedulers[i] is not None:
lr_schedulers[i].step(train_loss) # type: ignore
my_string = "Actual lr: "
for i in range(0, len(lr_schedulers)):
if lr_schedulers[i] is not None:
my_string += f" {lr_schedulers[i].get_last_lr()[0]:.4e} " # type: ignore
else:
my_string += " --- "
print(my_string)
t_training: float = time.perf_counter()
# Switch the network into evalution mode
network.eval()
with torch.no_grad():
for image, target in test_dataloader:
output = network(test_processing_chain(image))
test_correct += (output.argmax(dim=1) == target).sum().cpu().numpy()
test_number += target.shape[0]
t_testing = time.perf_counter()
perfomance_test_correct: float = 100.0 * test_correct / test_number
tb.add_scalar("Train Loss", train_loss / float(train_number), epoch_id)
tb.add_scalar("Train Number Correct", train_correct, epoch_id)
tb.add_scalar("Test Number Correct", test_correct, epoch_id)
print(
f"Training: Loss={train_loss / float(train_number):.5f} Correct={perfomance_train_correct:.2f}%"
)
print(f"Testing: Correct={perfomance_test_correct:.2f}%")
print(
f"Time: Training={(t_training - t_start):.1f}sec, Testing={(t_testing - t_training):.1f}sec"
)
tb.flush()
lr_check: list[float] = []
for i in range(0, len(lr_schedulers)):
if lr_schedulers[i] is not None:
lr_check.append(lr_schedulers[i].get_last_lr()[0]) # type: ignore
lr_check_max = float(torch.tensor(lr_check).max())
if lr_check_max < float(config_lr_parameter["lr_limit"]):
torch.save(network, f"Models/Model_{default_path}.pt")
tb.close()
print("Done (lr_limit)")
return
# save model state dict
# if epoch_id % 10 == 0:
# torch.save(network.state_dict(), f"Models/Model_{default_path}_{epoch_id}.pt")
torch.save(network.state_dict(), f"Models/Model_{default_path}.pt")
print()
tb.close()
print("Done (loop end)")
return

View file

@ -0,0 +1,33 @@
include ../.env
export
name = SpikeGeneration
type = CPU
PYPOSTFIX := $(shell $(PYBIN)python3-config --extension-suffix)
PYBIND11INCLUDE := $(shell $(PYBIN)python3 -m pybind11 --includes)
PARAMETERS_O = $(PARAMETERS_O_CPU) $(PYBIND11INCLUDE)
PARAMETERS_Linker = $(PARAMETERS_Linker_CPU)
so_file = Py$(name)$(type)$(PYPOSTFIX)
pyi_file = Py$(name)$(type).pyi
all: ../$(so_file)
$(O_DIRS)$(name)$(type).o: $(name)$(type).h $(name)$(type).cpp
mkdir -p $(O_DIRS)
$(CC) $(PARAMETERS_O) -c $(name)$(type).cpp -o $(O_DIRS)$(name)$(type).o
$(O_DIRS)Py$(name)$(type).o: $(name)$(type).h Py$(name)$(type).cpp
mkdir -p $(O_DIRS)
$(CC) $(PARAMETERS_O) -c Py$(name)$(type).cpp -o $(O_DIRS)Py$(name)$(type).o
../$(so_file): $(O_DIRS)$(name)$(type).o $(O_DIRS)Py$(name)$(type).o
$(CC) $(PARAMETERS_Linker) -o ../$(so_file) $(O_DIRS)$(name)$(type).o $(O_DIRS)Py$(name)$(type).o
#######################
clean:
rm -rf $(O_DIRS)
rm -f ../$(so_file)
rm -f ../$(pyi_file)

View file

@ -0,0 +1,19 @@
#include <pybind11/pybind11.h>
#include "SpikeGenerationCPU.h"
namespace py = pybind11;
PYBIND11_MODULE(PySpikeGenerationCPU, m)
{
m.doc() = "SpikeGenerationCPU Module";
py::class_<SpikeGenerationCPU>(m, "SpikeGenerationCPU")
.def(py::init<>())
.def("gpu_occupancy_export",
&SpikeGenerationCPU::gpu_occupancy_export)
.def("gpu_occupancy_import",
&SpikeGenerationCPU::gpu_occupancy_import)
.def("spike_generation",
&SpikeGenerationCPU::entrypoint);
}

View file

@ -0,0 +1,220 @@
#include "SpikeGenerationCPU.h"
#include <omp.h>
#include <stdio.h>
#include <string.h>
#include <algorithm>
#include <cassert>
#include <iostream>
SpikeGenerationCPU::SpikeGenerationCPU()
{
};
SpikeGenerationCPU::~SpikeGenerationCPU()
{
};
void SpikeGenerationCPU::entrypoint(
int64_t input_pointer_addr,
int64_t input_dim_0,
int64_t input_dim_1,
int64_t input_dim_2,
int64_t input_dim_3,
int64_t random_values_pointer_addr,
int64_t random_values_dim_0,
int64_t random_values_dim_1,
int64_t random_values_dim_2,
int64_t random_values_dim_3,
int64_t output_pointer_addr,
int64_t output_dim_0,
int64_t output_dim_1,
int64_t output_dim_2,
int64_t output_dim_3,
int64_t number_of_cpu_processes)
{
float* input_pointer = (float*)input_pointer_addr;
float* random_values_pointer = (float*)random_values_pointer_addr;
int64_t* output_pointer = (int64_t*)output_pointer_addr;
// Input
assert((input_pointer != nullptr));
assert((input_dim_0 > 0));
assert((input_dim_1 > 0));
assert((input_dim_2 > 0));
assert((input_dim_3 > 0));
// Random
assert((random_values_pointer != nullptr));
assert((random_values_dim_0 > 0));
assert((random_values_dim_1 > 0));
assert((random_values_dim_2 > 0));
assert((random_values_dim_3 > 0));
// Output
assert((output_pointer != nullptr));
assert((output_dim_0 > 0));
assert((output_dim_1 > 0));
assert((output_dim_2 > 0));
assert((output_dim_3 > 0));
// Input
size_t input_dim_c0 = input_dim_1 * input_dim_2 * input_dim_3;
size_t input_dim_c1 = input_dim_2 * input_dim_3;
size_t input_dim_c2 = input_dim_3;
// Random
size_t random_values_dim_c0 =
random_values_dim_1 * random_values_dim_2 * random_values_dim_3;
size_t random_values_dim_c1 =
random_values_dim_2 * random_values_dim_3;
size_t random_values_dim_c2 = random_values_dim_3;
// Output
size_t output_dim_c0 =
output_dim_1 * output_dim_2 * output_dim_3;
size_t output_dim_c1 = output_dim_2 * output_dim_3;
size_t output_dim_c2 = output_dim_3;
size_t number_of_pattern = input_dim_0;
size_t h_dim = input_dim_1;
size_t spike_dim = output_dim_1;
size_t x_dim = output_dim_2;
size_t y_dim = output_dim_2;
assert((number_of_cpu_processes > 0));
omp_set_num_threads(number_of_cpu_processes);
// DEBUG:
// omp_set_num_threads(1);
#pragma omp parallel for
for (size_t pattern_id = 0; pattern_id < number_of_pattern; pattern_id++)
{
spike_generation(
input_pointer,
input_dim_c0,
input_dim_c1,
input_dim_c2,
random_values_pointer,
random_values_dim_c0,
random_values_dim_c1,
random_values_dim_c2,
output_pointer,
output_dim_c0,
output_dim_c1,
output_dim_c2,
x_dim,
y_dim,
spike_dim,
h_dim,
pattern_id);
}
return;
};
void SpikeGenerationCPU::spike_generation(
float* input_pointer,
size_t input_dim_c0,
size_t input_dim_c1,
size_t input_dim_c2,
float* random_values_pointer,
size_t random_values_dim_c0,
size_t random_values_dim_c1,
size_t random_values_dim_c2,
int64_t* output_pointer,
size_t output_dim_c0,
size_t output_dim_c1,
size_t output_dim_c2,
size_t x_dim,
size_t y_dim,
size_t spike_dim,
size_t h_dim,
size_t pattern_id)
{
float* p_ptr = nullptr;
int64_t* out_ptr = nullptr;
float* rand_ptr = nullptr;
for (size_t counter_x = 0; counter_x < x_dim; counter_x++)
{
for (size_t counter_y = 0; counter_y < y_dim; counter_y++)
{
p_ptr = input_pointer + pattern_id * input_dim_c0 +
counter_x * input_dim_c2 + counter_y;
// + counter * input_dim_c1
out_ptr = output_pointer + pattern_id * output_dim_c0 +
counter_x * output_dim_c2 + counter_y;
// + counter * output_dim_c1
rand_ptr = random_values_pointer +
pattern_id * random_values_dim_c0 +
counter_x * random_values_dim_c2 + counter_y;
// + counter * random_values_dim_c1
for (size_t counter = 0; counter < spike_dim; counter++)
{
out_ptr[counter * output_dim_c1] = lower_bound(p_ptr,
h_dim,
input_dim_c1,
rand_ptr[counter * random_values_dim_c1]);
}
}
}
return;
};
// algorithmic idea stolen from libc++
size_t SpikeGenerationCPU::lower_bound(float* data_ptr,
size_t data_length,
size_t data_ptr_stride,
float compare_to_value)
{
size_t start_of_range = 0;
size_t length_of_range = data_length;
while (length_of_range != 0)
{
size_t half_length = length_of_range >> 1;
size_t actual_position = start_of_range + half_length;
if (data_ptr[actual_position * data_ptr_stride] < compare_to_value)
{
start_of_range = ++actual_position;
length_of_range -= half_length + 1;
}
else
length_of_range = half_length;
}
return start_of_range;
};
void SpikeGenerationCPU::gpu_occupancy_export(
size_t dim_x,
size_t dim_y,
size_t number_of_pattern,
size_t spike_dim,
int64_t setting_memory_addr,
size_t setting_dim_0,
size_t setting_dim_1)
{
return;
};
void SpikeGenerationCPU::gpu_occupancy_import(
int64_t setting_memory_addr,
size_t setting_dim_0,
size_t setting_dim_1)
{
return;
};

View file

@ -0,0 +1,74 @@
#ifndef SPIKEGENERATIONCPU
#define SPIKEGENERATIONCPU
#include <unistd.h>
#include <cctype>
#include <iostream>
class SpikeGenerationCPU
{
public:
SpikeGenerationCPU();
~SpikeGenerationCPU();
void entrypoint(
int64_t input_pointer_addr,
int64_t input_dim_0,
int64_t input_dim_1,
int64_t input_dim_2,
int64_t input_dim_3,
int64_t random_values_pointer_addr,
int64_t random_values_dim_0,
int64_t random_values_dim_1,
int64_t random_values_dim_2,
int64_t random_values_dim_3,
int64_t output_pointer_addr,
int64_t output_dim_0,
int64_t output_dim_1,
int64_t output_dim_2,
int64_t output_dim_3,
int64_t number_of_cpu_processes);
void gpu_occupancy_export(
size_t dim_x,
size_t dim_y,
size_t number_of_pattern,
size_t spike_dim,
int64_t setting_memory_addr,
size_t setting_dim_0,
size_t setting_dim_1);
void gpu_occupancy_import(
int64_t setting_memory_addr,
size_t setting_dim_0,
size_t setting_dim_1);
private:
void spike_generation(
float* input_pointer,
size_t input_dim_c0,
size_t input_dim_c1,
size_t input_dim_c2,
float* random_values_pointer,
size_t random_values_dim_c0,
size_t random_values_dim_c1,
size_t random_values_dim_c2,
int64_t* output_pointer,
size_t output_dim_c0,
size_t output_dim_c1,
size_t output_dim_c2,
size_t x_dim,
size_t y_dim,
size_t spike_dim,
size_t h_dim,
size_t pattern_id);
size_t lower_bound(
float* data_ptr,
size_t data_length,
size_t data_ptr_stride,
float compare_to_value);
};
#endif /* SPIKEGENERATIONCPU */