feat: remove old architecture

This commit is contained in:
Alexander Engelsberger 2022-08-15 12:14:14 +02:00
parent bcf9c6bdb1
commit 5a89f24c10
No known key found for this signature in database
GPG Key ID: DE8669706B6AC2E7
44 changed files with 371 additions and 3757 deletions

View File

@ -1,67 +0,0 @@
"""CBC example using the Iris dataset."""
import argparse
import warnings
import prototorch as pt
import pytorch_lightning as pl
from prototorch.models import CBC, VisCBC2D
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
if __name__ == "__main__":
# Reproducibility
seed_everything(seed=4)
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Dataset
train_ds = pt.datasets.Iris(dims=[0, 2])
# Dataloaders
train_loader = DataLoader(train_ds, batch_size=32)
# Hyperparameters
hparams = dict(
distribution=[1, 0, 3],
margin=0.1,
proto_lr=0.01,
bb_lr=0.01,
)
# Initialize the model
model = CBC(
hparams,
components_initializer=pt.initializers.SSCI(train_ds, noise=0.1),
reasonings_initializer=pt.initializers.
PurePositiveReasoningsInitializer(),
)
# Callbacks
vis = VisCBC2D(
data=train_ds,
title="CBC Iris Example",
resolution=100,
axis_off=True,
)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
],
detect_anomaly=True,
log_every_n_steps=1,
max_epochs=1000,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,99 +0,0 @@
"""Dynamically prune 'loser' prototypes in GLVQ-type models."""
import argparse
import logging
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import (
CELVQ,
PruneLoserPrototypes,
VisGLVQ2D,
)
from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
if __name__ == "__main__":
# Reproducibility
seed_everything(seed=4)
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Dataset
num_classes = 4
num_features = 2
num_clusters = 1
train_ds = pt.datasets.Random(
num_samples=500,
num_classes=num_classes,
num_features=num_features,
num_clusters=num_clusters,
separation=3.0,
seed=42,
)
# Dataloaders
train_loader = DataLoader(train_ds, batch_size=256)
# Hyperparameters
prototypes_per_class = num_clusters * 5
hparams = dict(
distribution=(num_classes, prototypes_per_class),
lr=0.2,
)
# Initialize the model
model = CELVQ(
hparams,
prototypes_initializer=pt.initializers.FVCI(2, 3.0),
)
# Compute intermediate input and output sizes
model.example_input_array = torch.zeros(4, 2)
# Summary
logging.info(model)
# Callbacks
vis = VisGLVQ2D(train_ds)
pruning = PruneLoserPrototypes(
threshold=0.01, # prune prototype if it wins less than 1%
idle_epochs=20, # pruning too early may cause problems
prune_quota_per_epoch=2, # prune at most 2 prototypes per epoch
frequency=1, # prune every epoch
verbose=True,
)
es = EarlyStopping(
monitor="train_loss",
min_delta=0.001,
patience=20,
mode="min",
verbose=True,
check_on_train_epoch_end=True,
)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
pruning,
es,
],
detect_anomaly=True,
log_every_n_steps=1,
max_epochs=1000,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,79 +0,0 @@
"""GLVQ example using the Iris dataset."""
import argparse
import logging
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import GLVQ, VisGLVQ2D
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.optim.lr_scheduler import ExponentialLR
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=PossibleUserWarning)
if __name__ == "__main__":
# Reproducibility
seed_everything(seed=4)
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Dataset
train_ds = pt.datasets.Iris(dims=[0, 2])
# Dataloaders
train_loader = DataLoader(train_ds, batch_size=64, num_workers=4)
# Hyperparameters
hparams = dict(
distribution={
"num_classes": 3,
"per_class": 4
},
lr=0.01,
)
# Initialize the model
model = GLVQ(
hparams,
optimizer=torch.optim.Adam,
prototypes_initializer=pt.initializers.SMCI(train_ds),
lr_scheduler=ExponentialLR,
lr_scheduler_kwargs=dict(gamma=0.99, verbose=False),
)
# Compute intermediate input and output sizes
model.example_input_array = torch.zeros(4, 2)
# Callbacks
vis = VisGLVQ2D(data=train_ds)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
],
max_epochs=100,
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)
# Manual save
trainer.save_checkpoint("./glvq_iris.ckpt")
# Load saved model
new_model = GLVQ.load_from_checkpoint(
checkpoint_path="./glvq_iris.ckpt",
strict=False,
)
logging.info(new_model)

View File

@ -1,73 +1,134 @@
"""GMLVQ example using the Iris dataset."""
import logging
import argparse
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import GMLVQ, VisGMLVQ2D
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.optim.lr_scheduler import ExponentialLR
from torch.utils.data import DataLoader
import torchmetrics
from prototorch.core import SMCI
from prototorch.datasets import Iris
from prototorch.models.architectures.base import Steps
from prototorch.models.callbacks import (
LogTorchmetricCallback,
PlotLambdaMatrixToTensorboard,
VisGMLVQ2D,
)
from prototorch.models.library.gmlvq import GMLVQ
from pytorch_lightning.callbacks import EarlyStopping
from torch.utils.data import DataLoader, random_split
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
logging.basicConfig(level=logging.INFO)
if __name__ == "__main__":
# ##############################################################################
# Reproducibility
seed_everything(seed=4)
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
def main():
# ------------------------------------------------------------
# DATA
# ------------------------------------------------------------
# Dataset
train_ds = pt.datasets.Iris()
full_dataset = Iris()
full_count = len(full_dataset)
# Dataloaders
train_loader = DataLoader(train_ds, batch_size=64)
train_count = int(full_count * 0.5)
val_count = int(full_count * 0.4)
test_count = int(full_count * 0.1)
# Hyperparameters
hparams = dict(
train_dataset, val_dataset, test_dataset = random_split(
full_dataset, (train_count, val_count, test_count))
# Dataloader
train_loader = DataLoader(
train_dataset,
batch_size=1,
num_workers=4,
shuffle=True,
)
val_loader = DataLoader(
val_dataset,
batch_size=1,
num_workers=4,
shuffle=False,
)
test_loader = DataLoader(
test_dataset,
batch_size=1,
num_workers=0,
shuffle=False,
)
# ------------------------------------------------------------
# HYPERPARAMETERS
# ------------------------------------------------------------
# Select Initializer
components_initializer = SMCI(full_dataset)
# Define Hyperparameters
hyperparameters = GMLVQ.HyperParameters(
lr=dict(components_layer=0.1, _omega=0),
input_dim=4,
latent_dim=4,
distribution={
"num_classes": 3,
"per_class": 2
},
proto_lr=0.01,
bb_lr=0.01,
distribution=dict(
num_classes=3,
per_class=1,
),
component_initializer=components_initializer,
)
# Initialize the model
model = GMLVQ(
hparams,
optimizer=torch.optim.Adam,
prototypes_initializer=pt.initializers.SMCI(train_ds),
lr_scheduler=ExponentialLR,
lr_scheduler_kwargs=dict(gamma=0.99, verbose=False),
# Create Model
model = GMLVQ(hyperparameters)
# ------------------------------------------------------------
# TRAINING
# ------------------------------------------------------------
# Controlling Callbacks
recall = LogTorchmetricCallback(
'training_recall',
torchmetrics.Recall,
num_classes=3,
step=Steps.TRAINING,
)
# Compute intermediate input and output sizes
model.example_input_array = torch.zeros(4, 4)
stopping_criterion = LogTorchmetricCallback(
'validation_recall',
torchmetrics.Recall,
num_classes=3,
step=Steps.VALIDATION,
)
# Callbacks
vis = VisGMLVQ2D(data=train_ds)
es = EarlyStopping(
monitor=stopping_criterion.name,
mode="max",
patience=10,
)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
# Visualization Callback
vis = VisGMLVQ2D(data=full_dataset)
# Define trainer
trainer = pl.Trainer(
callbacks=[
vis,
recall,
stopping_criterion,
es,
PlotLambdaMatrixToTensorboard(),
],
max_epochs=100,
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)
# Train
trainer.fit(model, train_loader, val_loader)
trainer.test(model, test_loader)
# Manual save
trainer.save_checkpoint("./y_arch.ckpt")
# Load saved model
new_model = GMLVQ.load_from_checkpoint(
checkpoint_path="./y_arch.ckpt",
strict=True,
)
if __name__ == "__main__":
main()

View File

@ -1,112 +0,0 @@
"""GMLVQ example using the MNIST dataset."""
import argparse
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import (
ImageGMLVQ,
PruneLoserPrototypes,
VisImgComp,
)
from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import MNIST
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
if __name__ == "__main__":
# Reproducibility
seed_everything(seed=4)
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Dataset
train_ds = MNIST(
"~/datasets",
train=True,
download=True,
transform=transforms.Compose([
transforms.ToTensor(),
]),
)
test_ds = MNIST(
"~/datasets",
train=False,
download=True,
transform=transforms.Compose([
transforms.ToTensor(),
]),
)
# Dataloaders
train_loader = DataLoader(train_ds, num_workers=4, batch_size=256)
test_loader = DataLoader(test_ds, num_workers=4, batch_size=256)
# Hyperparameters
num_classes = 10
prototypes_per_class = 10
hparams = dict(
input_dim=28 * 28,
latent_dim=28 * 28,
distribution=(num_classes, prototypes_per_class),
proto_lr=0.01,
bb_lr=0.01,
)
# Initialize the model
model = ImageGMLVQ(
hparams,
optimizer=torch.optim.Adam,
prototypes_initializer=pt.initializers.SMCI(train_ds),
)
# Callbacks
vis = VisImgComp(
data=train_ds,
num_columns=10,
show=False,
tensorboard=True,
random_data=100,
add_embedding=True,
embedding_data=200,
flatten_data=False,
)
pruning = PruneLoserPrototypes(
threshold=0.01,
idle_epochs=1,
prune_quota_per_epoch=10,
frequency=1,
verbose=True,
)
es = EarlyStopping(
monitor="train_loss",
min_delta=0.001,
patience=15,
mode="min",
check_on_train_epoch_end=True,
)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
pruning,
es,
],
max_epochs=1000,
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,94 +0,0 @@
"""GMLVQ example using the spiral dataset."""
import argparse
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import (
GMLVQ,
PruneLoserPrototypes,
VisGLVQ2D,
)
from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
if __name__ == "__main__":
# Reproducibility
seed_everything(seed=4)
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Dataset
train_ds = pt.datasets.Spiral(num_samples=500, noise=0.5)
# Dataloaders
train_loader = DataLoader(train_ds, batch_size=256)
# Hyperparameters
num_classes = 2
prototypes_per_class = 10
hparams = dict(
distribution=(num_classes, prototypes_per_class),
transfer_function="swish_beta",
transfer_beta=10.0,
proto_lr=0.1,
bb_lr=0.1,
input_dim=2,
latent_dim=2,
)
# Initialize the model
model = GMLVQ(
hparams,
optimizer=torch.optim.Adam,
prototypes_initializer=pt.initializers.SSCI(train_ds, noise=1e-2),
)
# Callbacks
vis = VisGLVQ2D(
train_ds,
show_last_only=False,
block=False,
)
pruning = PruneLoserPrototypes(
threshold=0.01,
idle_epochs=10,
prune_quota_per_epoch=5,
frequency=5,
replace=True,
prototypes_initializer=pt.initializers.SSCI(train_ds, noise=1e-1),
verbose=True,
)
es = EarlyStopping(
monitor="train_loss",
min_delta=1.0,
patience=5,
mode="min",
check_on_train_epoch_end=True,
)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
es,
pruning,
],
max_epochs=1000,
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,65 +0,0 @@
"""Growing Neural Gas example using the Iris dataset."""
import argparse
import logging
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import GrowingNeuralGas, VisNG2D
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
if __name__ == "__main__":
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Reproducibility
seed_everything(seed=42)
# Prepare the data
train_ds = pt.datasets.Iris(dims=[0, 2])
train_loader = DataLoader(train_ds, batch_size=64)
# Hyperparameters
hparams = dict(
num_prototypes=5,
input_dim=2,
lr=0.1,
)
# Initialize the model
model = GrowingNeuralGas(
hparams,
prototypes_initializer=pt.initializers.ZCI(2),
)
# Compute intermediate input and output sizes
model.example_input_array = torch.zeros(4, 2)
# Model summary
logging.info(model)
# Callbacks
vis = VisNG2D(data=train_loader)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
],
max_epochs=100,
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,116 +0,0 @@
"""GTLVQ example using the MNIST dataset."""
import argparse
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import (
ImageGTLVQ,
PruneLoserPrototypes,
VisImgComp,
)
from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import MNIST
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
if __name__ == "__main__":
# Reproducibility
seed_everything(seed=4)
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Dataset
train_ds = MNIST(
"~/datasets",
train=True,
download=True,
transform=transforms.Compose([
transforms.ToTensor(),
]),
)
test_ds = MNIST(
"~/datasets",
train=False,
download=True,
transform=transforms.Compose([
transforms.ToTensor(),
]),
)
# Dataloaders
train_loader = DataLoader(train_ds, num_workers=0, batch_size=256)
test_loader = DataLoader(test_ds, num_workers=0, batch_size=256)
# Hyperparameters
num_classes = 10
prototypes_per_class = 1
hparams = dict(
input_dim=28 * 28,
latent_dim=28,
distribution=(num_classes, prototypes_per_class),
proto_lr=0.01,
bb_lr=0.01,
)
# Initialize the model
model = ImageGTLVQ(
hparams,
optimizer=torch.optim.Adam,
prototypes_initializer=pt.initializers.SMCI(train_ds),
#Use one batch of data for subspace initiator.
omega_initializer=pt.initializers.PCALinearTransformInitializer(
next(iter(train_loader))[0].reshape(256, 28 * 28)))
# Callbacks
vis = VisImgComp(
data=train_ds,
num_columns=10,
show=False,
tensorboard=True,
random_data=100,
add_embedding=True,
embedding_data=200,
flatten_data=False,
)
pruning = PruneLoserPrototypes(
threshold=0.01,
idle_epochs=1,
prune_quota_per_epoch=10,
frequency=1,
verbose=True,
)
es = EarlyStopping(
monitor="train_loss",
min_delta=0.001,
patience=15,
mode="min",
check_on_train_epoch_end=True,
)
# Setup trainer
# using GPUs here is strongly recommended!
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
pruning,
es,
],
max_epochs=1000,
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,76 +0,0 @@
"""Localized-GTLVQ example using the Moons dataset."""
import argparse
import logging
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import GTLVQ, VisGLVQ2D
from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
if __name__ == "__main__":
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Reproducibility
seed_everything(seed=2)
# Dataset
train_ds = pt.datasets.Moons(num_samples=300, noise=0.2, seed=42)
# Dataloaders
train_loader = DataLoader(
train_ds,
batch_size=256,
shuffle=True,
)
# Hyperparameters
# Latent_dim should be lower than input dim.
hparams = dict(distribution=[1, 3], input_dim=2, latent_dim=1)
# Initialize the model
model = GTLVQ(hparams,
prototypes_initializer=pt.initializers.SMCI(train_ds))
# Compute intermediate input and output sizes
model.example_input_array = torch.zeros(4, 2)
# Summary
logging.info(model)
# Callbacks
vis = VisGLVQ2D(data=train_ds)
es = EarlyStopping(
monitor="train_acc",
min_delta=0.001,
patience=20,
mode="max",
verbose=False,
check_on_train_epoch_end=True,
)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
es,
],
max_epochs=1000,
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,81 +0,0 @@
"""k-NN example using the Iris dataset from scikit-learn."""
import argparse
import logging
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import KNN, VisGLVQ2D
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=PossibleUserWarning)
if __name__ == "__main__":
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Dataset
X, y = load_iris(return_X_y=True)
X = X[:, 0:3:2]
X_train, X_test, y_train, y_test = train_test_split(
X,
y,
test_size=0.5,
random_state=42,
)
train_ds = pt.datasets.NumpyDataset(X_train, y_train)
test_ds = pt.datasets.NumpyDataset(X_test, y_test)
# Dataloaders
train_loader = DataLoader(train_ds, batch_size=16)
test_loader = DataLoader(test_ds, batch_size=16)
# Hyperparameters
hparams = dict(k=5)
# Initialize the model
model = KNN(hparams, data=train_ds)
# Compute intermediate input and output sizes
model.example_input_array = torch.zeros(4, 2)
# Summary
logging.info(model)
# Callbacks
vis = VisGLVQ2D(
data=(X_train, y_train),
resolution=200,
block=True,
)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
max_epochs=1,
callbacks=[
vis,
],
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
# This is only for visualization. k-NN has no training phase.
trainer.fit(model, train_loader)
# Recall
y_pred = model.predict(torch.tensor(X_train))
logging.info(y_pred)
# Test
trainer.test(model, dataloaders=test_loader)

