36 Commits

Author SHA1 Message Date
Jensun Ravichandran
17b45249f4 Add scaffolding 2021-06-08 01:13:59 +02:00
Jensun Ravichandran
4f1c879528 [BUGFIX] Update unit tests 2021-06-04 22:29:30 +02:00
Jensun Ravichandran
2272c55092 [BUGFIX] Fix typo 2021-06-04 22:24:42 +02:00
Jensun Ravichandran
b03c9b1d3c Add competition and pooling modules 2021-06-04 22:18:46 +02:00
Jensun Ravichandran
0c28eda706 [FEATURE] Remove utility modules and add wrappers instead 2021-06-04 22:16:55 +02:00
Jensun Ravichandran
7bc0bfa3ab Rename loss functions 2021-06-04 22:15:57 +02:00
Jensun Ravichandran
827958a28a [FEATURE] Optional transforms in DataAwareInitializers 2021-06-04 22:14:45 +02:00
Jensun Ravichandran
8200e1d3d8 [FEATURE] Allow initialized_components to be a dataset 2021-06-04 22:13:36 +02:00
Jensun Ravichandran
729b20e9ab [FEATURE] Add scale to random initializer 2021-06-03 16:35:44 +02:00
Alexander Engelsberger
ca8ac7a43b [REFACTOR] Probabilistic losses 2021-06-03 14:01:13 +02:00
Alexander Engelsberger
b724a28a6f [BUGFIX] Stratified functions work on GPU now 2021-06-03 13:19:26 +02:00
Jensun Ravichandran
1e0a8392a2 [QA] Fix for "redefined-builtin" (W0622) 2021-06-02 00:07:44 +02:00
Jensun Ravichandran
2eb7b05653 [FEATURE] Add wrappers for more sklearn datasets 2021-06-01 23:33:51 +02:00
Jensun Ravichandran
d8a0b2dfcc Minor tweaks 2021-06-01 23:28:01 +02:00
Jensun Ravichandran
2a7394b593 [QA] Remove commented-out torch.jit.script decorators 2021-06-01 19:46:21 +02:00
Jensun Ravichandran
b1e64c8b8b [QA] Remove utils.py 2021-06-01 19:41:48 +02:00
Jensun Ravichandran
70cf17607e [BUGFIX] Fix broken _precheck_initializer 2021-06-01 19:41:21 +02:00
Jensun Ravichandran
b1568a550a [QA] Fix for "no-self-use" (R0201) 2021-06-01 19:26:05 +02:00
Jensun Ravichandran
e8e803e8ef [QA] Fix for "dangerous-default-value" (W0102) 2021-06-01 19:24:00 +02:00
Jensun Ravichandran
2c453265fe [QA] Remove duplicate headings 2021-06-01 19:18:37 +02:00
Jensun Ravichandran
7336d35fee [QA] Fix "dangerous-default-value" (W0102) 2021-06-01 19:15:06 +02:00
Jensun Ravichandran
bc18952c05 [QA] Fix "dangerous-default-value" (W0102) 2021-06-01 19:10:53 +02:00
Jensun Ravichandran
8e8d0b9c2c [QA] Fix "list-item-bullet-indent" 2021-06-01 19:08:37 +02:00
Jensun Ravichandran
5a7da2b40b [QA] Fix for "no-value-for-parameter" (E1120) 2021-06-01 19:03:57 +02:00
Jensun Ravichandran
b6d38f442b [QA] Remove trailing whitespace 2021-06-01 19:01:20 +02:00
Jensun Ravichandran
8e8851d962 Dynamically remove components 2021-06-01 18:45:47 +02:00
Jensun Ravichandran
27b43b06a7 Rename functions/transform.py -> functions/transforms.py 2021-06-01 17:43:23 +02:00
Jensun Ravichandran
ff69eb1256 Tecator.data is a Tensor and Tecator.targets is a LongTensor 2021-06-01 17:28:37 +02:00
Alexander Engelsberger
4ca581909a [FEATURE] Change NumpyDataset.data to torch.Tensor 2021-06-01 17:17:42 +02:00
Alexander Engelsberger
2722d976f5 [WIP] Add Growing Neural Gas Energy 2021-06-01 17:16:26 +02:00
Jensun Ravichandran
946cda00d2 Add more competition functions 2021-06-01 12:37:21 +02:00
Jensun Ravichandran
8227525c82 Add LambdaLayer 2021-05-31 16:47:20 +02:00
Jensun Ravichandran
e61ae73749 Make components dynamic 2021-05-31 00:31:40 +02:00
Alexander Engelsberger
040d1ee9e8 Add probabilistic losses
Based on Soft LVQ paper by Seo and Obermayer
2021-05-28 20:38:50 +02:00
Alexander Engelsberger
7f0da894fa Add transformation from distances into gaussian distribution 2021-05-28 16:50:04 +02:00
Alexander Engelsberger
62726df278 Add stratified sum as competition
For example used in RSLVQ
2021-05-28 16:49:39 +02:00
25 changed files with 776 additions and 458 deletions

View File

@@ -23,9 +23,9 @@ A clear and concise description of what you expected to happen.
If applicable, add screenshots to help explain your problem.
**Desktop (please complete the following information):**
- OS: [e.g. Ubuntu 20.10]
- Prototorch Version: [e.g. v0.4.0]
- Python Version: [e.g. 3.9.5]
- OS: [e.g. Ubuntu 20.10]
- Prototorch Version: [e.g. v0.4.0]
- Python Version: [e.g. 3.9.5]
**Additional context**
Add any other context about the problem here.

View File

@@ -2,17 +2,15 @@
## Release 0.5.0
- Breaking: Removed deprecated `prototorch.modules.Prototypes1D`
- Use `prototorch.components.LabeledComponents` instead
- Breaking: Removed deprecated `prototorch.modules.Prototypes1D`.
- Use `prototorch.components.LabeledComponents` instead.
## Release 0.2.0
### Includes
- Fixes in example scripts.
## Release 0.1.1-dev0
### Includes
- Minor bugfixes.
- 100% line coverage.

View File

