Compare commits
14 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
696719600b | ||
|
48e7c029fa | ||
|
5de3a480c7 | ||
|
626f51ce80 | ||
|
6d7d93c8e8 | ||
|
93b1d0bd46 | ||
|
b7992c01db | ||
|
23d1a71b31 | ||
|
e922aae432 | ||
|
3e50d0d817 | ||
|
dc4f31d700 | ||
|
02954044d7 | ||
|
8f08ba66ea | ||
|
e0b92e9ac2 |
@ -1,9 +1,11 @@
|
|||||||
[bumpversion]
|
[bumpversion]
|
||||||
current_version = 0.5.2
|
current_version = 1.0.0a2
|
||||||
commit = True
|
commit = True
|
||||||
tag = True
|
tag = True
|
||||||
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)
|
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)((?P<release>[a-zA-Z0-9_.-]+))?
|
||||||
serialize = {major}.{minor}.{patch}
|
serialize =
|
||||||
|
{major}.{minor}.{patch}-{release}
|
||||||
|
{major}.{minor}.{patch}
|
||||||
message = build: bump version {current_version} → {new_version}
|
message = build: bump version {current_version} → {new_version}
|
||||||
|
|
||||||
[bumpversion:file:setup.py]
|
[bumpversion:file:setup.py]
|
||||||
|
@ -6,6 +6,7 @@ repos:
|
|||||||
rev: v4.2.0
|
rev: v4.2.0
|
||||||
hooks:
|
hooks:
|
||||||
- id: trailing-whitespace
|
- id: trailing-whitespace
|
||||||
|
exclude: (^\.bumpversion\.cfg$|cli_messages\.py)
|
||||||
- id: end-of-file-fixer
|
- id: end-of-file-fixer
|
||||||
- id: check-yaml
|
- id: check-yaml
|
||||||
- id: check-added-large-files
|
- id: check-added-large-files
|
||||||
|
@ -23,7 +23,7 @@ author = "Jensun Ravichandran"
|
|||||||
|
|
||||||
# The full version, including alpha/beta/rc tags
|
# The full version, including alpha/beta/rc tags
|
||||||
#
|
#
|
||||||
release = "0.5.2"
|
release = "1.0.0-a2"
|
||||||
|
|
||||||
# -- General configuration ---------------------------------------------------
|
# -- General configuration ---------------------------------------------------
|
||||||
|
|
||||||
|
88
examples/y_architecture_example.py
Normal file
88
examples/y_architecture_example.py
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
import prototorch as pt
|
||||||
|
import pytorch_lightning as pl
|
||||||
|
import torchmetrics
|
||||||
|
from prototorch.core import SMCI
|
||||||
|
from prototorch.y.callbacks import (
|
||||||
|
LogTorchmetricCallback,
|
||||||
|
PlotLambdaMatrixToTensorboard,
|
||||||
|
VisGMLVQ2D,
|
||||||
|
)
|
||||||
|
from prototorch.y.library.gmlvq import GMLVQ
|
||||||
|
from pytorch_lightning.callbacks import EarlyStopping
|
||||||
|
from torch.utils.data import DataLoader
|
||||||
|
|
||||||
|
# ##############################################################################
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
|
||||||
|
# ------------------------------------------------------------
|
||||||
|
# DATA
|
||||||
|
# ------------------------------------------------------------
|
||||||
|
|
||||||
|
# Dataset
|
||||||
|
train_ds = pt.datasets.Iris()
|
||||||
|
|
||||||
|
# Dataloader
|
||||||
|
train_loader = DataLoader(
|
||||||
|
train_ds,
|
||||||
|
batch_size=32,
|
||||||
|
num_workers=0,
|
||||||
|
shuffle=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------
|
||||||
|
# HYPERPARAMETERS
|
||||||
|
# ------------------------------------------------------------
|
||||||
|
|
||||||
|
# Select Initializer
|
||||||
|
components_initializer = SMCI(train_ds)
|
||||||
|
|
||||||
|
# Define Hyperparameters
|
||||||
|
hyperparameters = GMLVQ.HyperParameters(
|
||||||
|
lr=dict(components_layer=0.1, _omega=0),
|
||||||
|
input_dim=4,
|
||||||
|
distribution=dict(
|
||||||
|
num_classes=3,
|
||||||
|
per_class=1,
|
||||||
|
),
|
||||||
|
component_initializer=components_initializer,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create Model
|
||||||
|
model = GMLVQ(hyperparameters)
|
||||||
|
|
||||||
|
print(model)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------
|
||||||
|
# TRAINING
|
||||||
|
# ------------------------------------------------------------
|
||||||
|
|
||||||
|
# Controlling Callbacks
|
||||||
|
stopping_criterion = LogTorchmetricCallback(
|
||||||
|
'recall',
|
||||||
|
torchmetrics.Recall,
|
||||||
|
num_classes=3,
|
||||||
|
)
|
||||||
|
|
||||||
|
es = EarlyStopping(
|
||||||
|
monitor=stopping_criterion.name,
|
||||||
|
mode="max",
|
||||||
|
patience=10,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Visualization Callback
|
||||||
|
vis = VisGMLVQ2D(data=train_ds)
|
||||||
|
|
||||||
|
# Define trainer
|
||||||
|
trainer = pl.Trainer(
|
||||||
|
callbacks=[
|
||||||
|
vis,
|
||||||
|
stopping_criterion,
|
||||||
|
es,
|
||||||
|
PlotLambdaMatrixToTensorboard(),
|
||||||
|
],
|
||||||
|
max_epochs=1000,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Train
|
||||||
|
trainer.fit(model, train_loader)
|
@ -36,4 +36,4 @@ from .unsupervised import (
|
|||||||
)
|
)
|
||||||
from .vis import *
|
from .vis import *
|
||||||
|
|
||||||
__version__ = "0.5.2"
|
__version__ = "1.0.0-a2"
|
||||||
|
@ -22,7 +22,16 @@ from prototorch.nn.wrappers import LambdaLayer
|
|||||||
|
|
||||||
|
|
||||||
class ProtoTorchBolt(pl.LightningModule):
|
class ProtoTorchBolt(pl.LightningModule):
|
||||||
"""All ProtoTorch models are ProtoTorch Bolts."""
|
"""All ProtoTorch models are ProtoTorch Bolts.
|
||||||
|
|
||||||
|
hparams:
|
||||||
|
- lr: learning rate
|
||||||
|
|
||||||
|
kwargs:
|
||||||
|
- optimizer: optimizer class
|
||||||
|
- lr_scheduler: learning rate scheduler class
|
||||||
|
- lr_scheduler_kwargs: learning rate scheduler kwargs
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, hparams, **kwargs):
|
def __init__(self, hparams, **kwargs):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
@ -65,6 +74,11 @@ class ProtoTorchBolt(pl.LightningModule):
|
|||||||
|
|
||||||
|
|
||||||
class PrototypeModel(ProtoTorchBolt):
|
class PrototypeModel(ProtoTorchBolt):
|
||||||
|
"""Abstract Prototype Model
|
||||||
|
|
||||||
|
kwargs:
|
||||||
|
- distance_fn: distance function
|
||||||
|
"""
|
||||||
proto_layer: AbstractComponents
|
proto_layer: AbstractComponents
|
||||||
|
|
||||||
def __init__(self, hparams, **kwargs):
|
def __init__(self, hparams, **kwargs):
|
||||||
@ -203,35 +217,3 @@ class SupervisedPrototypeModel(PrototypeModel):
|
|||||||
accuracy = torchmetrics.functional.accuracy(preds.int(), targets.int())
|
accuracy = torchmetrics.functional.accuracy(preds.int(), targets.int())
|
||||||
|
|
||||||
self.log("test_acc", accuracy)
|
self.log("test_acc", accuracy)
|
||||||
|
|
||||||
|
|
||||||
class ProtoTorchMixin(object):
|
|
||||||
"""All mixins are ProtoTorchMixins."""
|
|
||||||
|
|
||||||
|
|
||||||
class NonGradientMixin(ProtoTorchMixin):
|
|
||||||
"""Mixin for custom non-gradient optimization."""
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self.automatic_optimization = False
|
|
||||||
|
|
||||||
def training_step(self, train_batch, batch_idx, optimizer_idx=None):
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
|
|
||||||
class ImagePrototypesMixin(ProtoTorchMixin):
|
|
||||||
"""Mixin for models with image prototypes."""
|
|
||||||
proto_layer: Components
|
|
||||||
components: torch.Tensor
|
|
||||||
|
|
||||||
def on_train_batch_end(self, outputs, batch, batch_idx):
|
|
||||||
"""Constrain the components to the range [0, 1] by clamping after updates."""
|
|
||||||
self.proto_layer.components.data.clamp_(0.0, 1.0)
|
|
||||||
|
|
||||||
def get_prototype_grid(self, num_columns=2, return_channels_last=True):
|
|
||||||
from torchvision.utils import make_grid
|
|
||||||
grid = make_grid(self.components, nrow=num_columns)
|
|
||||||
if return_channels_last:
|
|
||||||
grid = grid.permute((1, 2, 0))
|
|
||||||
return grid.cpu()
|
|
||||||
|
@ -40,8 +40,8 @@ class PruneLoserPrototypes(pl.Callback):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
ratios = pl_module.prototype_win_ratios.mean(dim=0)
|
ratios = pl_module.prototype_win_ratios.mean(dim=0)
|
||||||
to_prune = torch.arange(len(ratios))[ratios < self.threshold]
|
to_prune_tensor = torch.arange(len(ratios))[ratios < self.threshold]
|
||||||
to_prune = to_prune.tolist()
|
to_prune = to_prune_tensor.tolist()
|
||||||
prune_labels = pl_module.prototype_labels[to_prune]
|
prune_labels = pl_module.prototype_labels[to_prune]
|
||||||
if self.prune_quota_per_epoch > 0:
|
if self.prune_quota_per_epoch > 0:
|
||||||
to_prune = to_prune[:self.prune_quota_per_epoch]
|
to_prune = to_prune[:self.prune_quota_per_epoch]
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
import torch
|
import torch
|
||||||
|
import torch.nn.functional as F
|
||||||
import torchmetrics
|
import torchmetrics
|
||||||
from prototorch.core.competitions import CBCC
|
from prototorch.core.competitions import CBCC
|
||||||
from prototorch.core.components import ReasoningComponents
|
from prototorch.core.components import ReasoningComponents
|
||||||
@ -7,12 +8,13 @@ from prototorch.core.losses import MarginLoss
|
|||||||
from prototorch.core.similarities import euclidean_similarity
|
from prototorch.core.similarities import euclidean_similarity
|
||||||
from prototorch.nn.wrappers import LambdaLayer
|
from prototorch.nn.wrappers import LambdaLayer
|
||||||
|
|
||||||
from .abstract import ImagePrototypesMixin
|
|
||||||
from .glvq import SiameseGLVQ
|
from .glvq import SiameseGLVQ
|
||||||
|
from .mixins import ImagePrototypesMixin
|
||||||
|
|
||||||
|
|
||||||
class CBC(SiameseGLVQ):
|
class CBC(SiameseGLVQ):
|
||||||
"""Classification-By-Components."""
|
"""Classification-By-Components."""
|
||||||
|
proto_layer: ReasoningComponents
|
||||||
|
|
||||||
def __init__(self, hparams, **kwargs):
|
def __init__(self, hparams, **kwargs):
|
||||||
super().__init__(hparams, skip_proto_layer=True, **kwargs)
|
super().__init__(hparams, skip_proto_layer=True, **kwargs)
|
||||||
@ -22,7 +24,7 @@ class CBC(SiameseGLVQ):
|
|||||||
reasonings_initializer = kwargs.get("reasonings_initializer",
|
reasonings_initializer = kwargs.get("reasonings_initializer",
|
||||||
RandomReasoningsInitializer())
|
RandomReasoningsInitializer())
|
||||||
self.components_layer = ReasoningComponents(
|
self.components_layer = ReasoningComponents(
|
||||||
self.hparams.distribution,
|
self.hparams["distribution"],
|
||||||
components_initializer=components_initializer,
|
components_initializer=components_initializer,
|
||||||
reasonings_initializer=reasonings_initializer,
|
reasonings_initializer=reasonings_initializer,
|
||||||
)
|
)
|
||||||
@ -32,7 +34,7 @@ class CBC(SiameseGLVQ):
|
|||||||
# Namespace hook
|
# Namespace hook
|
||||||
self.proto_layer = self.components_layer
|
self.proto_layer = self.components_layer
|
||||||
|
|
||||||
self.loss = MarginLoss(self.hparams.margin)
|
self.loss = MarginLoss(self.hparams["margin"])
|
||||||
|
|
||||||
def forward(self, x):
|
def forward(self, x):
|
||||||
components, reasonings = self.components_layer()
|
components, reasonings = self.components_layer()
|
||||||
@ -48,7 +50,7 @@ class CBC(SiameseGLVQ):
|
|||||||
x, y = batch
|
x, y = batch
|
||||||
y_pred = self(x)
|
y_pred = self(x)
|
||||||
num_classes = self.num_classes
|
num_classes = self.num_classes
|
||||||
y_true = torch.nn.functional.one_hot(y.long(), num_classes=num_classes)
|
y_true = F.one_hot(y.long(), num_classes=num_classes)
|
||||||
loss = self.loss(y_pred, y_true).mean()
|
loss = self.loss(y_pred, y_true).mean()
|
||||||
return y_pred, loss
|
return y_pred, loss
|
||||||
|
|
||||||
|
@ -17,8 +17,9 @@ from prototorch.core.transforms import LinearTransform
|
|||||||
from prototorch.nn.wrappers import LambdaLayer, LossLayer
|
from prototorch.nn.wrappers import LambdaLayer, LossLayer
|
||||||
from torch.nn.parameter import Parameter
|
from torch.nn.parameter import Parameter
|
||||||
|
|
||||||
from .abstract import ImagePrototypesMixin, SupervisedPrototypeModel
|
from .abstract import SupervisedPrototypeModel
|
||||||
from .extras import ltangent_distance, orthogonalization
|
from .extras import ltangent_distance, orthogonalization
|
||||||
|
from .mixins import ImagePrototypesMixin
|
||||||
|
|
||||||
|
|
||||||
class GLVQ(SupervisedPrototypeModel):
|
class GLVQ(SupervisedPrototypeModel):
|
||||||
@ -46,19 +47,24 @@ class GLVQ(SupervisedPrototypeModel):
|
|||||||
def initialize_prototype_win_ratios(self):
|
def initialize_prototype_win_ratios(self):
|
||||||
self.register_buffer(
|
self.register_buffer(
|
||||||
"prototype_win_ratios",
|
"prototype_win_ratios",
|
||||||
torch.zeros(self.num_prototypes, device=self.device))
|
torch.zeros(self.num_prototypes, device=self.device),
|
||||||
|
)
|
||||||
|
|
||||||
def on_train_epoch_start(self):
|
def on_train_epoch_start(self):
|
||||||
self.initialize_prototype_win_ratios()
|
self.initialize_prototype_win_ratios()
|
||||||
|
|
||||||
def log_prototype_win_ratios(self, distances):
|
def log_prototype_win_ratios(self, distances):
|
||||||
batch_size = len(distances)
|
batch_size = len(distances)
|
||||||
prototype_wc = torch.zeros(self.num_prototypes,
|
prototype_wc = torch.zeros(
|
||||||
|
self.num_prototypes,
|
||||||
dtype=torch.long,
|
dtype=torch.long,
|
||||||
device=self.device)
|
device=self.device,
|
||||||
wi, wc = torch.unique(distances.min(dim=-1).indices,
|
)
|
||||||
|
wi, wc = torch.unique(
|
||||||
|
distances.min(dim=-1).indices,
|
||||||
sorted=True,
|
sorted=True,
|
||||||
return_counts=True)
|
return_counts=True,
|
||||||
|
)
|
||||||
prototype_wc[wi] = wc
|
prototype_wc[wi] = wc
|
||||||
prototype_wr = prototype_wc / batch_size
|
prototype_wr = prototype_wc / batch_size
|
||||||
self.prototype_win_ratios = torch.vstack([
|
self.prototype_win_ratios = torch.vstack([
|
||||||
@ -81,14 +87,12 @@ class GLVQ(SupervisedPrototypeModel):
|
|||||||
return train_loss
|
return train_loss
|
||||||
|
|
||||||
def validation_step(self, batch, batch_idx):
|
def validation_step(self, batch, batch_idx):
|
||||||
# `model.eval()` and `torch.no_grad()` handled by pl
|
|
||||||
out, val_loss = self.shared_step(batch, batch_idx)
|
out, val_loss = self.shared_step(batch, batch_idx)
|
||||||
self.log("val_loss", val_loss)
|
self.log("val_loss", val_loss)
|
||||||
self.log_acc(out, batch[-1], tag="val_acc")
|
self.log_acc(out, batch[-1], tag="val_acc")
|
||||||
return val_loss
|
return val_loss
|
||||||
|
|
||||||
def test_step(self, batch, batch_idx):
|
def test_step(self, batch, batch_idx):
|
||||||
# `model.eval()` and `torch.no_grad()` handled by pl
|
|
||||||
out, test_loss = self.shared_step(batch, batch_idx)
|
out, test_loss = self.shared_step(batch, batch_idx)
|
||||||
self.log_acc(out, batch[-1], tag="test_acc")
|
self.log_acc(out, batch[-1], tag="test_acc")
|
||||||
return test_loss
|
return test_loss
|
||||||
@ -99,10 +103,6 @@ class GLVQ(SupervisedPrototypeModel):
|
|||||||
test_loss += batch_loss.item()
|
test_loss += batch_loss.item()
|
||||||
self.log("test_loss", test_loss)
|
self.log("test_loss", test_loss)
|
||||||
|
|
||||||
# TODO
|
|
||||||
# def predict_step(self, batch, batch_idx, dataloader_idx=None):
|
|
||||||
# pass
|
|
||||||
|
|
||||||
|
|
||||||
class SiameseGLVQ(GLVQ):
|
class SiameseGLVQ(GLVQ):
|
||||||
"""GLVQ in a Siamese setting.
|
"""GLVQ in a Siamese setting.
|
||||||
@ -113,19 +113,23 @@ class SiameseGLVQ(GLVQ):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self,
|
def __init__(
|
||||||
|
self,
|
||||||
hparams,
|
hparams,
|
||||||
backbone=torch.nn.Identity(),
|
backbone=torch.nn.Identity(),
|
||||||
both_path_gradients=False,
|
both_path_gradients=False,
|
||||||
**kwargs):
|
**kwargs,
|
||||||
|
):
|
||||||
distance_fn = kwargs.pop("distance_fn", squared_euclidean_distance)
|
distance_fn = kwargs.pop("distance_fn", squared_euclidean_distance)
|
||||||
super().__init__(hparams, distance_fn=distance_fn, **kwargs)
|
super().__init__(hparams, distance_fn=distance_fn, **kwargs)
|
||||||
self.backbone = backbone
|
self.backbone = backbone
|
||||||
self.both_path_gradients = both_path_gradients
|
self.both_path_gradients = both_path_gradients
|
||||||
|
|
||||||
def configure_optimizers(self):
|
def configure_optimizers(self):
|
||||||
proto_opt = self.optimizer(self.proto_layer.parameters(),
|
proto_opt = self.optimizer(
|
||||||
lr=self.hparams["proto_lr"])
|
self.proto_layer.parameters(),
|
||||||
|
lr=self.hparams["proto_lr"],
|
||||||
|
)
|
||||||
# Only add a backbone optimizer if backbone has trainable parameters
|
# Only add a backbone optimizer if backbone has trainable parameters
|
||||||
bb_params = list(self.backbone.parameters())
|
bb_params = list(self.backbone.parameters())
|
||||||
if (bb_params):
|
if (bb_params):
|
||||||
@ -266,13 +270,19 @@ class GMLVQ(GLVQ):
|
|||||||
super().__init__(hparams, distance_fn=distance_fn, **kwargs)
|
super().__init__(hparams, distance_fn=distance_fn, **kwargs)
|
||||||
|
|
||||||
# Additional parameters
|
# Additional parameters
|
||||||
omega_initializer = kwargs.get("omega_initializer",
|
omega_initializer = kwargs.get(
|
||||||
EyeLinearTransformInitializer())
|
"omega_initializer",
|
||||||
omega = omega_initializer.generate(self.hparams["input_dim"],
|
EyeLinearTransformInitializer(),
|
||||||
self.hparams["latent_dim"])
|
)
|
||||||
|
omega = omega_initializer.generate(
|
||||||
|
self.hparams["input_dim"],
|
||||||
|
self.hparams["latent_dim"],
|
||||||
|
)
|
||||||
self.register_parameter("_omega", Parameter(omega))
|
self.register_parameter("_omega", Parameter(omega))
|
||||||
self.backbone = LambdaLayer(lambda x: x @ self._omega,
|
self.backbone = LambdaLayer(
|
||||||
name="omega matrix")
|
lambda x: x @ self._omega,
|
||||||
|
name="omega matrix",
|
||||||
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def omega_matrix(self):
|
def omega_matrix(self):
|
||||||
|
@ -1,20 +1,21 @@
|
|||||||
"""LVQ models that are optimized using non-gradient methods."""
|
"""LVQ models that are optimized using non-gradient methods."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from collections import OrderedDict
|
||||||
|
|
||||||
from prototorch.core.losses import _get_dp_dm
|
from prototorch.core.losses import _get_dp_dm
|
||||||
from prototorch.nn.activations import get_activation
|
from prototorch.nn.activations import get_activation
|
||||||
from prototorch.nn.wrappers import LambdaLayer
|
from prototorch.nn.wrappers import LambdaLayer
|
||||||
|
|
||||||
from .abstract import NonGradientMixin
|
|
||||||
from .glvq import GLVQ
|
from .glvq import GLVQ
|
||||||
|
from .mixins import NonGradientMixin
|
||||||
|
|
||||||
|
|
||||||
class LVQ1(NonGradientMixin, GLVQ):
|
class LVQ1(NonGradientMixin, GLVQ):
|
||||||
"""Learning Vector Quantization 1."""
|
"""Learning Vector Quantization 1."""
|
||||||
|
|
||||||
def training_step(self, train_batch, batch_idx, optimizer_idx=None):
|
def training_step(self, train_batch, batch_idx, optimizer_idx=None):
|
||||||
protos, plables = self.proto_layer()
|
protos, plabels = self.proto_layer()
|
||||||
x, y = train_batch
|
x, y = train_batch
|
||||||
dis = self.compute_distances(x)
|
dis = self.compute_distances(x)
|
||||||
# TODO Vectorized implementation
|
# TODO Vectorized implementation
|
||||||
@ -28,9 +29,11 @@ class LVQ1(NonGradientMixin, GLVQ):
|
|||||||
else:
|
else:
|
||||||
shift = protos[w] - xi
|
shift = protos[w] - xi
|
||||||
updated_protos = protos + 0.0
|
updated_protos = protos + 0.0
|
||||||
updated_protos[w] = protos[w] + (self.hparams.lr * shift)
|
updated_protos[w] = protos[w] + (self.hparams["lr"] * shift)
|
||||||
self.proto_layer.load_state_dict({"_components": updated_protos},
|
self.proto_layer.load_state_dict(
|
||||||
strict=False)
|
OrderedDict(_components=updated_protos),
|
||||||
|
strict=False,
|
||||||
|
)
|
||||||
|
|
||||||
logging.debug(f"dis={dis}")
|
logging.debug(f"dis={dis}")
|
||||||
logging.debug(f"y={y}")
|
logging.debug(f"y={y}")
|
||||||
@ -58,10 +61,12 @@ class LVQ21(NonGradientMixin, GLVQ):
|
|||||||
shiftp = xi - protos[wp]
|
shiftp = xi - protos[wp]
|
||||||
shiftn = protos[wn] - xi
|
shiftn = protos[wn] - xi
|
||||||
updated_protos = protos + 0.0
|
updated_protos = protos + 0.0
|
||||||
updated_protos[wp] = protos[wp] + (self.hparams.lr * shiftp)
|
updated_protos[wp] = protos[wp] + (self.hparams["lr"] * shiftp)
|
||||||
updated_protos[wn] = protos[wn] + (self.hparams.lr * shiftn)
|
updated_protos[wn] = protos[wn] + (self.hparams["lr"] * shiftn)
|
||||||
self.proto_layer.load_state_dict({"_components": updated_protos},
|
self.proto_layer.load_state_dict(
|
||||||
strict=False)
|
OrderedDict(_components=updated_protos),
|
||||||
|
strict=False,
|
||||||
|
)
|
||||||
|
|
||||||
# Logging
|
# Logging
|
||||||
self.log_acc(dis, y, tag="train_acc")
|
self.log_acc(dis, y, tag="train_acc")
|
||||||
@ -80,14 +85,17 @@ class MedianLVQ(NonGradientMixin, GLVQ):
|
|||||||
super().__init__(hparams, **kwargs)
|
super().__init__(hparams, **kwargs)
|
||||||
|
|
||||||
self.transfer_layer = LambdaLayer(
|
self.transfer_layer = LambdaLayer(
|
||||||
get_activation(self.hparams.transfer_fn))
|
get_activation(self.hparams["transfer_fn"]))
|
||||||
|
|
||||||
def _f(self, x, y, protos, plabels):
|
def _f(self, x, y, protos, plabels):
|
||||||
d = self.distance_layer(x, protos)
|
d = self.distance_layer(x, protos)
|
||||||
dp, dm = _get_dp_dm(d, y, plabels)
|
dp, dm = _get_dp_dm(d, y, plabels, with_indices=False)
|
||||||
mu = (dp - dm) / (dp + dm)
|
mu = (dp - dm) / (dp + dm)
|
||||||
invmu = -1.0 * mu
|
negative_mu = -1.0 * mu
|
||||||
f = self.transfer_layer(invmu, beta=self.hparams.transfer_beta) + 1.0
|
f = self.transfer_layer(
|
||||||
|
negative_mu,
|
||||||
|
beta=self.hparams["transfer_beta"],
|
||||||
|
) + 1.0
|
||||||
return f
|
return f
|
||||||
|
|
||||||
def expectation(self, x, y, protos, plabels):
|
def expectation(self, x, y, protos, plabels):
|
||||||
@ -118,8 +126,10 @@ class MedianLVQ(NonGradientMixin, GLVQ):
|
|||||||
_lower_bound = self.lower_bound(x, y, _protos, plabels, gamma)
|
_lower_bound = self.lower_bound(x, y, _protos, plabels, gamma)
|
||||||
if _lower_bound > lower_bound:
|
if _lower_bound > lower_bound:
|
||||||
logging.debug(f"Updating prototype {i} to data {k}...")
|
logging.debug(f"Updating prototype {i} to data {k}...")
|
||||||
self.proto_layer.load_state_dict({"_components": _protos},
|
self.proto_layer.load_state_dict(
|
||||||
strict=False)
|
OrderedDict(_components=_protos),
|
||||||
|
strict=False,
|
||||||
|
)
|
||||||
break
|
break
|
||||||
|
|
||||||
# Logging
|
# Logging
|
||||||
|
35
prototorch/models/mixins.py
Normal file
35
prototorch/models/mixins.py
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
import pytorch_lightning as pl
|
||||||
|
import torch
|
||||||
|
from prototorch.core.components import Components
|
||||||
|
|
||||||
|
|
||||||
|
class ProtoTorchMixin(pl.LightningModule):
|
||||||
|
"""All mixins are ProtoTorchMixins."""
|
||||||
|
|
||||||
|
|
||||||
|
class NonGradientMixin(ProtoTorchMixin):
|
||||||
|
"""Mixin for custom non-gradient optimization."""
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.automatic_optimization = False
|
||||||
|
|
||||||
|
def training_step(self, train_batch, batch_idx, optimizer_idx=None):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
class ImagePrototypesMixin(ProtoTorchMixin):
|
||||||
|
"""Mixin for models with image prototypes."""
|
||||||
|
proto_layer: Components
|
||||||
|
components: torch.Tensor
|
||||||
|
|
||||||
|
def on_train_batch_end(self, outputs, batch, batch_idx):
|
||||||
|
"""Constrain the components to the range [0, 1] by clamping after updates."""
|
||||||
|
self.proto_layer.components.data.clamp_(0.0, 1.0)
|
||||||
|
|
||||||
|
def get_prototype_grid(self, num_columns=2, return_channels_last=True):
|
||||||
|
from torchvision.utils import make_grid
|
||||||
|
grid = make_grid(self.components, nrow=num_columns)
|
||||||
|
if return_channels_last:
|
||||||
|
grid = grid.permute((1, 2, 0))
|
||||||
|
return grid.cpu()
|
@ -6,9 +6,10 @@ from prototorch.core.competitions import wtac
|
|||||||
from prototorch.core.distances import squared_euclidean_distance
|
from prototorch.core.distances import squared_euclidean_distance
|
||||||
from prototorch.core.losses import NeuralGasEnergy
|
from prototorch.core.losses import NeuralGasEnergy
|
||||||
|
|
||||||
from .abstract import NonGradientMixin, UnsupervisedPrototypeModel
|
from .abstract import UnsupervisedPrototypeModel
|
||||||
from .callbacks import GNGCallback
|
from .callbacks import GNGCallback
|
||||||
from .extras import ConnectionTopology
|
from .extras import ConnectionTopology
|
||||||
|
from .mixins import NonGradientMixin
|
||||||
|
|
||||||
|
|
||||||
class KohonenSOM(NonGradientMixin, UnsupervisedPrototypeModel):
|
class KohonenSOM(NonGradientMixin, UnsupervisedPrototypeModel):
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
"""Visualization Callbacks."""
|
"""Visualization Callbacks."""
|
||||||
|
|
||||||
|
import os
|
||||||
import warnings
|
import warnings
|
||||||
from typing import Sized
|
from typing import Sized
|
||||||
|
|
||||||
@ -32,6 +33,10 @@ class Vis2DAbstract(pl.Callback):
|
|||||||
tensorboard=False,
|
tensorboard=False,
|
||||||
show_last_only=False,
|
show_last_only=False,
|
||||||
pause_time=0.1,
|
pause_time=0.1,
|
||||||
|
save=False,
|
||||||
|
save_dir="./img",
|
||||||
|
fig_size=(5, 4),
|
||||||
|
dpi=500,
|
||||||
block=False):
|
block=False):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
|
||||||
@ -75,8 +80,16 @@ class Vis2DAbstract(pl.Callback):
|
|||||||
self.tensorboard = tensorboard
|
self.tensorboard = tensorboard
|
||||||
self.show_last_only = show_last_only
|
self.show_last_only = show_last_only
|
||||||
self.pause_time = pause_time
|
self.pause_time = pause_time
|
||||||
|
self.save = save
|
||||||
|
self.save_dir = save_dir
|
||||||
|
self.fig_size = fig_size
|
||||||
|
self.dpi = dpi
|
||||||
self.block = block
|
self.block = block
|
||||||
|
|
||||||
|
if save:
|
||||||
|
if not os.path.exists(save_dir):
|
||||||
|
os.makedirs(save_dir)
|
||||||
|
|
||||||
def precheck(self, trainer):
|
def precheck(self, trainer):
|
||||||
if self.show_last_only:
|
if self.show_last_only:
|
||||||
if trainer.current_epoch != trainer.max_epochs - 1:
|
if trainer.current_epoch != trainer.max_epochs - 1:
|
||||||
@ -125,6 +138,11 @@ class Vis2DAbstract(pl.Callback):
|
|||||||
def log_and_display(self, trainer, pl_module):
|
def log_and_display(self, trainer, pl_module):
|
||||||
if self.tensorboard:
|
if self.tensorboard:
|
||||||
self.add_to_tensorboard(trainer, pl_module)
|
self.add_to_tensorboard(trainer, pl_module)
|
||||||
|
if self.save:
|
||||||
|
plt.tight_layout()
|
||||||
|
self.fig.set_size_inches(*self.fig_size, forward=False)
|
||||||
|
plt.savefig(f"{self.save_dir}/{trainer.current_epoch}.png",
|
||||||
|
dpi=self.dpi)
|
||||||
if self.show:
|
if self.show:
|
||||||
if not self.block:
|
if not self.block:
|
||||||
plt.pause(self.pause_time)
|
plt.pause(self.pause_time)
|
||||||
|
23
prototorch/y/__init__.py
Normal file
23
prototorch/y/__init__.py
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
from .architectures.base import BaseYArchitecture
|
||||||
|
from .architectures.comparison import (
|
||||||
|
OmegaComparisonMixin,
|
||||||
|
SimpleComparisonMixin,
|
||||||
|
)
|
||||||
|
from .architectures.competition import WTACompetitionMixin
|
||||||
|
from .architectures.components import SupervisedArchitecture
|
||||||
|
from .architectures.loss import GLVQLossMixin
|
||||||
|
from .architectures.optimization import (
|
||||||
|
MultipleLearningRateMixin,
|
||||||
|
SingleLearningRateMixin,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
'BaseYArchitecture',
|
||||||
|
"OmegaComparisonMixin",
|
||||||
|
"SimpleComparisonMixin",
|
||||||
|
"SingleLearningRateMixin",
|
||||||
|
"MultipleLearningRateMixin",
|
||||||
|
"SupervisedArchitecture",
|
||||||
|
"WTACompetitionMixin",
|
||||||
|
"GLVQLossMixin",
|
||||||
|
]
|
212
prototorch/y/architectures/base.py
Normal file
212
prototorch/y/architectures/base.py
Normal file
@ -0,0 +1,212 @@
|
|||||||
|
"""
|
||||||
|
Proto Y Architecture
|
||||||
|
|
||||||
|
Network architecture for Component based Learning.
|
||||||
|
"""
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import (
|
||||||
|
Dict,
|
||||||
|
Set,
|
||||||
|
Type,
|
||||||
|
)
|
||||||
|
|
||||||
|
import pytorch_lightning as pl
|
||||||
|
import torch
|
||||||
|
from torchmetrics import Metric
|
||||||
|
from torchmetrics.classification.accuracy import Accuracy
|
||||||
|
|
||||||
|
|
||||||
|
class BaseYArchitecture(pl.LightningModule):
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class HyperParameters:
|
||||||
|
...
|
||||||
|
|
||||||
|
registered_metrics: Dict[Type[Metric], Metric] = {}
|
||||||
|
registered_metric_names: Dict[Type[Metric], Set[str]] = {}
|
||||||
|
|
||||||
|
components_layer: torch.nn.Module
|
||||||
|
|
||||||
|
def __init__(self, hparams) -> None:
|
||||||
|
super().__init__()
|
||||||
|
|
||||||
|
# Common Steps
|
||||||
|
self.init_components(hparams)
|
||||||
|
self.init_latent(hparams)
|
||||||
|
self.init_comparison(hparams)
|
||||||
|
self.init_competition(hparams)
|
||||||
|
|
||||||
|
# Train Steps
|
||||||
|
self.init_loss(hparams)
|
||||||
|
|
||||||
|
# Inference Steps
|
||||||
|
self.init_inference(hparams)
|
||||||
|
|
||||||
|
# Initialize Model Metrics
|
||||||
|
self.init_model_metrics()
|
||||||
|
|
||||||
|
# internal API, called by models and callbacks
|
||||||
|
def register_torchmetric(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
metric: Type[Metric],
|
||||||
|
**metric_kwargs,
|
||||||
|
):
|
||||||
|
if metric not in self.registered_metrics:
|
||||||
|
self.registered_metrics[metric] = metric(**metric_kwargs)
|
||||||
|
self.registered_metric_names[metric] = {name}
|
||||||
|
else:
|
||||||
|
self.registered_metric_names[metric].add(name)
|
||||||
|
|
||||||
|
# external API
|
||||||
|
def get_competition(self, batch, components):
|
||||||
|
latent_batch, latent_components = self.latent(batch, components)
|
||||||
|
# TODO: => Latent Hook
|
||||||
|
comparison_tensor = self.comparison(latent_batch, latent_components)
|
||||||
|
# TODO: => Comparison Hook
|
||||||
|
return comparison_tensor
|
||||||
|
|
||||||
|
def forward(self, batch):
|
||||||
|
if isinstance(batch, torch.Tensor):
|
||||||
|
batch = (batch, None)
|
||||||
|
# TODO: manage different datatypes?
|
||||||
|
components = self.components_layer()
|
||||||
|
# TODO: => Component Hook
|
||||||
|
comparison_tensor = self.get_competition(batch, components)
|
||||||
|
# TODO: => Competition Hook
|
||||||
|
return self.inference(comparison_tensor, components)
|
||||||
|
|
||||||
|
def predict(self, batch):
|
||||||
|
"""
|
||||||
|
Alias for forward
|
||||||
|
"""
|
||||||
|
return self.forward(batch)
|
||||||
|
|
||||||
|
def forward_comparison(self, batch):
|
||||||
|
if isinstance(batch, torch.Tensor):
|
||||||
|
batch = (batch, None)
|
||||||
|
# TODO: manage different datatypes?
|
||||||
|
components = self.components_layer()
|
||||||
|
# TODO: => Component Hook
|
||||||
|
return self.get_competition(batch, components)
|
||||||
|
|
||||||
|
def loss_forward(self, batch):
|
||||||
|
# TODO: manage different datatypes?
|
||||||
|
components = self.components_layer()
|
||||||
|
# TODO: => Component Hook
|
||||||
|
comparison_tensor = self.get_competition(batch, components)
|
||||||
|
# TODO: => Competition Hook
|
||||||
|
return self.loss(comparison_tensor, batch, components)
|
||||||
|
|
||||||
|
# Empty Initialization
|
||||||
|
# TODO: Type hints
|
||||||
|
# TODO: Docs
|
||||||
|
def init_components(self, hparams: HyperParameters) -> None:
|
||||||
|
...
|
||||||
|
|
||||||
|
def init_latent(self, hparams: HyperParameters) -> None:
|
||||||
|
...
|
||||||
|
|
||||||
|
def init_comparison(self, hparams: HyperParameters) -> None:
|
||||||
|
...
|
||||||
|
|
||||||
|
def init_competition(self, hparams: HyperParameters) -> None:
|
||||||
|
...
|
||||||
|
|
||||||
|
def init_loss(self, hparams: HyperParameters) -> None:
|
||||||
|
...
|
||||||
|
|
||||||
|
def init_inference(self, hparams: HyperParameters) -> None:
|
||||||
|
...
|
||||||
|
|
||||||
|
def init_model_metrics(self) -> None:
|
||||||
|
self.register_torchmetric('accuracy', Accuracy)
|
||||||
|
|
||||||
|
# Empty Steps
|
||||||
|
# TODO: Type hints
|
||||||
|
def components(self):
|
||||||
|
"""
|
||||||
|
This step has no input.
|
||||||
|
|
||||||
|
It returns the components.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError(
|
||||||
|
"The components step has no reasonable default.")
|
||||||
|
|
||||||
|
def latent(self, batch, components):
|
||||||
|
"""
|
||||||
|
The latent step receives the data batch and the components.
|
||||||
|
It can transform both by an arbitrary function.
|
||||||
|
|
||||||
|
It returns the transformed batch and components, each of the same length as the original input.
|
||||||
|
"""
|
||||||
|
return batch, components
|
||||||
|
|
||||||
|
def comparison(self, batch, components):
|
||||||
|
"""
|
||||||
|
Takes a batch of size N and the component set of size M.
|
||||||
|
|
||||||
|
It returns an NxMxD tensor containing D (usually 1) pairwise comparison measures.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError(
|
||||||
|
"The comparison step has no reasonable default.")
|
||||||
|
|
||||||
|
def competition(self, comparison_measures, components):
|
||||||
|
"""
|
||||||
|
Takes the tensor of comparison measures.
|
||||||
|
|
||||||
|
Assigns a competition vector to each class.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError(
|
||||||
|
"The competition step has no reasonable default.")
|
||||||
|
|
||||||
|
def loss(self, comparison_measures, batch, components):
|
||||||
|
"""
|
||||||
|
Takes the tensor of competition measures.
|
||||||
|
|
||||||
|
Calculates a single loss value
|
||||||
|
"""
|
||||||
|
raise NotImplementedError("The loss step has no reasonable default.")
|
||||||
|
|
||||||
|
def inference(self, comparison_measures, components):
|
||||||
|
"""
|
||||||
|
Takes the tensor of competition measures.
|
||||||
|
|
||||||
|
Returns the inferred vector.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError(
|
||||||
|
"The inference step has no reasonable default.")
|
||||||
|
|
||||||
|
def update_metrics_step(self, batch):
|
||||||
|
x, y = batch
|
||||||
|
|
||||||
|
# Prediction Metrics
|
||||||
|
preds = self(x)
|
||||||
|
for metric in self.registered_metrics:
|
||||||
|
instance = self.registered_metrics[metric].to(self.device)
|
||||||
|
instance(y, preds)
|
||||||
|
|
||||||
|
def update_metrics_epoch(self):
|
||||||
|
for metric in self.registered_metrics:
|
||||||
|
instance = self.registered_metrics[metric].to(self.device)
|
||||||
|
value = instance.compute()
|
||||||
|
|
||||||
|
for name in self.registered_metric_names[metric]:
|
||||||
|
self.log(name, value)
|
||||||
|
|
||||||
|
instance.reset()
|
||||||
|
|
||||||
|
# Lightning Hooks
|
||||||
|
def training_step(self, batch, batch_idx, optimizer_idx=None):
|
||||||
|
self.update_metrics_step(batch)
|
||||||
|
|
||||||
|
return self.loss_forward(batch)
|
||||||
|
|
||||||
|
def training_epoch_end(self, outs) -> None:
|
||||||
|
self.update_metrics_epoch()
|
||||||
|
|
||||||
|
def validation_step(self, batch, batch_idx):
|
||||||
|
return self.loss_forward(batch)
|
||||||
|
|
||||||
|
def test_step(self, batch, batch_idx):
|
||||||
|
return self.loss_forward(batch)
|
112
prototorch/y/architectures/comparison.py
Normal file
112
prototorch/y/architectures/comparison.py
Normal file
@ -0,0 +1,112 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Callable, Dict
|
||||||
|
|
||||||
|
import torch
|
||||||
|
from prototorch.core.distances import euclidean_distance
|
||||||
|
from prototorch.core.initializers import (
|
||||||
|
AbstractLinearTransformInitializer,
|
||||||
|
EyeLinearTransformInitializer,
|
||||||
|
)
|
||||||
|
from prototorch.nn.wrappers import LambdaLayer
|
||||||
|
from prototorch.y.architectures.base import BaseYArchitecture
|
||||||
|
from torch import Tensor
|
||||||
|
from torch.nn.parameter import Parameter
|
||||||
|
|
||||||
|
|
||||||
|
class SimpleComparisonMixin(BaseYArchitecture):
|
||||||
|
"""
|
||||||
|
Simple Comparison
|
||||||
|
|
||||||
|
A comparison layer that only uses the positions of the components and the batch for dissimilarity computation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# HyperParameters
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
@dataclass
|
||||||
|
class HyperParameters(BaseYArchitecture.HyperParameters):
|
||||||
|
"""
|
||||||
|
comparison_fn: The comparison / dissimilarity function to use. Default: euclidean_distance.
|
||||||
|
comparison_args: Keyword arguments for the comparison function. Default: {}.
|
||||||
|
"""
|
||||||
|
comparison_fn: Callable = euclidean_distance
|
||||||
|
comparison_args: dict = field(default_factory=lambda: dict())
|
||||||
|
|
||||||
|
comparison_parameters: dict = field(default_factory=lambda: dict())
|
||||||
|
|
||||||
|
# Steps
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
def init_comparison(self, hparams: HyperParameters):
|
||||||
|
self.comparison_layer = LambdaLayer(
|
||||||
|
fn=hparams.comparison_fn,
|
||||||
|
**hparams.comparison_args,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.comparison_kwargs: dict[str, Tensor] = dict()
|
||||||
|
|
||||||
|
def comparison(self, batch, components):
|
||||||
|
comp_tensor, _ = components
|
||||||
|
batch_tensor, _ = batch
|
||||||
|
|
||||||
|
comp_tensor = comp_tensor.unsqueeze(1)
|
||||||
|
|
||||||
|
distances = self.comparison_layer(
|
||||||
|
batch_tensor,
|
||||||
|
comp_tensor,
|
||||||
|
**self.comparison_kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
return distances
|
||||||
|
|
||||||
|
|
||||||
|
class OmegaComparisonMixin(SimpleComparisonMixin):
|
||||||
|
"""
|
||||||
|
Omega Comparison
|
||||||
|
|
||||||
|
A comparison layer that uses the positions of the components and the batch for dissimilarity computation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_omega: torch.Tensor
|
||||||
|
|
||||||
|
# HyperParameters
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
@dataclass
|
||||||
|
class HyperParameters(SimpleComparisonMixin.HyperParameters):
|
||||||
|
"""
|
||||||
|
input_dim: Necessary Field: The dimensionality of the input.
|
||||||
|
latent_dim: The dimensionality of the latent space. Default: 2.
|
||||||
|
omega_initializer: The initializer to use for the omega matrix. Default: EyeLinearTransformInitializer.
|
||||||
|
"""
|
||||||
|
input_dim: int | None = None
|
||||||
|
latent_dim: int = 2
|
||||||
|
omega_initializer: type[
|
||||||
|
AbstractLinearTransformInitializer] = EyeLinearTransformInitializer
|
||||||
|
|
||||||
|
# Steps
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
def init_comparison(self, hparams: HyperParameters) -> None:
|
||||||
|
super().init_comparison(hparams)
|
||||||
|
|
||||||
|
# Initialize the omega matrix
|
||||||
|
if hparams.input_dim is None:
|
||||||
|
raise ValueError("input_dim must be specified.")
|
||||||
|
else:
|
||||||
|
omega = hparams.omega_initializer().generate(
|
||||||
|
hparams.input_dim,
|
||||||
|
hparams.latent_dim,
|
||||||
|
)
|
||||||
|
self.register_parameter("_omega", Parameter(omega))
|
||||||
|
self.comparison_kwargs = dict(omega=self._omega)
|
||||||
|
|
||||||
|
# Properties
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
@property
|
||||||
|
def omega_matrix(self):
|
||||||
|
return self._omega.detach().cpu()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def lambda_matrix(self):
|
||||||
|
omega = self._omega.detach()
|
||||||
|
lam = omega @ omega.T
|
||||||
|
return lam.detach().cpu()
|
29
prototorch/y/architectures/competition.py
Normal file
29
prototorch/y/architectures/competition.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from prototorch.core.competitions import WTAC
|
||||||
|
from prototorch.y.architectures.base import BaseYArchitecture
|
||||||
|
|
||||||
|
|
||||||
|
class WTACompetitionMixin(BaseYArchitecture):
|
||||||
|
"""
|
||||||
|
Winner Take All Competition
|
||||||
|
|
||||||
|
A competition layer that uses the winner-take-all strategy.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# HyperParameters
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
@dataclass
|
||||||
|
class HyperParameters(BaseYArchitecture.HyperParameters):
|
||||||
|
"""
|
||||||
|
No hyperparameters.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Steps
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
def init_inference(self, hparams: HyperParameters):
|
||||||
|
self.competition_layer = WTAC()
|
||||||
|
|
||||||
|
def inference(self, comparison_measures, components):
|
||||||
|
comp_labels = components[1]
|
||||||
|
return self.competition_layer(comparison_measures, comp_labels)
|
53
prototorch/y/architectures/components.py
Normal file
53
prototorch/y/architectures/components.py
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from prototorch.core.components import LabeledComponents
|
||||||
|
from prototorch.core.initializers import (
|
||||||
|
AbstractComponentsInitializer,
|
||||||
|
LabelsInitializer,
|
||||||
|
)
|
||||||
|
from prototorch.y import BaseYArchitecture
|
||||||
|
|
||||||
|
|
||||||
|
class SupervisedArchitecture(BaseYArchitecture):
|
||||||
|
"""
|
||||||
|
Supervised Architecture
|
||||||
|
|
||||||
|
An architecture that uses labeled Components as component Layer.
|
||||||
|
"""
|
||||||
|
components_layer: LabeledComponents
|
||||||
|
|
||||||
|
# HyperParameters
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
@dataclass
|
||||||
|
class HyperParameters:
|
||||||
|
"""
|
||||||
|
distribution: A valid prototype distribution. No default possible.
|
||||||
|
components_initializer: An implementation of AbstractComponentsInitializer. No default possible.
|
||||||
|
"""
|
||||||
|
distribution: "dict[str, int]"
|
||||||
|
component_initializer: AbstractComponentsInitializer
|
||||||
|
|
||||||
|
# Steps
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
def init_components(self, hparams: HyperParameters):
|
||||||
|
self.components_layer = LabeledComponents(
|
||||||
|
distribution=hparams.distribution,
|
||||||
|
components_initializer=hparams.component_initializer,
|
||||||
|
labels_initializer=LabelsInitializer(),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Properties
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
@property
|
||||||
|
def prototypes(self):
|
||||||
|
"""
|
||||||
|
Returns the position of the prototypes.
|
||||||
|
"""
|
||||||
|
return self.components_layer.components.detach().cpu()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def prototype_labels(self):
|
||||||
|
"""
|
||||||
|
Returns the labels of the prototypes.
|
||||||
|
"""
|
||||||
|
return self.components_layer.labels.detach().cpu()
|
42
prototorch/y/architectures/loss.py
Normal file
42
prototorch/y/architectures/loss.py
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
|
from prototorch.core.losses import GLVQLoss
|
||||||
|
from prototorch.y.architectures.base import BaseYArchitecture
|
||||||
|
|
||||||
|
|
||||||
|
class GLVQLossMixin(BaseYArchitecture):
|
||||||
|
"""
|
||||||
|
GLVQ Loss
|
||||||
|
|
||||||
|
A loss layer that uses the Generalized Learning Vector Quantization (GLVQ) loss.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# HyperParameters
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
@dataclass
|
||||||
|
class HyperParameters(BaseYArchitecture.HyperParameters):
|
||||||
|
"""
|
||||||
|
margin: The margin of the GLVQ loss. Default: 0.0.
|
||||||
|
transfer_fn: Transfer function to use. Default: sigmoid_beta.
|
||||||
|
transfer_args: Keyword arguments for the transfer function. Default: {beta: 10.0}.
|
||||||
|
"""
|
||||||
|
margin: float = 0.0
|
||||||
|
|
||||||
|
transfer_fn: str = "sigmoid_beta"
|
||||||
|
transfer_args: dict = field(default_factory=lambda: dict(beta=10.0))
|
||||||
|
|
||||||
|
# Steps
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
def init_loss(self, hparams: HyperParameters):
|
||||||
|
self.loss_layer = GLVQLoss(
|
||||||
|
margin=hparams.margin,
|
||||||
|
transfer_fn=hparams.transfer_fn,
|
||||||
|
**hparams.transfer_args,
|
||||||
|
)
|
||||||
|
|
||||||
|
def loss(self, comparison_measures, batch, components):
|
||||||
|
target = batch[1]
|
||||||
|
comp_labels = components[1]
|
||||||
|
loss = self.loss_layer(comparison_measures, target, comp_labels)
|
||||||
|
self.log('loss', loss)
|
||||||
|
return loss
|
86
prototorch/y/architectures/optimization.py
Normal file
86
prototorch/y/architectures/optimization.py
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Type
|
||||||
|
|
||||||
|
import torch
|
||||||
|
from prototorch.y import BaseYArchitecture
|
||||||
|
from torch.nn.parameter import Parameter
|
||||||
|
|
||||||
|
|
||||||
|
class SingleLearningRateMixin(BaseYArchitecture):
|
||||||
|
"""
|
||||||
|
Single Learning Rate
|
||||||
|
|
||||||
|
All parameters are updated with a single learning rate.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# HyperParameters
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
@dataclass
|
||||||
|
class HyperParameters(BaseYArchitecture.HyperParameters):
|
||||||
|
"""
|
||||||
|
lr: The learning rate. Default: 0.1.
|
||||||
|
optimizer: The optimizer to use. Default: torch.optim.Adam.
|
||||||
|
"""
|
||||||
|
lr: float = 0.1
|
||||||
|
optimizer: Type[torch.optim.Optimizer] = torch.optim.Adam
|
||||||
|
|
||||||
|
# Steps
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
def __init__(self, hparams: HyperParameters) -> None:
|
||||||
|
super().__init__(hparams)
|
||||||
|
self.lr = hparams.lr
|
||||||
|
self.optimizer = hparams.optimizer
|
||||||
|
|
||||||
|
# Hooks
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
def configure_optimizers(self):
|
||||||
|
return self.optimizer(self.parameters(), lr=self.lr) # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
class MultipleLearningRateMixin(BaseYArchitecture):
|
||||||
|
"""
|
||||||
|
Multiple Learning Rates
|
||||||
|
|
||||||
|
Define Different Learning Rates for different parameters.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# HyperParameters
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
@dataclass
|
||||||
|
class HyperParameters(BaseYArchitecture.HyperParameters):
|
||||||
|
"""
|
||||||
|
lr: The learning rate. Default: 0.1.
|
||||||
|
optimizer: The optimizer to use. Default: torch.optim.Adam.
|
||||||
|
"""
|
||||||
|
lr: dict = field(default_factory=lambda: dict())
|
||||||
|
optimizer: Type[torch.optim.Optimizer] = torch.optim.Adam
|
||||||
|
|
||||||
|
# Steps
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
def __init__(self, hparams: HyperParameters) -> None:
|
||||||
|
super().__init__(hparams)
|
||||||
|
self.lr = hparams.lr
|
||||||
|
self.optimizer = hparams.optimizer
|
||||||
|
|
||||||
|
# Hooks
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
def configure_optimizers(self):
|
||||||
|
optimizers = []
|
||||||
|
for name, lr in self.lr.items():
|
||||||
|
if not hasattr(self, name):
|
||||||
|
raise ValueError(f"{name} is not a parameter of {self}")
|
||||||
|
else:
|
||||||
|
model_part = getattr(self, name)
|
||||||
|
if isinstance(model_part, Parameter):
|
||||||
|
optimizers.append(
|
||||||
|
self.optimizer(
|
||||||
|
[model_part],
|
||||||
|
lr=lr, # type: ignore
|
||||||
|
))
|
||||||
|
elif hasattr(model_part, "parameters"):
|
||||||
|
optimizers.append(
|
||||||
|
self.optimizer(
|
||||||
|
model_part.parameters(),
|
||||||
|
lr=lr, # type: ignore
|
||||||
|
))
|
||||||
|
return optimizers
|
149
prototorch/y/callbacks.py
Normal file
149
prototorch/y/callbacks.py
Normal file
@ -0,0 +1,149 @@
|
|||||||
|
import warnings
|
||||||
|
from typing import Optional, Type
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytorch_lightning as pl
|
||||||
|
import torch
|
||||||
|
import torchmetrics
|
||||||
|
from matplotlib import pyplot as plt
|
||||||
|
from prototorch.models.vis import Vis2DAbstract
|
||||||
|
from prototorch.utils.utils import mesh2d
|
||||||
|
from prototorch.y.architectures.base import BaseYArchitecture
|
||||||
|
from prototorch.y.library.gmlvq import GMLVQ
|
||||||
|
from pytorch_lightning.loggers import TensorBoardLogger
|
||||||
|
|
||||||
|
DIVERGING_COLOR_MAPS = [
|
||||||
|
'PiYG', 'PRGn', 'BrBG', 'PuOr', 'RdGy', 'RdBu', 'RdYlBu', 'RdYlGn',
|
||||||
|
'Spectral', 'coolwarm', 'bwr', 'seismic'
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class LogTorchmetricCallback(pl.Callback):
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
name,
|
||||||
|
metric: Type[torchmetrics.Metric],
|
||||||
|
on="prediction",
|
||||||
|
**metric_kwargs,
|
||||||
|
) -> None:
|
||||||
|
self.name = name
|
||||||
|
self.metric = metric
|
||||||
|
self.metric_kwargs = metric_kwargs
|
||||||
|
self.on = on
|
||||||
|
|
||||||
|
def setup(
|
||||||
|
self,
|
||||||
|
trainer: pl.Trainer,
|
||||||
|
pl_module: BaseYArchitecture,
|
||||||
|
stage: Optional[str] = None,
|
||||||
|
) -> None:
|
||||||
|
if self.on == "prediction":
|
||||||
|
pl_module.register_torchmetric(
|
||||||
|
self.name,
|
||||||
|
self.metric,
|
||||||
|
**self.metric_kwargs,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"{self.on} is no valid metric hook")
|
||||||
|
|
||||||
|
|
||||||
|
class VisGLVQ2D(Vis2DAbstract):
|
||||||
|
|
||||||
|
def visualize(self, pl_module):
|
||||||
|
protos = pl_module.prototypes
|
||||||
|
plabels = pl_module.prototype_labels
|
||||||
|
x_train, y_train = self.x_train, self.y_train
|
||||||
|
ax = self.setup_ax()
|
||||||
|
self.plot_protos(ax, protos, plabels)
|
||||||
|
if x_train is not None:
|
||||||
|
self.plot_data(ax, x_train, y_train)
|
||||||
|
mesh_input, xx, yy = mesh2d(
|
||||||
|
np.vstack([x_train, protos]),
|
||||||
|
self.border,
|
||||||
|
self.resolution,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
mesh_input, xx, yy = mesh2d(protos, self.border, self.resolution)
|
||||||
|
_components = pl_module.components_layer.components
|
||||||
|
mesh_input = torch.from_numpy(mesh_input).type_as(_components)
|
||||||
|
y_pred = pl_module.predict(mesh_input)
|
||||||
|
y_pred = y_pred.cpu().reshape(xx.shape)
|
||||||
|
ax.contourf(xx, yy, y_pred, cmap=self.cmap, alpha=0.35)
|
||||||
|
|
||||||
|
|
||||||
|
class VisGMLVQ2D(Vis2DAbstract):
|
||||||
|
|
||||||
|
def __init__(self, *args, ev_proj=True, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self.ev_proj = ev_proj
|
||||||
|
|
||||||
|
def visualize(self, pl_module):
|
||||||
|
protos = pl_module.prototypes
|
||||||
|
plabels = pl_module.prototype_labels
|
||||||
|
x_train, y_train = self.x_train, self.y_train
|
||||||
|
device = pl_module.device
|
||||||
|
omega = pl_module._omega.detach()
|
||||||
|
lam = omega @ omega.T
|
||||||
|
u, _, _ = torch.pca_lowrank(lam, q=2)
|
||||||
|
with torch.no_grad():
|
||||||
|
x_train = torch.Tensor(x_train).to(device)
|
||||||
|
x_train = x_train @ u
|
||||||
|
x_train = x_train.cpu().detach()
|
||||||
|
if self.show_protos:
|
||||||
|
with torch.no_grad():
|
||||||
|
protos = torch.Tensor(protos).to(device)
|
||||||
|
protos = protos @ u
|
||||||
|
protos = protos.cpu().detach()
|
||||||
|
ax = self.setup_ax()
|
||||||
|
self.plot_data(ax, x_train, y_train)
|
||||||
|
if self.show_protos:
|
||||||
|
self.plot_protos(ax, protos, plabels)
|
||||||
|
|
||||||
|
|
||||||
|
class PlotLambdaMatrixToTensorboard(pl.Callback):
|
||||||
|
|
||||||
|
def __init__(self, cmap='seismic') -> None:
|
||||||
|
super().__init__()
|
||||||
|
self.cmap = cmap
|
||||||
|
|
||||||
|
if self.cmap not in DIVERGING_COLOR_MAPS and type(self.cmap) is str:
|
||||||
|
warnings.warn(
|
||||||
|
f"{self.cmap} is not a diverging color map. We recommend to use one of the following: {DIVERGING_COLOR_MAPS}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def on_train_start(self, trainer, pl_module: GMLVQ):
|
||||||
|
self.plot_lambda(trainer, pl_module)
|
||||||
|
|
||||||
|
def on_train_epoch_end(self, trainer, pl_module: GMLVQ):
|
||||||
|
self.plot_lambda(trainer, pl_module)
|
||||||
|
|
||||||
|
def plot_lambda(self, trainer, pl_module: GMLVQ):
|
||||||
|
|
||||||
|
self.fig, self.ax = plt.subplots(1, 1)
|
||||||
|
|
||||||
|
# plot lambda matrix
|
||||||
|
l_matrix = pl_module.lambda_matrix
|
||||||
|
|
||||||
|
# normalize lambda matrix
|
||||||
|
l_matrix = l_matrix / torch.max(torch.abs(l_matrix))
|
||||||
|
|
||||||
|
# plot lambda matrix
|
||||||
|
self.ax.imshow(l_matrix.detach().numpy(), self.cmap, vmin=-1, vmax=1)
|
||||||
|
|
||||||
|
self.fig.colorbar(self.ax.images[-1])
|
||||||
|
|
||||||
|
# add title
|
||||||
|
self.ax.set_title('Lambda Matrix')
|
||||||
|
|
||||||
|
# add to tensorboard
|
||||||
|
if isinstance(trainer.logger, TensorBoardLogger):
|
||||||
|
trainer.logger.experiment.add_figure(
|
||||||
|
f"lambda_matrix",
|
||||||
|
self.fig,
|
||||||
|
trainer.global_step,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
warnings.warn(
|
||||||
|
f"{self.__class__.__name__} is not compatible with {trainer.logger.__class__.__name__} as logger. Use TensorBoardLogger instead."
|
||||||
|
)
|
5
prototorch/y/library/__init__.py
Normal file
5
prototorch/y/library/__init__.py
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
from .glvq import GLVQ
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"GLVQ",
|
||||||
|
]
|
35
prototorch/y/library/glvq.py
Normal file
35
prototorch/y/library/glvq.py
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from prototorch.y import (
|
||||||
|
SimpleComparisonMixin,
|
||||||
|
SingleLearningRateMixin,
|
||||||
|
SupervisedArchitecture,
|
||||||
|
WTACompetitionMixin,
|
||||||
|
)
|
||||||
|
from prototorch.y.architectures.loss import GLVQLossMixin
|
||||||
|
|
||||||
|
|
||||||
|
class GLVQ(
|
||||||
|
SupervisedArchitecture,
|
||||||
|
SimpleComparisonMixin,
|
||||||
|
GLVQLossMixin,
|
||||||
|
WTACompetitionMixin,
|
||||||
|
SingleLearningRateMixin,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Generalized Learning Vector Quantization (GLVQ)
|
||||||
|
|
||||||
|
A GLVQ architecture that uses the winner-take-all strategy and the GLVQ loss.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class HyperParameters(
|
||||||
|
SimpleComparisonMixin.HyperParameters,
|
||||||
|
SingleLearningRateMixin.HyperParameters,
|
||||||
|
GLVQLossMixin.HyperParameters,
|
||||||
|
WTACompetitionMixin.HyperParameters,
|
||||||
|
SupervisedArchitecture.HyperParameters,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
No hyperparameters.
|
||||||
|
"""
|
50
prototorch/y/library/gmlvq.py
Normal file
50
prototorch/y/library/gmlvq.py
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
|
import torch
|
||||||
|
from prototorch.core.distances import omega_distance
|
||||||
|
from prototorch.y import (
|
||||||
|
GLVQLossMixin,
|
||||||
|
MultipleLearningRateMixin,
|
||||||
|
OmegaComparisonMixin,
|
||||||
|
SupervisedArchitecture,
|
||||||
|
WTACompetitionMixin,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class GMLVQ(
|
||||||
|
SupervisedArchitecture,
|
||||||
|
OmegaComparisonMixin,
|
||||||
|
GLVQLossMixin,
|
||||||
|
WTACompetitionMixin,
|
||||||
|
MultipleLearningRateMixin,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Generalized Matrix Learning Vector Quantization (GMLVQ)
|
||||||
|
|
||||||
|
A GMLVQ architecture that uses the winner-take-all strategy and the GLVQ loss.
|
||||||
|
"""
|
||||||
|
# HyperParameters
|
||||||
|
# ----------------------------------------------------------------------------------------------------
|
||||||
|
@dataclass
|
||||||
|
class HyperParameters(
|
||||||
|
MultipleLearningRateMixin.HyperParameters,
|
||||||
|
OmegaComparisonMixin.HyperParameters,
|
||||||
|
GLVQLossMixin.HyperParameters,
|
||||||
|
WTACompetitionMixin.HyperParameters,
|
||||||
|
SupervisedArchitecture.HyperParameters,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
comparison_fn: The comparison / dissimilarity function to use. Override Default: omega_distance.
|
||||||
|
comparison_args: Keyword arguments for the comparison function. Override Default: {}.
|
||||||
|
"""
|
||||||
|
comparison_fn: Callable = omega_distance
|
||||||
|
comparison_args: dict = field(default_factory=lambda: dict())
|
||||||
|
optimizer: type[torch.optim.Optimizer] = torch.optim.Adam
|
||||||
|
|
||||||
|
lr: dict = field(default_factory=lambda: dict(
|
||||||
|
components_layer=0.1,
|
||||||
|
_omega=0.5,
|
||||||
|
))
|
2
setup.py
2
setup.py
@ -55,7 +55,7 @@ ALL = CLI + DEV + DOCS + EXAMPLES + TESTS
|
|||||||
|
|
||||||
setup(
|
setup(
|
||||||
name=safe_name("prototorch_" + PLUGIN_NAME),
|
name=safe_name("prototorch_" + PLUGIN_NAME),
|
||||||
version="0.5.2",
|
version="1.0.0-a2",
|
||||||
description="Pre-packaged prototype-based "
|
description="Pre-packaged prototype-based "
|
||||||
"machine learning models using ProtoTorch and PyTorch-Lightning.",
|
"machine learning models using ProtoTorch and PyTorch-Lightning.",
|
||||||
long_description=long_description,
|
long_description=long_description,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user