Add files via upload

Ordner beinhaltet den momentanen Stand des Codes, wie ich ihn auf den GPUs ausführe (d.h. ohne Softmax, etc) und angepasst auf die jeweilige Stimuluskondition.
This commit is contained in:
katharinakorb 2023-07-31 11:48:17 +02:00 committed by GitHub
parent ca8c65e3d9
commit 475746ad41
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 2003 additions and 0 deletions

View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2023 David Rotermund
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -0,0 +1,11 @@
Directory="/home/kk/Documents/Semester4/code/Classic_contour_net_shallow"
Priority="-500"
echo $Directory
mkdir $Directory/argh_log_classic
for out_channels_idx in {0..0}; do
for kernel_size_idx in {0..0}; do
for stride_idx in {0..0}; do
echo "hostname; cd $Directory ; /home/kk/P3.10/bin/python3 cnn_training.py --idx-conv-out-channels-list $out_channels_idx --idx-conv-kernel-sizes $kernel_size_idx --idx-conv-stride-sizes $stride_idx -s \$JOB_ID" | qsub -o $Directory/argh_log_classic -j y -p $Priority -q gp4u,gp3u -N ClassicTraining
done
done
done

View file

@ -0,0 +1,405 @@
import torch
import numpy as np
import datetime
import argh
import time
import os
import json
from jsmin import jsmin
from functions.alicorn_data_loader import alicorn_data_loader
from functions.train import train
from functions.test import test
from functions.make_cnn import make_cnn
from functions.set_seed import set_seed
from functions.plot_intermediate import plot_intermediate
from functions.create_logger import create_logger
# to disable logging output from Tensorflow
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
from torch.utils.tensorboard import SummaryWriter
def main(
idx_conv_out_channels_list: int = 0,
idx_conv_kernel_sizes: int = 0,
idx_conv_stride_sizes: int = 0,
seed_counter: int = 0,
) -> None:
config_filenname = "config.json"
with open(config_filenname, "r") as file_handle:
config = json.loads(jsmin(file_handle.read()))
# get model information:
output_channels = config["conv_out_channels_list"][idx_conv_out_channels_list]
logger = create_logger(
save_logging_messages=bool(config["save_logging_messages"]),
display_logging_messages=bool(config["display_logging_messages"]),
model_name=str(output_channels),
)
# network settings:
conv_out_channels_list: list[list[int]] = config["conv_out_channels_list"]
conv_kernel_sizes: list[list[int]] = config["conv_kernel_sizes"]
conv_stride_sizes: list[int] = config["conv_stride_sizes"]
num_pfinkel: list = np.arange(
int(config["num_pfinkel_start"]),
int(config["num_pfinkel_stop"]),
int(config["num_pfinkel_step"]),
).tolist()
run_network(
out_channels=conv_out_channels_list[int(idx_conv_out_channels_list)],
kernel_size=conv_kernel_sizes[int(idx_conv_kernel_sizes)],
stride=conv_stride_sizes[int(idx_conv_stride_sizes)],
activation_function=str(config["activation_function"]),
train_first_layer=bool(config["train_first_layer"]),
seed_counter=seed_counter,
minimum_learning_rate=float(config["minimum_learning_rate"]),
conv_0_kernel_size=int(config["conv_0_kernel_size"]),
mp_1_kernel_size=int(config["mp_1_kernel_size"]),
mp_1_stride=int(config["mp_1_stride"]),
batch_size_train=int(config["batch_size_train"]),
batch_size_test=int(config["batch_size_test"]),
learning_rate=float(config["learning_rate"]),
max_epochs=int(config["max_epochs"]),
save_model=bool(config["save_model"]),
stimuli_per_pfinkel=int(config["stimuli_per_pfinkel"]),
num_pfinkel=num_pfinkel,
logger=logger,
save_ever_x_epochs=int(config["save_ever_x_epochs"]),
scheduler_patience=int(config["scheduler_patience"]),
condition=str(config["condition"]),
data_path=str(config["data_path"]),
pooling_type=str(config["pooling_type"]),
conv_0_enable_softmax=bool(config["conv_0_enable_softmax"]),
scale_data=int(config["scale_data"]),
use_scheduler=bool(config["use_scheduler"]),
use_adam=bool(config["use_adam"]),
use_plot_intermediate=bool(config["use_plot_intermediate"]),
leak_relu_negative_slope=float(config["leak_relu_negative_slope"]),
scheduler_verbose=bool(config["scheduler_verbose"]),
scheduler_factor=float(config["scheduler_factor"]),
precision_100_percent=int(config["precision_100_percent"]),
scheduler_threshold=float(config["scheduler_threshold"]),
)
def run_network(
out_channels: list[int],
kernel_size: list[int],
num_pfinkel: list,
logger,
stride: int,
activation_function: str,
train_first_layer: bool,
seed_counter: int,
minimum_learning_rate: float,
conv_0_kernel_size: int,
mp_1_kernel_size: int,
mp_1_stride: int,
scheduler_patience: int,
batch_size_train: int,
batch_size_test: int,
learning_rate: float,
max_epochs: int,
save_model: bool,
stimuli_per_pfinkel: int,
save_ever_x_epochs: int,
condition: str,
data_path: str,
pooling_type: str,
conv_0_enable_softmax: bool,
scale_data: float,
use_scheduler: bool,
use_adam: bool,
use_plot_intermediate: bool,
leak_relu_negative_slope: float,
scheduler_verbose: bool,
scheduler_factor: float,
precision_100_percent: int,
scheduler_threshold: float,
) -> None:
# define device:
device_str: str = "cuda:0" if torch.cuda.is_available() else "cpu"
logger.info(f"Using {device_str} device")
device: torch.device = torch.device(device_str)
torch.set_default_dtype(torch.float32)
# -------------------------------------------------------------------
logger.info("-==- START -==-")
train_accuracy: list[float] = []
train_losses: list[float] = []
train_loss: list[float] = []
test_accuracy: list[float] = []
test_losses: list[float] = []
# prepare data:
logger.info(num_pfinkel)
logger.info(condition)
logger.info("Loading training data")
data_train = alicorn_data_loader(
num_pfinkel=num_pfinkel,
load_stimuli_per_pfinkel=stimuli_per_pfinkel,
condition=condition,
logger=logger,
data_path=data_path,
)
logger.info("Loading test data")
data_test = alicorn_data_loader(
num_pfinkel=num_pfinkel,
load_stimuli_per_pfinkel=stimuli_per_pfinkel,
condition=condition,
logger=logger,
data_path=data_path,
)
logger.info("Loading done!")
# data loader
loader_train = torch.utils.data.DataLoader(
data_train, shuffle=True, batch_size=batch_size_train
)
loader_test = torch.utils.data.DataLoader(
data_test, shuffle=False, batch_size=batch_size_test
)
previous_test_acc: float = -1
# set seed for reproducibility
set_seed(seed=int(seed_counter), logger=logger)
# number conv layer:
if train_first_layer:
num_conv_layers = len(out_channels)
else:
num_conv_layers = len(out_channels) if len(out_channels) >= 2 else 1
# determine num conv layers
model_name = (
f"ArghCNN_numConvLayers{num_conv_layers}"
f"_outChannels{out_channels}_kernelSize{kernel_size}_"
f"{activation_function}_stride{stride}_"
f"trainFirstConvLayer{train_first_layer}_"
f"seed{seed_counter}_{condition}"
)
current = datetime.datetime.now().strftime("%d%m-%H%M")
# new tb session
os.makedirs("tb_runs", exist_ok=True)
path: str = os.path.join("tb_runs", f"{model_name}")
tb = SummaryWriter(path)
# --------------------------------------------------------------------------
# print network configuration:
logger.info("----------------------------------------------------")
logger.info(f"Number conv layers: {num_conv_layers}")
logger.info(f"Output channels: {out_channels}")
logger.info(f"Kernel sizes: {kernel_size}")
logger.info(f"Stride: {stride}")
logger.info(f"Activation function: {activation_function}")
logger.info(f"Training conv 0: {train_first_layer}")
logger.info(f"Seed: {seed_counter}")
logger.info(f"LR-scheduler patience: {scheduler_patience}")
logger.info(f"Pooling layer kernel: {mp_1_kernel_size}, stride: {mp_1_stride}")
# define model:
model = make_cnn(
conv_out_channels_list=out_channels,
conv_kernel_size=kernel_size,
conv_stride_size=stride,
conv_activation_function=activation_function,
train_conv_0=train_first_layer,
conv_0_kernel_size=conv_0_kernel_size,
mp_1_kernel_size=mp_1_kernel_size,
mp_1_stride=mp_1_stride,
logger=logger,
pooling_type=pooling_type,
conv_0_enable_softmax=conv_0_enable_softmax,
l_relu_negative_slope=leak_relu_negative_slope,
).to(device)
logger.info(model)
old_params: dict = {}
for name, param in model.named_parameters():
old_params[name] = param.data.detach().cpu().clone()
# pararmeters for training:
param_list: list = []
for i in range(0, len(model)):
if (not train_first_layer) and (i == 0):
pass
else:
for name, param in model[i].named_parameters():
logger.info(f"Learning parameter: layer: {i} name: {name}")
param_list.append(param)
for name, param in model.named_parameters():
assert (
torch.isfinite(param.data).sum().cpu()
== torch.tensor(param.data.size()).prod()
), name
# optimizer and learning rate scheduler
if use_adam:
optimizer = torch.optim.Adam(param_list, lr=learning_rate)
else:
optimizer = torch.optim.SGD(param_list, lr=learning_rate) # type: ignore
if use_scheduler:
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer,
patience=scheduler_patience,
eps=minimum_learning_rate / 10,
verbose=scheduler_verbose,
factor=scheduler_factor,
threshold=scheduler_threshold,
)
# training loop:
logger.info("-==- Data and network loader: Done -==-")
t_dis0 = time.perf_counter()
for epoch in range(1, max_epochs + 1):
# train
logger.info("-==- Training... -==-")
running_loss = train(
model=model,
loader=loader_train,
optimizer=optimizer,
epoch=epoch,
device=device,
tb=tb,
test_acc=previous_test_acc,
logger=logger,
train_accuracy=train_accuracy,
train_losses=train_losses,
train_loss=train_loss,
scale_data=scale_data,
)
# logging:
logger.info("")
logger.info("Check for changes in the weights:")
for name, param in model.named_parameters():
if isinstance(old_params[name], torch.Tensor) and isinstance(
param.data, torch.Tensor
):
temp_torch = param.data.detach().cpu().clone()
if old_params[name].ndim == temp_torch.ndim:
if old_params[name].size() == temp_torch.size():
abs_diff = torch.abs(old_params[name] - temp_torch).max()
logger.info(f"Parameter {name}: {abs_diff:.3e}")
old_params[name] = temp_torch
logger.info("")
logger.info("-==- Testing... -==-")
previous_test_acc = test( # type: ignore
model=model,
loader=loader_test,
device=device,
tb=tb,
epoch=epoch,
logger=logger,
test_accuracy=test_accuracy,
test_losses=test_losses,
scale_data=scale_data,
)
logger.info(f"Time required: {time.perf_counter()-t_dis0:.2e} sec")
# save model after every 100th epoch:
if save_model and (epoch % save_ever_x_epochs == 0):
pt_filename: str = f"{model_name}_{epoch}Epoch_{current}.pt"
logger.info("")
logger.info(f"Saved model: {pt_filename}")
os.makedirs("trained_models", exist_ok=True)
torch.save(
model,
os.path.join(
"trained_models",
pt_filename,
),
)
# check nan
for name, param in model.named_parameters():
assert (
torch.isfinite(param.data).sum().cpu()
== torch.tensor(param.data.size()).prod()
), name
# update scheduler
if use_scheduler:
if scheduler_verbose and isinstance(scheduler.best, float):
logger.info(
"Step LR scheduler: "
f"Loss: {running_loss:.2e} "
f"Best: {scheduler.best:.2e} "
f"Delta: {running_loss-scheduler.best:.2e} "
f"Threshold: {scheduler.threshold:.2e} "
f"Number of bad epochs: {scheduler.num_bad_epochs} "
f"Patience: {scheduler.patience} "
)
scheduler.step(running_loss)
# stop learning: lr too small
if optimizer.param_groups[0]["lr"] <= minimum_learning_rate:
logger.info("Learning rate is too small. Stop training.")
break
# stop learning: done
if round(previous_test_acc, precision_100_percent) == 100.0:
logger.info("100% test performance reached. Stop training.")
break
if use_plot_intermediate:
plot_intermediate(
train_accuracy=train_accuracy,
test_accuracy=test_accuracy,
train_losses=train_losses,
test_losses=test_losses,
save_name=model_name,
)
os.makedirs("performance_data", exist_ok=True)
np.savez(
os.path.join("performance_data", f"performances_{model_name}.npz"),
output_channels=np.array(out_channels),
train_accuracy=np.array(train_accuracy),
test_accuracy=np.array(test_accuracy),
train_losses=np.array(train_losses),
test_losses=np.array(test_losses),
)
# end TB session:
tb.close()
# print model name:
logger.info("")
logger.info(f"Saved model: {model_name}_{epoch}Epoch_{current}")
if save_model:
os.makedirs("trained_models", exist_ok=True)
torch.save(
model,
os.path.join(
"trained_models",
f"{model_name}_{epoch}Epoch_{current}.pt",
),
)
if __name__ == "__main__":
argh.dispatch_command(main)