@@ -11,6 +11,36 @@ from prototorch.components.initializers import (ClassAwareInitializer,
ZeroReasoningsInitializer)
from torch.nn.parameter import Parameter
from .initializers import parse_data_arg
def get_labels_object(distribution):
if isinstance(distribution, dict):
if "num_classes" in distribution.keys():
labels = EqualLabelsInitializer(
distribution["num_classes"],
distribution["prototypes_per_class"])
else:
labels = CustomLabelsInitializer(distribution)
elif isinstance(distribution, tuple):
num_classes, prototypes_per_class = distribution
labels = EqualLabelsInitializer(num_classes, prototypes_per_class)
elif isinstance(distribution, list):
labels = UnequalLabelsInitializer(distribution)
else:
msg = f"`distribution` not understood." \
f"You have provided: {distribution=}."
raise ValueError(msg)
return labels
def _precheck_initializer(initializer):
if not isinstance(initializer, ComponentsInitializer):
emsg = f"`initializer` has to be some subtype of " \
f"{ComponentsInitializer}. " \
f"You have provided: {initializer=} instead."
raise TypeError(emsg)
class Components(torch.nn.Module):
"""Components is a set of learnable Tensors."""
@@ -21,29 +51,46 @@ class Components(torch.nn.Module):
initialized_components=None):
super().__init__()
self.num_components = num_components
# Ignore all initialization settings if initialized_components is given.
if initialized_components is not None:
self.register_parameter("_components",
Parameter(initialized_components))
self._register_components(initialized_components)
if num_components is not None or initializer is not None:
wmsg = "Arguments ignored while initializing Components"
warnings.warn(wmsg)
else:
self._initialize_components(initializer)
self._initialize_components(num_components, initializer)
def _precheck_initializer(self, initializer):
if not isinstance(initializer, ComponentsInitializer):
emsg = f"`initializer` has to be some subtype of " \
f"{ComponentsInitializer}. " \
f"You have provided: {initializer=} instead."
raise TypeError(emsg)
@property
def num_components(self):
return len(self._components)
def _initialize_components(self, initializer):
self._precheck_initializer(initializer)
_components = initializer.generate(self.num_components)
self.register_parameter("_components", Parameter(_components))
def _register_components(self, components):
self.register_parameter("_components", Parameter(components))
def _initialize_components(self, num_components, initializer):
_precheck_initializer(initializer)
_components = initializer.generate(num_components)
self._register_components(_components)
def add_components(self,
num=1,
initializer=None,
*,
initialized_components=None):
if initialized_components is not None:
_components = torch.cat([self._components, initialized_components])
else:
_precheck_initializer(initializer)
_new = initializer.generate(num)
_components = torch.cat([self._components, _new])
self._register_components(_components)
def remove_components(self, indices=None):
mask = torch.ones(self.num_components, dtype=torch.bool)
mask[indices] = False
_components = self._components[mask]
self._register_components(_components)
return mask
@property
def components(self):
@@ -54,7 +101,7 @@ class Components(torch.nn.Module):
return self._components
def extra_repr(self):
return f"components.shape: {tuple(self._components.shape)}"
return f"(components): (shape: {tuple(self._components.shape)})"
class LabeledComponents(Components):
@@ -68,39 +115,60 @@ class LabeledComponents(Components):
*,
initialized_components=None):
if initialized_components is not None:
components, component_labels = initialized_components
components, component_labels = parse_data_arg(
initialized_components)
super().__init__(initialized_components=components)
self._labels = component_labels
else:
_labels = self._initialize_labels(distribution)
labels = get_labels_object(distribution)
self.initial_distribution = labels.distribution
_labels = labels.generate()
super().__init__(len(_labels), initializer=initializer)
self.register_buffer("_labels", _labels)
self._register_labels(_labels)
def _initialize_components(self, initializer):
def _register_labels(self, labels):
self.register_buffer("_labels", labels)
@property
def distribution(self):
clabels, counts = torch.unique(self._labels,
sorted=True,
return_counts=True)
return dict(zip(clabels.tolist(), counts.tolist()))
def _initialize_components(self, num_components, initializer):
if isinstance(initializer, ClassAwareInitializer):
self._precheck_initializer(initializer)
_components = initializer.generate(self.num_components,
self.distribution)
self.register_parameter("_components", Parameter(_components))
_precheck_initializer(initializer)
_components = initializer.generate(num_components,
self.initial_distribution)
self._register_components(_components)
else:
super()._initialize_components(initializer)
super()._initialize_components(num_components, initializer)
def _initialize_labels(self, distribution):
if type(distribution) == dict:
if "num_classes" in distribution.keys():
labels = EqualLabelsInitializer(
distribution["num_classes"],
distribution["prototypes_per_class"])
else:
labels = CustomLabelsInitializer(distribution)
elif type(distribution) == tuple:
num_classes, prototypes_per_class = distribution
labels = EqualLabelsInitializer(num_classes, prototypes_per_class)
elif type(distribution) == list:
labels = UnequalLabelsInitializer(distribution)
def add_components(self, distribution, initializer):
_precheck_initializer(initializer)
self.distribution = labels.distribution
return labels.generate()
# Labels
labels = get_labels_object(distribution)
new_labels = labels.generate()
_labels = torch.cat([self._labels, new_labels])
self._register_labels(_labels)
# Components
if isinstance(initializer, ClassAwareInitializer):
_new = initializer.generate(len(new_labels), labels.distribution)
else:
_new = initializer.generate(len(new_labels))
_components = torch.cat([self._components, _new])
self._register_components(_components)
def remove_components(self, indices=None):
# Components
mask = super().remove_components(indices)
# Labels
_labels = self._labels[mask]
self._register_labels(_labels)
@property
def component_labels(self):
@@ -141,7 +209,7 @@ class ReasoningComponents(Components):
super().__init__(len(self._reasonings), initializer=initializer)
def _initialize_reasonings(self, reasonings):
if type(reasonings) == tuple:
if isinstance(reasonings, tuple):
num_classes, num_components = reasonings
reasonings = ZeroReasoningsInitializer(num_classes, num_components)

View File