View File

@ -1,118 +0,0 @@
"""Kohonen Self Organizing Map."""
import argparse
import logging
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from matplotlib import pyplot as plt
from prototorch.models import KohonenSOM
from prototorch.utils.colors import hex_to_rgb
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.utils.data import DataLoader, TensorDataset
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
class Vis2DColorSOM(pl.Callback):
def __init__(self, data, title="ColorSOMe", pause_time=0.1):
super().__init__()
self.title = title
self.fig = plt.figure(self.title)
self.data = data
self.pause_time = pause_time
def on_train_epoch_end(self, trainer, pl_module: KohonenSOM):
ax = self.fig.gca()
ax.cla()
ax.set_title(self.title)
h, w = pl_module._grid.shape[:2]
protos = pl_module.prototypes.view(h, w, 3)
ax.imshow(protos)
ax.axis("off")
# Overlay color names
d = pl_module.compute_distances(self.data)
wp = pl_module.predict_from_distances(d)
for i, iloc in enumerate(wp):
plt.text(
iloc[1],
iloc[0],
color_names[i],
ha="center",
va="center",
bbox=dict(facecolor="white", alpha=0.5, lw=0),
)
if trainer.current_epoch != trainer.max_epochs - 1:
plt.pause(self.pause_time)
else:
plt.show(block=True)
if __name__ == "__main__":
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Reproducibility
seed_everything(seed=42)
# Prepare the data
hex_colors = [
"#000000", "#0000ff", "#00007f", "#1f86ff", "#5466aa", "#997fff",
"#00ff00", "#ff0000", "#00ffff", "#ff00ff", "#ffff00", "#ffffff",
"#545454", "#7f7f7f", "#a8a8a8", "#808000", "#800080", "#ffa500"
]
color_names = [
"black", "blue", "darkblue", "skyblue", "greyblue", "lilac", "green",
"red", "cyan", "magenta", "yellow", "white", "darkgrey", "mediumgrey",
"lightgrey", "olive", "purple", "orange"
]
colors = list(hex_to_rgb(hex_colors))
data = torch.Tensor(colors) / 255.0
train_ds = TensorDataset(data)
train_loader = DataLoader(train_ds, batch_size=8)
# Hyperparameters
hparams = dict(
shape=(18, 32),
alpha=1.0,
sigma=16,
lr=0.1,
)
# Initialize the model
model = KohonenSOM(
hparams,
prototypes_initializer=pt.initializers.RNCI(3),
)
# Compute intermediate input and output sizes
model.example_input_array = torch.zeros(4, 3)
# Model summary
logging.info(model)
# Callbacks
vis = Vis2DColorSOM(data=data)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
max_epochs=500,
callbacks=[
vis,
],
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,77 +0,0 @@
"""Localized-GMLVQ example using the Moons dataset."""
import argparse
import logging
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import LGMLVQ, VisGLVQ2D
from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
if __name__ == "__main__":
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Reproducibility
seed_everything(seed=2)
# Dataset
train_ds = pt.datasets.Moons(num_samples=300, noise=0.2, seed=42)
# Dataloaders
train_loader = DataLoader(train_ds, batch_size=256, shuffle=True)
# Hyperparameters
hparams = dict(
distribution=[1, 3],
input_dim=2,
latent_dim=2,
)
# Initialize the model
model = LGMLVQ(
hparams,
prototypes_initializer=pt.initializers.SMCI(train_ds),
)
# Compute intermediate input and output sizes
model.example_input_array = torch.zeros(4, 2)
# Summary
logging.info(model)
# Callbacks
vis = VisGLVQ2D(data=train_ds)
es = EarlyStopping(
monitor="train_acc",
min_delta=0.001,
patience=20,
mode="max",
verbose=False,
check_on_train_epoch_end=True,
)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
es,
],
log_every_n_steps=1,
max_epochs=1000,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,103 +0,0 @@
"""LVQMLN example using all four dimensions of the Iris dataset."""
import argparse
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import (
LVQMLN,
PruneLoserPrototypes,
VisSiameseGLVQ2D,
)
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
class Backbone(torch.nn.Module):
def __init__(self, input_size=4, hidden_size=10, latent_size=2):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.latent_size = latent_size
self.dense1 = torch.nn.Linear(self.input_size, self.hidden_size)
self.dense2 = torch.nn.Linear(self.hidden_size, self.latent_size)
self.activation = torch.nn.Sigmoid()
def forward(self, x):
x = self.activation(self.dense1(x))
out = self.activation(self.dense2(x))
return out
if __name__ == "__main__":
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Dataset
train_ds = pt.datasets.Iris()
# Reproducibility
seed_everything(seed=42)
# Dataloaders
train_loader = DataLoader(train_ds, batch_size=150)
# Hyperparameters
hparams = dict(
distribution=[3, 4, 5],
proto_lr=0.001,
bb_lr=0.001,
)
# Initialize the backbone
backbone = Backbone()
# Initialize the model
model = LVQMLN(
hparams,
prototypes_initializer=pt.initializers.SSCI(
train_ds,
transform=backbone,
),
backbone=backbone,
)
# Callbacks
vis = VisSiameseGLVQ2D(
data=train_ds,
map_protos=False,
border=0.1,
resolution=500,
axis_off=True,
)
pruning = PruneLoserPrototypes(
threshold=0.01,
idle_epochs=20,
prune_quota_per_epoch=2,
frequency=10,
verbose=True,
)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
pruning,
],
log_every_n_steps=1,
max_epochs=1000,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,68 +0,0 @@
"""Median-LVQ example using the Iris dataset."""
import argparse
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import MedianLVQ, VisGLVQ2D
from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
if __name__ == "__main__":
# Reproducibility
seed_everything(seed=4)
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Dataset
train_ds = pt.datasets.Iris(dims=[0, 2])
# Dataloaders
train_loader = DataLoader(
train_ds,
batch_size=len(train_ds), # MedianLVQ cannot handle mini-batches
)
# Initialize the model
model = MedianLVQ(
hparams=dict(distribution=(3, 2), lr=0.01),
prototypes_initializer=pt.initializers.SSCI(train_ds),
)
# Compute intermediate input and output sizes
model.example_input_array = torch.zeros(4, 2)
# Callbacks
vis = VisGLVQ2D(data=train_ds)
es = EarlyStopping(
monitor="train_acc",
min_delta=0.01,
patience=5,
mode="max",
verbose=True,
check_on_train_epoch_end=True,
)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
es,
],
max_epochs=1000,
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,74 +0,0 @@
"""Neural Gas example using the Iris dataset."""
import argparse
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import NeuralGas, VisNG2D
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from sklearn.datasets import load_iris
from sklearn.preprocessing import StandardScaler
from torch.optim.lr_scheduler import ExponentialLR
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
if __name__ == "__main__":
# Reproducibility
seed_everything(seed=4)
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Prepare and pre-process the dataset
x_train, y_train = load_iris(return_X_y=True)
x_train = x_train[:, 0:3:2]
scaler = StandardScaler()
scaler.fit(x_train)
x_train = scaler.transform(x_train)
train_ds = pt.datasets.NumpyDataset(x_train, y_train)
# Dataloaders
train_loader = DataLoader(train_ds, batch_size=150)
# Hyperparameters
hparams = dict(
num_prototypes=30,
input_dim=2,
lr=0.03,
)
# Initialize the model
model = NeuralGas(
hparams,
prototypes_initializer=pt.core.ZCI(2),
lr_scheduler=ExponentialLR,
lr_scheduler_kwargs=dict(gamma=0.99, verbose=False),
)
# Compute intermediate input and output sizes
model.example_input_array = torch.zeros(4, 2)
# Callbacks
vis = VisNG2D(data=train_ds)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
],
max_epochs=1000,
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,68 +0,0 @@
"""RSLVQ example using the Iris dataset."""
import argparse
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import RSLVQ, VisGLVQ2D
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
if __name__ == "__main__":
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Reproducibility
seed_everything(seed=42)
# Dataset
train_ds = pt.datasets.Iris(dims=[0, 2])
# Dataloaders
train_loader = DataLoader(train_ds, batch_size=64)
# Hyperparameters
hparams = dict(
distribution=[2, 2, 3],
proto_lr=0.05,
lambd=0.1,
variance=1.0,
input_dim=2,
latent_dim=2,
bb_lr=0.01,
)
# Initialize the model
model = RSLVQ(
hparams,
optimizer=torch.optim.Adam,
prototypes_initializer=pt.initializers.SSCI(train_ds, noise=0.2),
)
# Compute intermediate input and output sizes
model.example_input_array = torch.zeros(4, 2)
# Callbacks
vis = VisGLVQ2D(data=train_ds)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
],
detect_anomaly=True,
max_epochs=100,
log_every_n_steps=1,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,83 +0,0 @@
"""Siamese GLVQ example using all four dimensions of the Iris dataset."""
import argparse
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import SiameseGLVQ, VisSiameseGLVQ2D
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
class Backbone(torch.nn.Module):
def __init__(self, input_size=4, hidden_size=10, latent_size=2):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.latent_size = latent_size
self.dense1 = torch.nn.Linear(self.input_size, self.hidden_size)
self.dense2 = torch.nn.Linear(self.hidden_size, self.latent_size)
self.activation = torch.nn.Sigmoid()
def forward(self, x):
x = self.activation(self.dense1(x))
out = self.activation(self.dense2(x))
return out
if __name__ == "__main__":
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Dataset
train_ds = pt.datasets.Iris()
# Reproducibility
seed_everything(seed=2)
# Dataloaders
train_loader = DataLoader(train_ds, batch_size=150)
# Hyperparameters
hparams = dict(
distribution=[1, 2, 3],
proto_lr=0.01,
bb_lr=0.01,
)
# Initialize the backbone
backbone = Backbone()
# Initialize the model
model = SiameseGLVQ(
hparams,
prototypes_initializer=pt.initializers.SMCI(train_ds),
backbone=backbone,
both_path_gradients=False,
)
# Callbacks
vis = VisSiameseGLVQ2D(data=train_ds, border=0.1)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
],
max_epochs=1000,
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,85 +0,0 @@
"""Siamese GTLVQ example using all four dimensions of the Iris dataset."""
import argparse
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import SiameseGTLVQ, VisSiameseGLVQ2D
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=PossibleUserWarning)
warnings.filterwarnings("ignore", category=UserWarning)
class Backbone(torch.nn.Module):
def __init__(self, input_size=4, hidden_size=10, latent_size=2):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.latent_size = latent_size
self.dense1 = torch.nn.Linear(self.input_size, self.hidden_size)
self.dense2 = torch.nn.Linear(self.hidden_size, self.latent_size)
self.activation = torch.nn.Sigmoid()
def forward(self, x):
x = self.activation(self.dense1(x))
out = self.activation(self.dense2(x))
return out
if __name__ == "__main__":
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Dataset
train_ds = pt.datasets.Iris()
# Reproducibility
seed_everything(seed=2)
# Dataloaders
train_loader = DataLoader(train_ds, batch_size=150)
# Hyperparameters
hparams = dict(
distribution=[1, 2, 3],
proto_lr=0.01,
bb_lr=0.01,
input_dim=2,
latent_dim=1,
)
# Initialize the backbone
backbone = Backbone(latent_size=hparams["input_dim"])
# Initialize the model
model = SiameseGTLVQ(
hparams,
prototypes_initializer=pt.initializers.SMCI(train_ds),
backbone=backbone,
both_path_gradients=False,
)
# Callbacks
vis = VisSiameseGLVQ2D(data=train_ds, border=0.1)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
],
max_epochs=1000,
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,124 +0,0 @@
"""Warm-starting GLVQ with prototypes from Growing Neural Gas."""
import argparse
import warnings
import prototorch as pt
import pytorch_lightning as pl
import torch
from prototorch.models import (
GLVQ,
KNN,
GrowingNeuralGas,
PruneLoserPrototypes,
VisGLVQ2D,
)
from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.utilities.seed import seed_everything
from pytorch_lightning.utilities.warnings import PossibleUserWarning
from torch.optim.lr_scheduler import ExponentialLR
from torch.utils.data import DataLoader
warnings.filterwarnings("ignore", category=PossibleUserWarning)
if __name__ == "__main__":
# Reproducibility
seed_everything(seed=4)
# Command-line arguments
parser = argparse.ArgumentParser()
parser = pl.Trainer.add_argparse_args(parser)
args = parser.parse_args()
# Prepare the data
train_ds = pt.datasets.Iris(dims=[0, 2])
train_loader = DataLoader(train_ds, batch_size=64, num_workers=0)
# Initialize the gng
gng = GrowingNeuralGas(
hparams=dict(num_prototypes=5, insert_freq=2, lr=0.1),
prototypes_initializer=pt.initializers.ZCI(2),
lr_scheduler=ExponentialLR,
lr_scheduler_kwargs=dict(gamma=0.99, verbose=False),
)
# Callbacks
es = EarlyStopping(
monitor="loss",
min_delta=0.001,
patience=20,
mode="min",
verbose=False,
check_on_train_epoch_end=True,
)
# Setup trainer for GNG
trainer = pl.Trainer(
max_epochs=1000,
callbacks=[
es,
],
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
trainer.fit(gng, train_loader)
# Hyperparameters
hparams = dict(
distribution=[],
lr=0.01,
)
# Warm-start prototypes
knn = KNN(dict(k=1), data=train_ds)
prototypes = gng.prototypes
plabels = knn.predict(prototypes)
# Initialize the model
model = GLVQ(
hparams,
optimizer=torch.optim.Adam,
prototypes_initializer=pt.initializers.LCI(prototypes),
labels_initializer=pt.initializers.LLI(plabels),
lr_scheduler=ExponentialLR,
lr_scheduler_kwargs=dict(gamma=0.99, verbose=False),
)
# Compute intermediate input and output sizes
model.example_input_array = torch.zeros(4, 2)
# Callbacks
vis = VisGLVQ2D(data=train_ds)
pruning = PruneLoserPrototypes(
threshold=0.02,
idle_epochs=2,
prune_quota_per_epoch=5,
frequency=1,
verbose=True,
)
es = EarlyStopping(
monitor="train_loss",
min_delta=0.001,
patience=10,
mode="min",
verbose=True,
check_on_train_epoch_end=True,
)
# Setup trainer
trainer = pl.Trainer.from_argparse_args(
args,
callbacks=[
vis,
pruning,
es,
],
max_epochs=1000,
log_every_n_steps=1,
detect_anomaly=True,
)
# Training loop
trainer.fit(model, train_loader)

View File

@ -1,134 +0,0 @@
import logging
import prototorch as pt
import pytorch_lightning as pl
import torchmetrics
from prototorch.core import SMCI
from prototorch.y.architectures.base import Steps
from prototorch.y.callbacks import (
LogTorchmetricCallback,
PlotLambdaMatrixToTensorboard,
VisGMLVQ2D,
)
from prototorch.y.library.gmlvq import GMLVQ
from pytorch_lightning.callbacks import EarlyStopping
from torch.utils.data import DataLoader, random_split
logging.basicConfig(level=logging.INFO)
# ##############################################################################
def main():
# ------------------------------------------------------------
# DATA
# ------------------------------------------------------------
# Dataset
full_dataset = pt.datasets.Iris()
full_count = len(full_dataset)
train_count = int(full_count * 0.5)
val_count = int(full_count * 0.4)
test_count = int(full_count * 0.1)
train_dataset, val_dataset, test_dataset = random_split(
full_dataset, (train_count, val_count, test_count))
# Dataloader
train_loader = DataLoader(
train_dataset,
batch_size=1,
num_workers=4,
shuffle=True,
)
val_loader = DataLoader(
val_dataset,
batch_size=1,
num_workers=4,
shuffle=False,
)
test_loader = DataLoader(
test_dataset,
batch_size=1,
num_workers=0,
shuffle=False,
)
# ------------------------------------------------------------
# HYPERPARAMETERS
# ------------------------------------------------------------
# Select Initializer
components_initializer = SMCI(full_dataset)
# Define Hyperparameters
hyperparameters = GMLVQ.HyperParameters(
lr=dict(components_layer=0.1, _omega=0),
input_dim=4,
distribution=dict(
num_classes=3,
per_class=1,
),
component_initializer=components_initializer,
)
# Create Model
model = GMLVQ(hyperparameters)
# ------------------------------------------------------------
# TRAINING
# ------------------------------------------------------------
# Controlling Callbacks
recall = LogTorchmetricCallback(
'training_recall',
torchmetrics.Recall,
num_classes=3,
step=Steps.TRAINING,
)
stopping_criterion = LogTorchmetricCallback(
'validation_recall',
torchmetrics.Recall,
num_classes=3,
step=Steps.VALIDATION,
)
es = EarlyStopping(
monitor=stopping_criterion.name,
mode="max",
patience=10,
)
# Visualization Callback
vis = VisGMLVQ2D(data=full_dataset)
# Define trainer
trainer = pl.Trainer(
callbacks=[
vis,
recall,
stopping_criterion,
es,
PlotLambdaMatrixToTensorboard(),
],
max_epochs=100,
)
# Train
trainer.fit(model, train_loader, val_loader)
trainer.test(model, test_loader)
# Manual save
trainer.save_checkpoint("./y_arch.ckpt")
# Load saved model
new_model = GMLVQ.load_from_checkpoint(
checkpoint_path="./y_arch.ckpt",
strict=True,
)
if __name__ == "__main__":
main()

View File

@ -1,39 +1,25 @@
"""`models` plugin for the `prototorch` package."""
from .architectures.base import BaseYArchitecture
from .architectures.comparison import (
OmegaComparisonMixin,
SimpleComparisonMixin,
)
from .architectures.competition import WTACompetitionMixin
from .architectures.components import SupervisedArchitecture
from .architectures.loss import GLVQLossMixin
from .architectures.optimization import (
MultipleLearningRateMixin,
SingleLearningRateMixin,
)
from .callbacks import PrototypeConvergence, PruneLoserPrototypes
from .cbc import CBC, ImageCBC
from .glvq import (
GLVQ,
GLVQ1,
GLVQ21,
GMLVQ,
GRLVQ,
GTLVQ,
LGMLVQ,
LVQMLN,
ImageGLVQ,
ImageGMLVQ,
ImageGTLVQ,
SiameseGLVQ,
SiameseGMLVQ,
SiameseGTLVQ,
)
from .knn import KNN
from .lvq import (
LVQ1,
LVQ21,
MedianLVQ,
)
from .probabilistic import (
CELVQ,
RSLVQ,
SLVQ,
)
from .unsupervised import (
GrowingNeuralGas,
KohonenSOM,
NeuralGas,
)
from .vis import *
__all__ = [
'BaseYArchitecture',
"OmegaComparisonMixin",
"SimpleComparisonMixin",
"SingleLearningRateMixin",
"MultipleLearningRateMixin",
"SupervisedArchitecture",
"WTACompetitionMixin",
"GLVQLossMixin",
]
__version__ = "1.0.0-a5"
__version__ = "1.0.0"

View File

@ -1,219 +0,0 @@
"""Abstract classes to be inherited by prototorch models."""
import logging
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
import torchmetrics
from prototorch.core.competitions import WTAC
from prototorch.core.components import (
AbstractComponents,
Components,
LabeledComponents,
)
from prototorch.core.distances import euclidean_distance
from prototorch.core.initializers import (
LabelsInitializer,
ZerosCompInitializer,
)
from prototorch.core.pooling import stratified_min_pooling
from prototorch.nn.wrappers import LambdaLayer
class ProtoTorchBolt(pl.LightningModule):
"""All ProtoTorch models are ProtoTorch Bolts.
hparams:
- lr: learning rate
kwargs:
- optimizer: optimizer class
- lr_scheduler: learning rate scheduler class
- lr_scheduler_kwargs: learning rate scheduler kwargs
"""
def __init__(self, hparams, **kwargs):
super().__init__()
# Hyperparameters
self.save_hyperparameters(hparams)
# Default hparams
self.hparams.setdefault("lr", 0.01)
# Default config
self.optimizer = kwargs.get("optimizer", torch.optim.Adam)
self.lr_scheduler = kwargs.get("lr_scheduler", None)
self.lr_scheduler_kwargs = kwargs.get("lr_scheduler_kwargs", dict())
def configure_optimizers(self):
optimizer = self.optimizer(self.parameters(), lr=self.hparams["lr"])
if self.lr_scheduler is not None:
scheduler = self.lr_scheduler(optimizer,
**self.lr_scheduler_kwargs)
sch = {
"scheduler": scheduler,
"interval": "step",
} # called after each training step
return [optimizer], [sch]
else:
return optimizer
def reconfigure_optimizers(self):
if self.trainer:
self.trainer.strategy.setup_optimizers(self.trainer)
else:
logging.warning("No trainer to reconfigure optimizers!")
def __repr__(self):
surep = super().__repr__()
indented = "".join([f"\t{line}\n" for line in surep.splitlines()])
wrapped = f"ProtoTorch Bolt(\n{indented})"
return wrapped
class PrototypeModel(ProtoTorchBolt):
"""Abstract Prototype Model
kwargs:
- distance_fn: distance function
"""
proto_layer: AbstractComponents
def __init__(self, hparams, **kwargs):
super().__init__(hparams, **kwargs)
distance_fn = kwargs.get("distance_fn", euclidean_distance)
self.distance_layer = LambdaLayer(distance_fn)
@property
def num_prototypes(self):
return len(self.proto_layer.components)
@property
def prototypes(self):
return self.proto_layer.components.detach().cpu()
@property
def components(self):
"""Only an alias for the prototypes."""
return self.prototypes
def add_prototypes(self, *args, **kwargs):
self.proto_layer.add_components(*args, **kwargs)
self.hparams["distribution"] = self.proto_layer.distribution
self.reconfigure_optimizers()
def remove_prototypes(self, indices):
self.proto_layer.remove_components(indices)
self.hparams["distribution"] = self.proto_layer.distribution
self.reconfigure_optimizers()
class UnsupervisedPrototypeModel(PrototypeModel):
proto_layer: Components
def __init__(self, hparams, **kwargs):
super().__init__(hparams, **kwargs)
# Layers
prototypes_initializer = kwargs.get("prototypes_initializer", None)
if prototypes_initializer is not None:
self.proto_layer = Components(
self.hparams["num_prototypes"],
initializer=prototypes_initializer,
)
def compute_distances(self, x):
protos = self.proto_layer().type_as(x)
distances = self.distance_layer(x, protos)
return distances
def forward(self, x):
distances = self.compute_distances(x)
return distances
class SupervisedPrototypeModel(PrototypeModel):
proto_layer: LabeledComponents
def __init__(self, hparams, skip_proto_layer=False, **kwargs):
super().__init__(hparams, **kwargs)
# Layers
distribution = hparams.get("distribution", None)
prototypes_initializer = kwargs.get("prototypes_initializer", None)
labels_initializer = kwargs.get("labels_initializer",
LabelsInitializer())
if not skip_proto_layer:
# when subclasses do not need a customized prototype layer
if prototypes_initializer is not None:
# when building a new model
self.proto_layer = LabeledComponents(
distribution=distribution,
components_initializer=prototypes_initializer,
labels_initializer=labels_initializer,
)
proto_shape = self.proto_layer.components.shape[1:]
self.hparams["initialized_proto_shape"] = proto_shape
else:
# when restoring a checkpointed model
self.proto_layer = LabeledComponents(
distribution=distribution,
components_initializer=ZerosCompInitializer(
self.hparams["initialized_proto_shape"]),
)
self.competition_layer = WTAC()
@property
def prototype_labels(self):
return self.proto_layer.labels.detach().cpu()
@property
def num_classes(self):
return self.proto_layer.num_classes
def compute_distances(self, x):
protos, _ = self.proto_layer()
distances = self.distance_layer(x, protos)
return distances
def forward(self, x):
distances = self.compute_distances(x)
_, plabels = self.proto_layer()
winning = stratified_min_pooling(distances, plabels)
y_pred = F.softmin(winning, dim=1)
return y_pred
def predict_from_distances(self, distances):
with torch.no_grad():
_, plabels = self.proto_layer()
y_pred = self.competition_layer(distances, plabels)
return y_pred
def predict(self, x):
with torch.no_grad():
distances = self.compute_distances(x)
y_pred = self.predict_from_distances(distances)
return y_pred
def log_acc(self, distances, targets, tag):
preds = self.predict_from_distances(distances)
accuracy = torchmetrics.functional.accuracy(preds.int(), targets.int())
# `.int()` because FloatTensors are assumed to be class probabilities
self.log(tag,
accuracy,
on_step=False,
on_epoch=True,
prog_bar=True,
logger=True)
def test_step(self, batch, batch_idx):
x, targets = batch
preds = self.predict(x)
accuracy = torchmetrics.functional.accuracy(preds.int(), targets.int())
self.log("test_acc", accuracy)

View File

@ -1,7 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Callable, Dict
from typing import Callable
import torch
from prototorch.core.distances import euclidean_distance
@ -9,8 +9,8 @@ from prototorch.core.initializers import (
AbstractLinearTransformInitializer,
EyeLinearTransformInitializer,
)
from prototorch.models.architectures.base import BaseYArchitecture
from prototorch.nn.wrappers import LambdaLayer
from prototorch.y.architectures.base import BaseYArchitecture
from torch import Tensor
from torch.nn.parameter import Parameter
@ -19,11 +19,12 @@ class SimpleComparisonMixin(BaseYArchitecture):
"""
Simple Comparison
A comparison layer that only uses the positions of the components and the batch for dissimilarity computation.
A comparison layer that only uses the positions of the components
and the batch for dissimilarity computation.
"""
# HyperParameters
# ----------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------
@dataclass
class HyperParameters(BaseYArchitecture.HyperParameters):
"""
@ -36,7 +37,7 @@ class SimpleComparisonMixin(BaseYArchitecture):
comparison_parameters: dict = field(default_factory=lambda: dict())
# Steps
# ----------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------
def init_comparison(self, hparams: HyperParameters):
self.comparison_layer = LambdaLayer(
fn=hparams.comparison_fn,
@ -64,19 +65,22 @@ class OmegaComparisonMixin(SimpleComparisonMixin):
"""
Omega Comparison
A comparison layer that uses the positions of the components and the batch for dissimilarity computation.
A comparison layer that uses the positions of the components
and the batch for dissimilarity computation.
"""
_omega: torch.Tensor
# HyperParameters
# ----------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------
@dataclass
class HyperParameters(SimpleComparisonMixin.HyperParameters):
"""
input_dim: Necessary Field: The dimensionality of the input.
latent_dim: The dimensionality of the latent space. Default: 2.
omega_initializer: The initializer to use for the omega matrix. Default: EyeLinearTransformInitializer.
latent_dim:
The dimensionality of the latent space. Default: 2.
omega_initializer:
The initializer to use for the omega matrix. Default: EyeLinearTransformInitializer.
"""
input_dim: int | None = None
latent_dim: int = 2
@ -84,7 +88,7 @@ class OmegaComparisonMixin(SimpleComparisonMixin):
AbstractLinearTransformInitializer] = EyeLinearTransformInitializer
# Steps
# ----------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------
def init_comparison(self, hparams: HyperParameters) -> None:
super().init_comparison(hparams)
@ -100,13 +104,34 @@ class OmegaComparisonMixin(SimpleComparisonMixin):
self.comparison_kwargs = dict(omega=self._omega)
# Properties
# ----------------------------------------------------------------------------------------------------
# ----------------------------------------------------------------------------------------------
@property
def omega_matrix(self):
'''
Omega Matrix. Mapping applied to data and prototypes.
'''
return self._omega.detach().cpu()
@property
def lambda_matrix(self):
'''
Lambda Matrix.
'''
omega = self._omega.detach()
lam = omega @ omega.T
return lam.detach().cpu()
@property
def relevance_profile(self):
'''
Relevance Profile. Main Diagonal of the Lambda Matrix.
'''
return self.lambda_matrix.diag().abs()
@property
def classification_influence_profile(self):
'''
Classification Influence Profile. Influence of each dimension.
'''
lam = self.lambda_matrix
return lam.abs().sum(0)

View File

@ -1,7 +1,7 @@
from dataclasses import dataclass
from prototorch.core.competitions import WTAC
from prototorch.y.architectures.base import BaseYArchitecture
from prototorch.models.architectures.base import BaseYArchitecture
class WTACompetitionMixin(BaseYArchitecture):

View File

@ -6,7 +6,7 @@ from prototorch.core.initializers import (
LabelsInitializer,
ZerosCompInitializer,
)
from prototorch.y import BaseYArchitecture
from prototorch.models import BaseYArchitecture
class SupervisedArchitecture(BaseYArchitecture):

View File

@ -1,7 +1,7 @@
from dataclasses import dataclass, field
from prototorch.core.losses import GLVQLoss
from prototorch.y.architectures.base import BaseYArchitecture
from prototorch.models.architectures.base import BaseYArchitecture
class GLVQLossMixin(BaseYArchitecture):

View File

@ -2,7 +2,7 @@ from dataclasses import dataclass, field
from typing import Type
import torch
from prototorch.y import BaseYArchitecture
from prototorch.models import BaseYArchitecture
from torch.nn.parameter import Parameter

View File

@ -1,152 +1,217 @@
"""Lightning Callbacks."""
import logging
from typing import TYPE_CHECKING
import warnings
from typing import Optional, Type
import numpy as np
import pytorch_lightning as pl
import torch
from prototorch.core.initializers import LiteralCompInitializer
import torchmetrics
from matplotlib import pyplot as plt
from prototorch.models.architectures.base import BaseYArchitecture, Steps
from prototorch.models.library.gmlvq import GMLVQ
from prototorch.models.vis import Vis2DAbstract
from prototorch.utils.utils import mesh2d
from pytorch_lightning.loggers import TensorBoardLogger
from .extras import ConnectionTopology
if TYPE_CHECKING:
from prototorch.models import GLVQ, GrowingNeuralGas
DIVERGING_COLOR_MAPS = [
'PiYG',
'PRGn',
'BrBG',
'PuOr',
'RdGy',
'RdBu',
'RdYlBu',
'RdYlGn',
'Spectral',
'coolwarm',
'bwr',
'seismic',
]
class PruneLoserPrototypes(pl.Callback):
class LogTorchmetricCallback(pl.Callback):
def __init__(
self,
threshold=0.01,
idle_epochs=10,
prune_quota_per_epoch=-1,
frequency=1,
replace=False,
prototypes_initializer=None,
verbose=False,
):
self.threshold = threshold # minimum win ratio
self.idle_epochs = idle_epochs # epochs to wait before pruning
self.prune_quota_per_epoch = prune_quota_per_epoch
self.frequency = frequency
self.replace = replace
self.verbose = verbose
self.prototypes_initializer = prototypes_initializer
name,
metric: Type[torchmetrics.Metric],
step: str = Steps.TRAINING,
**metric_kwargs,
) -> None:
self.name = name
self.metric = metric
self.metric_kwargs = metric_kwargs
self.step = step
def on_train_epoch_end(self, trainer, pl_module: "GLVQ"):
if (trainer.current_epoch + 1) < self.idle_epochs:
return None
if (trainer.current_epoch + 1) % self.frequency:
return None
ratios = pl_module.prototype_win_ratios.mean(dim=0)
to_prune_tensor = torch.arange(len(ratios))[ratios < self.threshold]
to_prune = to_prune_tensor.tolist()
prune_labels = pl_module.prototype_labels[to_prune]
if self.prune_quota_per_epoch > 0:
to_prune = to_prune[:self.prune_quota_per_epoch]
prune_labels = prune_labels[:self.prune_quota_per_epoch]
if len(to_prune) > 0:
logging.debug(f"\nPrototype win ratios: {ratios}")
logging.debug(f"Pruning prototypes at: {to_prune}")
logging.debug(f"Corresponding labels are: {prune_labels.tolist()}")
cur_num_protos = pl_module.num_prototypes
pl_module.remove_prototypes(indices=to_prune)
if self.replace:
labels, counts = torch.unique(prune_labels,
sorted=True,
return_counts=True)
distribution = dict(zip(labels.tolist(), counts.tolist()))
logging.info(f"Re-adding pruned prototypes...")
logging.debug(f"distribution={distribution}")
pl_module.add_prototypes(
distribution=distribution,
components_initializer=self.prototypes_initializer)
new_num_protos = pl_module.num_prototypes
logging.info(f"`num_prototypes` changed from {cur_num_protos} "
f"to {new_num_protos}.")
return True
class PrototypeConvergence(pl.Callback):
def __init__(self, min_delta=0.01, idle_epochs=10, verbose=False):
self.min_delta = min_delta
self.idle_epochs = idle_epochs # epochs to wait
self.verbose = verbose
def on_train_epoch_end(self, trainer, pl_module):
if (trainer.current_epoch + 1) < self.idle_epochs:
return None
logging.info("Stopping...")
# TODO
return True
class GNGCallback(pl.Callback):
"""GNG Callback.
Applies growing algorithm based on accumulated error and topology.
Based on "A Growing Neural Gas Network Learns Topologies" by Bernd Fritzke.
"""
def __init__(self, reduction=0.1, freq=10):
self.reduction = reduction
self.freq = freq
def on_train_epoch_end(
def setup(
self,
trainer: pl.Trainer,
pl_module: "GrowingNeuralGas",
):
if (trainer.current_epoch + 1) % self.freq == 0:
# Get information
errors = pl_module.errors
topology: ConnectionTopology = pl_module.topology_layer
components = pl_module.proto_layer.components
# Insertion point
worst = torch.argmax(errors)
neighbors = topology.get_neighbors(worst)[0]
if len(neighbors) == 0:
logging.log(level=20, msg="No neighbor-pairs found!")
return
neighbors_errors = errors[neighbors]
worst_neighbor = neighbors[torch.argmax(neighbors_errors)]
# New Prototype
new_component = 0.5 * (components[worst] +
components[worst_neighbor])
# Add component
pl_module.proto_layer.add_components(
1,
initializer=LiteralCompInitializer(new_component.unsqueeze(0)),
pl_module: BaseYArchitecture,
stage: Optional[str] = None,
) -> None:
pl_module.register_torchmetric(
self,
self.metric,
step=self.step,
**self.metric_kwargs,
)
# Adjust Topology
topology.add_prototype()
topology.add_connection(worst, -1)
topology.add_connection(worst_neighbor, -1)
topology.remove_connection(worst, worst_neighbor)
def __call__(self, value, pl_module: BaseYArchitecture):
pl_module.log(self.name, value)
# New errors
worst_error = errors[worst].unsqueeze(0)
pl_module.errors = torch.cat([pl_module.errors, worst_error])
pl_module.errors[worst] = errors[worst] * self.reduction
pl_module.errors[
worst_neighbor] = errors[worst_neighbor] * self.reduction
trainer.strategy.setup_optimizers(trainer)
class LogConfusionMatrix(LogTorchmetricCallback):
def __init__(
self,
num_classes,
name="confusion",
on='prediction',
**kwargs,
):
super().__init__(
name,
torchmetrics.ConfusionMatrix,
on=on,
num_classes=num_classes,
**kwargs,
)
def __call__(self, value, pl_module: BaseYArchitecture):
fig, ax = plt.subplots()
ax.imshow(value.detach().cpu().numpy())
# Show all ticks and label them with the respective list entries
# ax.set_xticks(np.arange(len(farmers)), labels=farmers)
# ax.set_yticks(np.arange(len(vegetables)), labels=vegetables)
# Rotate the tick labels and set their alignment.
plt.setp(
ax.get_xticklabels(),
rotation=45,
ha="right",
rotation_mode="anchor",
)
# Loop over data dimensions and create text annotations.
for i in range(len(value)):
for j in range(len(value)):
text = ax.text(
j,
i,
value[i, j].item(),
ha="center",
va="center",
color="w",
)
ax.set_title(self.name)
fig.tight_layout()
pl_module.logger.experiment.add_figure(
tag=self.name,
figure=fig,
close=True,
global_step=pl_module.global_step,
)
class VisGLVQ2D(Vis2DAbstract):
def visualize(self, pl_module):
protos = pl_module.prototypes
plabels = pl_module.prototype_labels
x_train, y_train = self.x_train, self.y_train
ax = self.setup_ax()
self.plot_protos(ax, protos, plabels)
if x_train is not None:
self.plot_data(ax, x_train, y_train)
mesh_input, xx, yy = mesh2d(
np.vstack([x_train, protos]),
self.border,
self.resolution,
)
else:
mesh_input, xx, yy = mesh2d(protos, self.border, self.resolution)
_components = pl_module.components_layer.components
mesh_input = torch.from_numpy(mesh_input).type_as(_components)
y_pred = pl_module.predict(mesh_input)
y_pred = y_pred.cpu().reshape(xx.shape)
ax.contourf(xx, yy, y_pred, cmap=self.cmap, alpha=0.35)
class VisGMLVQ2D(Vis2DAbstract):
def __init__(self, *args, ev_proj=True, **kwargs):
super().__init__(*args, **kwargs)
self.ev_proj = ev_proj
def visualize(self, pl_module):
protos = pl_module.prototypes
plabels = pl_module.prototype_labels
x_train, y_train = self.x_train, self.y_train
device = pl_module.device
omega = pl_module._omega.detach()
lam = omega @ omega.T
u, _, _ = torch.pca_lowrank(lam, q=2)
with torch.no_grad():
x_train = torch.Tensor(x_train).to(device)
x_train = x_train @ u
x_train = x_train.cpu().detach()
if self.show_protos:
with torch.no_grad():
protos = torch.Tensor(protos).to(device)
protos = protos @ u
protos = protos.cpu().detach()
ax = self.setup_ax()
self.plot_data(ax, x_train, y_train)
if self.show_protos:
self.plot_protos(ax, protos, plabels)
class PlotLambdaMatrixToTensorboard(pl.Callback):
def __init__(self, cmap='seismic') -> None:
super().__init__()
self.cmap = cmap
if self.cmap not in DIVERGING_COLOR_MAPS and type(self.cmap) is str:
warnings.warn(
f"{self.cmap} is not a diverging color map. We recommend to use one of the following: {DIVERGING_COLOR_MAPS}"
)
def on_train_start(self, trainer, pl_module: GMLVQ):
self.plot_lambda(trainer, pl_module)
def on_train_epoch_end(self, trainer, pl_module: GMLVQ):
self.plot_lambda(trainer, pl_module)
def plot_lambda(self, trainer, pl_module: GMLVQ):
self.fig, self.ax = plt.subplots(1, 1)
# plot lambda matrix
l_matrix = pl_module.lambda_matrix
# normalize lambda matrix
l_matrix = l_matrix / torch.max(torch.abs(l_matrix))
# plot lambda matrix
self.ax.imshow(l_matrix.detach().numpy(), self.cmap, vmin=-1, vmax=1)
self.fig.colorbar(self.ax.images[-1])
# add title
self.ax.set_title('Lambda Matrix')
# add to tensorboard
if isinstance(trainer.logger, TensorBoardLogger):
trainer.logger.experiment.add_figure(
f"lambda_matrix",
self.fig,
trainer.global_step,
)
else:
warnings.warn(
f"{self.__class__.__name__} is not compatible with {trainer.logger.__class__.__name__} as logger. Use TensorBoardLogger instead."
)

View File

@ -1,80 +0,0 @@
import torch
import torch.nn.functional as F
import torchmetrics
from prototorch.core.competitions import CBCC
from prototorch.core.components import ReasoningComponents
from prototorch.core.initializers import RandomReasoningsInitializer
from prototorch.core.losses import MarginLoss
from prototorch.core.similarities import euclidean_similarity
from prototorch.nn.wrappers import LambdaLayer
from .glvq import SiameseGLVQ
from .mixins import ImagePrototypesMixin
class CBC(SiameseGLVQ):
"""Classification-By-Components."""
proto_layer: ReasoningComponents
def __init__(self, hparams, **kwargs):
super().__init__(hparams, skip_proto_layer=True, **kwargs)
similarity_fn = kwargs.get("similarity_fn", euclidean_similarity)
components_initializer = kwargs.get("components_initializer", None)
reasonings_initializer = kwargs.get("reasonings_initializer",
RandomReasoningsInitializer())
self.components_layer = ReasoningComponents(
self.hparams["distribution"],
components_initializer=components_initializer,
reasonings_initializer=reasonings_initializer,
)
self.similarity_layer = LambdaLayer(similarity_fn)
self.competition_layer = CBCC()
# Namespace hook
self.proto_layer = self.components_layer
self.loss = MarginLoss(self.hparams["margin"])
def forward(self, x):
components, reasonings = self.components_layer()
latent_x = self.backbone(x)
self.backbone.requires_grad_(self.both_path_gradients)
latent_components = self.backbone(components)
self.backbone.requires_grad_(True)
detections = self.similarity_layer(latent_x, latent_components)
probs = self.competition_layer(detections, reasonings)
return probs
def shared_step(self, batch, batch_idx, optimizer_idx=None):
x, y = batch
y_pred = self(x)
num_classes = self.num_classes
y_true = F.one_hot(y.long(), num_classes=num_classes)
loss = self.loss(y_pred, y_true).mean()
return y_pred, loss
def training_step(self, batch, batch_idx, optimizer_idx=None):
y_pred, train_loss = self.shared_step(batch, batch_idx, optimizer_idx)
preds = torch.argmax(y_pred, dim=1)
accuracy = torchmetrics.functional.accuracy(preds.int(),
batch[1].int())
self.log("train_acc",
accuracy,
on_step=False,
on_epoch=True,
prog_bar=True,
logger=True)
return train_loss
def predict(self, x):
with torch.no_grad():
y_pred = self(x)
y_pred = torch.argmax(y_pred, dim=1)
return y_pred
class ImageCBC(ImagePrototypesMixin, CBC):
"""CBC model that constrains the components to the range [0, 1] by
clamping after updates.
"""

View File

@ -1,130 +0,0 @@
"""prototorch.models.extras
Modules not yet available in prototorch go here temporarily.
"""
import torch
from prototorch.core.similarities import gaussian
def rank_scaled_gaussian(distances, lambd):
order = torch.argsort(distances, dim=1)
ranks = torch.argsort(order, dim=1)
return torch.exp(-torch.exp(-ranks / lambd) * distances)
def orthogonalization(tensors):
"""Orthogonalization via polar decomposition """
u, _, v = torch.svd(tensors, compute_uv=True)
u_shape = tuple(list(u.shape))
v_shape = tuple(list(v.shape))
# reshape to (num x N x M)
u = torch.reshape(u, (-1, u_shape[-2], u_shape[-1]))
v = torch.reshape(v, (-1, v_shape[-2], v_shape[-1]))
out = u @ v.permute([0, 2, 1])
out = torch.reshape(out, u_shape[:-1] + (v_shape[-2], ))
return out
def ltangent_distance(x, y, omegas):
r"""Localized Tangent distance.
Compute Orthogonal Complement: math:`\bm P_k = \bm I - \Omega_k \Omega_k^T`
Compute Tangent Distance: math:`{\| \bm P \bm x - \bm P_k \bm y_k \|}_2`
:param `torch.tensor` omegas: Three dimensional matrix
:rtype: `torch.tensor`
"""
x, y = [arr.view(arr.size(0), -1) for arr in (x, y)]
p = torch.eye(omegas.shape[-2], device=omegas.device) - torch.bmm(
omegas, omegas.permute([0, 2, 1]))
projected_x = x @ p
projected_y = torch.diagonal(y @ p).T
expanded_y = torch.unsqueeze(projected_y, dim=1)
batchwise_difference = expanded_y - projected_x
differences_squared = batchwise_difference**2
distances = torch.sqrt(torch.sum(differences_squared, dim=2))
distances = distances.permute(1, 0)
return distances
class GaussianPrior(torch.nn.Module):
def __init__(self, variance):
super().__init__()
self.variance = variance
def forward(self, distances):
return gaussian(distances, self.variance)
class RankScaledGaussianPrior(torch.nn.Module):
def __init__(self, lambd):
super().__init__()
self.lambd = lambd
def forward(self, distances):
return rank_scaled_gaussian(distances, self.lambd)
class ConnectionTopology(torch.nn.Module):
def __init__(self, agelimit, num_prototypes):
super().__init__()
self.agelimit = agelimit
self.num_prototypes = num_prototypes
self.cmat = torch.zeros((self.num_prototypes, self.num_prototypes))
self.age = torch.zeros_like(self.cmat)
def forward(self, d):
order = torch.argsort(d, dim=1)
for element in order:
i0, i1 = element[0], element[1]
self.cmat[i0][i1] = 1
self.cmat[i1][i0] = 1
self.age[i0][i1] = 0
self.age[i1][i0] = 0
self.age[i0][self.cmat[i0] == 1] += 1
self.age[i1][self.cmat[i1] == 1] += 1
self.cmat[i0][self.age[i0] > self.agelimit] = 0
self.cmat[i1][self.age[i1] > self.agelimit] = 0
def get_neighbors(self, position):
return torch.where(self.cmat[position])
def add_prototype(self):
new_cmat = torch.zeros([dim + 1 for dim in self.cmat.shape])
new_cmat[:-1, :-1] = self.cmat
self.cmat = new_cmat
new_age = torch.zeros([dim + 1 for dim in self.age.shape])
new_age[:-1, :-1] = self.age
self.age = new_age
def add_connection(self, a, b):
self.cmat[a][b] = 1
self.cmat[b][a] = 1
self.age[a][b] = 0
self.age[b][a] = 0
def remove_connection(self, a, b):
self.cmat[a][b] = 0
self.cmat[b][a] = 0
self.age[a][b] = 0
self.age[b][a] = 0
def extra_repr(self):
return f"(agelimit): ({self.agelimit})"

View File

@ -1,414 +0,0 @@
"""Models based on the GLVQ framework."""
import torch
from prototorch.core.competitions import wtac
from prototorch.core.distances import (
lomega_distance,
omega_distance,
squared_euclidean_distance,
)
from prototorch.core.initializers import EyeLinearTransformInitializer
from prototorch.core.losses import (
GLVQLoss,
lvq1_loss,
lvq21_loss,
)
from prototorch.core.transforms import LinearTransform
from prototorch.nn.wrappers import LambdaLayer, LossLayer
from torch.nn.parameter import Parameter
from .abstract import SupervisedPrototypeModel
from .extras import ltangent_distance, orthogonalization
from .mixins import ImagePrototypesMixin
class GLVQ(SupervisedPrototypeModel):
"""Generalized Learning Vector Quantization."""
def __init__(self, hparams, **kwargs):
super().__init__(hparams, **kwargs)
# Default hparams
self.hparams.setdefault("margin", 0.0)
self.hparams.setdefault("transfer_fn", "identity")
self.hparams.setdefault("transfer_beta", 10.0)
# Loss
self.loss = GLVQLoss(
margin=self.hparams["margin"],
transfer_fn=self.hparams["transfer_fn"],
beta=self.hparams["transfer_beta"],
)
# def on_save_checkpoint(self, checkpoint):
# if "prototype_win_ratios" in checkpoint["state_dict"]:
# del checkpoint["state_dict"]["prototype_win_ratios"]
def initialize_prototype_win_ratios(self):
self.register_buffer(
"prototype_win_ratios",
torch.zeros(self.num_prototypes, device=self.device),
)
def on_train_epoch_start(self):
self.initialize_prototype_win_ratios()
def log_prototype_win_ratios(self, distances):
batch_size = len(distances)
prototype_wc = torch.zeros(
self.num_prototypes,
dtype=torch.long,
device=self.device,
)
wi, wc = torch.unique(
distances.min(dim=-1).indices,
sorted=True,
return_counts=True,
)
prototype_wc[wi] = wc
prototype_wr = prototype_wc / batch_size
self.prototype_win_ratios = torch.vstack([
self.prototype_win_ratios,
prototype_wr,
])
def shared_step(self, batch, batch_idx, optimizer_idx=None):
x, y = batch
out = self.compute_distances(x)
_, plabels = self.proto_layer()
loss = self.loss(out, y, plabels)
return out, loss
def training_step(self, batch, batch_idx, optimizer_idx=None):
out, train_loss = self.shared_step(batch, batch_idx, optimizer_idx)
self.log_prototype_win_ratios(out)
self.log("train_loss", train_loss)
self.log_acc(out, batch[-1], tag="train_acc")
return train_loss
def validation_step(self, batch, batch_idx):
out, val_loss = self.shared_step(batch, batch_idx)
self.log("val_loss", val_loss)
self.log_acc(out, batch[-1], tag="val_acc")
return val_loss
def test_step(self, batch, batch_idx):
out, test_loss = self.shared_step(batch, batch_idx)
self.log_acc(out, batch[-1], tag="test_acc")
return test_loss
def test_epoch_end(self, outputs):
test_loss = 0.0
for batch_loss in outputs:
test_loss += batch_loss.item()
self.log("test_loss", test_loss)
class SiameseGLVQ(GLVQ):
"""GLVQ in a Siamese setting.
GLVQ model that applies an arbitrary transformation on the inputs and the
prototypes before computing the distances between them. The weights in the
transformation pipeline are only learned from the inputs.
"""
def __init__(
self,
hparams,
backbone=torch.nn.Identity(),
both_path_gradients=False,
**kwargs,
):
distance_fn = kwargs.pop("distance_fn", squared_euclidean_distance)
super().__init__(hparams, distance_fn=distance_fn, **kwargs)
self.backbone = backbone
self.both_path_gradients = both_path_gradients
def configure_optimizers(self):
proto_opt = self.optimizer(
self.proto_layer.parameters(),
lr=self.hparams["proto_lr"],
)
# Only add a backbone optimizer if backbone has trainable parameters
bb_params = list(self.backbone.parameters())
if (bb_params):
bb_opt = self.optimizer(bb_params, lr=self.hparams["bb_lr"])
optimizers = [proto_opt, bb_opt]
else:
optimizers = [proto_opt]
if self.lr_scheduler is not None:
schedulers = []
for optimizer in optimizers:
scheduler = self.lr_scheduler(optimizer,
**self.lr_scheduler_kwargs)
schedulers.append(scheduler)
return optimizers, schedulers
else:
return optimizers
def compute_distances(self, x):
protos, _ = self.proto_layer()
x, protos = [arr.view(arr.size(0), -1) for arr in (x, protos)]
latent_x = self.backbone(x)
bb_grad = any([el.requires_grad for el in self.backbone.parameters()])
self.backbone.requires_grad_(bb_grad and self.both_path_gradients)
latent_protos = self.backbone(protos)
self.backbone.requires_grad_(bb_grad)
distances = self.distance_layer(latent_x, latent_protos)
return distances
def predict_latent(self, x, map_protos=True):
"""Predict `x` assuming it is already embedded in the latent space.
Only the prototypes are embedded in the latent space using the
backbone.
"""
self.eval()
with torch.no_grad():
protos, plabels = self.proto_layer()
if map_protos:
protos = self.backbone(protos)
d = self.distance_layer(x, protos)
y_pred = wtac(d, plabels)
return y_pred
class LVQMLN(SiameseGLVQ):
"""Learning Vector Quantization Multi-Layer Network.
GLVQ model that applies an arbitrary transformation on the inputs, BUT NOT
on the prototypes before computing the distances between them. This of
course, means that the prototypes no longer live the input space, but
rather in the embedding space.
"""
def compute_distances(self, x):
latent_protos, _ = self.proto_layer()
latent_x = self.backbone(x)
distances = self.distance_layer(latent_x, latent_protos)
return distances
class GRLVQ(SiameseGLVQ):
"""Generalized Relevance Learning Vector Quantization.
Implemented as a Siamese network with a linear transformation backbone.
TODO Make a RelevanceLayer. `bb_lr` is ignored otherwise.
"""
_relevances: torch.Tensor
def __init__(self, hparams, **kwargs):
super().__init__(hparams, **kwargs)
# Additional parameters
relevances = torch.ones(self.hparams["input_dim"], device=self.device)
self.register_parameter("_relevances", Parameter(relevances))
# Override the backbone
self.backbone = LambdaLayer(lambda x: x @ torch.diag(self._relevances),
name="relevance scaling")
@property
def relevance_profile(self):
return self._relevances.detach().cpu()
def extra_repr(self):
return f"(relevances): (shape: {tuple(self._relevances.shape)})"
class SiameseGMLVQ(SiameseGLVQ):
"""Generalized Matrix Learning Vector Quantization.
Implemented as a Siamese network with a linear transformation backbone.
"""
def __init__(self, hparams, **kwargs):
super().__init__(hparams, **kwargs)
# Override the backbone
omega_initializer = kwargs.get("omega_initializer",
EyeLinearTransformInitializer())
self.backbone = LinearTransform(
self.hparams["input_dim"],
self.hparams["latent_dim"],
initializer=omega_initializer,
)
@property
def omega_matrix(self):
return self.backbone.weights
@property
def lambda_matrix(self):
omega = self.backbone.weights # (input_dim, latent_dim)
lam = omega @ omega.T
return lam.detach().cpu()
class GMLVQ(GLVQ):
"""Generalized Matrix Learning Vector Quantization.
Implemented as a regular GLVQ network that simply uses a different distance
function. This makes it easier to implement a localized variant.
"""
# Parameters
_omega: torch.Tensor
def __init__(self, hparams, **kwargs):
distance_fn = kwargs.pop("distance_fn", omega_distance)
super().__init__(hparams, distance_fn=distance_fn, **kwargs)
# Additional parameters
omega_initializer = kwargs.get(
"omega_initializer",
EyeLinearTransformInitializer(),
)
omega = omega_initializer.generate(
self.hparams["input_dim"],
self.hparams["latent_dim"],
)
self.register_parameter("_omega", Parameter(omega))
self.backbone = LambdaLayer(
lambda x: x @ self._omega,
name="omega matrix",
)
@property
def omega_matrix(self):
return self._omega.detach().cpu()
@property
def lambda_matrix(self):
omega = self._omega.detach() # (input_dim, latent_dim)
lam = omega @ omega.T
return lam.detach().cpu()
def compute_distances(self, x):
protos, _ = self.proto_layer()
distances = self.distance_layer(x, protos, self._omega)
return distances
def extra_repr(self):
return f"(omega): (shape: {tuple(self._omega.shape)})"
class LGMLVQ(GMLVQ):
"""Localized and Generalized Matrix Learning Vector Quantization."""
def __init__(self, hparams, **kwargs):
distance_fn = kwargs.pop("distance_fn", lomega_distance)
super().__init__(hparams, distance_fn=distance_fn, **kwargs)
# Re-register `_omega` to override the one from the super class.
omega = torch.randn(
self.num_prototypes,
self.hparams["input_dim"],
self.hparams["latent_dim"],
device=self.device,
)
self.register_parameter("_omega", Parameter(omega))
class GTLVQ(LGMLVQ):
"""Localized and Generalized Tangent Learning Vector Quantization."""
def __init__(self, hparams, **kwargs):
distance_fn = kwargs.pop("distance_fn", ltangent_distance)
super().__init__(hparams, distance_fn=distance_fn, **kwargs)
omega_initializer = kwargs.get("omega_initializer")
if omega_initializer is not None:
subspace = omega_initializer.generate(
self.hparams["input_dim"],
self.hparams["latent_dim"],
)
omega = torch.repeat_interleave(
subspace.unsqueeze(0),
self.num_prototypes,
dim=0,
)
else:
omega = torch.rand(
self.num_prototypes,
self.hparams["input_dim"],
self.hparams["latent_dim"],
device=self.device,
)
# Re-register `_omega` to override the one from the super class.
self.register_parameter("_omega", Parameter(omega))
def on_train_batch_end(self, outputs, batch, batch_idx):
with torch.no_grad():
self._omega.copy_(orthogonalization(self._omega))
class SiameseGTLVQ(SiameseGLVQ, GTLVQ):
"""Generalized Tangent Learning Vector Quantization.
Implemented as a Siamese network with a linear transformation backbone.
"""
class GLVQ1(GLVQ):
"""Generalized Learning Vector Quantization 1."""
def __init__(self, hparams, **kwargs):
super().__init__(hparams, **kwargs)
self.loss = LossLayer(lvq1_loss)
self.optimizer = torch.optim.SGD
class GLVQ21(GLVQ):
"""Generalized Learning Vector Quantization 2.1."""
def __init__(self, hparams, **kwargs):
super().__init__(hparams, **kwargs)
self.loss = LossLayer(lvq21_loss)
self.optimizer = torch.optim.SGD
class ImageGLVQ(ImagePrototypesMixin, GLVQ):
"""GLVQ for training on image data.
GLVQ model that constrains the prototypes to the range [0, 1] by clamping
after updates.
"""
class ImageGMLVQ(ImagePrototypesMixin, GMLVQ):
"""GMLVQ for training on image data.
GMLVQ model that constrains the prototypes to the range [0, 1] by clamping
after updates.
"""
class ImageGTLVQ(ImagePrototypesMixin, GTLVQ):
"""GTLVQ for training on image data.
GTLVQ model that constrains the prototypes to the range [0, 1] by clamping
after updates.
"""
def on_train_batch_end(self, outputs, batch, batch_idx):
"""Constrain the components to the range [0, 1] by clamping after updates."""
self.proto_layer.components.data.clamp_(0.0, 1.0)
with torch.no_grad():
self._omega.copy_(orthogonalization(self._omega))

View File

@ -1,45 +0,0 @@
"""ProtoTorch KNN model."""
import warnings
from prototorch.core.competitions import KNNC
from prototorch.core.components import LabeledComponents
from prototorch.core.initializers import (
LiteralCompInitializer,
LiteralLabelsInitializer,
)
from prototorch.utils.utils import parse_data_arg
from .abstract import SupervisedPrototypeModel
class KNN(SupervisedPrototypeModel):
"""K-Nearest-Neighbors classification algorithm."""
def __init__(self, hparams, **kwargs):
super().__init__(hparams, skip_proto_layer=True, **kwargs)
# Default hparams
self.hparams.setdefault("k", 1)
data = kwargs.get("data", None)
if data is None:
raise ValueError("KNN requires data, but was not provided!")
data, targets = parse_data_arg(data)
# Layers
self.proto_layer = LabeledComponents(
distribution=len(data) * [1],
components_initializer=LiteralCompInitializer(data),
labels_initializer=LiteralLabelsInitializer(targets))
self.competition_layer = KNNC(k=self.hparams.k)
def training_step(self, train_batch, batch_idx, optimizer_idx=None):
return 1 # skip training step
def on_train_batch_start(self, train_batch, batch_idx):
warnings.warn("k-NN has no training, skipping!")
return -1
def configure_optimizers(self):
return None

View File

@ -1,12 +1,12 @@
from dataclasses import dataclass
from prototorch.y import (
from prototorch.models import (
SimpleComparisonMixin,
SingleLearningRateMixin,
SupervisedArchitecture,
WTACompetitionMixin,
)
from prototorch.y.architectures.loss import GLVQLossMixin
from prototorch.models.architectures.loss import GLVQLossMixin
class GLVQ(

View File

@ -5,7 +5,7 @@ from typing import Callable
import torch
from prototorch.core.distances import omega_distance
from prototorch.y import (
from prototorch.models import (
GLVQLossMixin,
MultipleLearningRateMixin,
OmegaComparisonMixin,

View File

@ -1,138 +0,0 @@
"""LVQ models that are optimized using non-gradient methods."""
import logging
from collections import OrderedDict
from prototorch.core.losses import _get_dp_dm
from prototorch.nn.activations import get_activation
from prototorch.nn.wrappers import LambdaLayer
from .glvq import GLVQ
from .mixins import NonGradientMixin
class LVQ1(NonGradientMixin, GLVQ):
"""Learning Vector Quantization 1."""
def training_step(self, train_batch, batch_idx, optimizer_idx=None):
protos, plabels = self.proto_layer()
x, y = train_batch
dis = self.compute_distances(x)
# TODO Vectorized implementation
for xi, yi in zip(x, y):
d = self.compute_distances(xi.view(1, -1))
preds = self.competition_layer(d, plabels)
w = d.argmin(1)
if yi == preds:
shift = xi - protos[w]
else:
shift = protos[w] - xi
updated_protos = protos + 0.0
updated_protos[w] = protos[w] + (self.hparams["lr"] * shift)
self.proto_layer.load_state_dict(
OrderedDict(_components=updated_protos),
strict=False,
)
logging.debug(f"dis={dis}")
logging.debug(f"y={y}")
# Logging
self.log_acc(dis, y, tag="train_acc")
return None
class LVQ21(NonGradientMixin, GLVQ):
"""Learning Vector Quantization 2.1."""
def training_step(self, train_batch, batch_idx, optimizer_idx=None):
protos, plabels = self.proto_layer()
x, y = train_batch
dis = self.compute_distances(x)
# TODO Vectorized implementation
for xi, yi in zip(x, y):
xi = xi.view(1, -1)
yi = yi.view(1, )
d = self.compute_distances(xi)
(_, wp), (_, wn) = _get_dp_dm(d, yi, plabels, with_indices=True)
shiftp = xi - protos[wp]
shiftn = protos[wn] - xi
updated_protos = protos + 0.0
updated_protos[wp] = protos[wp] + (self.hparams["lr"] * shiftp)
updated_protos[wn] = protos[wn] + (self.hparams["lr"] * shiftn)
self.proto_layer.load_state_dict(
OrderedDict(_components=updated_protos),
strict=False,
)
# Logging
self.log_acc(dis, y, tag="train_acc")
return None
class MedianLVQ(NonGradientMixin, GLVQ):
"""Median LVQ
# TODO Avoid computing distances over and over
"""
def __init__(self, hparams, **kwargs):
super().__init__(hparams, **kwargs)
self.transfer_layer = LambdaLayer(
get_activation(self.hparams["transfer_fn"]))
def _f(self, x, y, protos, plabels):
d = self.distance_layer(x, protos)
dp, dm = _get_dp_dm(d, y, plabels, with_indices=False)
mu = (dp - dm) / (dp + dm)
negative_mu = -1.0 * mu
f = self.transfer_layer(
negative_mu,
beta=self.hparams["transfer_beta"],
) + 1.0
return f
def expectation(self, x, y, protos, plabels):
f = self._f(x, y, protos, plabels)
gamma = f / f.sum()
return gamma
def lower_bound(self, x, y, protos, plabels, gamma):
f = self._f(x, y, protos, plabels)
lower_bound = (gamma * f.log()).sum()
return lower_bound
def training_step(self, train_batch, batch_idx, optimizer_idx=None):
protos, plabels = self.proto_layer()
x, y = train_batch
dis = self.compute_distances(x)
for i, _ in enumerate(protos):
# Expectation step
gamma = self.expectation(x, y, protos, plabels)
lower_bound = self.lower_bound(x, y, protos, plabels, gamma)
# Maximization step
_protos = protos + 0
for k, xk in enumerate(x):
_protos[i] = xk
_lower_bound = self.lower_bound(x, y, _protos, plabels, gamma)
if _lower_bound > lower_bound:
logging.debug(f"Updating prototype {i} to data {k}...")
self.proto_layer.load_state_dict(
OrderedDict(_components=_protos),
strict=False,
)
break
# Logging
self.log_acc(dis, y, tag="train_acc")
return None

View File

@ -1,35 +0,0 @@
import pytorch_lightning as pl
import torch
from prototorch.core.components import Components
class ProtoTorchMixin(pl.LightningModule):
"""All mixins are ProtoTorchMixins."""
class NonGradientMixin(ProtoTorchMixin):
"""Mixin for custom non-gradient optimization."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.automatic_optimization = False
def training_step(self, train_batch, batch_idx, optimizer_idx=None):
raise NotImplementedError
class ImagePrototypesMixin(ProtoTorchMixin):
"""Mixin for models with image prototypes."""
proto_layer: Components
components: torch.Tensor
def on_train_batch_end(self, outputs, batch, batch_idx):
"""Constrain the components to the range [0, 1] by clamping after updates."""
self.proto_layer.components.data.clamp_(0.0, 1.0)
def get_prototype_grid(self, num_columns=2, return_channels_last=True):
from torchvision.utils import make_grid
grid = make_grid(self.components, nrow=num_columns)
if return_channels_last:
grid = grid.permute((1, 2, 0))
return grid.cpu()

View File

@ -1,131 +0,0 @@
"""Probabilistic GLVQ methods"""
import torch
from prototorch.core.losses import nllr_loss, rslvq_loss
from prototorch.core.pooling import (
stratified_min_pooling,
stratified_sum_pooling,
)
from prototorch.nn.wrappers import LossLayer
from .extras import GaussianPrior, RankScaledGaussianPrior
from .glvq import GLVQ, SiameseGMLVQ
class CELVQ(GLVQ):
"""Cross-Entropy Learning Vector Quantization."""
def __init__(self, hparams, **kwargs):
super().__init__(hparams, **kwargs)
# Loss
self.loss = torch.nn.CrossEntropyLoss()
def shared_step(self, batch, batch_idx, optimizer_idx=None):
x, y = batch
out = self.compute_distances(x) # [None, num_protos]
_, plabels = self.proto_layer()
winning = stratified_min_pooling(out, plabels) # [None, num_classes]
probs = -1.0 * winning
batch_loss = self.loss(probs, y.long())
loss = batch_loss.sum()
return out, loss
class ProbabilisticLVQ(GLVQ):
def __init__(self, hparams, rejection_confidence=0.0, **kwargs):
super().__init__(hparams, **kwargs)
self.rejection_confidence = rejection_confidence
self._conditional_distribution = None
def forward(self, x):
distances = self.compute_distances(x)
conditional = self.conditional_distribution(distances)
prior = (1. / self.num_prototypes) * torch.ones(self.num_prototypes,
device=self.device)
posterior = conditional * prior
plabels = self.proto_layer._labels
if isinstance(plabels, torch.LongTensor) or isinstance(
plabels, torch.cuda.LongTensor): # type: ignore
y_pred = stratified_sum_pooling(posterior, plabels) # type: ignore
else:
raise ValueError("Labels must be LongTensor.")
return y_pred
def predict(self, x):
y_pred = self.forward(x)
confidence, prediction = torch.max(y_pred, dim=1)
prediction[confidence < self.rejection_confidence] = -1
return prediction
def training_step(self, batch, batch_idx, optimizer_idx=None):
x, y = batch
out = self.forward(x)
_, plabels = self.proto_layer()
batch_loss = self.loss(out, y, plabels)
loss = batch_loss.sum()
return loss
def conditional_distribution(self, distances):
"""Conditional distribution of distances."""
if self._conditional_distribution is None:
raise ValueError("Conditional distribution is not set.")
return self._conditional_distribution(distances)
class SLVQ(ProbabilisticLVQ):
"""Soft Learning Vector Quantization."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Default hparams
self.hparams.setdefault("variance", 1.0)
variance = self.hparams.get("variance")
self._conditional_distribution = GaussianPrior(variance)
self.loss = LossLayer(nllr_loss)
class RSLVQ(ProbabilisticLVQ):
"""Robust Soft Learning Vector Quantization."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Default hparams
self.hparams.setdefault("variance", 1.0)
variance = self.hparams.get("variance")
self._conditional_distribution = GaussianPrior(variance)
self.loss = LossLayer(rslvq_loss)
class PLVQ(ProbabilisticLVQ, SiameseGMLVQ):
"""Probabilistic Learning Vector Quantization.
TODO: Use Backbone LVQ instead
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Default hparams
self.hparams.setdefault("lambda", 1.0)
lam = self.hparams.get("lambda", 1.0)
self.conditional_distribution = RankScaledGaussianPrior(lam)
self.loss = torch.nn.KLDivLoss()
# FIXME
# def training_step(self, batch, batch_idx, optimizer_idx=None):
# x, y = batch
# y_pred = self(x)
# batch_loss = self.loss(y_pred, y)
# loss = batch_loss.sum()
# return loss

View File

@ -1,155 +0,0 @@
"""Unsupervised prototype learning algorithms."""
import numpy as np
import torch
from prototorch.core.competitions import wtac
from prototorch.core.distances import squared_euclidean_distance
from prototorch.core.losses import NeuralGasEnergy
from .abstract import UnsupervisedPrototypeModel
from .callbacks import GNGCallback
from .extras import ConnectionTopology
from .mixins import NonGradientMixin
class KohonenSOM(NonGradientMixin, UnsupervisedPrototypeModel):
"""Kohonen Self-Organizing-Map.
TODO Allow non-2D grids
"""
_grid: torch.Tensor
def __init__(self, hparams, **kwargs):
h, w = hparams.get("shape")
# Ignore `num_prototypes`
hparams["num_prototypes"] = h * w
distance_fn = kwargs.pop("distance_fn", squared_euclidean_distance)
super().__init__(hparams, distance_fn=distance_fn, **kwargs)
# Hyperparameters
self.save_hyperparameters(hparams)
# Default hparams
self.hparams.setdefault("alpha", 0.3)
self.hparams.setdefault("sigma", max(h, w) / 2.0)
# Additional parameters
x, y = torch.arange(h), torch.arange(w)
grid = torch.stack(torch.meshgrid(x, y, indexing="ij"), dim=-1)
self.register_buffer("_grid", grid)
self._sigma = self.hparams.sigma
self._lr = self.hparams.lr
def predict_from_distances(self, distances):
grid = self._grid.view(-1, 2)
wp = wtac(distances, grid)
return wp
def training_step(self, train_batch, batch_idx):
# x = train_batch
# TODO Check if the batch has labels
x = train_batch[0]
d = self.compute_distances(x)
wp = self.predict_from_distances(d)
grid = self._grid.view(-1, 2)
gd = squared_euclidean_distance(wp, grid)
nh = torch.exp(-gd / self._sigma**2)
protos = self.proto_layer()
diff = x.unsqueeze(dim=1) - protos
delta = self._lr * self.hparams.alpha * nh.unsqueeze(-1) * diff
updated_protos = protos + delta.sum(dim=0)
self.proto_layer.load_state_dict(
{"_components": updated_protos},
strict=False,
)
def training_epoch_end(self, training_step_outputs):
self._sigma = self.hparams.sigma * np.exp(
-self.current_epoch / self.trainer.max_epochs)
def extra_repr(self):
return f"(grid): (shape: {tuple(self._grid.shape)})"
class HeskesSOM(UnsupervisedPrototypeModel):
def __init__(self, hparams, **kwargs):
super().__init__(hparams, **kwargs)
def training_step(self, train_batch, batch_idx):
# TODO Implement me!
raise NotImplementedError()
class NeuralGas(UnsupervisedPrototypeModel):
def __init__(self, hparams, **kwargs):
super().__init__(hparams, **kwargs)
# Hyperparameters
self.save_hyperparameters(hparams)
# Default hparams
self.hparams.setdefault("age_limit", 10)
self.hparams.setdefault("lm", 1)
self.energy_layer = NeuralGasEnergy(lm=self.hparams["lm"])
self.topology_layer = ConnectionTopology(
agelimit=self.hparams["age_limit"],
num_prototypes=self.hparams["num_prototypes"],
)
def training_step(self, train_batch, batch_idx):
# x = train_batch
# TODO Check if the batch has labels
x = train_batch[0]
d = self.compute_distances(x)
loss, _ = self.energy_layer(d)
self.topology_layer(d)
self.log("loss", loss)
return loss
class GrowingNeuralGas(NeuralGas):
errors: torch.Tensor
def __init__(self, hparams, **kwargs):
super().__init__(hparams, **kwargs)
# Defaults
self.hparams.setdefault("step_reduction", 0.5)
self.hparams.setdefault("insert_reduction", 0.1)
self.hparams.setdefault("insert_freq", 10)
errors = torch.zeros(
self.hparams["num_prototypes"],
device=self.device,
)
self.register_buffer("errors", errors)
def training_step(self, train_batch, _batch_idx):
# x = train_batch
# TODO Check if the batch has labels
x = train_batch[0]
d = self.compute_distances(x)
loss, order = self.energy_layer(d)
winner = order[:, 0]
mask = torch.zeros_like(d)
mask[torch.arange(len(mask)), winner] = 1.0
dp = d * mask
self.errors += torch.sum(dp * dp)
self.errors *= self.hparams["step_reduction"]
self.topology_layer(d)
self.log("loss", loss)
return loss
def configure_callbacks(self):
return [
GNGCallback(
reduction=self.hparams["insert_reduction"],
freq=self.hparams["insert_freq"],
)
]

View File

@ -1,6 +1,5 @@
"""Visualization Callbacks."""
import os
import warnings
from typing import Sized
@ -33,10 +32,6 @@ class Vis2DAbstract(pl.Callback):
tensorboard=False,
show_last_only=False,
pause_time=0.1,
save=False,
save_dir="./img",
fig_size=(5, 4),
dpi=500,
block=False):
super().__init__()
@ -80,16 +75,8 @@ class Vis2DAbstract(pl.Callback):
self.tensorboard = tensorboard
self.show_last_only = show_last_only
self.pause_time = pause_time
self.save = save
self.save_dir = save_dir
self.fig_size = fig_size
self.dpi = dpi
self.block = block
if save:
if not os.path.exists(save_dir):
os.makedirs(save_dir)
def precheck(self, trainer):
if self.show_last_only:
if trainer.current_epoch != trainer.max_epochs - 1:
@ -138,11 +125,6 @@ class Vis2DAbstract(pl.Callback):
def log_and_display(self, trainer, pl_module):
if self.tensorboard:
self.add_to_tensorboard(trainer, pl_module)
if self.save:
plt.tight_layout()
self.fig.set_size_inches(*self.fig_size, forward=False)
plt.savefig(f"{self.save_dir}/{trainer.current_epoch}.png",
dpi=self.dpi)
if self.show:
if not self.block:
plt.pause(self.pause_time)
@ -169,13 +151,13 @@ class VisGLVQ2D(Vis2DAbstract):
plabels = pl_module.prototype_labels
x_train, y_train = self.x_train, self.y_train
ax = self.setup_ax()
self.plot_protos(ax, protos, plabels)
if x_train is not None:
self.plot_data(ax, x_train, y_train)
mesh_input, xx, yy = mesh2d(np.vstack([x_train, protos]),
self.border, self.resolution)
else:
mesh_input, xx, yy = mesh2d(protos, self.border, self.resolution)
self.plot_protos(ax, protos, plabels)
_components = pl_module.proto_layer._components
mesh_input = torch.from_numpy(mesh_input).type_as(_components)
y_pred = pl_module.predict(mesh_input)

View File

@ -1,23 +0,0 @@
from .architectures.base import BaseYArchitecture
from .architectures.comparison import (
OmegaComparisonMixin,
SimpleComparisonMixin,
)
from .architectures.competition import WTACompetitionMixin
from .architectures.components import SupervisedArchitecture
from .architectures.loss import GLVQLossMixin
from .architectures.optimization import (
MultipleLearningRateMixin,
SingleLearningRateMixin,
)
__all__ = [
'BaseYArchitecture',
"OmegaComparisonMixin",
"SimpleComparisonMixin",
"SingleLearningRateMixin",
"MultipleLearningRateMixin",
"SupervisedArchitecture",
"WTACompetitionMixin",
"GLVQLossMixin",
]

View File

@ -1,217 +0,0 @@
import logging
import warnings
from typing import Optional, Type
import numpy as np
import pytorch_lightning as pl
import torch
import torchmetrics
from matplotlib import pyplot as plt
from prototorch.models.vis import Vis2DAbstract
from prototorch.utils.utils import mesh2d
from prototorch.y.architectures.base import BaseYArchitecture, Steps
from prototorch.y.library.gmlvq import GMLVQ
from pytorch_lightning.loggers import TensorBoardLogger
DIVERGING_COLOR_MAPS = [
'PiYG',
'PRGn',
'BrBG',
'PuOr',
'RdGy',
'RdBu',
'RdYlBu',
'RdYlGn',
'Spectral',
'coolwarm',
'bwr',
'seismic',
]
class LogTorchmetricCallback(pl.Callback):
def __init__(
self,
name,
metric: Type[torchmetrics.Metric],
step: str = Steps.TRAINING,
**metric_kwargs,
) -> None:
self.name = name
self.metric = metric
self.metric_kwargs = metric_kwargs
self.step = step
def setup(
self,
trainer: pl.Trainer,
pl_module: BaseYArchitecture,
stage: Optional[str] = None,
) -> None:
pl_module.register_torchmetric(
self,
self.metric,
step=self.step,
**self.metric_kwargs,
)
def __call__(self, value, pl_module: BaseYArchitecture):
pl_module.log(self.name, value)
class LogConfusionMatrix(LogTorchmetricCallback):
def __init__(
self,
num_classes,
name="confusion",
on='prediction',
**kwargs,
):
super().__init__(
name,
torchmetrics.ConfusionMatrix,
on=on,
num_classes=num_classes,
**kwargs,
)
def __call__(self, value, pl_module: BaseYArchitecture):
fig, ax = plt.subplots()
ax.imshow(value.detach().cpu().numpy())
# Show all ticks and label them with the respective list entries
# ax.set_xticks(np.arange(len(farmers)), labels=farmers)
# ax.set_yticks(np.arange(len(vegetables)), labels=vegetables)
# Rotate the tick labels and set their alignment.
plt.setp(
ax.get_xticklabels(),
rotation=45,
ha="right",
rotation_mode="anchor",
)
# Loop over data dimensions and create text annotations.
for i in range(len(value)):
for j in range(len(value)):
text = ax.text(
j,
i,
value[i, j].item(),
ha="center",
va="center",
color="w",
)
ax.set_title(self.name)
fig.tight_layout()
pl_module.logger.experiment.add_figure(
tag=self.name,
figure=fig,
close=True,
global_step=pl_module.global_step,
)
class VisGLVQ2D(Vis2DAbstract):
def visualize(self, pl_module):
protos = pl_module.prototypes
plabels = pl_module.prototype_labels
x_train, y_train = self.x_train, self.y_train
ax = self.setup_ax()
self.plot_protos(ax, protos, plabels)
if x_train is not None:
self.plot_data(ax, x_train, y_train)
mesh_input, xx, yy = mesh2d(
np.vstack([x_train, protos]),
self.border,
self.resolution,
)
else:
mesh_input, xx, yy = mesh2d(protos, self.border, self.resolution)
_components = pl_module.components_layer.components
mesh_input = torch.from_numpy(mesh_input).type_as(_components)
y_pred = pl_module.predict(mesh_input)
y_pred = y_pred.cpu().reshape(xx.shape)
ax.contourf(xx, yy, y_pred, cmap=self.cmap, alpha=0.35)
class VisGMLVQ2D(Vis2DAbstract):
def __init__(self, *args, ev_proj=True, **kwargs):
super().__init__(*args, **kwargs)
self.ev_proj = ev_proj
def visualize(self, pl_module):
protos = pl_module.prototypes
plabels = pl_module.prototype_labels
x_train, y_train = self.x_train, self.y_train
device = pl_module.device
omega = pl_module._omega.detach()
lam = omega @ omega.T
u, _, _ = torch.pca_lowrank(lam, q=2)
with torch.no_grad():
x_train = torch.Tensor(x_train).to(device)
x_train = x_train @ u
x_train = x_train.cpu().detach()
if self.show_protos:
with torch.no_grad():
protos = torch.Tensor(protos).to(device)
protos = protos @ u
protos = protos.cpu().detach()
ax = self.setup_ax()
self.plot_data(ax, x_train, y_train)
if self.show_protos:
self.plot_protos(ax, protos, plabels)
class PlotLambdaMatrixToTensorboard(pl.Callback):
def __init__(self, cmap='seismic') -> None:
super().__init__()
self.cmap = cmap
if self.cmap not in DIVERGING_COLOR_MAPS and type(self.cmap) is str:
warnings.warn(
f"{self.cmap} is not a diverging color map. We recommend to use one of the following: {DIVERGING_COLOR_MAPS}"
)
def on_train_start(self, trainer, pl_module: GMLVQ):
self.plot_lambda(trainer, pl_module)
def on_train_epoch_end(self, trainer, pl_module: GMLVQ):
self.plot_lambda(trainer, pl_module)
def plot_lambda(self, trainer, pl_module: GMLVQ):
self.fig, self.ax = plt.subplots(1, 1)
# plot lambda matrix
l_matrix = pl_module.lambda_matrix
# normalize lambda matrix
l_matrix = l_matrix / torch.max(torch.abs(l_matrix))
# plot lambda matrix
self.ax.imshow(l_matrix.detach().numpy(), self.cmap, vmin=-1, vmax=1)
self.fig.colorbar(self.ax.images[-1])
# add title
self.ax.set_title('Lambda Matrix')
# add to tensorboard
if isinstance(trainer.logger, TensorBoardLogger):
trainer.logger.experiment.add_figure(
f"lambda_matrix",
self.fig,
trainer.global_step,
)
else:
warnings.warn(
f"{self.__class__.__name__} is not compatible with {trainer.logger.__class__.__name__} as logger. Use TensorBoardLogger instead."
)

View File

@ -1,195 +0,0 @@
"""prototorch.models test suite."""
import prototorch as pt
import pytest
import torch
def test_glvq_model_build():
model = pt.models.GLVQ(
{"distribution": (3, 2)},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_glvq1_model_build():
model = pt.models.GLVQ1(
{"distribution": (3, 2)},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_glvq21_model_build():
model = pt.models.GLVQ1(
{"distribution": (3, 2)},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_gmlvq_model_build():
model = pt.models.GMLVQ(
{
"distribution": (3, 2),
"input_dim": 2,
"latent_dim": 2,
},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_grlvq_model_build():
model = pt.models.GRLVQ(
{
"distribution": (3, 2),
"input_dim": 2,
},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_gtlvq_model_build():
model = pt.models.GTLVQ(
{
"distribution": (3, 2),
"input_dim": 4,
"latent_dim": 2,
},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_lgmlvq_model_build():
model = pt.models.LGMLVQ(
{
"distribution": (3, 2),
"input_dim": 4,
"latent_dim": 2,
},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_image_glvq_model_build():
model = pt.models.ImageGLVQ(
{"distribution": (3, 2)},
prototypes_initializer=pt.initializers.RNCI(16),
)
def test_image_gmlvq_model_build():
model = pt.models.ImageGMLVQ(
{
"distribution": (3, 2),
"input_dim": 16,
"latent_dim": 2,
},
prototypes_initializer=pt.initializers.RNCI(16),
)
def test_image_gtlvq_model_build():
model = pt.models.ImageGMLVQ(
{
"distribution": (3, 2),
"input_dim": 16,
"latent_dim": 2,
},
prototypes_initializer=pt.initializers.RNCI(16),
)
def test_siamese_glvq_model_build():
model = pt.models.SiameseGLVQ(
{"distribution": (3, 2)},
prototypes_initializer=pt.initializers.RNCI(4),
)
def test_siamese_gmlvq_model_build():
model = pt.models.SiameseGMLVQ(
{
"distribution": (3, 2),
"input_dim": 4,
"latent_dim": 2,
},
prototypes_initializer=pt.initializers.RNCI(4),
)
def test_siamese_gtlvq_model_build():
model = pt.models.SiameseGTLVQ(
{
"distribution": (3, 2),
"input_dim": 4,
"latent_dim": 2,
},
prototypes_initializer=pt.initializers.RNCI(4),
)
def test_knn_model_build():
train_ds = pt.datasets.Iris(dims=[0, 2])
model = pt.models.KNN(dict(k=3), data=train_ds)
def test_lvq1_model_build():
model = pt.models.LVQ1(
{"distribution": (3, 2)},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_lvq21_model_build():
model = pt.models.LVQ21(
{"distribution": (3, 2)},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_median_lvq_model_build():
model = pt.models.MedianLVQ(
{"distribution": (3, 2)},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_celvq_model_build():
model = pt.models.CELVQ(
{"distribution": (3, 2)},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_rslvq_model_build():
model = pt.models.RSLVQ(
{"distribution": (3, 2)},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_slvq_model_build():
model = pt.models.SLVQ(
{"distribution": (3, 2)},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_growing_neural_gas_model_build():
model = pt.models.GrowingNeuralGas(
{"num_prototypes": 5},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_kohonen_som_model_build():
model = pt.models.KohonenSOM(
{"shape": (3, 2)},
prototypes_initializer=pt.initializers.RNCI(2),
)
def test_neural_gas_model_build():
model = pt.models.NeuralGas(
{"num_prototypes": 5},
prototypes_initializer=pt.initializers.RNCI(2),
)