diff --git a/functions/alicorn_data_loader.py b/functions/alicorn_data_loader.py index 71fb6db..ad62205 100644 --- a/functions/alicorn_data_loader.py +++ b/functions/alicorn_data_loader.py @@ -1,107 +1,107 @@ -import torch -import numpy as np -import os - - -@torch.no_grad() -def alicorn_data_loader( - num_pfinkel: list[int] | None, - load_stimuli_per_pfinkel: int, - condition: str, - data_path: str, - logger=None, -) -> torch.utils.data.TensorDataset: - """ - - num_pfinkel: list of the angles that should be loaded (ranging from - 0-90). If None: all pfinkels loaded - - stimuli_per_pfinkel: defines amount of stimuli per path angle but - for label 0 and label 1 seperatly (e.g., stimuli_per_pfinkel = 1000: - 1000 stimuli = label 1, 1000 stimuli = label 0) - """ - filename: str | None = None - if condition == "Angular": - filename = "angular_angle" - elif condition == "Coignless": - filename = "base_angle" - elif condition == "Natural": - filename = "corner_angle" - else: - filename = None - assert filename is not None - filepaths: str = os.path.join(data_path, f"{condition}") - - stimuli_per_pfinkel: int = 100000 - - # ---------------------------- - - # for angles and batches - if num_pfinkel is None: - angle: list[int] = np.arange(0, 100, 10).tolist() - else: - angle = num_pfinkel - - assert isinstance(angle, list) - - batch: list[int] = np.arange(1, 11, 1).tolist() - - if load_stimuli_per_pfinkel <= (stimuli_per_pfinkel // len(batch)): - num_img_per_pfinkel: int = load_stimuli_per_pfinkel - num_batches: int = 1 - else: - # handle case where more than 10,000 stimuli per pfinkel needed - num_batches = load_stimuli_per_pfinkel // (stimuli_per_pfinkel // len(batch)) - num_img_per_pfinkel = load_stimuli_per_pfinkel // num_batches - - if logger is not None: - logger.info(f"{num_batches} batches") - logger.info(f"{num_img_per_pfinkel} stimuli per pfinkel.") - - # initialize data and label tensors: - num_stimuli: int = len(angle) * num_batches * num_img_per_pfinkel * 2 - data_tensor: torch.Tensor = torch.empty( - (num_stimuli, 200, 200), dtype=torch.uint8, device=torch.device("cpu") - ) - label_tensor: torch.Tensor = torch.empty( - (num_stimuli), dtype=torch.int64, device=torch.device("cpu") - ) - - if logger is not None: - logger.info(f"data tensor shape: {data_tensor.shape}") - logger.info(f"label tensor shape: {label_tensor.shape}") - - # append data - idx: int = 0 - for i in range(len(angle)): - for j in range(num_batches): - # load contour - temp_filename: str = ( - f"{filename}_{angle[i]:03}_b{batch[j]:03}_n10000_RENDERED.npz" - ) - contour_filename: str = os.path.join(filepaths, temp_filename) - c_data = np.load(contour_filename) - data_tensor[idx : idx + num_img_per_pfinkel, ...] = torch.tensor( - c_data["gaborfield"][:num_img_per_pfinkel, ...], - dtype=torch.uint8, - device=torch.device("cpu"), - ) - label_tensor[idx : idx + num_img_per_pfinkel] = int(1) - idx += num_img_per_pfinkel - - # next append distractor stimuli - for i in range(len(angle)): - for j in range(num_batches): - # load distractor - temp_filename = ( - f"{filename}_{angle[i]:03}_dist_b{batch[j]:03}_n10000_RENDERED.npz" - ) - distractor_filename: str = os.path.join(filepaths, temp_filename) - nc_data = np.load(distractor_filename) - data_tensor[idx : idx + num_img_per_pfinkel, ...] = torch.tensor( - nc_data["gaborfield"][:num_img_per_pfinkel, ...], - dtype=torch.uint8, - device=torch.device("cpu"), - ) - label_tensor[idx : idx + num_img_per_pfinkel] = int(0) - idx += num_img_per_pfinkel - - return torch.utils.data.TensorDataset(label_tensor, data_tensor.unsqueeze(1)) +import torch +import numpy as np +import os + + +@torch.no_grad() +def alicorn_data_loader( + num_pfinkel: list[int] | None, + load_stimuli_per_pfinkel: int, + condition: str, + logger, + data_path: str, +) -> torch.utils.data.TensorDataset: + """ + - num_pfinkel: list of the angles that should be loaded (ranging from + 0-90). If None: all pfinkels loaded + - stimuli_per_pfinkel: defines amount of stimuli per path angle but + for label 0 and label 1 seperatly (e.g., stimuli_per_pfinkel = 1000: + 1000 stimuli = label 1, 1000 stimuli = label 0) + """ + filename: str | None = None + if condition == "Angular": + filename = "angular_angle" + elif condition == "Coignless": + filename = "base_angle" + elif condition == "Natural": + filename = "corner_angle" + else: + filename = None + assert filename is not None + filepaths: str = os.path.join(data_path, f"{condition}") + + stimuli_per_pfinkel: int = 100000 + + # ---------------------------- + + # for angles and batches + if num_pfinkel is None: + angle: list[int] = np.arange(0, 100, 10).tolist() + else: + angle = num_pfinkel + + assert isinstance(angle, list) + + batch: list[int] = np.arange(1, 11, 1).tolist() + + if load_stimuli_per_pfinkel <= (stimuli_per_pfinkel // len(batch)): + num_img_per_pfinkel: int = load_stimuli_per_pfinkel + num_batches: int = 1 + else: + # handle case where more than 10,000 stimuli per pfinkel needed + num_batches = load_stimuli_per_pfinkel // (stimuli_per_pfinkel // len(batch)) + num_img_per_pfinkel = load_stimuli_per_pfinkel // num_batches + + if logger is not None: + logger.info(f"{num_batches} batches") + logger.info(f"{num_img_per_pfinkel} stimuli per pfinkel.") + + # initialize data and label tensors: + num_stimuli: int = len(angle) * num_batches * num_img_per_pfinkel * 2 + data_tensor: torch.Tensor = torch.empty( + (num_stimuli, 200, 200), dtype=torch.uint8, device=torch.device("cpu") + ) + label_tensor: torch.Tensor = torch.empty( + (num_stimuli), dtype=torch.int64, device=torch.device("cpu") + ) + + if logger is not None: + logger.info(f"data tensor shape: {data_tensor.shape}") + logger.info(f"label tensor shape: {label_tensor.shape}") + + # append data + idx: int = 0 + for i in range(len(angle)): + for j in range(num_batches): + # load contour + temp_filename: str = ( + f"{filename}_{angle[i]:03}_b{batch[j]:03}_n10000_RENDERED.npz" + ) + contour_filename: str = os.path.join(filepaths, temp_filename) + c_data = np.load(contour_filename) + data_tensor[idx : idx + num_img_per_pfinkel, ...] = torch.tensor( + c_data["gaborfield"][:num_img_per_pfinkel, ...], + dtype=torch.uint8, + device=torch.device("cpu"), + ) + label_tensor[idx : idx + num_img_per_pfinkel] = int(1) + idx += num_img_per_pfinkel + + # next append distractor stimuli + for i in range(len(angle)): + for j in range(num_batches): + # load distractor + temp_filename = ( + f"{filename}_{angle[i]:03}_dist_b{batch[j]:03}_n10000_RENDERED.npz" + ) + distractor_filename: str = os.path.join(filepaths, temp_filename) + nc_data = np.load(distractor_filename) + data_tensor[idx : idx + num_img_per_pfinkel, ...] = torch.tensor( + nc_data["gaborfield"][:num_img_per_pfinkel, ...], + dtype=torch.uint8, + device=torch.device("cpu"), + ) + label_tensor[idx : idx + num_img_per_pfinkel] = int(0) + idx += num_img_per_pfinkel + + return torch.utils.data.TensorDataset(label_tensor, data_tensor.unsqueeze(1)) diff --git a/functions/make_cnn.py b/functions/make_cnn.py index 3f84363..866b02c 100644 --- a/functions/make_cnn.py +++ b/functions/make_cnn.py @@ -32,9 +32,6 @@ def make_cnn( ) ) - if conv_0_enable_softmax: - cnn.append(torch.nn.Softmax(dim=1)) - setting_understood: bool = False if conv_activation_function.upper() == str("relu").upper(): cnn.append(torch.nn.ReLU()) @@ -60,6 +57,9 @@ def make_cnn( setting_understood = True assert setting_understood + if conv_0_enable_softmax: + cnn.append(torch.nn.Softmax(dim=1)) + # Changing structure for i in range(1, len(conv_out_channels_list)): if i == 1 and not train_conv_0: diff --git a/functions/test.py b/functions/test.py index 344630d..8da369f 100644 --- a/functions/test.py +++ b/functions/test.py @@ -1,58 +1,58 @@ -import torch -import logging - - -@torch.no_grad() -def test( - model: torch.nn.modules.container.Sequential, - loader: torch.utils.data.dataloader.DataLoader, - device: torch.device, - tb, - epoch: int, - logger: logging.Logger, - test_accuracy: list[float], - test_losses: list[float], - scale_data: float, -) -> float: - test_loss: float = 0.0 - correct: int = 0 - pattern_count: float = 0.0 - - model.eval() - - for data in loader: - label = data[0].to(device) - image = data[1].type(dtype=torch.float32).to(device) - if scale_data > 0: - image /= scale_data - - output = model(image) - - # loss and optimization - loss = torch.nn.functional.cross_entropy(output, label, reduction="sum") - pattern_count += float(label.shape[0]) - test_loss += loss.item() - prediction = output.argmax(dim=1) - correct += prediction.eq(label).sum().item() - - logger.info( - ( - "Test set:" - f" Average loss: {test_loss / pattern_count:.3e}," - f" Accuracy: {correct}/{pattern_count}," - f"({100.0 * correct / pattern_count:.2f}%)" - ) - ) - logger.info("") - - acc = 100.0 * correct / pattern_count - test_losses.append(test_loss / pattern_count) - test_accuracy.append(acc) - - # add to tb: - tb.add_scalar("Test Loss", (test_loss / pattern_count), epoch) - tb.add_scalar("Test Performance", 100.0 * correct / pattern_count, epoch) - tb.add_scalar("Test Number Correct", correct, epoch) - tb.flush() - - return acc +import torch +import logging + + +@torch.no_grad() +def test( + model: torch.nn.modules.container.Sequential, + loader: torch.utils.data.dataloader.DataLoader, + device: torch.device, + tb, + epoch: int, + logger: logging.Logger, + test_accuracy: list[float], + test_losses: list[float], + scale_data: float, +) -> float: + test_loss: float = 0.0 + correct: int = 0 + pattern_count: float = 0.0 + + model.eval() + + for data in loader: + label = data[0].to(device) + image = data[1].type(dtype=torch.float32).to(device) + if scale_data > 0: + image /= scale_data + + output = model(image) + + # loss and optimization + loss = torch.nn.functional.cross_entropy(output, label, reduction="sum") + pattern_count += float(label.shape[0]) + test_loss += loss.item() + prediction = output.argmax(dim=1) + correct += prediction.eq(label).sum().item() + + logger.info( + ( + "Test set:" + f" Average loss: {test_loss / pattern_count:.3e}," + f" Accuracy: {correct}/{pattern_count}," + f"({100.0 * correct / pattern_count:.2f}%)" + ) + ) + logger.info("") + + acc = 100.0 * correct / pattern_count + test_losses.append(test_loss / pattern_count) + test_accuracy.append(acc) + + # add to tb: + tb.add_scalar("Test Loss", (test_loss / pattern_count), epoch) + tb.add_scalar("Test Performance", 100.0 * correct / pattern_count, epoch) + tb.add_scalar("Test Number Correct", correct, epoch) + tb.flush() + + return acc diff --git a/functions/train.py b/functions/train.py index 6f13d84..7d49a90 100644 --- a/functions/train.py +++ b/functions/train.py @@ -1,80 +1,80 @@ -import torch -import logging - - -def train( - model: torch.nn.modules.container.Sequential, - loader: torch.utils.data.dataloader.DataLoader, - optimizer: torch.optim.Adam | torch.optim.SGD, - epoch: int, - device: torch.device, - tb, - test_acc, - logger: logging.Logger, - train_accuracy: list[float], - train_losses: list[float], - train_loss: list[float], - scale_data: float, -) -> float: - num_train_pattern: int = 0 - running_loss: float = 0.0 - correct: int = 0 - pattern_count: float = 0.0 - - model.train() - for data in loader: - label = data[0].to(device) - image = data[1].type(dtype=torch.float32).to(device) - if scale_data > 0: - image /= scale_data - - optimizer.zero_grad() - output = model(image) - loss = torch.nn.functional.cross_entropy(output, label, reduction="sum") - loss.backward() - - optimizer.step() - - # for loss and accuracy plotting: - num_train_pattern += int(label.shape[0]) - pattern_count += float(label.shape[0]) - running_loss += float(loss) - train_loss.append(float(loss)) - prediction = output.argmax(dim=1) - correct += prediction.eq(label).sum().item() - - total_number_of_pattern: int = int(len(loader)) * int(label.shape[0]) - - # infos: - logger.info( - ( - "Train Epoch:" - f" {epoch}" - f" [{int(pattern_count)}/{total_number_of_pattern}" - f" ({100.0 * pattern_count / total_number_of_pattern:.2f}%)]," - f" Loss: {float(running_loss) / float(num_train_pattern):.4e}," - f" Acc: {(100.0 * correct / num_train_pattern):.2f}" - f" Test Acc: {test_acc:.2f}%," - f" LR: {optimizer.param_groups[0]['lr']:.2e}" - ) - ) - - acc = 100.0 * correct / num_train_pattern - train_accuracy.append(acc) - - epoch_loss = running_loss / pattern_count - train_losses.append(epoch_loss) - - # add to tb: - tb.add_scalar("Train Loss", loss.item(), epoch) - tb.add_scalar("Train Performance", torch.tensor(acc), epoch) - tb.add_scalar("Train Number Correct", torch.tensor(correct), epoch) - - # for parameters: - for name, param in model.named_parameters(): - if "weight" in name or "bias" in name: - tb.add_histogram(f"{name}", param.data.clone(), epoch) - - tb.flush() - - return epoch_loss +import torch +import logging + + +def train( + model: torch.nn.modules.container.Sequential, + loader: torch.utils.data.dataloader.DataLoader, + optimizer: torch.optim.Adam | torch.optim.SGD, + epoch: int, + device: torch.device, + tb, + test_acc, + logger: logging.Logger, + train_accuracy: list[float], + train_losses: list[float], + train_loss: list[float], + scale_data: float, +) -> float: + num_train_pattern: int = 0 + running_loss: float = 0.0 + correct: int = 0 + pattern_count: float = 0.0 + + model.train() + for data in loader: + label = data[0].to(device) + image = data[1].type(dtype=torch.float32).to(device) + if scale_data > 0: + image /= scale_data + + optimizer.zero_grad() + output = model(image) + loss = torch.nn.functional.cross_entropy(output, label, reduction="sum") + loss.backward() + + optimizer.step() + + # for loss and accuracy plotting: + num_train_pattern += int(label.shape[0]) + pattern_count += float(label.shape[0]) + running_loss += float(loss) + train_loss.append(float(loss)) + prediction = output.argmax(dim=1) + correct += prediction.eq(label).sum().item() + + total_number_of_pattern: int = int(len(loader)) * int(label.shape[0]) + + # infos: + logger.info( + ( + "Train Epoch:" + f" {epoch}" + f" [{int(pattern_count)}/{total_number_of_pattern}" + f" ({100.0 * pattern_count / total_number_of_pattern:.2f}%)]," + f" Loss: {float(running_loss) / float(num_train_pattern):.4e}," + f" Acc: {(100.0 * correct / num_train_pattern):.2f}" + f" Test Acc: {test_acc:.2f}%," + f" LR: {optimizer.param_groups[0]['lr']:.2e}" + ) + ) + + acc = 100.0 * correct / num_train_pattern + train_accuracy.append(acc) + + epoch_loss = running_loss / pattern_count + train_losses.append(epoch_loss) + + # add to tb: + tb.add_scalar("Train Loss", loss.item(), epoch) + tb.add_scalar("Train Performance", torch.tensor(acc), epoch) + tb.add_scalar("Train Number Correct", torch.tensor(correct), epoch) + + # for parameters: + for name, param in model.named_parameters(): + if "weight" in name or "bias" in name: + tb.add_histogram(f"{name}", param.data.clone(), epoch) + + tb.flush() + + return epoch_loss