@@ -1,4 +1,5 @@
"""ProtoTroch Initializers."""
"""ProtoTroch Component and Label Initializers."""
import warnings
from collections.abc import Iterable
from itertools import chain
@@ -13,21 +14,30 @@ def parse_data_arg(data_arg):
if isinstance(data_arg, DataLoader):
data = torch.tensor([])
labels = torch.tensor([])
targets = torch.tensor([])
for x, y in data_arg:
data = torch.cat([data, x])
labels = torch.cat([labels, y])
targets = torch.cat([targets, y])
else:
data, labels = data_arg
data, targets = data_arg
if not isinstance(data, torch.Tensor):
wmsg = f"Converting data to {torch.Tensor}."
warnings.warn(wmsg)
data = torch.Tensor(data)
if not isinstance(labels, torch.Tensor):
wmsg = f"Converting labels to {torch.Tensor}."
if not isinstance(targets, torch.Tensor):
wmsg = f"Converting targets to {torch.Tensor}."
warnings.warn(wmsg)
labels = torch.Tensor(labels)
return data, labels
targets = torch.Tensor(targets)
return data, targets
def get_subinitializers(data, targets, clabels, subinit_type):
initializers = dict()
for clabel in clabels:
class_data = data[targets == clabel]
class_initializer = subinit_type(class_data)
initializers[clabel] = (class_initializer)
return initializers
# Components
@@ -37,18 +47,22 @@ class ComponentsInitializer(object):
class DimensionAwareInitializer(ComponentsInitializer):
def __init__(self, c_dims):
def __init__(self, dims):
super().__init__()
if isinstance(c_dims, Iterable):
self.components_dims = tuple(c_dims)
if isinstance(dims, Iterable):
self.components_dims = tuple(dims)
else:
self.components_dims = (c_dims, )
self.components_dims = (dims, )
class OnesInitializer(DimensionAwareInitializer):
def __init__(self, dims, scale=1.0):
super().__init__(dims)
self.scale = scale
def generate(self, length):
gen_dims = (length, ) + self.components_dims
return torch.ones(gen_dims)
return torch.ones(gen_dims) * self.scale
class ZerosInitializer(DimensionAwareInitializer):
@@ -58,78 +72,73 @@ class ZerosInitializer(DimensionAwareInitializer):
class UniformInitializer(DimensionAwareInitializer):
def __init__(self, c_dims, min=0.0, max=1.0):
super().__init__(c_dims)
self.min = min
self.max = max
def __init__(self, dims, minimum=0.0, maximum=1.0, scale=1.0):
super().__init__(dims)
self.minimum = minimum
self.maximum = maximum
self.scale = scale
def generate(self, length):
gen_dims = (length, ) + self.components_dims
return torch.ones(gen_dims).uniform_(self.min, self.max)
return torch.ones(gen_dims).uniform_(self.minimum,
self.maximum) * self.scale
class DataAwareInitializer(ComponentsInitializer):
def __init__(self, data):
def __init__(self, data, transform=torch.nn.Identity()):
super().__init__()
self.data = data
self.transform = transform
def __del__(self):
del self.data
class SelectionInitializer(DataAwareInitializer):
def generate(self, length):
indices = torch.LongTensor(length).random_(0, len(self.data))
return self.data[indices]
return self.transform(self.data[indices])
class MeanInitializer(DataAwareInitializer):
def generate(self, length):
mean = torch.mean(self.data, dim=0)
repeat_dim = [length] + [1] * len(mean.shape)
return mean.repeat(repeat_dim)
return self.transform(mean.repeat(repeat_dim))
class ClassAwareInitializer(ComponentsInitializer):
class ClassAwareInitializer(DataAwareInitializer):
def __init__(self, data, transform=torch.nn.Identity()):
super().__init__()
data, labels = parse_data_arg(data)
self.data = data
self.labels = labels
self.transform = transform
self.clabels = torch.unique(self.labels)
data, targets = parse_data_arg(data)
super().__init__(data, transform)
self.targets = targets
self.clabels = torch.unique(self.targets).int().tolist()
self.num_classes = len(self.clabels)
def _get_samples_from_initializer(self, length, dist):
if not dist:
per_class = length // self.num_classes
dist = self.num_classes * [per_class]
if type(dist) == dict:
dist = dist.values()
samples_list = [
init.generate(n) for init, n in zip(self.initializers, dist)
]
out = torch.vstack(samples_list)
dist = dict(zip(self.clabels, self.num_classes * [per_class]))
if isinstance(dist, list):
dist = dict(zip(self.clabels, dist))
samples = [self.initializers[k].generate(n) for k, n in dist.items()]
out = torch.vstack(samples)
with torch.no_grad():
out = self.transform(out)
return out
def __del__(self):
del self.data
del self.labels
del self.targets
class StratifiedMeanInitializer(ClassAwareInitializer):
def __init__(self, data, **kwargs):
super().__init__(data, **kwargs)
self.initializers = get_subinitializers(self.data, self.targets,
self.clabels, MeanInitializer)
self.initializers = []
for clabel in self.clabels:
class_data = self.data[self.labels == clabel]
class_initializer = MeanInitializer(class_data)
self.initializers.append(class_initializer)
def generate(self, length, dist=[]):
def generate(self, length, dist):
samples = self._get_samples_from_initializer(length, dist)
return samples
@@ -138,12 +147,9 @@ class StratifiedSelectionInitializer(ClassAwareInitializer):
def __init__(self, data, noise=None, **kwargs):
super().__init__(data, **kwargs)
self.noise = noise
self.initializers = []
for clabel in self.clabels:
class_data = self.data[self.labels == clabel]
class_initializer = SelectionInitializer(class_data)
self.initializers.append(class_initializer)
self.initializers = get_subinitializers(self.data, self.targets,
self.clabels,
SelectionInitializer)
def add_noise_v1(self, x):
return x + self.noise
@@ -155,7 +161,7 @@ class StratifiedSelectionInitializer(ClassAwareInitializer):
mask = torch.bernoulli(n1) - torch.bernoulli(n2)
return x + (self.noise * mask)
def generate(self, length, dist=[]):
def generate(self, length, dist):
samples = self._get_samples_from_initializer(length, dist)
if self.noise is not None:
samples = self.add_noise_v1(samples)
@@ -181,8 +187,8 @@ class UnequalLabelsInitializer(LabelsInitializer):
clabels = range(len(self.dist))
if not dist:
dist = self.dist
labels = list(chain(*[[i] * n for i, n in zip(clabels, dist)]))
return torch.LongTensor(labels)
targets = list(chain(*[[i] * n for i, n in zip(clabels, dist)]))
return torch.LongTensor(targets)
class EqualLabelsInitializer(LabelsInitializer):

View File

