refactor(api)!: merge the new api changes into dev
This commit is contained in:
@@ -4,8 +4,19 @@ from importlib.metadata import PackageNotFoundError, version
|
||||
|
||||
from .callbacks import PrototypeConvergence, PruneLoserPrototypes
|
||||
from .cbc import CBC, ImageCBC
|
||||
from .glvq import (GLVQ, GLVQ1, GLVQ21, GMLVQ, GRLVQ, LGMLVQ, LVQMLN,
|
||||
ImageGLVQ, ImageGMLVQ, SiameseGLVQ, SiameseGMLVQ)
|
||||
from .glvq import (
|
||||
GLVQ,
|
||||
GLVQ1,
|
||||
GLVQ21,
|
||||
GMLVQ,
|
||||
GRLVQ,
|
||||
LGMLVQ,
|
||||
LVQMLN,
|
||||
ImageGLVQ,
|
||||
ImageGMLVQ,
|
||||
SiameseGLVQ,
|
||||
SiameseGMLVQ,
|
||||
)
|
||||
from .knn import KNN
|
||||
from .lvq import LVQ1, LVQ21, MedianLVQ
|
||||
from .probabilistic import CELVQ, PLVQ, RSLVQ, SLVQ
|
||||
|
@@ -5,9 +5,13 @@ from typing import Final, final
|
||||
import pytorch_lightning as pl
|
||||
import torch
|
||||
import torchmetrics
|
||||
from prototorch.components import Components, LabeledComponents
|
||||
from prototorch.functions.distances import euclidean_distance
|
||||
from prototorch.modules import WTAC, LambdaLayer
|
||||
|
||||
from ..core.competitions import WTAC
|
||||
from ..core.components import Components, LabeledComponents
|
||||
from ..core.distances import euclidean_distance
|
||||
from ..core.initializers import LabelsInitializer
|
||||
from ..core.pooling import stratified_min_pooling
|
||||
from ..nn.wrappers import LambdaLayer
|
||||
|
||||
|
||||
class ProtoTorchMixin(object):
|
||||
@@ -85,13 +89,11 @@ class UnsupervisedPrototypeModel(PrototypeModel):
|
||||
super().__init__(hparams, **kwargs)
|
||||
|
||||
# Layers
|
||||
prototype_initializer = kwargs.get("prototype_initializer", None)
|
||||
initialized_prototypes = kwargs.get("initialized_prototypes", None)
|
||||
if prototype_initializer is not None or initialized_prototypes is not None:
|
||||
prototypes_initializer = kwargs.get("prototypes_initializer", None)
|
||||
if prototypes_initializer is not None:
|
||||
self.proto_layer = Components(
|
||||
self.hparams.num_prototypes,
|
||||
initializer=prototype_initializer,
|
||||
initialized_components=initialized_prototypes,
|
||||
initializer=prototypes_initializer,
|
||||
)
|
||||
|
||||
def compute_distances(self, x):
|
||||
@@ -109,23 +111,24 @@ class SupervisedPrototypeModel(PrototypeModel):
|
||||
super().__init__(hparams, **kwargs)
|
||||
|
||||
# Layers
|
||||
prototype_initializer = kwargs.get("prototype_initializer", None)
|
||||
initialized_prototypes = kwargs.get("initialized_prototypes", None)
|
||||
if prototype_initializer is not None or initialized_prototypes is not None:
|
||||
prototypes_initializer = kwargs.get("prototypes_initializer", None)
|
||||
labels_initializer = kwargs.get("labels_initializer",
|
||||
LabelsInitializer())
|
||||
if prototypes_initializer is not None:
|
||||
self.proto_layer = LabeledComponents(
|
||||
distribution=self.hparams.distribution,
|
||||
initializer=prototype_initializer,
|
||||
initialized_components=initialized_prototypes,
|
||||
components_initializer=prototypes_initializer,
|
||||
labels_initializer=labels_initializer,
|
||||
)
|
||||
self.competition_layer = WTAC()
|
||||
|
||||
@property
|
||||
def prototype_labels(self):
|
||||
return self.proto_layer.component_labels.detach().cpu()
|
||||
return self.proto_layer.labels.detach().cpu()
|
||||
|
||||
@property
|
||||
def num_classes(self):
|
||||
return len(self.proto_layer.distribution)
|
||||
return self.proto_layer.num_classes
|
||||
|
||||
def compute_distances(self, x):
|
||||
protos, _ = self.proto_layer()
|
||||
@@ -134,15 +137,14 @@ class SupervisedPrototypeModel(PrototypeModel):
|
||||
|
||||
def forward(self, x):
|
||||
distances = self.compute_distances(x)
|
||||
y_pred = self.predict_from_distances(distances)
|
||||
# TODO
|
||||
y_pred = torch.eye(self.num_classes, device=self.device)[
|
||||
y_pred.long()] # depends on labels {0,...,num_classes}
|
||||
plabels = self.proto_layer.labels
|
||||
winning = stratified_min_pooling(distances, plabels)
|
||||
y_pred = torch.nn.functional.softmin(winning)
|
||||
return y_pred
|
||||
|
||||
def predict_from_distances(self, distances):
|
||||
with torch.no_grad():
|
||||
plabels = self.proto_layer.component_labels
|
||||
plabels = self.proto_layer.labels
|
||||
y_pred = self.competition_layer(distances, plabels)
|
||||
return y_pred
|
||||
|
||||
|
@@ -4,8 +4,9 @@ import logging
|
||||
|
||||
import pytorch_lightning as pl
|
||||
import torch
|
||||
from prototorch.components import Components
|
||||
|
||||
from ..core.components import Components
|
||||
from ..core.initializers import LiteralCompInitializer
|
||||
from .extras import ConnectionTopology
|
||||
|
||||
|
||||
@@ -16,7 +17,7 @@ class PruneLoserPrototypes(pl.Callback):
|
||||
prune_quota_per_epoch=-1,
|
||||
frequency=1,
|
||||
replace=False,
|
||||
initializer=None,
|
||||
prototypes_initializer=None,
|
||||
verbose=False):
|
||||
self.threshold = threshold # minimum win ratio
|
||||
self.idle_epochs = idle_epochs # epochs to wait before pruning
|
||||
@@ -24,7 +25,7 @@ class PruneLoserPrototypes(pl.Callback):
|
||||
self.frequency = frequency
|
||||
self.replace = replace
|
||||
self.verbose = verbose
|
||||
self.initializer = initializer
|
||||
self.prototypes_initializer = prototypes_initializer
|
||||
|
||||
def on_epoch_end(self, trainer, pl_module):
|
||||
if (trainer.current_epoch + 1) < self.idle_epochs:
|
||||
@@ -55,8 +56,9 @@ class PruneLoserPrototypes(pl.Callback):
|
||||
if self.verbose:
|
||||
print(f"Re-adding pruned prototypes...")
|
||||
print(f"{distribution=}")
|
||||
pl_module.add_prototypes(distribution=distribution,
|
||||
initializer=self.initializer)
|
||||
pl_module.add_prototypes(
|
||||
distribution=distribution,
|
||||
components_initializer=self.prototypes_initializer)
|
||||
new_num_protos = pl_module.num_prototypes
|
||||
if self.verbose:
|
||||
print(f"`num_prototypes` changed from {cur_num_protos} "
|
||||
@@ -116,7 +118,8 @@ class GNGCallback(pl.Callback):
|
||||
|
||||
# Add component
|
||||
pl_module.proto_layer.add_components(
|
||||
initialized_components=new_component.unsqueeze(0))
|
||||
None,
|
||||
initializer=LiteralCompInitializer(new_component.unsqueeze(0)))
|
||||
|
||||
# Adjust Topology
|
||||
topology.add_prototype()
|
||||
|
@@ -1,49 +1,54 @@
|
||||
import torch
|
||||
import torchmetrics
|
||||
|
||||
from ..core.competitions import CBCC
|
||||
from ..core.components import ReasoningComponents
|
||||
from ..core.initializers import RandomReasoningsInitializer
|
||||
from ..core.losses import MarginLoss
|
||||
from ..core.similarities import euclidean_similarity
|
||||
from ..nn.wrappers import LambdaLayer
|
||||
from .abstract import ImagePrototypesMixin
|
||||
from .extras import (CosineSimilarity, MarginLoss, ReasoningLayer,
|
||||
euclidean_similarity, rescaled_cosine_similarity,
|
||||
shift_activation)
|
||||
from .glvq import SiameseGLVQ
|
||||
|
||||
|
||||
class CBC(SiameseGLVQ):
|
||||
"""Classification-By-Components."""
|
||||
def __init__(self, hparams, margin=0.1, **kwargs):
|
||||
def __init__(self, hparams, **kwargs):
|
||||
super().__init__(hparams, **kwargs)
|
||||
self.margin = margin
|
||||
self.similarity_fn = kwargs.get("similarity_fn", euclidean_similarity)
|
||||
num_components = self.components.shape[0]
|
||||
self.reasoning_layer = ReasoningLayer(num_components=num_components,
|
||||
num_classes=self.num_classes)
|
||||
self.component_layer = self.proto_layer
|
||||
|
||||
@property
|
||||
def components(self):
|
||||
return self.prototypes
|
||||
similarity_fn = kwargs.get("similarity_fn", euclidean_similarity)
|
||||
components_initializer = kwargs.get("components_initializer", None)
|
||||
reasonings_initializer = kwargs.get("reasonings_initializer",
|
||||
RandomReasoningsInitializer())
|
||||
self.components_layer = ReasoningComponents(
|
||||
self.hparams.distribution,
|
||||
components_initializer=components_initializer,
|
||||
reasonings_initializer=reasonings_initializer,
|
||||
)
|
||||
self.similarity_layer = LambdaLayer(similarity_fn)
|
||||
self.competition_layer = CBCC()
|
||||
|
||||
@property
|
||||
def reasonings(self):
|
||||
return self.reasoning_layer.reasonings.cpu()
|
||||
# Namespace hook
|
||||
self.proto_layer = self.components_layer
|
||||
|
||||
self.loss = MarginLoss(self.hparams.margin)
|
||||
|
||||
def forward(self, x):
|
||||
components, _ = self.component_layer()
|
||||
components, reasonings = self.components_layer()
|
||||
latent_x = self.backbone(x)
|
||||
self.backbone.requires_grad_(self.both_path_gradients)
|
||||
latent_components = self.backbone(components)
|
||||
self.backbone.requires_grad_(True)
|
||||
detections = self.similarity_fn(latent_x, latent_components)
|
||||
probs = self.reasoning_layer(detections)
|
||||
detections = self.similarity_layer(latent_x, latent_components)
|
||||
probs = self.competition_layer(detections, reasonings)
|
||||
return probs
|
||||
|
||||
def shared_step(self, batch, batch_idx, optimizer_idx=None):
|
||||
x, y = batch
|
||||
# x = x.view(x.size(0), -1)
|
||||
y_pred = self(x)
|
||||
num_classes = self.reasoning_layer.num_classes
|
||||
num_classes = self.num_classes
|
||||
y_true = torch.nn.functional.one_hot(y.long(), num_classes=num_classes)
|
||||
loss = MarginLoss(self.margin)(y_pred, y_true).mean(dim=0)
|
||||
loss = self.loss(y_pred, y_true).mean(dim=0)
|
||||
return y_pred, loss
|
||||
|
||||
def training_step(self, batch, batch_idx, optimizer_idx=None):
|
||||
@@ -70,7 +75,3 @@ class ImageCBC(ImagePrototypesMixin, CBC):
|
||||
"""CBC model that constrains the components to the range [0, 1] by
|
||||
clamping after updates.
|
||||
"""
|
||||
def __init__(self, hparams, **kwargs):
|
||||
super().__init__(hparams, **kwargs)
|
||||
# Namespace hook
|
||||
self.proto_layer = self.component_layer
|
||||
|
@@ -5,23 +5,32 @@ Modules not yet available in prototorch go here temporarily.
|
||||
"""
|
||||
|
||||
import torch
|
||||
from prototorch.functions.distances import euclidean_distance
|
||||
from prototorch.functions.similarities import cosine_similarity
|
||||
|
||||
from ..core.similarities import gaussian
|
||||
|
||||
|
||||
def rescaled_cosine_similarity(x, y):
|
||||
"""Cosine Similarity rescaled to [0, 1]."""
|
||||
similarities = cosine_similarity(x, y)
|
||||
return (similarities + 1.0) / 2.0
|
||||
def rank_scaled_gaussian(distances, lambd):
|
||||
order = torch.argsort(distances, dim=1)
|
||||
ranks = torch.argsort(order, dim=1)
|
||||
return torch.exp(-torch.exp(-ranks / lambd) * distances)
|
||||
|
||||
|
||||
def shift_activation(x):
|
||||
return (x + 1.0) / 2.0
|
||||
class GaussianPrior(torch.nn.Module):
|
||||
def __init__(self, variance):
|
||||
super().__init__()
|
||||
self.variance = variance
|
||||
|
||||
def forward(self, distances):
|
||||
return gaussian(distances, self.variance)
|
||||
|
||||
|
||||
def euclidean_similarity(x, y, variance=1.0):
|
||||
d = euclidean_distance(x, y)
|
||||
return torch.exp(-(d * d) / (2 * variance))
|
||||
class RankScaledGaussianPrior(torch.nn.Module):
|
||||
def __init__(self, lambd):
|
||||
super().__init__()
|
||||
self.lambd = lambd
|
||||
|
||||
def forward(self, distances):
|
||||
return rank_scaled_gaussian(distances, self.lambd)
|
||||
|
||||
|
||||
class ConnectionTopology(torch.nn.Module):
|
||||
@@ -79,64 +88,3 @@ class ConnectionTopology(torch.nn.Module):
|
||||
|
||||
def extra_repr(self):
|
||||
return f"(agelimit): ({self.agelimit})"
|
||||
|
||||
|
||||
class CosineSimilarity(torch.nn.Module):
|
||||
def __init__(self, activation=shift_activation):
|
||||
super().__init__()
|
||||
self.activation = activation
|
||||
|
||||
def forward(self, x, y):
|
||||
epsilon = torch.finfo(x.dtype).eps
|
||||
normed_x = (x / x.pow(2).sum(dim=tuple(range(
|
||||
1, x.ndim)), keepdim=True).clamp(min=epsilon).sqrt()).flatten(
|
||||
start_dim=1)
|
||||
normed_y = (y / y.pow(2).sum(dim=tuple(range(
|
||||
1, y.ndim)), keepdim=True).clamp(min=epsilon).sqrt()).flatten(
|
||||
start_dim=1)
|
||||
# normed_x = (x / torch.linalg.norm(x, dim=1))
|
||||
diss = torch.inner(normed_x, normed_y)
|
||||
return self.activation(diss)
|
||||
|
||||
|
||||
class MarginLoss(torch.nn.modules.loss._Loss):
|
||||
def __init__(self,
|
||||
margin=0.3,
|
||||
size_average=None,
|
||||
reduce=None,
|
||||
reduction="mean"):
|
||||
super().__init__(size_average, reduce, reduction)
|
||||
self.margin = margin
|
||||
|
||||
def forward(self, input_, target):
|
||||
dp = torch.sum(target * input_, dim=-1)
|
||||
dm = torch.max(input_ - target, dim=-1).values
|
||||
return torch.nn.functional.relu(dm - dp + self.margin)
|
||||
|
||||
|
||||
class ReasoningLayer(torch.nn.Module):
|
||||
def __init__(self, num_components, num_classes, num_replicas=1):
|
||||
super().__init__()
|
||||
self.num_replicas = num_replicas
|
||||
self.num_classes = num_classes
|
||||
probabilities_init = torch.zeros(2, 1, num_components,
|
||||
self.num_classes)
|
||||
probabilities_init.uniform_(0.4, 0.6)
|
||||
# TODO Use `self.register_parameter("param", Paramater(param))` instead
|
||||
self.reasoning_probabilities = torch.nn.Parameter(probabilities_init)
|
||||
|
||||
@property
|
||||
def reasonings(self):
|
||||
pk = self.reasoning_probabilities[0]
|
||||
nk = (1 - pk) * self.reasoning_probabilities[1]
|
||||
ik = 1 - pk - nk
|
||||
img = torch.cat([pk, nk, ik], dim=0).permute(1, 0, 2)
|
||||
return img.unsqueeze(1)
|
||||
|
||||
def forward(self, detections):
|
||||
pk = self.reasoning_probabilities[0].clamp(0, 1)
|
||||
nk = (1 - pk) * self.reasoning_probabilities[1].clamp(0, 1)
|
||||
numerator = (detections @ (pk - nk)) + nk.sum(1)
|
||||
probs = numerator / (pk + nk).sum(1)
|
||||
probs = probs.squeeze(0)
|
||||
return probs
|
||||
|
@@ -1,16 +1,14 @@
|
||||
"""Models based on the GLVQ framework."""
|
||||
|
||||
import torch
|
||||
from prototorch.functions.activations import get_activation
|
||||
from prototorch.functions.competitions import wtac
|
||||
from prototorch.functions.distances import (lomega_distance, omega_distance,
|
||||
squared_euclidean_distance)
|
||||
from prototorch.functions.helper import get_flat
|
||||
from prototorch.functions.losses import glvq_loss, lvq1_loss, lvq21_loss
|
||||
from prototorch.components import LinearMapping
|
||||
from prototorch.modules import LambdaLayer, LossLayer
|
||||
from torch.nn.parameter import Parameter
|
||||
|
||||
from ..core.competitions import wtac
|
||||
from ..core.distances import lomega_distance, omega_distance, squared_euclidean_distance
|
||||
from ..core.initializers import EyeTransformInitializer
|
||||
from ..core.losses import glvq_loss, lvq1_loss, lvq21_loss
|
||||
from ..nn.activations import get_activation
|
||||
from ..nn.wrappers import LambdaLayer, LossLayer
|
||||
from .abstract import ImagePrototypesMixin, SupervisedPrototypeModel
|
||||
|
||||
|
||||
@@ -30,9 +28,6 @@ class GLVQ(SupervisedPrototypeModel):
|
||||
# Loss
|
||||
self.loss = LossLayer(glvq_loss)
|
||||
|
||||
# Prototype metrics
|
||||
self.initialize_prototype_win_ratios()
|
||||
|
||||
def initialize_prototype_win_ratios(self):
|
||||
self.register_buffer(
|
||||
"prototype_win_ratios",
|
||||
@@ -59,7 +54,7 @@ class GLVQ(SupervisedPrototypeModel):
|
||||
def shared_step(self, batch, batch_idx, optimizer_idx=None):
|
||||
x, y = batch
|
||||
out = self.compute_distances(x)
|
||||
plabels = self.proto_layer.component_labels
|
||||
plabels = self.proto_layer.labels
|
||||
mu = self.loss(out, y, prototype_labels=plabels)
|
||||
batch_loss = self.transfer_layer(mu, beta=self.hparams.transfer_beta)
|
||||
loss = batch_loss.sum(dim=0)
|
||||
@@ -135,7 +130,7 @@ class SiameseGLVQ(GLVQ):
|
||||
|
||||
def compute_distances(self, x):
|
||||
protos, _ = self.proto_layer()
|
||||
x, protos = get_flat(x, protos)
|
||||
x, protos = [arr.view(arr.size(0), -1) for arr in (x, protos)]
|
||||
latent_x = self.backbone(x)
|
||||
self.backbone.requires_grad_(self.both_path_gradients)
|
||||
latent_protos = self.backbone(protos)
|
||||
@@ -240,18 +235,14 @@ class GMLVQ(GLVQ):
|
||||
super().__init__(hparams, distance_fn=distance_fn, **kwargs)
|
||||
|
||||
# Additional parameters
|
||||
omega_initializer = kwargs.get("omega_initializer", None)
|
||||
initialized_omega = kwargs.get("initialized_omega", None)
|
||||
if omega_initializer is not None or initialized_omega is not None:
|
||||
self.omega_layer = LinearMapping(
|
||||
mapping_shape=(self.hparams.input_dim, self.hparams.latent_dim),
|
||||
initializer=omega_initializer,
|
||||
initialized_linearmapping=initialized_omega,
|
||||
)
|
||||
omega_initializer = kwargs.get("omega_initializer",
|
||||
EyeTransformInitializer())
|
||||
omega = omega_initializer.generate(self.hparams.input_dim,
|
||||
self.hparams.latent_dim)
|
||||
self.register_parameter("_omega", Parameter(omega))
|
||||
self.backbone = LambdaLayer(lambda x: x @ self._omega,
|
||||
name="omega matrix")
|
||||
|
||||
self.register_parameter("_omega", Parameter(self.omega_layer.mapping))
|
||||
self.backbone = LambdaLayer(lambda x: x @ self._omega, name = "omega matrix")
|
||||
|
||||
@property
|
||||
def omega_matrix(self):
|
||||
return self._omega.detach().cpu()
|
||||
@@ -264,24 +255,6 @@ class GMLVQ(GLVQ):
|
||||
def extra_repr(self):
|
||||
return f"(omega): (shape: {tuple(self._omega.shape)})"
|
||||
|
||||
def predict_latent(self, x, map_protos=True):
|
||||
"""Predict `x` assuming it is already embedded in the latent space.
|
||||
|
||||
Only the prototypes are embedded in the latent space using the
|
||||
backbone.
|
||||
|
||||
"""
|
||||
self.eval()
|
||||
with torch.no_grad():
|
||||
protos, plabels = self.proto_layer()
|
||||
if map_protos:
|
||||
protos = self.backbone(protos)
|
||||
d = squared_euclidean_distance(x, protos)
|
||||
y_pred = wtac(d, plabels)
|
||||
return y_pred
|
||||
|
||||
|
||||
|
||||
|
||||
class LGMLVQ(GMLVQ):
|
||||
"""Localized and Generalized Matrix Learning Vector Quantization."""
|
||||
|
@@ -2,9 +2,10 @@
|
||||
|
||||
import warnings
|
||||
|
||||
from prototorch.components import LabeledComponents
|
||||
from prototorch.modules import KNNC
|
||||
|
||||
from ..core.competitions import KNNC
|
||||
from ..core.components import LabeledComponents
|
||||
from ..core.initializers import LiteralCompInitializer, LiteralLabelsInitializer
|
||||
from ..utils.utils import parse_data_arg
|
||||
from .abstract import SupervisedPrototypeModel
|
||||
|
||||
|
||||
@@ -19,9 +20,13 @@ class KNN(SupervisedPrototypeModel):
|
||||
data = kwargs.get("data", None)
|
||||
if data is None:
|
||||
raise ValueError("KNN requires data, but was not provided!")
|
||||
data, targets = parse_data_arg(data)
|
||||
|
||||
# Layers
|
||||
self.proto_layer = LabeledComponents(initialized_components=data)
|
||||
self.proto_layer = LabeledComponents(
|
||||
distribution=[],
|
||||
components_initializer=LiteralCompInitializer(data),
|
||||
labels_initializer=LiteralLabelsInitializer(targets))
|
||||
self.competition_layer = KNNC(k=self.hparams.k)
|
||||
|
||||
def training_step(self, train_batch, batch_idx, optimizer_idx=None):
|
||||
|
@@ -1,7 +1,6 @@
|
||||
"""LVQ models that are optimized using non-gradient methods."""
|
||||
|
||||
from prototorch.functions.losses import _get_dp_dm
|
||||
|
||||
from ..core.losses import _get_dp_dm
|
||||
from .abstract import NonGradientMixin
|
||||
from .glvq import GLVQ
|
||||
|
||||
@@ -10,7 +9,7 @@ class LVQ1(NonGradientMixin, GLVQ):
|
||||
"""Learning Vector Quantization 1."""
|
||||
def training_step(self, train_batch, batch_idx, optimizer_idx=None):
|
||||
protos = self.proto_layer.components
|
||||
plabels = self.proto_layer.component_labels
|
||||
plabels = self.proto_layer.labels
|
||||
|
||||
x, y = train_batch
|
||||
dis = self.compute_distances(x)
|
||||
@@ -29,6 +28,8 @@ class LVQ1(NonGradientMixin, GLVQ):
|
||||
self.proto_layer.load_state_dict({"_components": updated_protos},
|
||||
strict=False)
|
||||
|
||||
print(f"{dis=}")
|
||||
print(f"{y=}")
|
||||
# Logging
|
||||
self.log_acc(dis, y, tag="train_acc")
|
||||
|
||||
@@ -39,7 +40,7 @@ class LVQ21(NonGradientMixin, GLVQ):
|
||||
"""Learning Vector Quantization 2.1."""
|
||||
def training_step(self, train_batch, batch_idx, optimizer_idx=None):
|
||||
protos = self.proto_layer.components
|
||||
plabels = self.proto_layer.component_labels
|
||||
plabels = self.proto_layer.labels
|
||||
|
||||
x, y = train_batch
|
||||
dis = self.compute_distances(x)
|
||||
|
@@ -1,13 +1,11 @@
|
||||
"""Probabilistic GLVQ methods"""
|
||||
|
||||
import torch
|
||||
from prototorch.functions.losses import nllr_loss, rslvq_loss
|
||||
from prototorch.functions.pooling import (stratified_min_pooling,
|
||||
stratified_sum_pooling)
|
||||
from prototorch.functions.transforms import (GaussianPrior,
|
||||
RankScaledGaussianPrior)
|
||||
from prototorch.modules import LambdaLayer, LossLayer
|
||||
|
||||
from ..core.losses import nllr_loss, rslvq_loss
|
||||
from ..core.pooling import stratified_min_pooling, stratified_sum_pooling
|
||||
from ..nn.wrappers import LambdaLayer, LossLayer
|
||||
from .extras import GaussianPrior, RankScaledGaussianPrior
|
||||
from .glvq import GLVQ, SiameseGMLVQ
|
||||
|
||||
|
||||
@@ -22,7 +20,7 @@ class CELVQ(GLVQ):
|
||||
def shared_step(self, batch, batch_idx, optimizer_idx=None):
|
||||
x, y = batch
|
||||
out = self.compute_distances(x) # [None, num_protos]
|
||||
plabels = self.proto_layer.component_labels
|
||||
plabels = self.proto_layer.labels
|
||||
winning = stratified_min_pooling(out, plabels) # [None, num_classes]
|
||||
probs = -1.0 * winning
|
||||
batch_loss = self.loss(probs, y.long())
|
||||
@@ -56,7 +54,7 @@ class ProbabilisticLVQ(GLVQ):
|
||||
def training_step(self, batch, batch_idx, optimizer_idx=None):
|
||||
x, y = batch
|
||||
out = self.forward(x)
|
||||
plabels = self.proto_layer.component_labels
|
||||
plabels = self.proto_layer.labels
|
||||
batch_loss = self.loss(out, y, plabels)
|
||||
loss = batch_loss.sum(dim=0)
|
||||
return loss
|
||||
@@ -89,11 +87,10 @@ class PLVQ(ProbabilisticLVQ, SiameseGMLVQ):
|
||||
self.hparams.lambd)
|
||||
self.loss = torch.nn.KLDivLoss()
|
||||
|
||||
def training_step(self, batch, batch_idx, optimizer_idx=None):
|
||||
x, y = batch
|
||||
out = self.forward(x)
|
||||
y_dist = torch.nn.functional.one_hot(
|
||||
y.long(), num_classes=self.num_classes).float()
|
||||
batch_loss = self.loss(out, y_dist)
|
||||
loss = batch_loss.sum(dim=0)
|
||||
return loss
|
||||
# FIXME
|
||||
# def training_step(self, batch, batch_idx, optimizer_idx=None):
|
||||
# x, y = batch
|
||||
# y_pred = self(x)
|
||||
# batch_loss = self.loss(y_pred, y)
|
||||
# loss = batch_loss.sum(dim=0)
|
||||
# return loss
|
||||
|
@@ -2,11 +2,11 @@
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from prototorch.functions.competitions import wtac
|
||||
from prototorch.functions.distances import squared_euclidean_distance
|
||||
from prototorch.modules import LambdaLayer
|
||||
from prototorch.modules.losses import NeuralGasEnergy
|
||||
|
||||
from ..core.competitions import wtac
|
||||
from ..core.distances import squared_euclidean_distance
|
||||
from ..core.losses import NeuralGasEnergy
|
||||
from ..nn.wrappers import LambdaLayer
|
||||
from .abstract import NonGradientMixin, UnsupervisedPrototypeModel
|
||||
from .callbacks import GNGCallback
|
||||
from .extras import ConnectionTopology
|
||||
|
@@ -7,6 +7,8 @@ import torchvision
|
||||
from matplotlib import pyplot as plt
|
||||
from torch.utils.data import DataLoader, Dataset
|
||||
|
||||
from ..utils.utils import mesh2d
|
||||
|
||||
|
||||
class Vis2DAbstract(pl.Callback):
|
||||
def __init__(self,
|
||||
@@ -73,23 +75,7 @@ class Vis2DAbstract(pl.Callback):
|
||||
ax.axis("off")
|
||||
return ax
|
||||
|
||||
def get_mesh_input(self, x):
|
||||
x_shift = self.border * np.ptp(x[:, 0])
|
||||
y_shift = self.border * np.ptp(x[:, 1])
|
||||
x_min, x_max = x[:, 0].min() - x_shift, x[:, 0].max() + x_shift
|
||||
y_min, y_max = x[:, 1].min() - y_shift, x[:, 1].max() + y_shift
|
||||
xx, yy = np.meshgrid(np.linspace(x_min, x_max, self.resolution),
|
||||
np.linspace(y_min, y_max, self.resolution))
|
||||
mesh_input = np.c_[xx.ravel(), yy.ravel()]
|
||||
return mesh_input, xx, yy
|
||||
|
||||
def perform_pca_2D(self, data):
|
||||
(_, eigVal, eigVec) = torch.pca_lowrank(data, q=2)
|
||||
return data @ eigVec
|
||||
|
||||
def plot_data(self, ax, x, y, pca=False):
|
||||
if pca:
|
||||
x = self.perform_pca_2D(x)
|
||||
def plot_data(self, ax, x, y):
|
||||
ax.scatter(
|
||||
x[:, 0],
|
||||
x[:, 1],
|
||||
@@ -100,9 +86,7 @@ class Vis2DAbstract(pl.Callback):
|
||||
s=30,
|
||||
)
|
||||
|
||||
def plot_protos(self, ax, protos, plabels, pca=False):
|
||||
if pca:
|
||||
protos = self.perform_pca_2D(protos)
|
||||
def plot_protos(self, ax, protos, plabels):
|
||||
ax.scatter(
|
||||
protos[:, 0],
|
||||
protos[:, 1],
|
||||
@@ -146,7 +130,7 @@ class VisGLVQ2D(Vis2DAbstract):
|
||||
self.plot_data(ax, x_train, y_train)
|
||||
self.plot_protos(ax, protos, plabels)
|
||||
x = np.vstack((x_train, protos))
|
||||
mesh_input, xx, yy = self.get_mesh_input(x)
|
||||
mesh_input, xx, yy = mesh2d(x, self.border, self.resolution)
|
||||
_components = pl_module.proto_layer._components
|
||||
mesh_input = torch.from_numpy(mesh_input).type_as(_components)
|
||||
y_pred = pl_module.predict(mesh_input)
|
||||
@@ -181,9 +165,9 @@ class VisSiameseGLVQ2D(Vis2DAbstract):
|
||||
if self.show_protos:
|
||||
self.plot_protos(ax, protos, plabels)
|
||||
x = np.vstack((x_train, protos))
|
||||
mesh_input, xx, yy = self.get_mesh_input(x)
|
||||
mesh_input, xx, yy = mesh2d(x, self.border, self.resolution)
|
||||
else:
|
||||
mesh_input, xx, yy = self.get_mesh_input(x_train)
|
||||
mesh_input, xx, yy = mesh2d(x_train, self.border, self.resolution)
|
||||
_components = pl_module.proto_layer._components
|
||||
mesh_input = torch.Tensor(mesh_input).type_as(_components)
|
||||
y_pred = pl_module.predict_latent(mesh_input,
|
||||
@@ -194,50 +178,6 @@ class VisSiameseGLVQ2D(Vis2DAbstract):
|
||||
self.log_and_display(trainer, pl_module)
|
||||
|
||||
|
||||
class VisGMLVQ2D(Vis2DAbstract):
|
||||
def __init__(self, *args, map_protos=True, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.map_protos = map_protos
|
||||
|
||||
def on_epoch_end(self, trainer, pl_module):
|
||||
if not self.precheck(trainer):
|
||||
return True
|
||||
|
||||
protos = pl_module.prototypes
|
||||
plabels = pl_module.prototype_labels
|
||||
x_train, y_train = self.x_train, self.y_train
|
||||
device = pl_module.device
|
||||
with torch.no_grad():
|
||||
x_train = pl_module.backbone(torch.Tensor(x_train).to(device))
|
||||
x_train = x_train.cpu().detach()
|
||||
if self.map_protos:
|
||||
with torch.no_grad():
|
||||
protos = pl_module.backbone(torch.Tensor(protos).to(device))
|
||||
protos = protos.cpu().detach()
|
||||
ax = self.setup_ax()
|
||||
if x_train.shape[1] > 2:
|
||||
self.plot_data(ax, x_train, y_train, pca=True)
|
||||
else:
|
||||
self.plot_data(ax, x_train, y_train, pca=False)
|
||||
if self.show_protos:
|
||||
if protos.shape[1] > 2:
|
||||
self.plot_protos(ax, protos, plabels, pca=True)
|
||||
else:
|
||||
self.plot_protos(ax, protos, plabels, pca=False)
|
||||
### something to work on: meshgrid with pca
|
||||
# x = np.vstack((x_train, protos))
|
||||
# mesh_input, xx, yy = self.get_mesh_input(x)
|
||||
#else:
|
||||
# mesh_input, xx, yy = self.get_mesh_input(x_train)
|
||||
#_components = pl_module.proto_layer._components
|
||||
#mesh_input = torch.Tensor(mesh_input).type_as(_components)
|
||||
#y_pred = pl_module.predict_latent(mesh_input,
|
||||
# map_protos=self.map_protos)
|
||||
#y_pred = y_pred.cpu().reshape(xx.shape)
|
||||
#ax.contourf(xx, yy, y_pred, cmap=self.cmap, alpha=0.35)
|
||||
self.log_and_display(trainer, pl_module)
|
||||
|
||||
|
||||
class VisCBC2D(Vis2DAbstract):
|
||||
def on_epoch_end(self, trainer, pl_module):
|
||||
if not self.precheck(trainer):
|
||||
@@ -250,8 +190,8 @@ class VisCBC2D(Vis2DAbstract):
|
||||
self.plot_data(ax, x_train, y_train)
|
||||
self.plot_protos(ax, protos, "w")
|
||||
x = np.vstack((x_train, protos))
|
||||
mesh_input, xx, yy = self.get_mesh_input(x)
|
||||
_components = pl_module.component_layer._components
|
||||
mesh_input, xx, yy = mesh2d(x, self.border, self.resolution)
|
||||
_components = pl_module.components_layer._components
|
||||
y_pred = pl_module.predict(
|
||||
torch.Tensor(mesh_input).type_as(_components))
|
||||
y_pred = y_pred.cpu().reshape(xx.shape)
|
||||
|
Reference in New Issue
Block a user