157 lines
5.4 KiB
Python
157 lines
5.4 KiB
Python
import pytorch_lightning as pl
|
|
import torch
|
|
import torchmetrics
|
|
from prototorch.components.components import Components
|
|
from prototorch.functions.distances import euclidean_distance
|
|
from prototorch.functions.similarities import cosine_similarity
|
|
|
|
from .abstract import (AbstractPrototypeModel, PrototypeImageModel,
|
|
SiamesePrototypeModel)
|
|
from .glvq import SiameseGLVQ
|
|
|
|
|
|
def rescaled_cosine_similarity(x, y):
|
|
"""Cosine Similarity rescaled to [0, 1]."""
|
|
similarities = cosine_similarity(x, y)
|
|
return (similarities + 1.0) / 2.0
|
|
|
|
|
|
def shift_activation(x):
|
|
return (x + 1.0) / 2.0
|
|
|
|
|
|
def euclidean_similarity(x, y, beta=3):
|
|
d = euclidean_distance(x, y)
|
|
return torch.exp(-d * beta)
|
|
|
|
|
|
class CosineSimilarity(torch.nn.Module):
|
|
def __init__(self, activation=shift_activation):
|
|
super().__init__()
|
|
self.activation = activation
|
|
|
|
def forward(self, x, y):
|
|
epsilon = torch.finfo(x.dtype).eps
|
|
normed_x = (x / x.pow(2).sum(dim=tuple(range(
|
|
1, x.ndim)), keepdim=True).clamp(min=epsilon).sqrt()).flatten(
|
|
start_dim=1)
|
|
normed_y = (y / y.pow(2).sum(dim=tuple(range(
|
|
1, y.ndim)), keepdim=True).clamp(min=epsilon).sqrt()).flatten(
|
|
start_dim=1)
|
|
# normed_x = (x / torch.linalg.norm(x, dim=1))
|
|
diss = torch.inner(normed_x, normed_y)
|
|
return self.activation(diss)
|
|
|
|
|
|
class MarginLoss(torch.nn.modules.loss._Loss):
|
|
def __init__(self,
|
|
margin=0.3,
|
|
size_average=None,
|
|
reduce=None,
|
|
reduction="mean"):
|
|
super().__init__(size_average, reduce, reduction)
|
|
self.margin = margin
|
|
|
|
def forward(self, input_, target):
|
|
dp = torch.sum(target * input_, dim=-1)
|
|
dm = torch.max(input_ - target, dim=-1).values
|
|
return torch.nn.functional.relu(dm - dp + self.margin)
|
|
|
|
|
|
class ReasoningLayer(torch.nn.Module):
|
|
def __init__(self, num_components, num_classes, n_replicas=1):
|
|
super().__init__()
|
|
self.n_replicas = n_replicas
|
|
self.num_classes = num_classes
|
|
probabilities_init = torch.zeros(2, 1, num_components,
|
|
self.num_classes)
|
|
probabilities_init.uniform_(0.4, 0.6)
|
|
self.reasoning_probabilities = torch.nn.Parameter(probabilities_init)
|
|
|
|
@property
|
|
def reasonings(self):
|
|
pk = self.reasoning_probabilities[0]
|
|
nk = (1 - pk) * self.reasoning_probabilities[1]
|
|
ik = 1 - pk - nk
|
|
img = torch.cat([pk, nk, ik], dim=0).permute(1, 0, 2)
|
|
return img.unsqueeze(1)
|
|
|
|
def forward(self, detections):
|
|
pk = self.reasoning_probabilities[0].clamp(0, 1)
|
|
nk = (1 - pk) * self.reasoning_probabilities[1].clamp(0, 1)
|
|
epsilon = torch.finfo(pk.dtype).eps
|
|
numerator = (detections @ (pk - nk)) + nk.sum(1)
|
|
probs = numerator / (pk + nk).sum(1)
|
|
probs = probs.squeeze(0)
|
|
return probs
|
|
|
|
|
|
class CBC(SiameseGLVQ):
|
|
"""Classification-By-Components."""
|
|
def __init__(self,
|
|
hparams,
|
|
margin=0.1,
|
|
similarity=euclidean_similarity,
|
|
**kwargs):
|
|
super().__init__(hparams, **kwargs)
|
|
self.margin = margin
|
|
self.similarity_fn = kwargs.get("similarity_fn", euclidean_similarity)
|
|
num_components = self.components.shape[0]
|
|
self.reasoning_layer = ReasoningLayer(num_components=num_components,
|
|
num_classes=self.num_classes)
|
|
self.component_layer = self.proto_layer
|
|
|
|
@property
|
|
def components(self):
|
|
return self.prototypes
|
|
|
|
@property
|
|
def reasonings(self):
|
|
return self.reasoning_layer.reasonings.cpu()
|
|
|
|
def forward(self, x):
|
|
components, _ = self.component_layer()
|
|
latent_x = self.backbone(x)
|
|
self.backbone.requires_grad_(self.both_path_gradients)
|
|
latent_components = self.backbone(components)
|
|
self.backbone.requires_grad_(True)
|
|
detections = self.similarity_fn(latent_x, latent_components)
|
|
probs = self.reasoning_layer(detections)
|
|
return probs
|
|
|
|
def shared_step(self, batch, batch_idx, optimizer_idx=None):
|
|
x, y = batch
|
|
# x = x.view(x.size(0), -1)
|
|
y_pred = self(x)
|
|
nclasses = self.reasoning_layer.num_classes
|
|
y_true = torch.nn.functional.one_hot(y.long(), num_classes=nclasses)
|
|
loss = MarginLoss(self.margin)(y_pred, y_true).mean(dim=0)
|
|
return y_pred, loss
|
|
|
|
def training_step(self, batch, batch_idx, optimizer_idx=None):
|
|
y_pred, train_loss = self.shared_step(batch, batch_idx, optimizer_idx)
|
|
preds = torch.argmax(y_pred, dim=1)
|
|
self.acc_metric(preds.int(), batch[1].int())
|
|
self.log("train_acc",
|
|
self.acc_metric,
|
|
on_step=False,
|
|
on_epoch=True,
|
|
prog_bar=True,
|
|
logger=True)
|
|
return train_loss
|
|
|
|
def predict(self, x):
|
|
with torch.no_grad():
|
|
y_pred = self(x)
|
|
y_pred = torch.argmax(y_pred, dim=1)
|
|
return y_pred
|
|
|
|
|
|
class ImageCBC(CBC):
|
|
"""CBC model that constrains the components to the range [0, 1] by
|
|
clamping after updates.
|
|
"""
|
|
def on_train_batch_end(self, outputs, batch, batch_idx, dataloader_idx):
|
|
# super().on_train_batch_end(outputs, batch, batch_idx, dataloader_idx)
|
|
self.component_layer.components.data.clamp_(0.0, 1.0)
|