@@ -1,8 +1,6 @@
"""ProtoTorch datasets."""
from .abstract import NumpyDataset
from .iris import Iris
from .sklearn import Blobs, Circles, Iris, Moons, Random
from .spiral import Spiral
from .tecator import Tecator
__all__ = ['Iris', 'Spiral', 'Tecator']

View File

@@ -15,9 +15,9 @@ import torch
class NumpyDataset(torch.utils.data.TensorDataset):
"""Create a PyTorch TensorDataset from NumPy arrays."""
def __init__(self, data, targets):
self.data = data
self.targets = targets
tensors = [torch.Tensor(data), torch.Tensor(targets)]
self.data = torch.Tensor(data)
self.targets = torch.LongTensor(targets)
tensors = [self.data, self.targets]
super().__init__(*tensors)

View File

@@ -1,40 +0,0 @@
"""Thin wrapper for the Iris classification dataset from sklearn.
URL:
https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_iris.html
"""
from typing import Sequence
from prototorch.datasets.abstract import NumpyDataset
from sklearn.datasets import load_iris
class Iris(NumpyDataset):
"""
Iris Dataset by Ronald Fisher introduced in 1936.
The dataset contains four measurements from flowers of three species of iris.
.. list-table:: Iris
:header-rows: 1
* - dimensions
- classes
- training size
- validation size
- test size
* - 4
- 3
- 150
- 0
- 0
:param dims: select a subset of dimensions
"""
def __init__(self, dims: Sequence[int] = None):
x, y = load_iris(return_X_y=True)
if dims:
x = x[:, dims]
super().__init__(x, y)

View File

@@ -0,0 +1,137 @@
"""Thin wrappers for a few scikit-learn datasets.
URL:
https://scikit-learn.org/stable/modules/classes.html#module-sklearn.datasets
"""
import warnings
from typing import Sequence, Union
from prototorch.datasets.abstract import NumpyDataset
from sklearn.datasets import (load_iris, make_blobs, make_circles,
make_classification, make_moons)
class Iris(NumpyDataset):
"""Iris Dataset by Ronald Fisher introduced in 1936.
The dataset contains four measurements from flowers of three species of iris.
.. list-table:: Iris
:header-rows: 1
* - dimensions
- classes
- training size
- validation size
- test size
* - 4
- 3
- 150
- 0
- 0
:param dims: select a subset of dimensions
"""
def __init__(self, dims: Sequence[int] = None):
x, y = load_iris(return_X_y=True)
if dims:
x = x[:, dims]
super().__init__(x, y)
class Blobs(NumpyDataset):
"""Generate isotropic Gaussian blobs for clustering.
Read more at
https://scikit-learn.org/stable/datasets/sample_generators.html#sample-generators.
"""
def __init__(self,
num_samples: int = 300,
num_features: int = 2,
seed: Union[None, int] = 0):
x, y = make_blobs(num_samples,
num_features,
centers=None,
random_state=seed,
shuffle=False)
super().__init__(x, y)
class Random(NumpyDataset):
"""Generate a random n-class classification problem.
Read more at
https://scikit-learn.org/stable/modules/generated/sklearn.datasets.make_classification.html.
Note: n_classes * n_clusters_per_class <= 2**n_informative must satisfy.
"""
def __init__(self,
num_samples: int = 300,
num_features: int = 2,
num_classes: int = 2,
num_clusters: int = 2,
num_informative: Union[None, int] = None,
separation: float = 1.0,
seed: Union[None, int] = 0):
if not num_informative:
import math
num_informative = math.ceil(math.log2(num_classes * num_clusters))
if num_features < num_informative:
warnings.warn("Generating more features than requested.")
num_features = num_informative
x, y = make_classification(num_samples,
num_features,
n_informative=num_informative,
n_redundant=0,
n_classes=num_classes,
n_clusters_per_class=num_clusters,
class_sep=separation,
random_state=seed,
shuffle=False)
super().__init__(x, y)
class Circles(NumpyDataset):
"""Make a large circle containing a smaller circle in 2D.
A simple toy dataset to visualize clustering and classification algorithms.
Read more at
https://scikit-learn.org/stable/modules/generated/sklearn.datasets.make_circles.html
"""
def __init__(self,
num_samples: int = 300,
noise: float = 0.3,
factor: float = 0.8,
seed: Union[None, int] = 0):
x, y = make_circles(num_samples,
noise=noise,
factor=factor,
random_state=seed,
shuffle=False)
super().__init__(x, y)
class Moons(NumpyDataset):
"""Make two interleaving half circles.
A simple toy dataset to visualize clustering and classification algorithms.
Read more at
https://scikit-learn.org/stable/modules/generated/sklearn.datasets.make_moons.html
"""
def __init__(self,
num_samples: int = 300,
noise: float = 0.3,
seed: Union[None, int] = 0):
x, y = make_moons(num_samples,
noise=noise,
random_state=seed,
shuffle=False)
super().__init__(x, y)

View File