View file

@ -0,0 +1,52 @@
{
"data_path": "/home/kk/Documents/Semester4/code/RenderStimuli/Output/",
"save_logging_messages": true, // (true), false
"display_logging_messages": true, // (true), false
"batch_size_train": 500,
"batch_size_test": 250,
"max_epochs": 2000,
"save_model": true,
"conv_0_kernel_size": 11,
"mp_1_kernel_size": 3,
"mp_1_stride": 2,
"use_plot_intermediate": true, // true, (false)
"stimuli_per_pfinkel": 10000,
"num_pfinkel_start": 0,
"num_pfinkel_stop": 100,
"num_pfinkel_step": 10,
"precision_100_percent": 4, // (4)
"train_first_layer": true, // true, (false)
"save_ever_x_epochs": 10, // (10)
"activation_function": "leaky relu", // tanh, relu, (leaky relu), none
"leak_relu_negative_slope": 0.1, // (0.1)
// LR Scheduler ->
"use_scheduler": true, // (true), false
"scheduler_verbose": true,
"scheduler_factor": 0.1, //(0.1)
"scheduler_patience": 10, // (10)
"scheduler_threshold": 1e-5, // (1e-4)
"minimum_learning_rate": 1e-8,
"learning_rate": 0.0001,
// <- LR Scheduler
"pooling_type": "max", // (max), average, none
"conv_0_enable_softmax": false, // true, (false)
"use_adam": true, // (true) => adam, false => SGD
"condition": "Coignless",
"scale_data": 255.0, // (255.0),
"conv_out_channels_list": [
[
3,
8,
8
]
],
"conv_kernel_sizes": [
[
7,
15
]
],
"conv_stride_sizes": [
1
]
}

