feat(model): implement MedianLVQ
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
"""LVQ models that are optimized using non-gradient methods."""
|
||||
|
||||
from ..core.losses import _get_dp_dm
|
||||
from ..nn.activations import get_activation
|
||||
from ..nn.wrappers import LambdaLayer
|
||||
from .abstract import NonGradientMixin
|
||||
from .glvq import GLVQ
|
||||
|
||||
@@ -66,4 +68,61 @@ class LVQ21(NonGradientMixin, GLVQ):
|
||||
|
||||
|
||||
class MedianLVQ(NonGradientMixin, GLVQ):
|
||||
"""Median LVQ"""
|
||||
"""Median LVQ
|
||||
|
||||
# TODO Avoid computing distances over and over
|
||||
|
||||
"""
|
||||
def __init__(self, hparams, verbose=True, **kwargs):
|
||||
self.verbose = verbose
|
||||
super().__init__(hparams, **kwargs)
|
||||
|
||||
self.transfer_layer = LambdaLayer(
|
||||
get_activation(self.hparams.transfer_fn))
|
||||
|
||||
def _f(self, x, y, protos, plabels):
|
||||
d = self.distance_layer(x, protos)
|
||||
dp, dm = _get_dp_dm(d, y, plabels)
|
||||
mu = (dp - dm) / (dp + dm)
|
||||
invmu = -1.0 * mu
|
||||
f = self.transfer_layer(invmu, beta=self.hparams.transfer_beta) + 1.0
|
||||
return f
|
||||
|
||||
def expectation(self, x, y, protos, plabels):
|
||||
f = self._f(x, y, protos, plabels)
|
||||
gamma = f / f.sum()
|
||||
return gamma
|
||||
|
||||
def lower_bound(self, x, y, protos, plabels, gamma):
|
||||
f = self._f(x, y, protos, plabels)
|
||||
lower_bound = (gamma * f.log()).sum()
|
||||
return lower_bound
|
||||
|
||||
def training_step(self, train_batch, batch_idx, optimizer_idx=None):
|
||||
protos = self.proto_layer.components
|
||||
plabels = self.proto_layer.labels
|
||||
|
||||
x, y = train_batch
|
||||
dis = self.compute_distances(x)
|
||||
|
||||
for i, _ in enumerate(protos):
|
||||
# Expectation step
|
||||
gamma = self.expectation(x, y, protos, plabels)
|
||||
lower_bound = self.lower_bound(x, y, protos, plabels, gamma)
|
||||
|
||||
# Maximization step
|
||||
_protos = protos + 0
|
||||
for k, xk in enumerate(x):
|
||||
_protos[i] = xk
|
||||
_lower_bound = self.lower_bound(x, y, _protos, plabels, gamma)
|
||||
if _lower_bound > lower_bound:
|
||||
if self.verbose:
|
||||
print(f"Updating prototype {i} to data {k}...")
|
||||
self.proto_layer.load_state_dict({"_components": _protos},
|
||||
strict=False)
|
||||
break
|
||||
|
||||
# Logging
|
||||
self.log_acc(dis, y, tag="train_acc")
|
||||
|
||||
return None
|
||||
|
Reference in New Issue
Block a user