138 lines
5.0 KiB
Python
138 lines
5.0 KiB
Python
"""Lightning Callbacks."""
|
|
|
|
import logging
|
|
|
|
import pytorch_lightning as pl
|
|
import torch
|
|
|
|
from ..core.components import Components
|
|
from ..core.initializers import LiteralCompInitializer
|
|
from .extras import ConnectionTopology
|
|
|
|
|
|
class PruneLoserPrototypes(pl.Callback):
|
|
def __init__(self,
|
|
threshold=0.01,
|
|
idle_epochs=10,
|
|
prune_quota_per_epoch=-1,
|
|
frequency=1,
|
|
replace=False,
|
|
prototypes_initializer=None,
|
|
verbose=False):
|
|
self.threshold = threshold # minimum win ratio
|
|
self.idle_epochs = idle_epochs # epochs to wait before pruning
|
|
self.prune_quota_per_epoch = prune_quota_per_epoch
|
|
self.frequency = frequency
|
|
self.replace = replace
|
|
self.verbose = verbose
|
|
self.prototypes_initializer = prototypes_initializer
|
|
|
|
def on_epoch_end(self, trainer, pl_module):
|
|
if (trainer.current_epoch + 1) < self.idle_epochs:
|
|
return None
|
|
if (trainer.current_epoch + 1) % self.frequency:
|
|
return None
|
|
|
|
ratios = pl_module.prototype_win_ratios.mean(dim=0)
|
|
to_prune = torch.arange(len(ratios))[ratios < self.threshold]
|
|
to_prune = to_prune.tolist()
|
|
prune_labels = pl_module.prototype_labels[to_prune]
|
|
if self.prune_quota_per_epoch > 0:
|
|
to_prune = to_prune[:self.prune_quota_per_epoch]
|
|
prune_labels = prune_labels[:self.prune_quota_per_epoch]
|
|
|
|
if len(to_prune) > 0:
|
|
if self.verbose:
|
|
print(f"\nPrototype win ratios: {ratios}")
|
|
print(f"Pruning prototypes at: {to_prune}")
|
|
print(f"Corresponding labels are: {prune_labels.tolist()}")
|
|
cur_num_protos = pl_module.num_prototypes
|
|
pl_module.remove_prototypes(indices=to_prune)
|
|
if self.replace:
|
|
labels, counts = torch.unique(prune_labels,
|
|
sorted=True,
|
|
return_counts=True)
|
|
distribution = dict(zip(labels.tolist(), counts.tolist()))
|
|
if self.verbose:
|
|
print(f"Re-adding pruned prototypes...")
|
|
print(f"{distribution=}")
|
|
pl_module.add_prototypes(
|
|
distribution=distribution,
|
|
components_initializer=self.prototypes_initializer)
|
|
new_num_protos = pl_module.num_prototypes
|
|
if self.verbose:
|
|
print(f"`num_prototypes` changed from {cur_num_protos} "
|
|
f"to {new_num_protos}.")
|
|
return True
|
|
|
|
|
|
class PrototypeConvergence(pl.Callback):
|
|
def __init__(self, min_delta=0.01, idle_epochs=10, verbose=False):
|
|
self.min_delta = min_delta
|
|
self.idle_epochs = idle_epochs # epochs to wait
|
|
self.verbose = verbose
|
|
|
|
def on_epoch_end(self, trainer, pl_module):
|
|
if (trainer.current_epoch + 1) < self.idle_epochs:
|
|
return None
|
|
if self.verbose:
|
|
print("Stopping...")
|
|
# TODO
|
|
return True
|
|
|
|
|
|
class GNGCallback(pl.Callback):
|
|
"""GNG Callback.
|
|
|
|
Applies growing algorithm based on accumulated error and topology.
|
|
|
|
Based on "A Growing Neural Gas Network Learns Topologies" by Bernd Fritzke.
|
|
|
|
"""
|
|
def __init__(self, reduction=0.1, freq=10):
|
|
self.reduction = reduction
|
|
self.freq = freq
|
|
|
|
def on_epoch_end(self, trainer: pl.Trainer, pl_module):
|
|
if (trainer.current_epoch + 1) % self.freq == 0:
|
|
# Get information
|
|
errors = pl_module.errors
|
|
topology: ConnectionTopology = pl_module.topology_layer
|
|
components: Components = pl_module.proto_layer.components
|
|
|
|
# Insertion point
|
|
worst = torch.argmax(errors)
|
|
|
|
neighbors = topology.get_neighbors(worst)[0]
|
|
|
|
if len(neighbors) == 0:
|
|
logging.log(level=20, msg="No neighbor-pairs found!")
|
|
return
|
|
|
|
neighbors_errors = errors[neighbors]
|
|
worst_neighbor = neighbors[torch.argmax(neighbors_errors)]
|
|
|
|
# New Prototype
|
|
new_component = 0.5 * (components[worst] +
|
|
components[worst_neighbor])
|
|
|
|
# Add component
|
|
pl_module.proto_layer.add_components(
|
|
None,
|
|
initializer=LiteralCompInitializer(new_component.unsqueeze(0)))
|
|
|
|
# Adjust Topology
|
|
topology.add_prototype()
|
|
topology.add_connection(worst, -1)
|
|
topology.add_connection(worst_neighbor, -1)
|
|
topology.remove_connection(worst, worst_neighbor)
|
|
|
|
# New errors
|
|
worst_error = errors[worst].unsqueeze(0)
|
|
pl_module.errors = torch.cat([pl_module.errors, worst_error])
|
|
pl_module.errors[worst] = errors[worst] * self.reduction
|
|
pl_module.errors[
|
|
worst_neighbor] = errors[worst_neighbor] * self.reduction
|
|
|
|
trainer.accelerator_backend.setup_optimizers(trainer)
|