View file

@ -0,0 +1,107 @@
import torch
import numpy as np
import os
@torch.no_grad()
def alicorn_data_loader(
num_pfinkel: list[int] | None,
load_stimuli_per_pfinkel: int,
condition: str,
data_path: str,
logger=None,
) -> torch.utils.data.TensorDataset:
"""
- num_pfinkel: list of the angles that should be loaded (ranging from
0-90). If None: all pfinkels loaded
- stimuli_per_pfinkel: defines amount of stimuli per path angle but
for label 0 and label 1 seperatly (e.g., stimuli_per_pfinkel = 1000:
1000 stimuli = label 1, 1000 stimuli = label 0)
"""
filename: str | None = None
if condition == "Angular":
filename = "angular_angle"
elif condition == "Coignless":
filename = "base_angle"
elif condition == "Natural":
filename = "corner_angle"
else:
filename = None
assert filename is not None
filepaths: str = os.path.join(data_path, f"{condition}")
stimuli_per_pfinkel: int = 100000
# ----------------------------
# for angles and batches
if num_pfinkel is None:
angle: list[int] = np.arange(0, 100, 10).tolist()
else:
angle = num_pfinkel
assert isinstance(angle, list)
batch: list[int] = np.arange(1, 11, 1).tolist()
if load_stimuli_per_pfinkel <= (stimuli_per_pfinkel // len(batch)):
num_img_per_pfinkel: int = load_stimuli_per_pfinkel
num_batches: int = 1
else:
# handle case where more than 10,000 stimuli per pfinkel needed
num_batches = load_stimuli_per_pfinkel // (stimuli_per_pfinkel // len(batch))
num_img_per_pfinkel = load_stimuli_per_pfinkel // num_batches
if logger is not None:
logger.info(f"{num_batches} batches")
logger.info(f"{num_img_per_pfinkel} stimuli per pfinkel.")
# initialize data and label tensors:
num_stimuli: int = len(angle) * num_batches * num_img_per_pfinkel * 2
data_tensor: torch.Tensor = torch.empty(
(num_stimuli, 200, 200), dtype=torch.uint8, device=torch.device("cpu")
)
label_tensor: torch.Tensor = torch.empty(
(num_stimuli), dtype=torch.int64, device=torch.device("cpu")
)
if logger is not None:
logger.info(f"data tensor shape: {data_tensor.shape}")
logger.info(f"label tensor shape: {label_tensor.shape}")
# append data
idx: int = 0
for i in range(len(angle)):
for j in range(num_batches):
# load contour
temp_filename: str = (
f"{filename}_{angle[i]:03}_b{batch[j]:03}_n10000_RENDERED.npz"
)
contour_filename: str = os.path.join(filepaths, temp_filename)
c_data = np.load(contour_filename)
data_tensor[idx : idx + num_img_per_pfinkel, ...] = torch.tensor(
c_data["gaborfield"][:num_img_per_pfinkel, ...],
dtype=torch.uint8,
device=torch.device("cpu"),
)
label_tensor[idx : idx + num_img_per_pfinkel] = int(1)
idx += num_img_per_pfinkel
# next append distractor stimuli
for i in range(len(angle)):
for j in range(num_batches):
# load distractor
temp_filename = (
f"{filename}_{angle[i]:03}_dist_b{batch[j]:03}_n10000_RENDERED.npz"
)
distractor_filename: str = os.path.join(filepaths, temp_filename)
nc_data = np.load(distractor_filename)
data_tensor[idx : idx + num_img_per_pfinkel, ...] = torch.tensor(
nc_data["gaborfield"][:num_img_per_pfinkel, ...],
dtype=torch.uint8,
device=torch.device("cpu"),
)
label_tensor[idx : idx + num_img_per_pfinkel] = int(0)
idx += num_img_per_pfinkel
return torch.utils.data.TensorDataset(label_tensor, data_tensor.unsqueeze(1))

View file

@ -0,0 +1,103 @@
import torch
def unfold(
layer: torch.nn.Conv2d | torch.nn.MaxPool2d | torch.nn.AvgPool2d, size: int
) -> torch.Tensor:
if isinstance(layer.kernel_size, tuple):
assert layer.kernel_size[0] == layer.kernel_size[1]
kernel_size: int = int(layer.kernel_size[0])
else:
kernel_size = int(layer.kernel_size)
if isinstance(layer.dilation, tuple):
assert layer.dilation[0] == layer.dilation[1]
dilation: int = int(layer.dilation[0])
else:
dilation = int(layer.dilation) # type: ignore
if isinstance(layer.padding, tuple):
assert layer.padding[0] == layer.padding[1]
padding: int = int(layer.padding[0])
else:
padding = int(layer.padding)
if isinstance(layer.stride, tuple):
assert layer.stride[0] == layer.stride[1]
stride: int = int(layer.stride[0])
else:
stride = int(layer.stride)
out = (
torch.nn.functional.unfold(
torch.arange(0, size, dtype=torch.float32)
.unsqueeze(0)
.unsqueeze(0)
.unsqueeze(-1),
kernel_size=(kernel_size, 1),
dilation=(dilation, 1),
padding=(padding, 0),
stride=(stride, 1),
)
.squeeze(0)
.type(torch.int64)
)
return out
def analyse_network(
model: torch.nn.Sequential, input_shape: int
) -> tuple[list, list, list]:
combined_list: list = []
coordinate_list: list = []
layer_type_list: list = []
pixel_used: list[int] = []
size: int = int(input_shape)
for layer_id in range(0, len(model)):
if isinstance(
model[layer_id], (torch.nn.Conv2d, torch.nn.MaxPool2d, torch.nn.AvgPool2d)
):
out = unfold(layer=model[layer_id], size=size)
coordinate_list.append(out)
layer_type_list.append(
str(type(model[layer_id])).split(".")[-1].split("'")[0]
)
size = int(out.shape[-1])
else:
coordinate_list.append(None)
layer_type_list.append(None)
assert coordinate_list[0] is not None
combined_list.append(coordinate_list[0])
for i in range(1, len(coordinate_list)):
if coordinate_list[i] is None:
combined_list.append(combined_list[i - 1])
else:
for pos in range(0, coordinate_list[i].shape[-1]):
idx_shape: int | None = None
idx = torch.unique(
torch.flatten(combined_list[i - 1][:, coordinate_list[i][:, pos]])
)
if idx_shape is None:
idx_shape = idx.shape[0]
assert idx_shape == idx.shape[0]
assert idx_shape is not None
temp = torch.zeros((idx_shape, coordinate_list[i].shape[-1]))
for pos in range(0, coordinate_list[i].shape[-1]):
idx = torch.unique(
torch.flatten(combined_list[i - 1][:, coordinate_list[i][:, pos]])
)
temp[:, pos] = idx
combined_list.append(temp)
for i in range(0, len(combined_list)):
pixel_used.append(int(torch.unique(torch.flatten(combined_list[i])).shape[0]))
return combined_list, layer_type_list, pixel_used

View file

@ -0,0 +1,40 @@
import logging
import datetime
import os
def create_logger(save_logging_messages: bool, display_logging_messages: bool, model_name: str | None):
now = datetime.datetime.now()
dt_string_filename = now.strftime("%Y_%m_%d_%H_%M_%S")
logger = logging.getLogger("MyLittleLogger")
logger.setLevel(logging.DEBUG)
if save_logging_messages:
if model_name:
filename = os.path.join(
"logs", f"log_{dt_string_filename}_{model_name}.txt"
)
else:
filename = os.path.join("logs", f"log_{dt_string_filename}.txt")
time_format = "%b %-d %Y %H:%M:%S"
logformat = "%(asctime)s %(message)s"
file_formatter = logging.Formatter(fmt=logformat, datefmt=time_format)
os.makedirs("logs", exist_ok=True)
file_handler = logging.FileHandler(filename)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
if display_logging_messages:
time_format = "%H:%M:%S"
logformat = "%(asctime)s %(message)s"
stream_formatter = logging.Formatter(fmt=logformat, datefmt=time_format)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(stream_formatter)
logger.addHandler(stream_handler)
return logger

View file

@ -0,0 +1,39 @@
from scipy.stats import fisher_exact
def fisher_excat_upper(
correct_pattern_count: int, number_of_pattern: int, p_threshold: float = 5.0 / 100.0
) -> float:
error_pattern_count = int(number_of_pattern - correct_pattern_count)
bound = 100.0
for u in range(0, correct_pattern_count):
z = int(error_pattern_count + u)
_, pvalue = fisher_exact(
[[correct_pattern_count, error_pattern_count], [number_of_pattern - z, z]],
alternative="greater",
)
if bool(pvalue > p_threshold) is False:
bound = u * 100.0 / number_of_pattern
break
return bound
def fisher_excat_lower(
correct_pattern_count: int, number_of_pattern: int, p_threshold: float = 5.0 / 100.0
) -> float:
error_pattern_count = int(number_of_pattern - correct_pattern_count)
bound = 0.0
for u in range(0, error_pattern_count):
z = int(error_pattern_count - u)
_, pvalue = fisher_exact(
[[correct_pattern_count, error_pattern_count], [number_of_pattern - z, z]],
alternative="less",
)
if bool(pvalue > p_threshold) is False:
bound = u * 100.0 / number_of_pattern
break
return bound

View file

@ -0,0 +1,114 @@
import torch
import numpy as np
def make_cnn(
conv_out_channels_list: list[int],
conv_kernel_size: list[int],
conv_stride_size: int,
conv_activation_function: str,
train_conv_0: bool,
logger,
conv_0_kernel_size: int,
mp_1_kernel_size: int,
mp_1_stride: int,
pooling_type: str,
conv_0_enable_softmax: bool,
l_relu_negative_slope: float,
) -> torch.nn.Sequential:
assert len(conv_out_channels_list) >= 1
assert len(conv_out_channels_list) == len(conv_kernel_size) + 1
cnn = torch.nn.Sequential()
# Fixed structure
cnn.append(
torch.nn.Conv2d(
in_channels=1,
out_channels=conv_out_channels_list[0] if train_conv_0 else 32,
kernel_size=conv_0_kernel_size,
stride=1,
bias=train_conv_0,
)
)
if conv_0_enable_softmax:
cnn.append(torch.nn.Softmax(dim=1))
setting_understood: bool = False
if conv_activation_function.upper() == str("relu").upper():
cnn.append(torch.nn.ReLU())
setting_understood = True
elif conv_activation_function.upper() == str("leaky relu").upper():
cnn.append(torch.nn.LeakyReLU(negative_slope=l_relu_negative_slope))
setting_understood = True
elif conv_activation_function.upper() == str("tanh").upper():
cnn.append(torch.nn.Tanh())
setting_understood = True
elif conv_activation_function.upper() == str("none").upper():
setting_understood = True
assert setting_understood
setting_understood = False
if pooling_type.upper() == str("max").upper():
cnn.append(torch.nn.MaxPool2d(kernel_size=mp_1_kernel_size, stride=mp_1_stride))
setting_understood = True
elif pooling_type.upper() == str("average").upper():
cnn.append(torch.nn.AvgPool2d(kernel_size=mp_1_kernel_size, stride=mp_1_stride))
setting_understood = True
elif pooling_type.upper() == str("none").upper():
setting_understood = True
assert setting_understood
# Changing structure
for i in range(1, len(conv_out_channels_list)):
if i == 1 and not train_conv_0:
in_channels = 32
else:
in_channels = conv_out_channels_list[i - 1]
cnn.append(
torch.nn.Conv2d(
in_channels=in_channels,
out_channels=conv_out_channels_list[i],
kernel_size=conv_kernel_size[i - 1],
stride=conv_stride_size,
bias=True,
)
)
setting_understood = False
if conv_activation_function.upper() == str("relu").upper():
cnn.append(torch.nn.ReLU())
setting_understood = True
elif conv_activation_function.upper() == str("leaky relu").upper():
cnn.append(torch.nn.LeakyReLU(negative_slope=l_relu_negative_slope))
setting_understood = True
elif conv_activation_function.upper() == str("tanh").upper():
cnn.append(torch.nn.Tanh())
setting_understood = True
elif conv_activation_function.upper() == str("none").upper():
setting_understood = True
assert setting_understood
# Fixed structure
# define fully connected layer:
cnn.append(torch.nn.Flatten(start_dim=1))
cnn.append(torch.nn.LazyLinear(2, bias=True))
# if conv1 not trained:
filename_load_weight_0: str | None = None
if train_conv_0 is False and cnn[0]._parameters["weight"].shape[0] == 32:
filename_load_weight_0 = "weights_radius10.npy"
if train_conv_0 is False and cnn[0]._parameters["weight"].shape[0] == 16:
filename_load_weight_0 = "8orient_2phase_weights.npy"
if filename_load_weight_0 is not None:
logger.info(f"Replace weights in CNN 0 with {filename_load_weight_0}")
cnn[0]._parameters["weight"] = torch.tensor(
np.load(filename_load_weight_0),
dtype=cnn[0]._parameters["weight"].dtype,
requires_grad=False,
device=cnn[0]._parameters["weight"].device,
)
return cnn

View file

@ -0,0 +1,82 @@
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
import os
import re
mpl.rcParams["text.usetex"] = True
mpl.rcParams["font.family"] = "serif"
def plot_intermediate(
train_accuracy: list[float],
test_accuracy: list[float],
train_losses: list[float],
test_losses: list[float],
save_name: str,
reduction_factor: int = 1,
) -> None:
assert len(train_accuracy) == len(test_accuracy)
assert len(train_accuracy) == len(train_losses)
assert len(train_accuracy) == len(test_losses)
# legend:
pattern = r"(outChannels\[\d+(?:, \d+)*\]_kernelSize\[\d+(?:, \d+)*\]_)(\w+)(?=_stride)"
matches = re.findall(pattern, save_name)
legend_label = "".join(["".join(match) for match in matches])
max_epochs: int = len(train_accuracy)
# set stepsize
x = np.arange(1, max_epochs + 1)
stepsize = max_epochs // reduction_factor
# accuracies
plt.figure(figsize=[12, 7])
plt.subplot(2, 1, 1)
plt.plot(x, np.array(train_accuracy), label="Train: " + str(legend_label))
plt.plot(x, np.array(test_accuracy), label="Test: " + str(legend_label))
plt.title("Training and Testing Accuracy", fontsize=18)
plt.xlabel("Epoch", fontsize=18)
plt.ylabel("Accuracy (\\%)", fontsize=18)
plt.legend(fontsize=12)
plt.xticks(
np.concatenate((np.array([1]), np.arange(stepsize, max_epochs + 1, stepsize))),
np.concatenate((np.array([1]), np.arange(stepsize, max_epochs + 1, stepsize))),
)
# Increase tick label font size
plt.xticks(fontsize=16)
plt.yticks(fontsize=16)
plt.grid(True)
# losses
plt.subplot(2, 1, 2)
plt.plot(x, np.array(train_losses), label="Train: " + str(legend_label))
plt.plot(x, np.array(test_losses), label="Test: " + str(legend_label))
plt.title("Training and Testing Losses", fontsize=18)
plt.xlabel("Epoch", fontsize=18)
plt.ylabel("Loss", fontsize=18)
plt.legend(fontsize=14)
plt.xticks(
np.concatenate((np.array([1]), np.arange(stepsize, max_epochs + 1, stepsize))),
np.concatenate((np.array([1]), np.arange(stepsize, max_epochs + 1, stepsize))),
)
# Increase tick label font size
plt.xticks(fontsize=16)
plt.yticks(fontsize=16)
plt.grid(True)
plt.tight_layout()
os.makedirs("performance_plots", exist_ok=True)
plt.savefig(
os.path.join(
"performance_plots",
f"performance_{save_name}.pdf",
),
dpi=300,
bbox_inches="tight",
)
plt.show()

View file

@ -0,0 +1,11 @@
import torch
import numpy as np
def set_seed(seed: int, logger) -> None:
# set seed for all used modules
logger.info(f"set seed to {seed}")
torch.manual_seed(seed=seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed=seed)
np.random.seed(seed=seed)

View file

@ -0,0 +1,58 @@
import torch
import logging
@torch.no_grad()
def test(
model: torch.nn.modules.container.Sequential,
loader: torch.utils.data.dataloader.DataLoader,
device: torch.device,
tb,
epoch: int,
logger: logging.Logger,
test_accuracy: list[float],
test_losses: list[float],
scale_data: float,
) -> float:
test_loss: float = 0.0
correct: int = 0
pattern_count: float = 0.0
model.eval()
for data in loader:
label = data[0].to(device)
image = data[1].type(dtype=torch.float32).to(device)
if scale_data > 0:
image /= scale_data
output = model(image)
# loss and optimization
loss = torch.nn.functional.cross_entropy(output, label, reduction="sum")
pattern_count += float(label.shape[0])
test_loss += loss.item()
prediction = output.argmax(dim=1)
correct += prediction.eq(label).sum().item()
logger.info(
(
"Test set:"
f" Average loss: {test_loss / pattern_count:.3e},"
f" Accuracy: {correct}/{pattern_count},"
f"({100.0 * correct / pattern_count:.2f}%)"
)
)
logger.info("")
acc = 100.0 * correct / pattern_count
test_losses.append(test_loss / pattern_count)
test_accuracy.append(acc)
# add to tb:
tb.add_scalar("Test Loss", (test_loss / pattern_count), epoch)
tb.add_scalar("Test Performance", 100.0 * correct / pattern_count, epoch)
tb.add_scalar("Test Number Correct", correct, epoch)
tb.flush()
return acc

View file

@ -0,0 +1,80 @@
import torch
import logging
def train(
model: torch.nn.modules.container.Sequential,
loader: torch.utils.data.dataloader.DataLoader,
optimizer: torch.optim.Adam | torch.optim.SGD,
epoch: int,
device: torch.device,
tb,
test_acc,
logger: logging.Logger,
train_accuracy: list[float],
train_losses: list[float],
train_loss: list[float],
scale_data: float,
) -> float:
num_train_pattern: int = 0
running_loss: float = 0.0
correct: int = 0
pattern_count: float = 0.0
model.train()
for data in loader:
label = data[0].to(device)
image = data[1].type(dtype=torch.float32).to(device)
if scale_data > 0:
image /= scale_data
optimizer.zero_grad()
output = model(image)
loss = torch.nn.functional.cross_entropy(output, label, reduction="sum")
loss.backward()
optimizer.step()
# for loss and accuracy plotting:
num_train_pattern += int(label.shape[0])
pattern_count += float(label.shape[0])
running_loss += float(loss)
train_loss.append(float(loss))
prediction = output.argmax(dim=1)
correct += prediction.eq(label).sum().item()
total_number_of_pattern: int = int(len(loader)) * int(label.shape[0])
# infos:
logger.info(
(
"Train Epoch:"
f" {epoch}"
f" [{int(pattern_count)}/{total_number_of_pattern}"
f" ({100.0 * pattern_count / total_number_of_pattern:.2f}%)],"
f" Loss: {float(running_loss) / float(num_train_pattern):.4e},"
f" Acc: {(100.0 * correct / num_train_pattern):.2f}"
f" Test Acc: {test_acc:.2f}%,"
f" LR: {optimizer.param_groups[0]['lr']:.2e}"
)
)
acc = 100.0 * correct / num_train_pattern
train_accuracy.append(acc)
epoch_loss = running_loss / pattern_count
train_losses.append(epoch_loss)
# add to tb:
tb.add_scalar("Train Loss", loss.item(), epoch)
tb.add_scalar("Train Performance", torch.tensor(acc), epoch)
tb.add_scalar("Train Number Correct", torch.tensor(correct), epoch)
# for parameters:
for name, param in model.named_parameters():
if "weight" in name or "bias" in name:
tb.add_histogram(f"{name}", param.data.clone(), epoch)
tb.flush()
return epoch_loss

View file

@ -0,0 +1,164 @@
import torch
import matplotlib.pyplot as plt
import matplotlib.patches as patch
import matplotlib as mpl
mpl.rcParams["text.usetex"] = True
mpl.rcParams["font.family"] = "serif"
import os
import sys
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.append(parent_dir)
from functions.analyse_network import analyse_network
# define parameters
num_iterations: int = 100000
learning_rate: float = 0.1
apply_input_mask: bool = True
mark_region_in_plot: bool = False
sheduler_patience: int = 500
sheduler_factor: float = 0.9
sheduler_eps = 1e-08
target_image_active: float = 1e4
# path to NN
nn = "ArghCNN_numConvLayers3_outChannels[6, 8, 8]_kernelSize[7, 15]_leaky relu_stride1_trainFirstConvLayerTrue_seed287302_Coignless_801Epoch_2807-0857"
PATH = f"./trained_models/{nn}.pt"
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# load and eval model
model = torch.load(PATH).to(device)
model.eval()
print("Full network:")
print(model)
print("")
# enter index to plot:
idx = int(input("Please select layer: "))
print(f"Selected layer {idx}:")
assert idx < len(model)
model = model[: idx + 1]
# random input
input_img = torch.randn(1, 200, 200).to(device)
input_img = input_img.unsqueeze(0)
input_img.requires_grad_(True) # type: ignore
input_shape = input_img.shape
assert input_shape[-2] == input_shape[-1]
coordinate_list, layer_type_list, pixel_used = analyse_network(
model=model, input_shape=int(input_shape[-1])
)
output_shape = model(input_img).shape
target_image = torch.zeros(
(*output_shape,), dtype=input_img.dtype, device=input_img.device
)
input_parameter = torch.nn.Parameter(input_img)
print(
(
f"Available max positions: f:{target_image.shape[1]} "
f"x:{target_image.shape[2]} y:{target_image.shape[3]}"
)
)
# select neuron and plot for all feature maps (?)
neuron_f = int(input("Please select neuron_f: "))
neuron_x = target_image.shape[2] // 2
neuron_y = target_image.shape[3] // 2
print(f"Selected neuron {neuron_f}, {neuron_x}, {neuron_y}")
# Input mask ->
active_input_x = coordinate_list[-1][:, neuron_x].clone()
active_input_y = coordinate_list[-1][:, neuron_y].clone()
input_mask: torch.Tensor = torch.zeros_like(input_img)
input_mask[
:,
:,
active_input_x.type(torch.int64).unsqueeze(-1),
active_input_y.type(torch.int64).unsqueeze(0),
] = 1
rect_x = [int(active_input_x.min()), int(active_input_x.max())]
rect_y = [int(active_input_y.min()), int(active_input_y.max())]
# <- Input mask
if apply_input_mask:
with torch.no_grad():
input_img *= input_mask
optimizer = torch.optim.Adam([{"params": input_parameter}], lr=learning_rate)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer,
patience=sheduler_patience,
factor=sheduler_factor,
eps=sheduler_eps * 0.1,
)
target_image[0, neuron_f, neuron_x, neuron_y] = target_image_active
counter: int = 0
while (optimizer.param_groups[0]["lr"] > sheduler_eps) and (counter < num_iterations):
optimizer.zero_grad()
output = model(input_parameter)
loss = torch.nn.functional.mse_loss(output, target_image)
loss.backward()
if counter % 1000 == 0:
print(
f"{counter} : loss={float(loss):.3e} lr={optimizer.param_groups[0]['lr']:.3e}"
)
optimizer.step()
if apply_input_mask:
with torch.no_grad():
input_parameter.data[torch.where(input_mask == 0)] = 0.0
with torch.no_grad():
max_data = torch.abs(input_parameter.data).max()
if max_data > 1.0:
input_parameter.data /= max_data
if (
torch.isfinite(input_parameter.data).sum().cpu()
!= torch.tensor(input_parameter.data.size()).prod()
):
print(f"Found NaN in step: {counter}, use a smaller initial lr")
exit()
scheduler.step(float(loss))
counter += 1
# plot image:
_, ax = plt.subplots()
ax.imshow(input_img.squeeze().detach().cpu().numpy(), cmap="gray")
if mark_region_in_plot:
edgecolor = "sienna"
kernel = patch.Rectangle(
(rect_y[0], rect_x[0]),
int(rect_y[1] - rect_y[0]),
int(rect_x[1] - rect_x[0]),
linewidth=1.2,
edgecolor=edgecolor,
facecolor="none",
)
ax.add_patch(kernel)
plt.show(block=True)

