Add files via upload

This commit is contained in:
David Rotermund 2023-07-22 14:53:46 +02:00 committed by GitHub
parent 8505df62e3
commit e18690b0b3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 248 additions and 248 deletions

View file

@ -1,107 +1,107 @@
import torch import torch
import numpy as np import numpy as np
import os import os
@torch.no_grad() @torch.no_grad()
def alicorn_data_loader( def alicorn_data_loader(
num_pfinkel: list[int] | None, num_pfinkel: list[int] | None,
load_stimuli_per_pfinkel: int, load_stimuli_per_pfinkel: int,
condition: str, condition: str,
data_path: str, logger,
logger=None, data_path: str,
) -> torch.utils.data.TensorDataset: ) -> torch.utils.data.TensorDataset:
""" """
- num_pfinkel: list of the angles that should be loaded (ranging from - num_pfinkel: list of the angles that should be loaded (ranging from
0-90). If None: all pfinkels loaded 0-90). If None: all pfinkels loaded
- stimuli_per_pfinkel: defines amount of stimuli per path angle but - stimuli_per_pfinkel: defines amount of stimuli per path angle but
for label 0 and label 1 seperatly (e.g., stimuli_per_pfinkel = 1000: for label 0 and label 1 seperatly (e.g., stimuli_per_pfinkel = 1000:
1000 stimuli = label 1, 1000 stimuli = label 0) 1000 stimuli = label 1, 1000 stimuli = label 0)
""" """
filename: str | None = None filename: str | None = None
if condition == "Angular": if condition == "Angular":
filename = "angular_angle" filename = "angular_angle"
elif condition == "Coignless": elif condition == "Coignless":
filename = "base_angle" filename = "base_angle"
elif condition == "Natural": elif condition == "Natural":
filename = "corner_angle" filename = "corner_angle"
else: else:
filename = None filename = None
assert filename is not None assert filename is not None
filepaths: str = os.path.join(data_path, f"{condition}") filepaths: str = os.path.join(data_path, f"{condition}")
stimuli_per_pfinkel: int = 100000 stimuli_per_pfinkel: int = 100000
# ---------------------------- # ----------------------------
# for angles and batches # for angles and batches
if num_pfinkel is None: if num_pfinkel is None:
angle: list[int] = np.arange(0, 100, 10).tolist() angle: list[int] = np.arange(0, 100, 10).tolist()
else: else:
angle = num_pfinkel angle = num_pfinkel
assert isinstance(angle, list) assert isinstance(angle, list)
batch: list[int] = np.arange(1, 11, 1).tolist() batch: list[int] = np.arange(1, 11, 1).tolist()
if load_stimuli_per_pfinkel <= (stimuli_per_pfinkel // len(batch)): if load_stimuli_per_pfinkel <= (stimuli_per_pfinkel // len(batch)):
num_img_per_pfinkel: int = load_stimuli_per_pfinkel num_img_per_pfinkel: int = load_stimuli_per_pfinkel
num_batches: int = 1 num_batches: int = 1
else: else:
# handle case where more than 10,000 stimuli per pfinkel needed # handle case where more than 10,000 stimuli per pfinkel needed
num_batches = load_stimuli_per_pfinkel // (stimuli_per_pfinkel // len(batch)) num_batches = load_stimuli_per_pfinkel // (stimuli_per_pfinkel // len(batch))
num_img_per_pfinkel = load_stimuli_per_pfinkel // num_batches num_img_per_pfinkel = load_stimuli_per_pfinkel // num_batches
if logger is not None: if logger is not None:
logger.info(f"{num_batches} batches") logger.info(f"{num_batches} batches")
logger.info(f"{num_img_per_pfinkel} stimuli per pfinkel.") logger.info(f"{num_img_per_pfinkel} stimuli per pfinkel.")
# initialize data and label tensors: # initialize data and label tensors:
num_stimuli: int = len(angle) * num_batches * num_img_per_pfinkel * 2 num_stimuli: int = len(angle) * num_batches * num_img_per_pfinkel * 2
data_tensor: torch.Tensor = torch.empty( data_tensor: torch.Tensor = torch.empty(
(num_stimuli, 200, 200), dtype=torch.uint8, device=torch.device("cpu") (num_stimuli, 200, 200), dtype=torch.uint8, device=torch.device("cpu")
) )
label_tensor: torch.Tensor = torch.empty( label_tensor: torch.Tensor = torch.empty(
(num_stimuli), dtype=torch.int64, device=torch.device("cpu") (num_stimuli), dtype=torch.int64, device=torch.device("cpu")
) )
if logger is not None: if logger is not None:
logger.info(f"data tensor shape: {data_tensor.shape}") logger.info(f"data tensor shape: {data_tensor.shape}")
logger.info(f"label tensor shape: {label_tensor.shape}") logger.info(f"label tensor shape: {label_tensor.shape}")
# append data # append data
idx: int = 0 idx: int = 0
for i in range(len(angle)): for i in range(len(angle)):
for j in range(num_batches): for j in range(num_batches):
# load contour # load contour
temp_filename: str = ( temp_filename: str = (
f"{filename}_{angle[i]:03}_b{batch[j]:03}_n10000_RENDERED.npz" f"{filename}_{angle[i]:03}_b{batch[j]:03}_n10000_RENDERED.npz"
) )
contour_filename: str = os.path.join(filepaths, temp_filename) contour_filename: str = os.path.join(filepaths, temp_filename)
c_data = np.load(contour_filename) c_data = np.load(contour_filename)
data_tensor[idx : idx + num_img_per_pfinkel, ...] = torch.tensor( data_tensor[idx : idx + num_img_per_pfinkel, ...] = torch.tensor(
c_data["gaborfield"][:num_img_per_pfinkel, ...], c_data["gaborfield"][:num_img_per_pfinkel, ...],
dtype=torch.uint8, dtype=torch.uint8,
device=torch.device("cpu"), device=torch.device("cpu"),
) )
label_tensor[idx : idx + num_img_per_pfinkel] = int(1) label_tensor[idx : idx + num_img_per_pfinkel] = int(1)
idx += num_img_per_pfinkel idx += num_img_per_pfinkel
# next append distractor stimuli # next append distractor stimuli
for i in range(len(angle)): for i in range(len(angle)):
for j in range(num_batches): for j in range(num_batches):
# load distractor # load distractor
temp_filename = ( temp_filename = (
f"{filename}_{angle[i]:03}_dist_b{batch[j]:03}_n10000_RENDERED.npz" f"{filename}_{angle[i]:03}_dist_b{batch[j]:03}_n10000_RENDERED.npz"
) )
distractor_filename: str = os.path.join(filepaths, temp_filename) distractor_filename: str = os.path.join(filepaths, temp_filename)
nc_data = np.load(distractor_filename) nc_data = np.load(distractor_filename)
data_tensor[idx : idx + num_img_per_pfinkel, ...] = torch.tensor( data_tensor[idx : idx + num_img_per_pfinkel, ...] = torch.tensor(
nc_data["gaborfield"][:num_img_per_pfinkel, ...], nc_data["gaborfield"][:num_img_per_pfinkel, ...],
dtype=torch.uint8, dtype=torch.uint8,
device=torch.device("cpu"), device=torch.device("cpu"),
) )
label_tensor[idx : idx + num_img_per_pfinkel] = int(0) label_tensor[idx : idx + num_img_per_pfinkel] = int(0)
idx += num_img_per_pfinkel idx += num_img_per_pfinkel
return torch.utils.data.TensorDataset(label_tensor, data_tensor.unsqueeze(1)) return torch.utils.data.TensorDataset(label_tensor, data_tensor.unsqueeze(1))

View file

@ -32,9 +32,6 @@ def make_cnn(
) )
) )
if conv_0_enable_softmax:
cnn.append(torch.nn.Softmax(dim=1))
setting_understood: bool = False setting_understood: bool = False
if conv_activation_function.upper() == str("relu").upper(): if conv_activation_function.upper() == str("relu").upper():
cnn.append(torch.nn.ReLU()) cnn.append(torch.nn.ReLU())
@ -60,6 +57,9 @@ def make_cnn(
setting_understood = True setting_understood = True
assert setting_understood assert setting_understood
if conv_0_enable_softmax:
cnn.append(torch.nn.Softmax(dim=1))
# Changing structure # Changing structure
for i in range(1, len(conv_out_channels_list)): for i in range(1, len(conv_out_channels_list)):
if i == 1 and not train_conv_0: if i == 1 and not train_conv_0:

View file

@ -1,58 +1,58 @@
import torch import torch
import logging import logging
@torch.no_grad() @torch.no_grad()
def test( def test(
model: torch.nn.modules.container.Sequential, model: torch.nn.modules.container.Sequential,
loader: torch.utils.data.dataloader.DataLoader, loader: torch.utils.data.dataloader.DataLoader,
device: torch.device, device: torch.device,
tb, tb,
epoch: int, epoch: int,
logger: logging.Logger, logger: logging.Logger,
test_accuracy: list[float], test_accuracy: list[float],
test_losses: list[float], test_losses: list[float],
scale_data: float, scale_data: float,
) -> float: ) -> float:
test_loss: float = 0.0 test_loss: float = 0.0
correct: int = 0 correct: int = 0
pattern_count: float = 0.0 pattern_count: float = 0.0
model.eval() model.eval()
for data in loader: for data in loader:
label = data[0].to(device) label = data[0].to(device)
image = data[1].type(dtype=torch.float32).to(device) image = data[1].type(dtype=torch.float32).to(device)
if scale_data > 0: if scale_data > 0:
image /= scale_data image /= scale_data
output = model(image) output = model(image)
# loss and optimization # loss and optimization
loss = torch.nn.functional.cross_entropy(output, label, reduction="sum") loss = torch.nn.functional.cross_entropy(output, label, reduction="sum")
pattern_count += float(label.shape[0]) pattern_count += float(label.shape[0])
test_loss += loss.item() test_loss += loss.item()
prediction = output.argmax(dim=1) prediction = output.argmax(dim=1)
correct += prediction.eq(label).sum().item() correct += prediction.eq(label).sum().item()
logger.info( logger.info(
( (
"Test set:" "Test set:"
f" Average loss: {test_loss / pattern_count:.3e}," f" Average loss: {test_loss / pattern_count:.3e},"
f" Accuracy: {correct}/{pattern_count}," f" Accuracy: {correct}/{pattern_count},"
f"({100.0 * correct / pattern_count:.2f}%)" f"({100.0 * correct / pattern_count:.2f}%)"
) )
) )
logger.info("") logger.info("")
acc = 100.0 * correct / pattern_count acc = 100.0 * correct / pattern_count
test_losses.append(test_loss / pattern_count) test_losses.append(test_loss / pattern_count)
test_accuracy.append(acc) test_accuracy.append(acc)
# add to tb: # add to tb:
tb.add_scalar("Test Loss", (test_loss / pattern_count), epoch) tb.add_scalar("Test Loss", (test_loss / pattern_count), epoch)
tb.add_scalar("Test Performance", 100.0 * correct / pattern_count, epoch) tb.add_scalar("Test Performance", 100.0 * correct / pattern_count, epoch)
tb.add_scalar("Test Number Correct", correct, epoch) tb.add_scalar("Test Number Correct", correct, epoch)
tb.flush() tb.flush()
return acc return acc

View file

@ -1,80 +1,80 @@
import torch import torch
import logging import logging
def train( def train(
model: torch.nn.modules.container.Sequential, model: torch.nn.modules.container.Sequential,
loader: torch.utils.data.dataloader.DataLoader, loader: torch.utils.data.dataloader.DataLoader,
optimizer: torch.optim.Adam | torch.optim.SGD, optimizer: torch.optim.Adam | torch.optim.SGD,
epoch: int, epoch: int,
device: torch.device, device: torch.device,
tb, tb,
test_acc, test_acc,
logger: logging.Logger, logger: logging.Logger,
train_accuracy: list[float], train_accuracy: list[float],
train_losses: list[float], train_losses: list[float],
train_loss: list[float], train_loss: list[float],
scale_data: float, scale_data: float,
) -> float: ) -> float:
num_train_pattern: int = 0 num_train_pattern: int = 0
running_loss: float = 0.0 running_loss: float = 0.0
correct: int = 0 correct: int = 0
pattern_count: float = 0.0 pattern_count: float = 0.0
model.train() model.train()
for data in loader: for data in loader:
label = data[0].to(device) label = data[0].to(device)
image = data[1].type(dtype=torch.float32).to(device) image = data[1].type(dtype=torch.float32).to(device)
if scale_data > 0: if scale_data > 0:
image /= scale_data image /= scale_data
optimizer.zero_grad() optimizer.zero_grad()
output = model(image) output = model(image)
loss = torch.nn.functional.cross_entropy(output, label, reduction="sum") loss = torch.nn.functional.cross_entropy(output, label, reduction="sum")
loss.backward() loss.backward()
optimizer.step() optimizer.step()
# for loss and accuracy plotting: # for loss and accuracy plotting:
num_train_pattern += int(label.shape[0]) num_train_pattern += int(label.shape[0])
pattern_count += float(label.shape[0]) pattern_count += float(label.shape[0])
running_loss += float(loss) running_loss += float(loss)
train_loss.append(float(loss)) train_loss.append(float(loss))
prediction = output.argmax(dim=1) prediction = output.argmax(dim=1)
correct += prediction.eq(label).sum().item() correct += prediction.eq(label).sum().item()
total_number_of_pattern: int = int(len(loader)) * int(label.shape[0]) total_number_of_pattern: int = int(len(loader)) * int(label.shape[0])
# infos: # infos:
logger.info( logger.info(
( (
"Train Epoch:" "Train Epoch:"
f" {epoch}" f" {epoch}"
f" [{int(pattern_count)}/{total_number_of_pattern}" f" [{int(pattern_count)}/{total_number_of_pattern}"
f" ({100.0 * pattern_count / total_number_of_pattern:.2f}%)]," f" ({100.0 * pattern_count / total_number_of_pattern:.2f}%)],"
f" Loss: {float(running_loss) / float(num_train_pattern):.4e}," f" Loss: {float(running_loss) / float(num_train_pattern):.4e},"
f" Acc: {(100.0 * correct / num_train_pattern):.2f}" f" Acc: {(100.0 * correct / num_train_pattern):.2f}"
f" Test Acc: {test_acc:.2f}%," f" Test Acc: {test_acc:.2f}%,"
f" LR: {optimizer.param_groups[0]['lr']:.2e}" f" LR: {optimizer.param_groups[0]['lr']:.2e}"
) )
) )
acc = 100.0 * correct / num_train_pattern acc = 100.0 * correct / num_train_pattern
train_accuracy.append(acc) train_accuracy.append(acc)
epoch_loss = running_loss / pattern_count epoch_loss = running_loss / pattern_count
train_losses.append(epoch_loss) train_losses.append(epoch_loss)
# add to tb: # add to tb:
tb.add_scalar("Train Loss", loss.item(), epoch) tb.add_scalar("Train Loss", loss.item(), epoch)
tb.add_scalar("Train Performance", torch.tensor(acc), epoch) tb.add_scalar("Train Performance", torch.tensor(acc), epoch)
tb.add_scalar("Train Number Correct", torch.tensor(correct), epoch) tb.add_scalar("Train Number Correct", torch.tensor(correct), epoch)
# for parameters: # for parameters:
for name, param in model.named_parameters(): for name, param in model.named_parameters():
if "weight" in name or "bias" in name: if "weight" in name or "bias" in name:
tb.add_histogram(f"{name}", param.data.clone(), epoch) tb.add_histogram(f"{name}", param.data.clone(), epoch)
tb.flush() tb.flush()
return epoch_loss return epoch_loss