@@ -101,12 +101,12 @@ class Tecator(ProtoDataset):
x_train, y_train = f["x_train"], f["y_train"]
x_test, y_test = f["x_test"], f["y_test"]
training_set = [
torch.tensor(x_train, dtype=torch.float32),
torch.tensor(y_train),
torch.Tensor(x_train),
torch.LongTensor(y_train),
]
test_set = [
torch.tensor(x_test, dtype=torch.float32),
torch.tensor(y_test),
torch.Tensor(x_test),
torch.LongTensor(y_test),
]
with open(os.path.join(self.processed_folder, self.training_file),

View File

@@ -2,11 +2,4 @@
from .activations import identity, sigmoid_beta, swish_beta
from .competitions import knnc, wtac
__all__ = [
"identity",
"sigmoid_beta",
"swish_beta",
"knnc",
"wtac",
]
from .pooling import *

View File

@@ -5,17 +5,14 @@ import torch
ACTIVATIONS = dict()
# def register_activation(scriptf):
# ACTIVATIONS[scriptf.name] = scriptf
# return scriptf
def register_activation(function):
def register_activation(fn):
"""Add the activation function to the registry."""
ACTIVATIONS[function.__name__] = function
return function
name = fn.__name__
ACTIVATIONS[name] = fn
return fn
@register_activation
# @torch.jit.script
def identity(x, beta=0.0):
"""Identity activation function.
@@ -29,7 +26,6 @@ def identity(x, beta=0.0):
@register_activation
# @torch.jit.script
def sigmoid_beta(x, beta=10.0):
r"""Sigmoid activation function with scaling.
@@ -44,7 +40,6 @@ def sigmoid_beta(x, beta=10.0):
@register_activation
# @torch.jit.script
def swish_beta(x, beta=10.0):
r"""Swish activation function with scaling.

View File

@@ -3,42 +3,26 @@
import torch
def stratified_min(distances, labels):
clabels = torch.unique(labels, dim=0)
num_classes = clabels.size()[0]
if distances.size()[1] == num_classes:
# skip if only one prototype per class
return distances
batch_size = distances.size()[0]
winning_distances = torch.zeros(num_classes, batch_size)
inf = torch.full_like(distances.T, fill_value=float("inf"))
# distances_to_wpluses = torch.where(matcher, distances, inf)
for i, cl in enumerate(clabels):
# cdists = distances.T[labels == cl]
matcher = torch.eq(labels.unsqueeze(dim=1), cl)
if labels.ndim == 2:
# if the labels are one-hot vectors
matcher = torch.eq(torch.sum(matcher, dim=-1), num_classes)
cdists = torch.where(matcher, distances.T, inf).T
winning_distances[i] = torch.min(cdists, dim=1,
keepdim=True).values.squeeze()
if labels.ndim == 2:
# Transpose to return with `batch_size` first and
# reverse the columns to fix the ordering of the classes
return torch.flip(winning_distances.T, dims=(1, ))
def wtac(distances: torch.Tensor,
labels: torch.LongTensor) -> (torch.LongTensor):
"""Winner-Takes-All-Competition.
return winning_distances.T # return with `batch_size` first
Returns the labels corresponding to the winners.
def wtac(distances, labels):
"""
winning_indices = torch.min(distances, dim=1).indices
winning_labels = labels[winning_indices].squeeze()
return winning_labels
def knnc(distances, labels, k=1):
def knnc(distances: torch.Tensor,
labels: torch.LongTensor,
k: int = 1) -> (torch.LongTensor):
"""K-Nearest-Neighbors-Competition.
Returns the labels corresponding to the winners.
"""
winning_indices = torch.topk(-distances, k=k, dim=1).indices
# winning_labels = torch.mode(labels[winning_indices].squeeze(),
# dim=1).values
winning_labels = torch.mode(labels[winning_indices], dim=1).values
return winning_labels

View File

@@ -57,3 +57,38 @@ def lvq21_loss(distances, target_labels, prototype_labels):
mu = dp - dm
return mu
# Probabilistic
def _get_class_probabilities(probabilities, targets, prototype_labels):
# Create Label Mapping
uniques = prototype_labels.unique(sorted=True).tolist()
key_val = {key: val for key, val in zip(uniques, range(len(uniques)))}
target_indices = torch.LongTensor(list(map(key_val.get, targets.tolist())))
whole = probabilities.sum(dim=1)
correct = probabilities[torch.arange(len(probabilities)), target_indices]
wrong = whole - correct
return whole, correct, wrong
def nllr_loss(probabilities, targets, prototype_labels):
"""Compute the Negative Log-Likelihood Ratio loss."""
_, correct, wrong = _get_class_probabilities(probabilities, targets,
prototype_labels)
likelihood = correct / wrong
log_likelihood = torch.log(likelihood)
return -1.0 * log_likelihood
def rslvq_loss(probabilities, targets, prototype_labels):
"""Compute the Robust Soft Learning Vector Quantization (RSLVQ) loss."""
whole, correct, _ = _get_class_probabilities(probabilities, targets,
prototype_labels)
likelihood = correct / whole
log_likelihood = torch.log(likelihood)
return -1.0 * log_likelihood

View File

@@ -0,0 +1,80 @@
"""ProtoTorch pooling functions."""
from typing import Callable
import torch
def stratify_with(values: torch.Tensor,
labels: torch.LongTensor,
fn: Callable,
fill_value: float = 0.0) -> (torch.Tensor):
"""Apply an arbitrary stratification strategy on the columns on `values`.
The outputs correspond to sorted labels.
"""
clabels = torch.unique(labels, dim=0, sorted=True)
num_classes = clabels.size()[0]
if values.size()[1] == num_classes:
# skip if stratification is trivial
return values
batch_size = values.size()[0]
winning_values = torch.zeros(num_classes, batch_size, device=labels.device)
filler = torch.full_like(values.T, fill_value=fill_value)
for i, cl in enumerate(clabels):
matcher = torch.eq(labels.unsqueeze(dim=1), cl)
if labels.ndim == 2:
# if the labels are one-hot vectors
matcher = torch.eq(torch.sum(matcher, dim=-1), num_classes)
cdists = torch.where(matcher, values.T, filler).T
winning_values[i] = fn(cdists)
if labels.ndim == 2:
# Transpose to return with `batch_size` first and
# reverse the columns to fix the ordering of the classes
return torch.flip(winning_values.T, dims=(1, ))
return winning_values.T # return with `batch_size` first
def stratified_sum_pooling(values: torch.Tensor,
labels: torch.LongTensor) -> (torch.Tensor):
"""Group-wise sum."""
winning_values = stratify_with(
values,
labels,
fn=lambda x: torch.sum(x, dim=1, keepdim=True).squeeze(),
fill_value=0.0)
return winning_values
def stratified_min_pooling(values: torch.Tensor,
labels: torch.LongTensor) -> (torch.Tensor):
"""Group-wise minimum."""
winning_values = stratify_with(
values,
labels,
fn=lambda x: torch.min(x, dim=1, keepdim=True).values.squeeze(),
fill_value=float("inf"))
return winning_values
def stratified_max_pooling(values: torch.Tensor,
labels: torch.LongTensor) -> (torch.Tensor):
"""Group-wise maximum."""
winning_values = stratify_with(
values,
labels,
fn=lambda x: torch.max(x, dim=1, keepdim=True).values.squeeze(),
fill_value=-1.0 * float("inf"))
return winning_values
def stratified_prod_pooling(values: torch.Tensor,
labels: torch.LongTensor) -> (torch.Tensor):
"""Group-wise maximum."""
winning_values = stratify_with(
values,
labels,
fn=lambda x: torch.prod(x, dim=1, keepdim=True).squeeze(),
fill_value=1.0)
return winning_values

View File

@@ -0,0 +1,5 @@
import torch
def gaussian(distance, variance):
return torch.exp(-(distance * distance) / (2 * variance))

View File

@@ -1 +1,7 @@
"""ProtoTorch modules."""
from .competitions import *
from .initializers import *
from .pooling import *
from .transformations import *
from .wrappers import LambdaLayer, LossLayer

View File

@@ -0,0 +1,41 @@
"""ProtoTorch Competition Modules."""
import torch
from prototorch.functions.competitions import knnc, wtac
class WTAC(torch.nn.Module):
"""Winner-Takes-All-Competition Layer.
Thin wrapper over the `wtac` function.
"""
def forward(self, distances, labels):
return wtac(distances, labels)
class LTAC(torch.nn.Module):
"""Loser-Takes-All-Competition Layer.
Thin wrapper over the `wtac` function.
"""
def forward(self, probs, labels):
return wtac(-1.0 * probs, labels)
class KNNC(torch.nn.Module):
"""K-Nearest-Neighbors-Competition.
Thin wrapper over the `knnc` function.
"""
def __init__(self, k=1, **kwargs):
super().__init__(**kwargs)
self.k = k
def forward(self, distances, labels):
return knnc(distances, labels, k=self.k)
def extra_repr(self):
return f"k: {self.k}"

View File

@@ -0,0 +1,61 @@
"""ProtoTroch Module Initializers."""
import torch
# Transformations
class MatrixInitializer(object):
def __init__(self, *args, **kwargs):
...
def generate(self, shape):
raise NotImplementedError("Subclasses should implement this!")
class ZerosInitializer(MatrixInitializer):
def generate(self, shape):
return torch.zeros(shape)
class OnesInitializer(MatrixInitializer):
def __init__(self, scale=1.0):
super().__init__()
self.scale = scale
def generate(self, shape):
return torch.ones(shape) * self.scale
class UniformInitializer(MatrixInitializer):
def __init__(self, minimum=0.0, maximum=1.0, scale=1.0):
super().__init__()
self.minimum = minimum
self.maximum = maximum
self.scale = scale
def generate(self, shape):
return torch.ones(shape).uniform_(self.minimum,
self.maximum) * self.scale
class DataAwareInitializer(MatrixInitializer):
def __init__(self, data, transform=torch.nn.Identity()):
super().__init__()
self.data = data
self.transform = transform
def __del__(self):
del self.data
class EigenVectorInitializer(DataAwareInitializer):
def generate(self, shape):
# TODO
raise NotImplementedError()
# Aliases
EV = EigenVectorInitializer
Random = RandomInitializer = UniformInitializer
Zeros = ZerosInitializer
Ones = OnesInitializer

View File

@@ -1,7 +1,6 @@
"""ProtoTorch losses."""
import torch
from prototorch.functions.activations import get_activation
from prototorch.functions.losses import glvq_loss
@@ -21,8 +20,8 @@ class GLVQLoss(torch.nn.Module):
class NeuralGasEnergy(torch.nn.Module):
def __init__(self, lm):
super().__init__()
def __init__(self, lm, **kwargs):
super().__init__(**kwargs)
self.lm = lm
def forward(self, d):
@@ -38,3 +37,22 @@ class NeuralGasEnergy(torch.nn.Module):
@staticmethod
def _nghood_fn(rankings, lm):
return torch.exp(-rankings / lm)
class GrowingNeuralGasEnergy(NeuralGasEnergy):
def __init__(self, topology_layer, **kwargs):
super().__init__(**kwargs)
self.topology_layer = topology_layer
@staticmethod
def _nghood_fn(rankings, topology):
winner = rankings[:, 0]
weights = torch.zeros_like(rankings, dtype=torch.float)
weights[torch.arange(rankings.shape[0]), winner] = 1.0
neighbours = topology.get_neighbours(winner)
weights[neighbours] = 0.1
return weights

View File

@@ -0,0 +1,31 @@
"""ProtoTorch Pooling Modules."""
import torch
from prototorch.functions.pooling import (stratified_max_pooling,
stratified_min_pooling,
stratified_prod_pooling,
stratified_sum_pooling)
class StratifiedSumPooling(torch.nn.Module):
"""Thin wrapper over the `stratified_sum_pooling` function."""
def forward(self, values, labels):
return stratified_sum_pooling(values, labels)
class StratifiedProdPooling(torch.nn.Module):
"""Thin wrapper over the `stratified_prod_pooling` function."""
def forward(self, values, labels):
return stratified_prod_pooling(values, labels)
class StratifiedMinPooling(torch.nn.Module):
"""Thin wrapper over the `stratified_min_pooling` function."""
def forward(self, values, labels):
return stratified_min_pooling(values, labels)
class StratifiedMaxPooling(torch.nn.Module):
"""Thin wrapper over the `stratified_max_pooling` function."""
def forward(self, values, labels):
return stratified_max_pooling(values, labels)

View File

@@ -0,0 +1,49 @@
"""ProtoTorch Transformation Layers."""
import torch
from torch.nn.parameter import Parameter
from .initializers import MatrixInitializer
def _precheck_initializer(initializer):
if not isinstance(initializer, MatrixInitializer):
emsg = f"`initializer` has to be some subtype of " \
f"{MatrixInitializer}. " \
f"You have provided: {initializer=} instead."
raise TypeError(emsg)
class Omega(torch.nn.Module):
"""The Omega mapping used in GMLVQ."""
def __init__(self,
num_replicas=1,
input_dim=None,
latent_dim=None,
initializer=None,
*,
initialized_weights=None):
super().__init__()
if initialized_weights is not None:
self._register_weights(initialized_weights)
else:
if num_replicas == 1:
shape = (input_dim, latent_dim)
else:
shape = (num_replicas, input_dim, latent_dim)
self._initialize_weights(shape, initializer)
def _register_weights(self, weights):
self.register_parameter("_omega", Parameter(weights))
def _initialize_weights(self, shape, initializer):
_precheck_initializer(initializer)
_omega = initializer.generate(shape)
self._register_weights(_omega)
def forward(self):
return self._omega
def extra_repr(self):
return f"(omega): (shape: {tuple(self._omega.shape)})"

View File

@@ -0,0 +1,36 @@
"""ProtoTorch Wrappers."""
import torch
class LambdaLayer(torch.nn.Module):
def __init__(self, fn, name=None):
super().__init__()
self.fn = fn
self.name = name or fn.__name__ # lambda fns get <lambda>
def forward(self, *args, **kwargs):
return self.fn(*args, **kwargs)
def extra_repr(self):
return self.name
class LossLayer(torch.nn.modules.loss._Loss):
def __init__(self,
fn,
name=None,
size_average=None,
reduce=None,
reduction: str = "mean") -> None:
super().__init__(size_average=size_average,
reduce=reduce,
reduction=reduction)
self.fn = fn
self.name = name or fn.__name__ # lambda fns get <lambda>
def forward(self, *args, **kwargs):
return self.fn(*args, **kwargs)
def extra_repr(self):
return self.name

View File

@@ -1,243 +0,0 @@
"""Utilities that provide various small functionalities."""
import os
import pickle
import sys
from time import time
import matplotlib.pyplot as plt
import numpy as np
def progressbar(title, value, end, bar_width=20):
percent = float(value) / end
arrow = "=" * int(round(percent * bar_width) - 1) + ">"
spaces = "." * (bar_width - len(arrow))
sys.stdout.write("\r{}: [{}] {}%".format(title, arrow + spaces,
int(round(percent * 100))))
sys.stdout.flush()
if percent == 1.0:
print()
def prettify_string(inputs, start="", sep=" ", end="\n"):
outputs = start + " ".join(inputs.split()) + end
return outputs
def pretty_print(inputs):
print(prettify_string(inputs))
def writelog(self, *logs, logdir="./logs", logfile="run.txt"):
f = os.path.join(logdir, logfile)
with open(f, "a+") as fh:
for log in logs:
fh.write(log)
fh.write("\n")
def start_tensorboard(self, logdir="./logs"):
cmd = f"tensorboard --logdir={logdir} --port=6006"
os.system(cmd)
def make_directory(save_dir):
if not os.path.exists(save_dir):
print(f"Making directory {save_dir}.")
os.mkdir(save_dir)
def make_gif(filenames, duration, output_file=None):
try:
import imageio
except ModuleNotFoundError as e:
print("Please install Protoflow with [other] extra requirements.")
raise (e)
images = list()
for filename in filenames:
images.append(imageio.imread(filename))
if not output_file:
output_file = f"makegif.gif"
if images:
imageio.mimwrite(output_file, images, duration=duration)
def gif_from_dir(directory,
duration,
prefix="",
output_file=None,
verbose=True):
images = os.listdir(directory)
if verbose:
print(f"Making gif from {len(images)} images under {directory}.")
filenames = list()
# Sort images
images = sorted(
images,
key=lambda img: int(os.path.splitext(img)[0].replace(prefix, "")))
for image in images:
fname = os.path.join(directory, image)
filenames.append(fname)
if not output_file:
output_file = os.path.join(directory, "makegif.gif")
make_gif(filenames=filenames, duration=duration, output_file=output_file)
def accuracy_score(y_true, y_pred):
accuracy = np.sum(y_true == y_pred)
normalized_acc = accuracy / float(len(y_true))
return normalized_acc
def predict_and_score(clf,
x_test,
y_test,
verbose=False,
title="Test accuracy"):
y_pred = clf.predict(x_test)
accuracy = np.sum(y_test == y_pred)
normalized_acc = accuracy / float(len(y_test))
if verbose:
print(f"{title}: {normalized_acc * 100:06.04f}%")
return normalized_acc
def remove_nan_rows(arr):
"""Remove all rows with `nan` values in `arr`."""
mask = np.isnan(arr).any(axis=1)
return arr[~mask]
def remove_nan_cols(arr):
"""Remove all columns with `nan` values in `arr`."""
mask = np.isnan(arr).any(axis=0)
return arr[~mask]
def replace_in(arr, replacement_dict, inplace=False):
"""Replace the keys found in `arr` with the values from
the `replacement_dict`.
"""
if inplace:
new_arr = arr
else:
import copy
new_arr = copy.deepcopy(arr)
for k, v in replacement_dict.items():
new_arr[arr == k] = v
return new_arr
def train_test_split(data, train=0.7, val=0.15, shuffle=None, return_xy=False):
"""Split a classification dataset in such a way so as to
preserve the class distribution in subsamples of the dataset.
"""
if train + val > 1.0:
raise ValueError("Invalid split values for train and val.")
Y = data[:, -1]
labels = set(Y)
hist = dict()
for l in labels:
data_l = data[Y == l]
nl = len(data_l)
nl_train = int(nl * train)
nl_val = int(nl * val)
nl_test = nl - (nl_train + nl_val)
hist[l] = (nl_train, nl_val, nl_test)
train_data = list()
val_data = list()
test_data = list()
for l, (nl_train, nl_val, nl_test) in hist.items():
data_l = data[Y == l]
if shuffle:
np.random.shuffle(data_l)
train_l = data_l[:nl_train]
val_l = data_l[nl_train:nl_train + nl_val]
test_l = data_l[nl_train + nl_val:nl_train + nl_val + nl_test]
train_data.append(train_l)
val_data.append(val_l)
test_data.append(test_l)
def _squash(data_list):
data = np.array(data_list[0])
for item in data_list[1:]:
data = np.vstack((data, np.array(item)))
return data
train_data = _squash(train_data)
if val_data:
val_data = _squash(val_data)
if test_data:
test_data = _squash(test_data)
if return_xy:
x_train = train_data[:, :-1]
y_train = train_data[:, -1]
x_val = val_data[:, :-1]
y_val = val_data[:, -1]
x_test = test_data[:, :-1]
y_test = test_data[:, -1]
return (x_train, y_train), (x_val, y_val), (x_test, y_test)
return train_data, val_data, test_data
def class_histogram(data, title="Untitled"):
plt.figure(title)
plt.clf()
plt.title(title)
dist, counts = np.unique(data[:, -1], return_counts=True)
plt.bar(dist, counts)
plt.xticks(dist)
print("Call matplotlib.pyplot.show() to see the plot.")
def ntimer(n=10):
"""Wraps a function which wraps another function to time it."""
if n < 1:
raise (Exception(f"Invalid n = {n} given."))
def timer(func):
"""Wraps `func` with a timer and returns the wrapped `func`."""
def wrapper(*args, **kwargs):
rv = None
before = time()
for _ in range(n):
rv = func(*args, **kwargs)
after = time()
elapsed = after - before
print(f"Elapsed: {elapsed*1e3:02.02f} ms")
return rv
return wrapper
return timer
def memoize(verbose=True):
"""Wraps a function which wraps another function that memoizes."""
def memoizer(func):
"""Memoize (cache) return values of `func`.
Wraps `func` and returns the wrapped `func` so that `func`
is executed when the results are not available in the cache.
"""
cache = {}
def wrapper(*args, **kwargs):
t = (pickle.dumps(args), pickle.dumps(kwargs))
if t not in cache:
if verbose:
print(f"Adding NEW rv {func.__name__}{args}{kwargs} "
"to cache.")
cache[t] = func(*args, **kwargs)
else:
if verbose:
print(f"Using OLD rv {func.__name__}{args}{kwargs} "
"from cache.")
return cache[t]
return wrapper
return memoizer

View File

@@ -5,7 +5,7 @@ import unittest
import numpy as np
import torch
from prototorch.functions import (activations, competitions, distances,
initializers, losses)
initializers, losses, pooling)
class TestActivations(unittest.TestCase):
@@ -104,10 +104,28 @@ class TestCompetitions(unittest.TestCase):
decimal=5)
self.assertIsNone(mismatch)
def test_knnc_k1(self):
d = torch.tensor([[2.0, 3.0, 1.99, 3.01], [2.0, 3.0, 2.01, 3.0]])
labels = torch.tensor([0, 1, 2, 3])
actual = competitions.knnc(d, labels, k=1)
desired = torch.tensor([2, 0])
mismatch = np.testing.assert_array_almost_equal(actual,
desired,
decimal=5)
self.assertIsNone(mismatch)
def tearDown(self):
pass
class TestPooling(unittest.TestCase):
def setUp(self):
pass
def test_stratified_min(self):
d = torch.tensor([[1.0, 0.0, 2.0, 3.0], [9.0, 8.0, 0, 1]])
labels = torch.tensor([0, 0, 1, 2])
actual = competitions.stratified_min(d, labels)
actual = pooling.stratified_min_pooling(d, labels)
desired = torch.tensor([[0.0, 2.0, 3.0], [8.0, 0.0, 1.0]])
mismatch = np.testing.assert_array_almost_equal(actual,
desired,
@@ -118,28 +136,70 @@ class TestCompetitions(unittest.TestCase):
d = torch.tensor([[1.0, 0.0, 2.0, 3.0], [9.0, 8.0, 0, 1]])
labels = torch.tensor([0, 0, 1, 2])
labels = torch.eye(3)[labels]
actual = competitions.stratified_min(d, labels)
actual = pooling.stratified_min_pooling(d, labels)
desired = torch.tensor([[0.0, 2.0, 3.0], [8.0, 0.0, 1.0]])
mismatch = np.testing.assert_array_almost_equal(actual,
desired,
decimal=5)
self.assertIsNone(mismatch)
def test_stratified_min_simple(self):
def test_stratified_min_trivial(self):
d = torch.tensor([[0.0, 2.0, 3.0], [8.0, 0, 1]])
labels = torch.tensor([0, 1, 2])
actual = competitions.stratified_min(d, labels)
actual = pooling.stratified_min_pooling(d, labels)
desired = torch.tensor([[0.0, 2.0, 3.0], [8.0, 0.0, 1.0]])
mismatch = np.testing.assert_array_almost_equal(actual,
desired,
decimal=5)
self.assertIsNone(mismatch)
def test_knnc_k1(self):
d = torch.tensor([[2.0, 3.0, 1.99, 3.01], [2.0, 3.0, 2.01, 3.0]])
labels = torch.tensor([0, 1, 2, 3])
actual = competitions.knnc(d, labels, k=1)
desired = torch.tensor([2, 0])
def test_stratified_max(self):
d = torch.tensor([[1.0, 0.0, 2.0, 3.0, 9.0], [9.0, 8.0, 0, 1, 7.0]])
labels = torch.tensor([0, 0, 3, 2, 0])
actual = pooling.stratified_max_pooling(d, labels)
desired = torch.tensor([[9.0, 3.0, 2.0], [9.0, 1.0, 0.0]])
mismatch = np.testing.assert_array_almost_equal(actual,
desired,
decimal=5)
self.assertIsNone(mismatch)
def test_stratified_max_one_hot(self):
d = torch.tensor([[1.0, 0.0, 2.0, 3.0, 9.0], [9.0, 8.0, 0, 1, 7.0]])
labels = torch.tensor([0, 0, 2, 1, 0])
labels = torch.nn.functional.one_hot(labels, num_classes=3)
actual = pooling.stratified_max_pooling(d, labels)
desired = torch.tensor([[9.0, 3.0, 2.0], [9.0, 1.0, 0.0]])
mismatch = np.testing.assert_array_almost_equal(actual,
desired,
decimal=5)
self.assertIsNone(mismatch)
def test_stratified_sum(self):
d = torch.tensor([[1.0, 0.0, 2.0, 3.0], [9.0, 8.0, 0, 1]])
labels = torch.LongTensor([0, 0, 1, 2])
actual = pooling.stratified_sum_pooling(d, labels)
desired = torch.tensor([[1.0, 2.0, 3.0], [17.0, 0.0, 1.0]])
mismatch = np.testing.assert_array_almost_equal(actual,
desired,
decimal=5)
self.assertIsNone(mismatch)
def test_stratified_sum_one_hot(self):
d = torch.tensor([[1.0, 0.0, 2.0, 3.0], [9.0, 8.0, 0, 1]])
labels = torch.tensor([0, 0, 1, 2])
labels = torch.eye(3)[labels]
actual = pooling.stratified_sum_pooling(d, labels)
desired = torch.tensor([[1.0, 2.0, 3.0], [17.0, 0.0, 1.0]])
mismatch = np.testing.assert_array_almost_equal(actual,
desired,
decimal=5)
self.assertIsNone(mismatch)
def test_stratified_prod(self):
d = torch.tensor([[1.0, 0.0, 2.0, 3.0, 9.0], [9.0, 8.0, 0, 1, 7.0]])
labels = torch.tensor([0, 0, 3, 2, 0])
actual = pooling.stratified_prod_pooling(d, labels)
desired = torch.tensor([[0.0, 3.0, 2.0], [504.0, 1.0, 0.0]])
mismatch = np.testing.assert_array_almost_equal(actual,
desired,
decimal=5)