View file

@ -0,0 +1,182 @@
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
import matplotlib as mpl
mpl.rcParams["text.usetex"] = True
mpl.rcParams["font.family"] = "serif"
def plot_weights(
plot,
s,
grid_color,
linewidth,
idx,
smallDim,
swap_channels,
activations,
layer,
title,
colorbar,
vmin,
vmax,
):
plt.imshow(plot.T, cmap="gray", origin="lower", vmin=vmin, vmax=vmax)
ax = plt.gca()
a = np.arange(0, plot.shape[1] + 1, s[3])
b = np.arange(0, plot.shape[0] + 1, s[1])
plt.hlines(a - 0.5, -0.5, plot.shape[0] - 0.5, colors=grid_color, lw=linewidth)
plt.vlines(b - 0.5, -0.5, plot.shape[1] - 0.5, colors=grid_color, lw=linewidth)
plt.ylim(-1, plot.shape[1])
plt.xlim(-1, plot.shape[0])
ax.set_xticks(s[1] / 2 + np.arange(-0.5, plot.shape[0] - 1, s[1]))
ax.set_yticks(s[3] / 2 + np.arange(-0.5, plot.shape[1] - 1, s[3]))
if (
idx is not None
and (smallDim is False and swap_channels is False)
or (activations is True)
):
ax.set_xticklabels(idx, fontsize=15)
ax.set_yticklabels(np.arange(s[2]), fontsize=15)
elif idx is not None and layer == "FC1":
ax.set_xticklabels(np.arange(s[0]), fontsize=15)
ax.set_yticklabels(idx, fontsize=15)
elif idx is not None and (smallDim is True or swap_channels is True):
ax.set_xticklabels(np.arange(s[0]), fontsize=15)
ax.set_yticklabels(idx, fontsize=15)
else:
ax.set_xticklabels(np.arange(s[0]), fontsize=15)
ax.set_yticklabels(np.arange(s[2]), fontsize=15)
ax.invert_yaxis()
ax.xaxis.set_label_position("top")
ax.tick_params(axis="x", top=True, bottom=False, labeltop=True, labelbottom=False)
if title is not None:
is_string = isinstance(title, str)
if is_string is True:
plt.title(title)
if colorbar is True:
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size="1.5%", pad=0.05)
cbar = plt.colorbar(ax.get_images()[0], cax=cax)
tick_font_size = 14
cbar.ax.tick_params(labelsize=tick_font_size)
def plot_in_grid(
plot,
fig_size=(10, 10),
swap_channels=False,
title=None,
idx=None,
colorbar=False,
vmin=None,
vmax=None,
grid_color="k",
linewidth=0.75,
savetitle=None,
activations=False,
layer=None,
format="pdf",
bias=None,
plot_bias: bool = False,
):
smallDim = False
if plot.ndim < 4:
smallDim = True
plot = np.swapaxes(plot, 0, 1)
plot = plot[:, :, np.newaxis, np.newaxis]
if vmin is None and vmax is None:
# plot_abs = np.amax(np.abs(plot))
vmin = -(np.amax(np.abs(plot)))
vmax = np.amax(np.abs(plot))
if swap_channels is True:
plot = np.swapaxes(plot, 0, 1)
# print(plot.shape)
plot = np.ascontiguousarray(np.moveaxis(plot, 1, 2))
for j in range(plot.shape[2]):
for i in range(plot.shape[0]):
plot[(i - 1), :, (j - 1), :] = plot[(i - 1), :, (j - 1), :].T
s = plot.shape
plot = plot.reshape((s[0] * s[1], s[2] * s[3]))
plt.figure(figsize=fig_size)
if plot_bias and bias is not None:
if swap_channels:
# If axes are swapped, arrange the plots side by side
plt.subplot(1, 2, 1)
plot_weights(
plot=plot,
s=s,
grid_color=grid_color,
linewidth=linewidth,
idx=idx,
smallDim=smallDim,
swap_channels=swap_channels,
activations=activations,
layer=layer,
title=title,
colorbar=colorbar,
vmin=vmin,
vmax=vmax,
)
plt.subplot(1, 2, 2)
plt.plot(bias, np.arange(len(bias)))
plt.ylim(len(bias) - 1, 0)
plt.title("Bias")
plt.tight_layout()
else:
plt.subplot(2, 1, 1)
plot_weights(
plot=plot,
s=s,
grid_color=grid_color,
linewidth=linewidth,
idx=idx,
smallDim=smallDim,
swap_channels=swap_channels,
activations=activations,
layer=layer,
title=title,
colorbar=colorbar,
vmin=vmin,
vmax=vmax,
)
plt.subplot(2, 1, 2)
plt.plot(np.arange(len(bias)), bias)
plt.title("Bias")
else:
plot_weights(
plot=plot,
s=s,
grid_color=grid_color,
linewidth=linewidth,
idx=idx,
smallDim=smallDim,
swap_channels=swap_channels,
activations=activations,
layer=layer,
title=title,
colorbar=colorbar,
vmin=vmin,
vmax=vmax,
)
if savetitle is not None:
plt.savefig(f"plot_as_grid/{savetitle}.{format}")
plt.tight_layout()
plt.show(block=True)

