291 lines
8.8 KiB
Python
291 lines
8.8 KiB
Python
"""
|
|
Proto Y Architecture
|
|
|
|
Network architecture for Component based Learning.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import asdict, dataclass
|
|
from typing import Any, Callable
|
|
|
|
import pytorch_lightning as pl
|
|
import torch
|
|
from torchmetrics import Metric
|
|
|
|
|
|
class Steps(enumerate):
|
|
TRAINING = "training"
|
|
VALIDATION = "validation"
|
|
TEST = "test"
|
|
PREDICT = "predict"
|
|
|
|
|
|
class BaseYArchitecture(pl.LightningModule):
|
|
|
|
@dataclass
|
|
class HyperParameters:
|
|
"""
|
|
Add all hyperparameters in the inherited class.
|
|
"""
|
|
...
|
|
|
|
# Fields
|
|
registered_metrics: dict[str, dict[type[Metric], Metric]] = {
|
|
Steps.TRAINING: {},
|
|
Steps.VALIDATION: {},
|
|
Steps.TEST: {},
|
|
}
|
|
registered_metric_callbacks: dict[str, dict[type[Metric],
|
|
set[Callable]]] = {
|
|
Steps.TRAINING: {},
|
|
Steps.VALIDATION: {},
|
|
Steps.TEST: {},
|
|
}
|
|
|
|
# Type Hints for Necessary Fields
|
|
components_layer: torch.nn.Module
|
|
|
|
def __init__(self, hparams) -> None:
|
|
if isinstance(hparams, dict):
|
|
self.save_hyperparameters(hparams)
|
|
# TODO: => Move into Component Child
|
|
del hparams["initialized_proto_shape"]
|
|
hparams = self.HyperParameters(**hparams)
|
|
else:
|
|
hparams_dict = asdict(hparams)
|
|
hparams_dict["component_initializer"] = None
|
|
self.save_hyperparameters(hparams_dict, )
|
|
|
|
super().__init__()
|
|
|
|
# Common Steps
|
|
self.init_components(hparams)
|
|
self.init_backbone(hparams)
|
|
self.init_comparison(hparams)
|
|
self.init_competition(hparams)
|
|
|
|
# Train Steps
|
|
self.init_loss(hparams)
|
|
|
|
# Inference Steps
|
|
self.init_inference(hparams)
|
|
|
|
# external API
|
|
def get_competition(self, batch, components):
|
|
'''
|
|
Returns the output of the competition layer.
|
|
'''
|
|
latent_batch, latent_components = self.backbone(batch, components)
|
|
# TODO: => Latent Hook
|
|
comparison_tensor = self.comparison(latent_batch, latent_components)
|
|
# TODO: => Comparison Hook
|
|
return comparison_tensor
|
|
|
|
def forward(self, batch):
|
|
'''
|
|
Returns the prediction.
|
|
'''
|
|
if isinstance(batch, torch.Tensor):
|
|
batch = (batch, None)
|
|
# TODO: manage different datatypes?
|
|
components = self.components_layer()
|
|
# TODO: => Component Hook
|
|
comparison_tensor = self.get_competition(batch, components)
|
|
# TODO: => Competition Hook
|
|
return self.inference(comparison_tensor, components)
|
|
|
|
def predict(self, batch):
|
|
"""
|
|
Alias for forward
|
|
"""
|
|
return self.forward(batch)
|
|
|
|
def forward_comparison(self, batch):
|
|
'''
|
|
Returns the Output of the comparison layer.
|
|
'''
|
|
if isinstance(batch, torch.Tensor):
|
|
batch = (batch, None)
|
|
# TODO: manage different datatypes?
|
|
components = self.components_layer()
|
|
# TODO: => Component Hook
|
|
return self.get_competition(batch, components)
|
|
|
|
def loss_forward(self, batch):
|
|
'''
|
|
Returns the output of the loss layer.
|
|
'''
|
|
# TODO: manage different datatypes?
|
|
components = self.components_layer()
|
|
# TODO: => Component Hook
|
|
comparison_tensor = self.get_competition(batch, components)
|
|
# TODO: => Competition Hook
|
|
return self.loss(comparison_tensor, batch, components)
|
|
|
|
# Empty Initialization
|
|
def init_components(self, hparams: HyperParameters) -> None:
|
|
"""
|
|
All initialization necessary for the components step.
|
|
"""
|
|
|
|
def init_backbone(self, hparams: HyperParameters) -> None:
|
|
"""
|
|
All initialization necessary for the backbone step.
|
|
"""
|
|
|
|
def init_comparison(self, hparams: HyperParameters) -> None:
|
|
"""
|
|
All initialization necessary for the comparison step.
|
|
"""
|
|
|
|
def init_competition(self, hparams: HyperParameters) -> None:
|
|
"""
|
|
All initialization necessary for the competition step.
|
|
"""
|
|
|
|
def init_loss(self, hparams: HyperParameters) -> None:
|
|
"""
|
|
All initialization necessary for the loss step.
|
|
"""
|
|
|
|
def init_inference(self, hparams: HyperParameters) -> None:
|
|
"""
|
|
All initialization necessary for the inference step.
|
|
"""
|
|
|
|
# Empty Steps
|
|
def components(self):
|
|
"""
|
|
This step has no input.
|
|
|
|
It returns the components.
|
|
"""
|
|
raise NotImplementedError(
|
|
"The components step has no reasonable default.")
|
|
|
|
def backbone(self, batch, components):
|
|
"""
|
|
The backbone step receives the data batch and the components.
|
|
It can transform both by an arbitrary function.
|
|
|
|
It returns the transformed batch and components,
|
|
each of the same length as the original input.
|
|
"""
|
|
return batch, components
|
|
|
|
def comparison(self, batch, components):
|
|
"""
|
|
Takes a batch of size N and the component set of size M.
|
|
|
|
It returns an NxMxD tensor containing D (usually 1) pairwise comparison measures.
|
|
"""
|
|
raise NotImplementedError(
|
|
"The comparison step has no reasonable default.")
|
|
|
|
def competition(self, comparison_measures, components):
|
|
"""
|
|
Takes the tensor of comparison measures.
|
|
|
|
Assigns a competition vector to each class.
|
|
"""
|
|
raise NotImplementedError(
|
|
"The competition step has no reasonable default.")
|
|
|
|
def loss(self, comparison_measures, batch, components):
|
|
"""
|
|
Takes the tensor of competition measures.
|
|
|
|
Calculates a single loss value
|
|
"""
|
|
raise NotImplementedError("The loss step has no reasonable default.")
|
|
|
|
def inference(self, comparison_measures, components):
|
|
"""
|
|
Takes the tensor of competition measures.
|
|
|
|
Returns the inferred vector.
|
|
"""
|
|
raise NotImplementedError(
|
|
"The inference step has no reasonable default.")
|
|
|
|
# Y Architecture Hooks
|
|
|
|
# internal API, called by models and callbacks
|
|
def register_torchmetric(
|
|
self,
|
|
name: Callable,
|
|
metric: type[Metric],
|
|
step: str = Steps.TRAINING,
|
|
**metric_kwargs,
|
|
):
|
|
'''
|
|
Register a callback for evaluating a torchmetric.
|
|
'''
|
|
if step == Steps.PREDICT:
|
|
raise ValueError("Prediction metrics are not supported.")
|
|
|
|
if metric not in self.registered_metrics:
|
|
self.registered_metrics[step][metric] = metric(**metric_kwargs)
|
|
self.registered_metric_callbacks[step][metric] = {name}
|
|
else:
|
|
self.registered_metric_callbacks[step][metric].add(name)
|
|
|
|
def update_metrics_step(self, batch, step):
|
|
# Prediction Metrics
|
|
preds = self(batch)
|
|
|
|
_, y = batch
|
|
for metric in self.registered_metrics[step]:
|
|
instance = self.registered_metrics[step][metric].to(self.device)
|
|
instance(y, preds)
|
|
|
|
def update_metrics_epoch(self, step):
|
|
for metric in self.registered_metrics[step]:
|
|
instance = self.registered_metrics[step][metric].to(self.device)
|
|
value = instance.compute()
|
|
|
|
for callback in self.registered_metric_callbacks[step][metric]:
|
|
callback(value, self)
|
|
|
|
instance.reset()
|
|
|
|
# Lightning steps
|
|
# -------------------------------------------------------------------------
|
|
# >>>> Training
|
|
def training_step(self, batch, batch_idx, optimizer_idx=None):
|
|
self.update_metrics_step(batch, Steps.TRAINING)
|
|
|
|
return self.loss_forward(batch)
|
|
|
|
def training_epoch_end(self, outputs) -> None:
|
|
self.update_metrics_epoch(Steps.TRAINING)
|
|
|
|
# >>>> Validation
|
|
def validation_step(self, batch, batch_idx):
|
|
self.update_metrics_step(batch, Steps.VALIDATION)
|
|
|
|
return self.loss_forward(batch)
|
|
|
|
def validation_epoch_end(self, outputs) -> None:
|
|
self.update_metrics_epoch(Steps.VALIDATION)
|
|
|
|
# >>>> Test
|
|
def test_step(self, batch, batch_idx):
|
|
self.update_metrics_step(batch, Steps.TEST)
|
|
return self.loss_forward(batch)
|
|
|
|
def test_epoch_end(self, outputs) -> None:
|
|
self.update_metrics_epoch(Steps.TEST)
|
|
|
|
# >>>> Prediction
|
|
def predict_step(self, batch, batch_idx, dataloader_idx=0):
|
|
return self.predict(batch)
|
|
|
|
# Check points
|
|
def on_save_checkpoint(self, checkpoint: dict[str, Any]) -> None:
|
|
# Compatible with Lightning
|
|
checkpoint["hyper_parameters"] = {
|
|
'hparams': checkpoint["hyper_parameters"]
|
|
}
|
|
return super().on_save_checkpoint(checkpoint)
|