View file

@ -0,0 +1,72 @@
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
import os
import glob
from natsort import natsorted
mpl.rcParams["text.usetex"] = True
mpl.rcParams["font.family"] = "serif"
def plot_performance_across_channels(
filename_list: list[str], channel_idx: int, saveplot: bool
) -> None:
"""
y-axis: accuracies
x-axis: number of output channels in first layer
"""
train_accuracy: list = []
test_accuracy: list = []
output_channels: list = []
for file in filename_list:
data = np.load(file)
output_channels.append(data["output_channels"])
train_accuracy.append(data["train_accuracy"])
test_accuracy.append(data["test_accuracy"])
# get only first output channel:
out_channel_size = [out[channel_idx] for out in output_channels]
# get max accuracy of trained NNs
max_train_acc = [train.max() for train in train_accuracy]
max_test_acc = [test.max() for test in test_accuracy]
plt.figure(figsize=[12, 7])
plt.plot(out_channel_size, np.array(max_train_acc), label="Train")
plt.plot(out_channel_size, np.array(max_test_acc), label="Test")
plt.title("Training and Testing Accuracy", fontsize=18)
plt.xlabel(
f"Number of features in convolutional layer {channel_idx+1}", fontsize=18
)
plt.ylabel("Max. accuracy (\\%)", fontsize=18)
plt.legend(fontsize=14)
# Increase tick label font size
plt.xticks(out_channel_size, fontsize=16)
plt.yticks(fontsize=16)
plt.grid(True)
plt.tight_layout()
if saveplot:
os.makedirs("performance_plots", exist_ok=True)
plt.savefig(
os.path.join(
"performance_plots",
f"feature_perf_{output_channels}.pdf",
),
dpi=300,
bbox_inches="tight",
)
plt.show()
if __name__ == "__main__":
path: str = "/home/kk/Documents/Semester4/code/Classic_contour_net_shallow/performance_data/"
filename_list = natsorted(glob.glob(os.path.join(path, "performances_*.npz")))
plot_performance_across_channels(
filename_list=filename_list, channel_idx=0, saveplot=True
)

View file

@ -0,0 +1,49 @@
import torch
from plot_as_grid import plot_in_grid
import os
import sys
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.append(parent_dir)
from functions.make_cnn import make_cnn # noqa
# load on cpu
device = torch.device("cpu")
# path to NN
nn = "ArghCNN_numConvLayers3_outChannels[6, 8, 8]_kernelSize[7, 15]_leaky relu_stride1_trainFirstConvLayerTrue_seed287302_Coignless_801Epoch_2807-0857.pt"
PATH = f"../trained_models/{nn}"
# load and evaluate model
model = torch.load(PATH).to(device)
model.eval()
print("Full network:")
print(model)
print("")
# enter index to plot:
idx = int(input("Please select layer: "))
print(f"Selected layer {idx}:")
print(model[idx])
# bias
bias_input = input("Plot bias (y/n): ")
plot_bias: bool = False
if bias_input == "y":
plot_bias = True
bias = model[idx]._parameters["bias"].data
print(bias)
else:
bias = None
# visualize weights:
if idx > 0:
weights = model[idx].weight.cpu().detach().clone().numpy()
plot_in_grid(
weights, colorbar=True, swap_channels=True, bias=bias, plot_bias=plot_bias
)
else:
weights = model[idx].weight.cpu().detach().clone().numpy()
plot_in_grid(weights, colorbar=True, bias=bias, plot_bias=plot_bias)

View file

@ -0,0 +1,47 @@
import torch
import torchvision as tv
import matplotlib.pyplot as plt
import os
import glob
from natsort import natsorted
# import numpy as np
layer_id: int = 0
scale_each: bool = False
model_path: str = "trained_models"
filename_list: list = natsorted(glob.glob(os.path.join(model_path, str("*.pt"))))
assert len(filename_list) > 0
model_filename: str = filename_list[-1]
print(f"Load filename: {model_filename}")
model = torch.load(model_filename, map_location=torch.device("cpu"))
assert layer_id < len(model)
# ---
weights = model[layer_id]._parameters["weight"].data
bias = model[layer_id]._parameters["bias"].data
weight_grid = tv.utils.make_grid(
weights, nrow=8, padding=2, scale_each=scale_each, pad_value=float("NaN")
)
v_max_abs = torch.abs(weight_grid[0, ...]).max()
plt.subplot(3, 1, (1, 2))
plt.imshow(
weight_grid[0, ...],
vmin=-v_max_abs,
vmax=v_max_abs,
cmap="cool",
)
plt.axis("off")
plt.colorbar()
plt.title("Weights")
plt.subplot(3, 1, 3)
plt.plot(bias)
plt.title("Bias")
plt.show()

View file

@ -0,0 +1,62 @@
import torch
import torchvision as tv
import matplotlib.pyplot as plt
import os
import glob
from natsort import natsorted
# import numpy as np
layer_id: int = 3
scale_each_inner: bool = False
scale_each_outer: bool = False
model_path: str = "trained_models"
filename_list: list = natsorted(glob.glob(os.path.join(model_path, str("*.pt"))))
assert len(filename_list) > 0
model_filename: str = filename_list[-1]
print(f"Load filename: {model_filename}")
model = torch.load(model_filename, map_location=torch.device("cpu"))
assert layer_id < len(model)
print("Full network:")
print(model)
print("")
print(f"Selected layer {layer_id}:")
print(model[layer_id])
# ---
weights = model[layer_id]._parameters["weight"].data
bias = model[layer_id]._parameters["bias"].data
weight_grid = tv.utils.make_grid(
weights.movedim(0, 1),
nrow=8,
padding=2,
scale_each=scale_each_inner,
pad_value=float("NaN"),
)
weight_grid = tv.utils.make_grid(
weight_grid.unsqueeze(1), nrow=4, padding=2, scale_each=scale_each_outer
)
v_max_abs = torch.abs(weight_grid[0, ...]).max()
plt.subplot(3, 1, (1, 2))
plt.imshow(
weight_grid[0, ...],
vmin=-v_max_abs,
vmax=v_max_abs,
cmap="cool",
)
plt.axis("off")
plt.colorbar()
plt.title("Weights")
plt.subplot(3, 1, 3)
plt.plot(bias)
plt.title("Bias")
plt.show()

View file

@ -0,0 +1,223 @@
import torch
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
import os
import datetime
import re
import glob
from natsort import natsorted
mpl.rcParams["text.usetex"] = True
mpl.rcParams["font.family"] = "serif"
from functions.alicorn_data_loader import alicorn_data_loader
from functions.create_logger import create_logger
def performance_pfinkel_plot(
performances_list: list[dict], labels: list[str], save_name: str, logger
) -> None:
figure_path: str = "performance_pfinkel"
assert len(performances_list) == len(labels)
plt.figure(figsize=[14, 10])
# plot accuracy
plt.subplot(2, 1, 1)
for id in range(0, len(labels)):
x_values = np.zeros((len(performances_list[id].keys())))
y_values = np.zeros((len(performances_list[id].keys())))
counter = 0
for id_key in performances_list[id].keys():
x_values[counter] = performances_list[id][id_key]["pfinkel"]
y_values[counter] = performances_list[id][id_key]["test_accuracy"]
counter += 1
plt.plot(x_values, y_values, label=labels[id])
plt.xticks(x_values)
plt.title("Average accuracy", fontsize=18)
plt.xlabel("Path angle (in °)", fontsize=17)
plt.ylabel("Accuracy (\\%)", fontsize=17)
plt.legend(fontsize=14)
# Increase tick label font size
plt.xticks(fontsize=16)
plt.yticks(fontsize=16)
plt.grid(True)
# plot loss
plt.subplot(2, 1, 2)
for id in range(0, len(labels)):
x_values = np.zeros((len(performances_list[id].keys())))
y_values = np.zeros((len(performances_list[id].keys())))
counter = 0
for id_key in performances_list[id].keys():
x_values[counter] = performances_list[id][id_key]["pfinkel"]
y_values[counter] = performances_list[id][id_key]["test_losses"]
counter += 1
plt.plot(x_values, y_values, label=labels[id])
plt.xticks(x_values)
plt.title("Average loss", fontsize=18)
plt.xlabel("Path angle (in °)", fontsize=17)
plt.ylabel("Loss", fontsize=17)
plt.legend(fontsize=14)
# Increase tick label font size
plt.xticks(fontsize=16)
plt.yticks(fontsize=16)
plt.grid(True)
plt.tight_layout()
logger.info("")
logger.info("Saved in:")
os.makedirs(figure_path, exist_ok=True)
print(
os.path.join(
figure_path,
f"PerformancePfinkel_{save_name}_{current}.pdf",
)
)
plt.savefig(
os.path.join(
figure_path,
f"PerformancePfinkel_{save_name}_{current}.pdf",
),
dpi=300,
bbox_inches="tight",
)
plt.show()
if __name__ == "__main__":
model_path: str = "trained_models"
data_path: str = "/home/kk/Documents/Semester4/code/RenderStimuli/Output/"
selection_file_id: int = 0
# num stimuli per Pfinkel and batch size
stim_per_pfinkel: int = 10000
batch_size: int = 1000
# stimulus condition:
performances_list: list = []
condition: list[str] = ["Coignless", "Natural", "Angular"]
figure_label: list[str] = ["Classic", "Corner", "Bridge"]
# load test data:
num_pfinkel: list = np.arange(0, 100, 10).tolist()
image_scale: float = 255.0
# ------------------------------------------
# create logger:
logger = create_logger(
save_logging_messages=False,
display_logging_messages=True,
model_name="outChannels[6, 8, 8]"
)
device_str: str = "cuda:0" if torch.cuda.is_available() else "cpu"
logger.info(f"Using {device_str} device")
device: torch.device = torch.device(device_str)
torch.set_default_dtype(torch.float32)
# current time:
current = datetime.datetime.now().strftime("%d%m-%H%M")
# path to NN
list_filenames: list[str] = natsorted(
list(glob.glob(os.path.join(model_path, "*.pt")))
)
assert selection_file_id < len(list_filenames)
# model_filename: str = str(list_filenames[selection_file_id])
model_filename: str = "./trained_models/ArghCNN_numConvLayers3_outChannels[6, 8, 8]_kernelSize[7, 15]_leaky relu_stride1_trainFirstConvLayerTrue_seed287302_Coignless_801Epoch_2807-0857.pt"
logger.info(f"Using model file: {model_filename}")
# shorter saving name:
pattern = r"(outChannels\[.*?\])|(kernelSize\[.*?\])|(_relu)|(_seed\d+)"
matches = re.findall(pattern, model_filename)
save_name = "".join(["".join(match) for match in matches])
# load and evaluate model
model = torch.load(model_filename, map_location=device)
# Set the model to evaluation mode
model.eval()
for selected_condition in condition:
# save performances:
logger.info(f"Condition: {selected_condition}")
performances: dict = {}
for pfinkel in num_pfinkel:
test_loss: float = 0.0
correct: int = 0
pattern_count: int = 0
data_test = alicorn_data_loader(
num_pfinkel=[pfinkel],
load_stimuli_per_pfinkel=stim_per_pfinkel,
condition=selected_condition,
logger=logger,
data_path=data_path,
)
loader = torch.utils.data.DataLoader(
data_test, shuffle=False, batch_size=batch_size
)
# start testing network on new stimuli:
logger.info("")
logger.info(f"-==- Start {selected_condition} " f"Pfinkel {pfinkel}° -==-")
with torch.no_grad():
for batch_num, data in enumerate(loader):
label = data[0].to(device)
image = data[1].type(dtype=torch.float32).to(device)
image /= image_scale
# compute prediction error;
output = model(image)
# Label Typecast:
label = label.to(device)
# loss and optimization
loss = torch.nn.functional.cross_entropy(
output, label, reduction="sum"
)
pattern_count += int(label.shape[0])
test_loss += float(loss)
prediction = output.argmax(dim=1)
correct += prediction.eq(label).sum().item()
total_number_of_pattern: int = int(len(loader)) * int(
label.shape[0]
)
# logging:
logger.info(
(
f"{selected_condition},{pfinkel}° "
"Pfinkel: "
f"[{int(pattern_count)}/{total_number_of_pattern} ({100.0 * pattern_count / total_number_of_pattern:.2f}%)],"
f" Average loss: {test_loss / pattern_count:.3e}, "
"Accuracy: "
f"{100.0 * correct / pattern_count:.2f}% "
)
)
performances[pfinkel] = {
"pfinkel": pfinkel,
"test_accuracy": 100 * correct / pattern_count,
"test_losses": float(loss) / pattern_count,
}
performances_list.append(performances)
performance_pfinkel_plot(
performances_list=performances_list,
labels=figure_label,
save_name=save_name,
logger=logger,
)
logger.info("-==- DONE -==-")

View file

@ -0,0 +1,81 @@
import numpy as np
import glob
import os
from natsort import natsorted
from functions.fisher_exact import fisher_excat_upper, fisher_excat_lower
import matplotlib.pyplot as plt
import matplotlib as mpl
mpl.rcParams["text.usetex"] = True
mpl.rcParams["font.family"] = "serif"
p_threshold: float = 1.0 / 100.0
file_selection: int = 0
number_of_pattern: int = 60000
path: str = "performance_data"
data_source: str = "test_accuracy"
filename_list = natsorted(glob.glob(os.path.join(path, "performances_*.npz")))
assert file_selection < len(filename_list)
filename = filename_list[file_selection]
data = np.load(filename)
understand_parameter: bool = False
percentage: bool = True
if data_source.upper() == str("test_accuracy").upper():
to_print = data["test_accuracy"]
understand_parameter = True
percentage = True
elif data_source.upper() == str("train_accuracy").upper():
to_print = data["train_accuracy"]
understand_parameter = True
percentage = True
elif data_source.upper() == str("train_losses").upper():
to_print = data["train_losses"]
understand_parameter = True
percentage = False
elif data_source.upper() == str("test_losses").upper():
to_print = data["test_losses"]
understand_parameter = True
percentage = False
assert understand_parameter
if percentage:
correct_count = np.round(to_print * number_of_pattern / 100.0).astype(np.int64)
upper = np.zeros((correct_count.shape[0]))
lower = np.zeros((correct_count.shape[0]))
for id in range(0, correct_count.shape[0]):
upper[id] = fisher_excat_upper(
correct_pattern_count=correct_count[id],
number_of_pattern=number_of_pattern,
p_threshold=p_threshold,
)
lower[id] = fisher_excat_lower(
correct_pattern_count=correct_count[id],
number_of_pattern=number_of_pattern,
p_threshold=p_threshold,
)
x = np.arange(1, to_print.shape[0] + 1)
y = 100.0 - to_print
plt.plot(x, y + upper, "k--")
plt.plot(x, y - lower, "k--")
plt.plot(x, y, "r")
plt.ylim([0, (100.0 - to_print.min()) * 1.1])
plt.xlim([1, to_print.shape[0]])
plt.xlabel("Epochs")
plt.ylabel("Error [%]")
plt.title(data_source.replace("_", " "))
else:
x = np.arange(1, to_print.shape[0] + 1)
plt.plot(x, to_print)
plt.ylim([0, to_print.max() * 1.1])
plt.xlim([1, to_print.shape[0]])
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.title(data_source.replace("_", " "))
plt.show()