Compare commits
36 Commits
v0.5.0
...
feature/tr
Author | SHA1 | Date | |
---|---|---|---|
|
17b45249f4 | ||
|
4f1c879528 | ||
|
2272c55092 | ||
|
b03c9b1d3c | ||
|
0c28eda706 | ||
|
7bc0bfa3ab | ||
|
827958a28a | ||
|
8200e1d3d8 | ||
|
729b20e9ab | ||
|
ca8ac7a43b | ||
|
b724a28a6f | ||
|
1e0a8392a2 | ||
|
2eb7b05653 | ||
|
d8a0b2dfcc | ||
|
2a7394b593 | ||
|
b1e64c8b8b | ||
|
70cf17607e | ||
|
b1568a550a | ||
|
e8e803e8ef | ||
|
2c453265fe | ||
|
7336d35fee | ||
|
bc18952c05 | ||
|
8e8d0b9c2c | ||
|
5a7da2b40b | ||
|
b6d38f442b | ||
|
8e8851d962 | ||
|
27b43b06a7 | ||
|
ff69eb1256 | ||
|
4ca581909a | ||
|
2722d976f5 | ||
|
946cda00d2 | ||
|
8227525c82 | ||
|
e61ae73749 | ||
|
040d1ee9e8 | ||
|
7f0da894fa | ||
|
62726df278 |
6
.github/ISSUE_TEMPLATE/bug_report.md
vendored
6
.github/ISSUE_TEMPLATE/bug_report.md
vendored
@@ -23,9 +23,9 @@ A clear and concise description of what you expected to happen.
|
||||
If applicable, add screenshots to help explain your problem.
|
||||
|
||||
**Desktop (please complete the following information):**
|
||||
- OS: [e.g. Ubuntu 20.10]
|
||||
- Prototorch Version: [e.g. v0.4.0]
|
||||
- Python Version: [e.g. 3.9.5]
|
||||
- OS: [e.g. Ubuntu 20.10]
|
||||
- Prototorch Version: [e.g. v0.4.0]
|
||||
- Python Version: [e.g. 3.9.5]
|
||||
|
||||
**Additional context**
|
||||
Add any other context about the problem here.
|
@@ -2,17 +2,15 @@
|
||||
|
||||
## Release 0.5.0
|
||||
|
||||
- Breaking: Removed deprecated `prototorch.modules.Prototypes1D`
|
||||
- Use `prototorch.components.LabeledComponents` instead
|
||||
- Breaking: Removed deprecated `prototorch.modules.Prototypes1D`.
|
||||
- Use `prototorch.components.LabeledComponents` instead.
|
||||
|
||||
## Release 0.2.0
|
||||
|
||||
### Includes
|
||||
- Fixes in example scripts.
|
||||
|
||||
## Release 0.1.1-dev0
|
||||
|
||||
### Includes
|
||||
- Minor bugfixes.
|
||||
- 100% line coverage.
|
||||
|
||||
|
@@ -11,6 +11,36 @@ from prototorch.components.initializers import (ClassAwareInitializer,
|
||||
ZeroReasoningsInitializer)
|
||||
from torch.nn.parameter import Parameter
|
||||
|
||||
from .initializers import parse_data_arg
|
||||
|
||||
|
||||
def get_labels_object(distribution):
|
||||
if isinstance(distribution, dict):
|
||||
if "num_classes" in distribution.keys():
|
||||
labels = EqualLabelsInitializer(
|
||||
distribution["num_classes"],
|
||||
distribution["prototypes_per_class"])
|
||||
else:
|
||||
labels = CustomLabelsInitializer(distribution)
|
||||
elif isinstance(distribution, tuple):
|
||||
num_classes, prototypes_per_class = distribution
|
||||
labels = EqualLabelsInitializer(num_classes, prototypes_per_class)
|
||||
elif isinstance(distribution, list):
|
||||
labels = UnequalLabelsInitializer(distribution)
|
||||
else:
|
||||
msg = f"`distribution` not understood." \
|
||||
f"You have provided: {distribution=}."
|
||||
raise ValueError(msg)
|
||||
return labels
|
||||
|
||||
|
||||
def _precheck_initializer(initializer):
|
||||
if not isinstance(initializer, ComponentsInitializer):
|
||||
emsg = f"`initializer` has to be some subtype of " \
|
||||
f"{ComponentsInitializer}. " \
|
||||
f"You have provided: {initializer=} instead."
|
||||
raise TypeError(emsg)
|
||||
|
||||
|
||||
class Components(torch.nn.Module):
|
||||
"""Components is a set of learnable Tensors."""
|
||||
@@ -21,29 +51,46 @@ class Components(torch.nn.Module):
|
||||
initialized_components=None):
|
||||
super().__init__()
|
||||
|
||||
self.num_components = num_components
|
||||
|
||||
# Ignore all initialization settings if initialized_components is given.
|
||||
if initialized_components is not None:
|
||||
self.register_parameter("_components",
|
||||
Parameter(initialized_components))
|
||||
self._register_components(initialized_components)
|
||||
if num_components is not None or initializer is not None:
|
||||
wmsg = "Arguments ignored while initializing Components"
|
||||
warnings.warn(wmsg)
|
||||
else:
|
||||
self._initialize_components(initializer)
|
||||
self._initialize_components(num_components, initializer)
|
||||
|
||||
def _precheck_initializer(self, initializer):
|
||||
if not isinstance(initializer, ComponentsInitializer):
|
||||
emsg = f"`initializer` has to be some subtype of " \
|
||||
f"{ComponentsInitializer}. " \
|
||||
f"You have provided: {initializer=} instead."
|
||||
raise TypeError(emsg)
|
||||
@property
|
||||
def num_components(self):
|
||||
return len(self._components)
|
||||
|
||||
def _initialize_components(self, initializer):
|
||||
self._precheck_initializer(initializer)
|
||||
_components = initializer.generate(self.num_components)
|
||||
self.register_parameter("_components", Parameter(_components))
|
||||
def _register_components(self, components):
|
||||
self.register_parameter("_components", Parameter(components))
|
||||
|
||||
def _initialize_components(self, num_components, initializer):
|
||||
_precheck_initializer(initializer)
|
||||
_components = initializer.generate(num_components)
|
||||
self._register_components(_components)
|
||||
|
||||
def add_components(self,
|
||||
num=1,
|
||||
initializer=None,
|
||||
*,
|
||||
initialized_components=None):
|
||||
if initialized_components is not None:
|
||||
_components = torch.cat([self._components, initialized_components])
|
||||
else:
|
||||
_precheck_initializer(initializer)
|
||||
_new = initializer.generate(num)
|
||||
_components = torch.cat([self._components, _new])
|
||||
self._register_components(_components)
|
||||
|
||||
def remove_components(self, indices=None):
|
||||
mask = torch.ones(self.num_components, dtype=torch.bool)
|
||||
mask[indices] = False
|
||||
_components = self._components[mask]
|
||||
self._register_components(_components)
|
||||
return mask
|
||||
|
||||
@property
|
||||
def components(self):
|
||||
@@ -54,7 +101,7 @@ class Components(torch.nn.Module):
|
||||
return self._components
|
||||
|
||||
def extra_repr(self):
|
||||
return f"components.shape: {tuple(self._components.shape)}"
|
||||
return f"(components): (shape: {tuple(self._components.shape)})"
|
||||
|
||||
|
||||
class LabeledComponents(Components):
|
||||
@@ -68,39 +115,60 @@ class LabeledComponents(Components):
|
||||
*,
|
||||
initialized_components=None):
|
||||
if initialized_components is not None:
|
||||
components, component_labels = initialized_components
|
||||
components, component_labels = parse_data_arg(
|
||||
initialized_components)
|
||||
super().__init__(initialized_components=components)
|
||||
self._labels = component_labels
|
||||
else:
|
||||
_labels = self._initialize_labels(distribution)
|
||||
labels = get_labels_object(distribution)
|
||||
self.initial_distribution = labels.distribution
|
||||
_labels = labels.generate()
|
||||
super().__init__(len(_labels), initializer=initializer)
|
||||
self.register_buffer("_labels", _labels)
|
||||
self._register_labels(_labels)
|
||||
|
||||
def _initialize_components(self, initializer):
|
||||
def _register_labels(self, labels):
|
||||
self.register_buffer("_labels", labels)
|
||||
|
||||
@property
|
||||
def distribution(self):
|
||||
clabels, counts = torch.unique(self._labels,
|
||||
sorted=True,
|
||||
return_counts=True)
|
||||
return dict(zip(clabels.tolist(), counts.tolist()))
|
||||
|
||||
def _initialize_components(self, num_components, initializer):
|
||||
if isinstance(initializer, ClassAwareInitializer):
|
||||
self._precheck_initializer(initializer)
|
||||
_components = initializer.generate(self.num_components,
|
||||
self.distribution)
|
||||
self.register_parameter("_components", Parameter(_components))
|
||||
_precheck_initializer(initializer)
|
||||
_components = initializer.generate(num_components,
|
||||
self.initial_distribution)
|
||||
self._register_components(_components)
|
||||
else:
|
||||
super()._initialize_components(initializer)
|
||||
super()._initialize_components(num_components, initializer)
|
||||
|
||||
def _initialize_labels(self, distribution):
|
||||
if type(distribution) == dict:
|
||||
if "num_classes" in distribution.keys():
|
||||
labels = EqualLabelsInitializer(
|
||||
distribution["num_classes"],
|
||||
distribution["prototypes_per_class"])
|
||||
def add_components(self, distribution, initializer):
|
||||
_precheck_initializer(initializer)
|
||||
|
||||
# Labels
|
||||
labels = get_labels_object(distribution)
|
||||
new_labels = labels.generate()
|
||||
_labels = torch.cat([self._labels, new_labels])
|
||||
self._register_labels(_labels)
|
||||
|
||||
# Components
|
||||
if isinstance(initializer, ClassAwareInitializer):
|
||||
_new = initializer.generate(len(new_labels), labels.distribution)
|
||||
else:
|
||||
labels = CustomLabelsInitializer(distribution)
|
||||
elif type(distribution) == tuple:
|
||||
num_classes, prototypes_per_class = distribution
|
||||
labels = EqualLabelsInitializer(num_classes, prototypes_per_class)
|
||||
elif type(distribution) == list:
|
||||
labels = UnequalLabelsInitializer(distribution)
|
||||
_new = initializer.generate(len(new_labels))
|
||||
_components = torch.cat([self._components, _new])
|
||||
self._register_components(_components)
|
||||
|
||||
self.distribution = labels.distribution
|
||||
return labels.generate()
|
||||
def remove_components(self, indices=None):
|
||||
# Components
|
||||
mask = super().remove_components(indices)
|
||||
|
||||
# Labels
|
||||
_labels = self._labels[mask]
|
||||
self._register_labels(_labels)
|
||||
|
||||
@property
|
||||
def component_labels(self):
|
||||
@@ -141,7 +209,7 @@ class ReasoningComponents(Components):
|
||||
super().__init__(len(self._reasonings), initializer=initializer)
|
||||
|
||||
def _initialize_reasonings(self, reasonings):
|
||||
if type(reasonings) == tuple:
|
||||
if isinstance(reasonings, tuple):
|
||||
num_classes, num_components = reasonings
|
||||
reasonings = ZeroReasoningsInitializer(num_classes, num_components)
|
||||
|
||||
|
@@ -1,4 +1,5 @@
|
||||
"""ProtoTroch Initializers."""
|
||||
"""ProtoTroch Component and Label Initializers."""
|
||||
|
||||
import warnings
|
||||
from collections.abc import Iterable
|
||||
from itertools import chain
|
||||
@@ -13,21 +14,30 @@ def parse_data_arg(data_arg):
|
||||
|
||||
if isinstance(data_arg, DataLoader):
|
||||
data = torch.tensor([])
|
||||
labels = torch.tensor([])
|
||||
targets = torch.tensor([])
|
||||
for x, y in data_arg:
|
||||
data = torch.cat([data, x])
|
||||
labels = torch.cat([labels, y])
|
||||
targets = torch.cat([targets, y])
|
||||
else:
|
||||
data, labels = data_arg
|
||||
data, targets = data_arg
|
||||
if not isinstance(data, torch.Tensor):
|
||||
wmsg = f"Converting data to {torch.Tensor}."
|
||||
warnings.warn(wmsg)
|
||||
data = torch.Tensor(data)
|
||||
if not isinstance(labels, torch.Tensor):
|
||||
wmsg = f"Converting labels to {torch.Tensor}."
|
||||
if not isinstance(targets, torch.Tensor):
|
||||
wmsg = f"Converting targets to {torch.Tensor}."
|
||||
warnings.warn(wmsg)
|
||||
labels = torch.Tensor(labels)
|
||||
return data, labels
|
||||
targets = torch.Tensor(targets)
|
||||
return data, targets
|
||||
|
||||
|
||||
def get_subinitializers(data, targets, clabels, subinit_type):
|
||||
initializers = dict()
|
||||
for clabel in clabels:
|
||||
class_data = data[targets == clabel]
|
||||
class_initializer = subinit_type(class_data)
|
||||
initializers[clabel] = (class_initializer)
|
||||
return initializers
|
||||
|
||||
|
||||
# Components
|
||||
@@ -37,18 +47,22 @@ class ComponentsInitializer(object):
|
||||
|
||||
|
||||
class DimensionAwareInitializer(ComponentsInitializer):
|
||||
def __init__(self, c_dims):
|
||||
def __init__(self, dims):
|
||||
super().__init__()
|
||||
if isinstance(c_dims, Iterable):
|
||||
self.components_dims = tuple(c_dims)
|
||||
if isinstance(dims, Iterable):
|
||||
self.components_dims = tuple(dims)
|
||||
else:
|
||||
self.components_dims = (c_dims, )
|
||||
self.components_dims = (dims, )
|
||||
|
||||
|
||||
class OnesInitializer(DimensionAwareInitializer):
|
||||
def __init__(self, dims, scale=1.0):
|
||||
super().__init__(dims)
|
||||
self.scale = scale
|
||||
|
||||
def generate(self, length):
|
||||
gen_dims = (length, ) + self.components_dims
|
||||
return torch.ones(gen_dims)
|
||||
return torch.ones(gen_dims) * self.scale
|
||||
|
||||
|
||||
class ZerosInitializer(DimensionAwareInitializer):
|
||||
@@ -58,78 +72,73 @@ class ZerosInitializer(DimensionAwareInitializer):
|
||||
|
||||
|
||||
class UniformInitializer(DimensionAwareInitializer):
|
||||
def __init__(self, c_dims, min=0.0, max=1.0):
|
||||
super().__init__(c_dims)
|
||||
|
||||
self.min = min
|
||||
self.max = max
|
||||
def __init__(self, dims, minimum=0.0, maximum=1.0, scale=1.0):
|
||||
super().__init__(dims)
|
||||
self.minimum = minimum
|
||||
self.maximum = maximum
|
||||
self.scale = scale
|
||||
|
||||
def generate(self, length):
|
||||
gen_dims = (length, ) + self.components_dims
|
||||
return torch.ones(gen_dims).uniform_(self.min, self.max)
|
||||
return torch.ones(gen_dims).uniform_(self.minimum,
|
||||
self.maximum) * self.scale
|
||||
|
||||
|
||||
class DataAwareInitializer(ComponentsInitializer):
|
||||
def __init__(self, data):
|
||||
def __init__(self, data, transform=torch.nn.Identity()):
|
||||
super().__init__()
|
||||
self.data = data
|
||||
self.transform = transform
|
||||
|
||||
def __del__(self):
|
||||
del self.data
|
||||
|
||||
|
||||
class SelectionInitializer(DataAwareInitializer):
|
||||
def generate(self, length):
|
||||
indices = torch.LongTensor(length).random_(0, len(self.data))
|
||||
return self.data[indices]
|
||||
return self.transform(self.data[indices])
|
||||
|
||||
|
||||
class MeanInitializer(DataAwareInitializer):
|
||||
def generate(self, length):
|
||||
mean = torch.mean(self.data, dim=0)
|
||||
repeat_dim = [length] + [1] * len(mean.shape)
|
||||
return mean.repeat(repeat_dim)
|
||||
return self.transform(mean.repeat(repeat_dim))
|
||||
|
||||
|
||||
class ClassAwareInitializer(ComponentsInitializer):
|
||||
class ClassAwareInitializer(DataAwareInitializer):
|
||||
def __init__(self, data, transform=torch.nn.Identity()):
|
||||
super().__init__()
|
||||
data, labels = parse_data_arg(data)
|
||||
self.data = data
|
||||
self.labels = labels
|
||||
|
||||
self.transform = transform
|
||||
|
||||
self.clabels = torch.unique(self.labels)
|
||||
data, targets = parse_data_arg(data)
|
||||
super().__init__(data, transform)
|
||||
self.targets = targets
|
||||
self.clabels = torch.unique(self.targets).int().tolist()
|
||||
self.num_classes = len(self.clabels)
|
||||
|
||||
def _get_samples_from_initializer(self, length, dist):
|
||||
if not dist:
|
||||
per_class = length // self.num_classes
|
||||
dist = self.num_classes * [per_class]
|
||||
if type(dist) == dict:
|
||||
dist = dist.values()
|
||||
samples_list = [
|
||||
init.generate(n) for init, n in zip(self.initializers, dist)
|
||||
]
|
||||
out = torch.vstack(samples_list)
|
||||
dist = dict(zip(self.clabels, self.num_classes * [per_class]))
|
||||
if isinstance(dist, list):
|
||||
dist = dict(zip(self.clabels, dist))
|
||||
samples = [self.initializers[k].generate(n) for k, n in dist.items()]
|
||||
out = torch.vstack(samples)
|
||||
with torch.no_grad():
|
||||
out = self.transform(out)
|
||||
return out
|
||||
|
||||
def __del__(self):
|
||||
del self.data
|
||||
del self.labels
|
||||
del self.targets
|
||||
|
||||
|
||||
class StratifiedMeanInitializer(ClassAwareInitializer):
|
||||
def __init__(self, data, **kwargs):
|
||||
super().__init__(data, **kwargs)
|
||||
self.initializers = get_subinitializers(self.data, self.targets,
|
||||
self.clabels, MeanInitializer)
|
||||
|
||||
self.initializers = []
|
||||
for clabel in self.clabels:
|
||||
class_data = self.data[self.labels == clabel]
|
||||
class_initializer = MeanInitializer(class_data)
|
||||
self.initializers.append(class_initializer)
|
||||
|
||||
def generate(self, length, dist=[]):
|
||||
def generate(self, length, dist):
|
||||
samples = self._get_samples_from_initializer(length, dist)
|
||||
return samples
|
||||
|
||||
@@ -138,12 +147,9 @@ class StratifiedSelectionInitializer(ClassAwareInitializer):
|
||||
def __init__(self, data, noise=None, **kwargs):
|
||||
super().__init__(data, **kwargs)
|
||||
self.noise = noise
|
||||
|
||||
self.initializers = []
|
||||
for clabel in self.clabels:
|
||||
class_data = self.data[self.labels == clabel]
|
||||
class_initializer = SelectionInitializer(class_data)
|
||||
self.initializers.append(class_initializer)
|
||||
self.initializers = get_subinitializers(self.data, self.targets,
|
||||
self.clabels,
|
||||
SelectionInitializer)
|
||||
|
||||
def add_noise_v1(self, x):
|
||||
return x + self.noise
|
||||
@@ -155,7 +161,7 @@ class StratifiedSelectionInitializer(ClassAwareInitializer):
|
||||
mask = torch.bernoulli(n1) - torch.bernoulli(n2)
|
||||
return x + (self.noise * mask)
|
||||
|
||||
def generate(self, length, dist=[]):
|
||||
def generate(self, length, dist):
|
||||
samples = self._get_samples_from_initializer(length, dist)
|
||||
if self.noise is not None:
|
||||
samples = self.add_noise_v1(samples)
|
||||
@@ -181,8 +187,8 @@ class UnequalLabelsInitializer(LabelsInitializer):
|
||||
clabels = range(len(self.dist))
|
||||
if not dist:
|
||||
dist = self.dist
|
||||
labels = list(chain(*[[i] * n for i, n in zip(clabels, dist)]))
|
||||
return torch.LongTensor(labels)
|
||||
targets = list(chain(*[[i] * n for i, n in zip(clabels, dist)]))
|
||||
return torch.LongTensor(targets)
|
||||
|
||||
|
||||
class EqualLabelsInitializer(LabelsInitializer):
|
||||
|
@@ -1,8 +1,6 @@
|
||||
"""ProtoTorch datasets."""
|
||||
|
||||
from .abstract import NumpyDataset
|
||||
from .iris import Iris
|
||||
from .sklearn import Blobs, Circles, Iris, Moons, Random
|
||||
from .spiral import Spiral
|
||||
from .tecator import Tecator
|
||||
|
||||
__all__ = ['Iris', 'Spiral', 'Tecator']
|
||||
|
@@ -15,9 +15,9 @@ import torch
|
||||
class NumpyDataset(torch.utils.data.TensorDataset):
|
||||
"""Create a PyTorch TensorDataset from NumPy arrays."""
|
||||
def __init__(self, data, targets):
|
||||
self.data = data
|
||||
self.targets = targets
|
||||
tensors = [torch.Tensor(data), torch.Tensor(targets)]
|
||||
self.data = torch.Tensor(data)
|
||||
self.targets = torch.LongTensor(targets)
|
||||
tensors = [self.data, self.targets]
|
||||
super().__init__(*tensors)
|
||||
|
||||
|
||||
|
@@ -1,40 +0,0 @@
|
||||
"""Thin wrapper for the Iris classification dataset from sklearn.
|
||||
|
||||
URL:
|
||||
https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_iris.html
|
||||
|
||||
"""
|
||||
|
||||
from typing import Sequence
|
||||
|
||||
from prototorch.datasets.abstract import NumpyDataset
|
||||
from sklearn.datasets import load_iris
|
||||
|
||||
|
||||
class Iris(NumpyDataset):
|
||||
"""
|
||||
Iris Dataset by Ronald Fisher introduced in 1936.
|
||||
|
||||
The dataset contains four measurements from flowers of three species of iris.
|
||||
|
||||
.. list-table:: Iris
|
||||
:header-rows: 1
|
||||
|
||||
* - dimensions
|
||||
- classes
|
||||
- training size
|
||||
- validation size
|
||||
- test size
|
||||
* - 4
|
||||
- 3
|
||||
- 150
|
||||
- 0
|
||||
- 0
|
||||
|
||||
:param dims: select a subset of dimensions
|
||||
"""
|
||||
def __init__(self, dims: Sequence[int] = None):
|
||||
x, y = load_iris(return_X_y=True)
|
||||
if dims:
|
||||
x = x[:, dims]
|
||||
super().__init__(x, y)
|
137
prototorch/datasets/sklearn.py
Normal file
137
prototorch/datasets/sklearn.py
Normal file
@@ -0,0 +1,137 @@
|
||||
"""Thin wrappers for a few scikit-learn datasets.
|
||||
|
||||
URL:
|
||||
https://scikit-learn.org/stable/modules/classes.html#module-sklearn.datasets
|
||||
|
||||
"""
|
||||
|
||||
import warnings
|
||||
from typing import Sequence, Union
|
||||
|
||||
from prototorch.datasets.abstract import NumpyDataset
|
||||
|
||||
from sklearn.datasets import (load_iris, make_blobs, make_circles,
|
||||
make_classification, make_moons)
|
||||
|
||||
|
||||
class Iris(NumpyDataset):
|
||||
"""Iris Dataset by Ronald Fisher introduced in 1936.
|
||||
|
||||
The dataset contains four measurements from flowers of three species of iris.
|
||||
|
||||
.. list-table:: Iris
|
||||
:header-rows: 1
|
||||
|
||||
* - dimensions
|
||||
- classes
|
||||
- training size
|
||||
- validation size
|
||||
- test size
|
||||
* - 4
|
||||
- 3
|
||||
- 150
|
||||
- 0
|
||||
- 0
|
||||
|
||||
:param dims: select a subset of dimensions
|
||||
"""
|
||||
def __init__(self, dims: Sequence[int] = None):
|
||||
x, y = load_iris(return_X_y=True)
|
||||
if dims:
|
||||
x = x[:, dims]
|
||||
super().__init__(x, y)
|
||||
|
||||
|
||||
class Blobs(NumpyDataset):
|
||||
"""Generate isotropic Gaussian blobs for clustering.
|
||||
|
||||
Read more at
|
||||
https://scikit-learn.org/stable/datasets/sample_generators.html#sample-generators.
|
||||
|
||||
"""
|
||||
def __init__(self,
|
||||
num_samples: int = 300,
|
||||
num_features: int = 2,
|
||||
seed: Union[None, int] = 0):
|
||||
x, y = make_blobs(num_samples,
|
||||
num_features,
|
||||
centers=None,
|
||||
random_state=seed,
|
||||
shuffle=False)
|
||||
super().__init__(x, y)
|
||||
|
||||
|
||||
class Random(NumpyDataset):
|
||||
"""Generate a random n-class classification problem.
|
||||
|
||||
Read more at
|
||||
https://scikit-learn.org/stable/modules/generated/sklearn.datasets.make_classification.html.
|
||||
|
||||
Note: n_classes * n_clusters_per_class <= 2**n_informative must satisfy.
|
||||
"""
|
||||
def __init__(self,
|
||||
num_samples: int = 300,
|
||||
num_features: int = 2,
|
||||
num_classes: int = 2,
|
||||
num_clusters: int = 2,
|
||||
num_informative: Union[None, int] = None,
|
||||
separation: float = 1.0,
|
||||
seed: Union[None, int] = 0):
|
||||
if not num_informative:
|
||||
import math
|
||||
num_informative = math.ceil(math.log2(num_classes * num_clusters))
|
||||
if num_features < num_informative:
|
||||
warnings.warn("Generating more features than requested.")
|
||||
num_features = num_informative
|
||||
x, y = make_classification(num_samples,
|
||||
num_features,
|
||||
n_informative=num_informative,
|
||||
n_redundant=0,
|
||||
n_classes=num_classes,
|
||||
n_clusters_per_class=num_clusters,
|
||||
class_sep=separation,
|
||||
random_state=seed,
|
||||
shuffle=False)
|
||||
super().__init__(x, y)
|
||||
|
||||
|
||||
class Circles(NumpyDataset):
|
||||
"""Make a large circle containing a smaller circle in 2D.
|
||||
|
||||
A simple toy dataset to visualize clustering and classification algorithms.
|
||||
|
||||
Read more at
|
||||
https://scikit-learn.org/stable/modules/generated/sklearn.datasets.make_circles.html
|
||||
|
||||
"""
|
||||
def __init__(self,
|
||||
num_samples: int = 300,
|
||||
noise: float = 0.3,
|
||||
factor: float = 0.8,
|
||||
seed: Union[None, int] = 0):
|
||||
x, y = make_circles(num_samples,
|
||||
noise=noise,
|
||||
factor=factor,
|
||||
random_state=seed,
|
||||
shuffle=False)
|
||||
super().__init__(x, y)
|
||||
|
||||
|
||||
class Moons(NumpyDataset):
|
||||
"""Make two interleaving half circles.
|
||||
|
||||
A simple toy dataset to visualize clustering and classification algorithms.
|
||||
|
||||
Read more at
|
||||
https://scikit-learn.org/stable/modules/generated/sklearn.datasets.make_moons.html
|
||||
|
||||
"""
|
||||
def __init__(self,
|
||||
num_samples: int = 300,
|
||||
noise: float = 0.3,
|
||||
seed: Union[None, int] = 0):
|
||||
x, y = make_moons(num_samples,
|
||||
noise=noise,
|
||||
random_state=seed,
|
||||
shuffle=False)
|
||||
super().__init__(x, y)
|
@@ -101,12 +101,12 @@ class Tecator(ProtoDataset):
|
||||
x_train, y_train = f["x_train"], f["y_train"]
|
||||
x_test, y_test = f["x_test"], f["y_test"]
|
||||
training_set = [
|
||||
torch.tensor(x_train, dtype=torch.float32),
|
||||
torch.tensor(y_train),
|
||||
torch.Tensor(x_train),
|
||||
torch.LongTensor(y_train),
|
||||
]
|
||||
test_set = [
|
||||
torch.tensor(x_test, dtype=torch.float32),
|
||||
torch.tensor(y_test),
|
||||
torch.Tensor(x_test),
|
||||
torch.LongTensor(y_test),
|
||||
]
|
||||
|
||||
with open(os.path.join(self.processed_folder, self.training_file),
|
||||
|
@@ -2,11 +2,4 @@
|
||||
|
||||
from .activations import identity, sigmoid_beta, swish_beta
|
||||
from .competitions import knnc, wtac
|
||||
|
||||
__all__ = [
|
||||
"identity",
|
||||
"sigmoid_beta",
|
||||
"swish_beta",
|
||||
"knnc",
|
||||
"wtac",
|
||||
]
|
||||
from .pooling import *
|
||||
|
@@ -5,17 +5,14 @@ import torch
|
||||
ACTIVATIONS = dict()
|
||||
|
||||
|
||||
# def register_activation(scriptf):
|
||||
# ACTIVATIONS[scriptf.name] = scriptf
|
||||
# return scriptf
|
||||
def register_activation(function):
|
||||
def register_activation(fn):
|
||||
"""Add the activation function to the registry."""
|
||||
ACTIVATIONS[function.__name__] = function
|
||||
return function
|
||||
name = fn.__name__
|
||||
ACTIVATIONS[name] = fn
|
||||
return fn
|
||||
|
||||
|
||||
@register_activation
|
||||
# @torch.jit.script
|
||||
def identity(x, beta=0.0):
|
||||
"""Identity activation function.
|
||||
|
||||
@@ -29,7 +26,6 @@ def identity(x, beta=0.0):
|
||||
|
||||
|
||||
@register_activation
|
||||
# @torch.jit.script
|
||||
def sigmoid_beta(x, beta=10.0):
|
||||
r"""Sigmoid activation function with scaling.
|
||||
|
||||
@@ -44,7 +40,6 @@ def sigmoid_beta(x, beta=10.0):
|
||||
|
||||
|
||||
@register_activation
|
||||
# @torch.jit.script
|
||||
def swish_beta(x, beta=10.0):
|
||||
r"""Swish activation function with scaling.
|
||||
|
||||
|
@@ -3,42 +3,26 @@
|
||||
import torch
|
||||
|
||||
|
||||
def stratified_min(distances, labels):
|
||||
clabels = torch.unique(labels, dim=0)
|
||||
num_classes = clabels.size()[0]
|
||||
if distances.size()[1] == num_classes:
|
||||
# skip if only one prototype per class
|
||||
return distances
|
||||
batch_size = distances.size()[0]
|
||||
winning_distances = torch.zeros(num_classes, batch_size)
|
||||
inf = torch.full_like(distances.T, fill_value=float("inf"))
|
||||
# distances_to_wpluses = torch.where(matcher, distances, inf)
|
||||
for i, cl in enumerate(clabels):
|
||||
# cdists = distances.T[labels == cl]
|
||||
matcher = torch.eq(labels.unsqueeze(dim=1), cl)
|
||||
if labels.ndim == 2:
|
||||
# if the labels are one-hot vectors
|
||||
matcher = torch.eq(torch.sum(matcher, dim=-1), num_classes)
|
||||
cdists = torch.where(matcher, distances.T, inf).T
|
||||
winning_distances[i] = torch.min(cdists, dim=1,
|
||||
keepdim=True).values.squeeze()
|
||||
if labels.ndim == 2:
|
||||
# Transpose to return with `batch_size` first and
|
||||
# reverse the columns to fix the ordering of the classes
|
||||
return torch.flip(winning_distances.T, dims=(1, ))
|
||||
def wtac(distances: torch.Tensor,
|
||||
labels: torch.LongTensor) -> (torch.LongTensor):
|
||||
"""Winner-Takes-All-Competition.
|
||||
|
||||
return winning_distances.T # return with `batch_size` first
|
||||
Returns the labels corresponding to the winners.
|
||||
|
||||
|
||||
def wtac(distances, labels):
|
||||
"""
|
||||
winning_indices = torch.min(distances, dim=1).indices
|
||||
winning_labels = labels[winning_indices].squeeze()
|
||||
return winning_labels
|
||||
|
||||
|
||||
def knnc(distances, labels, k=1):
|
||||
def knnc(distances: torch.Tensor,
|
||||
labels: torch.LongTensor,
|
||||
k: int = 1) -> (torch.LongTensor):
|
||||
"""K-Nearest-Neighbors-Competition.
|
||||
|
||||
Returns the labels corresponding to the winners.
|
||||
|
||||
"""
|
||||
winning_indices = torch.topk(-distances, k=k, dim=1).indices
|
||||
# winning_labels = torch.mode(labels[winning_indices].squeeze(),
|
||||
# dim=1).values
|
||||
winning_labels = torch.mode(labels[winning_indices], dim=1).values
|
||||
return winning_labels
|
||||
|
@@ -57,3 +57,38 @@ def lvq21_loss(distances, target_labels, prototype_labels):
|
||||
mu = dp - dm
|
||||
|
||||
return mu
|
||||
|
||||
|
||||
# Probabilistic
|
||||
def _get_class_probabilities(probabilities, targets, prototype_labels):
|
||||
# Create Label Mapping
|
||||
uniques = prototype_labels.unique(sorted=True).tolist()
|
||||
key_val = {key: val for key, val in zip(uniques, range(len(uniques)))}
|
||||
|
||||
target_indices = torch.LongTensor(list(map(key_val.get, targets.tolist())))
|
||||
|
||||
whole = probabilities.sum(dim=1)
|
||||
correct = probabilities[torch.arange(len(probabilities)), target_indices]
|
||||
wrong = whole - correct
|
||||
|
||||
return whole, correct, wrong
|
||||
|
||||
|
||||
def nllr_loss(probabilities, targets, prototype_labels):
|
||||
"""Compute the Negative Log-Likelihood Ratio loss."""
|
||||
_, correct, wrong = _get_class_probabilities(probabilities, targets,
|
||||
prototype_labels)
|
||||
|
||||
likelihood = correct / wrong
|
||||
log_likelihood = torch.log(likelihood)
|
||||
return -1.0 * log_likelihood
|
||||
|
||||
|
||||
def rslvq_loss(probabilities, targets, prototype_labels):
|
||||
"""Compute the Robust Soft Learning Vector Quantization (RSLVQ) loss."""
|
||||
whole, correct, _ = _get_class_probabilities(probabilities, targets,
|
||||
prototype_labels)
|
||||
|
||||
likelihood = correct / whole
|
||||
log_likelihood = torch.log(likelihood)
|
||||
return -1.0 * log_likelihood
|
||||
|
80
prototorch/functions/pooling.py
Normal file
80
prototorch/functions/pooling.py
Normal file
@@ -0,0 +1,80 @@
|
||||
"""ProtoTorch pooling functions."""
|
||||
|
||||
from typing import Callable
|
||||
|
||||
import torch
|
||||
|
||||
|
||||
def stratify_with(values: torch.Tensor,
|
||||
labels: torch.LongTensor,
|
||||
fn: Callable,
|
||||
fill_value: float = 0.0) -> (torch.Tensor):
|
||||
"""Apply an arbitrary stratification strategy on the columns on `values`.
|
||||
|
||||
The outputs correspond to sorted labels.
|
||||
"""
|
||||
clabels = torch.unique(labels, dim=0, sorted=True)
|
||||
num_classes = clabels.size()[0]
|
||||
if values.size()[1] == num_classes:
|
||||
# skip if stratification is trivial
|
||||
return values
|
||||
batch_size = values.size()[0]
|
||||
winning_values = torch.zeros(num_classes, batch_size, device=labels.device)
|
||||
filler = torch.full_like(values.T, fill_value=fill_value)
|
||||
for i, cl in enumerate(clabels):
|
||||
matcher = torch.eq(labels.unsqueeze(dim=1), cl)
|
||||
if labels.ndim == 2:
|
||||
# if the labels are one-hot vectors
|
||||
matcher = torch.eq(torch.sum(matcher, dim=-1), num_classes)
|
||||
cdists = torch.where(matcher, values.T, filler).T
|
||||
winning_values[i] = fn(cdists)
|
||||
if labels.ndim == 2:
|
||||
# Transpose to return with `batch_size` first and
|
||||
# reverse the columns to fix the ordering of the classes
|
||||
return torch.flip(winning_values.T, dims=(1, ))
|
||||
|
||||
return winning_values.T # return with `batch_size` first
|
||||
|
||||
|
||||
def stratified_sum_pooling(values: torch.Tensor,
|
||||
labels: torch.LongTensor) -> (torch.Tensor):
|
||||
"""Group-wise sum."""
|
||||
winning_values = stratify_with(
|
||||
values,
|
||||
labels,
|
||||
fn=lambda x: torch.sum(x, dim=1, keepdim=True).squeeze(),
|
||||
fill_value=0.0)
|
||||
return winning_values
|
||||
|
||||
|
||||
def stratified_min_pooling(values: torch.Tensor,
|
||||
labels: torch.LongTensor) -> (torch.Tensor):
|
||||
"""Group-wise minimum."""
|
||||
winning_values = stratify_with(
|
||||
values,
|
||||
labels,
|
||||
fn=lambda x: torch.min(x, dim=1, keepdim=True).values.squeeze(),
|
||||
fill_value=float("inf"))
|
||||
return winning_values
|
||||
|
||||
|
||||
def stratified_max_pooling(values: torch.Tensor,
|
||||
labels: torch.LongTensor) -> (torch.Tensor):
|
||||
"""Group-wise maximum."""
|
||||
winning_values = stratify_with(
|
||||
values,
|
||||
labels,
|
||||
fn=lambda x: torch.max(x, dim=1, keepdim=True).values.squeeze(),
|
||||
fill_value=-1.0 * float("inf"))
|
||||
return winning_values
|
||||
|
||||
|
||||
def stratified_prod_pooling(values: torch.Tensor,
|
||||
labels: torch.LongTensor) -> (torch.Tensor):
|
||||
"""Group-wise maximum."""
|
||||
winning_values = stratify_with(
|
||||
values,
|
||||
labels,
|
||||
fn=lambda x: torch.prod(x, dim=1, keepdim=True).squeeze(),
|
||||
fill_value=1.0)
|
||||
return winning_values
|
5
prototorch/functions/transforms.py
Normal file
5
prototorch/functions/transforms.py
Normal file
@@ -0,0 +1,5 @@
|
||||
import torch
|
||||
|
||||
|
||||
def gaussian(distance, variance):
|
||||
return torch.exp(-(distance * distance) / (2 * variance))
|
@@ -1 +1,7 @@
|
||||
"""ProtoTorch modules."""
|
||||
|
||||
from .competitions import *
|
||||
from .initializers import *
|
||||
from .pooling import *
|
||||
from .transformations import *
|
||||
from .wrappers import LambdaLayer, LossLayer
|
||||
|
41
prototorch/modules/competitions.py
Normal file
41
prototorch/modules/competitions.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""ProtoTorch Competition Modules."""
|
||||
|
||||
import torch
|
||||
from prototorch.functions.competitions import knnc, wtac
|
||||
|
||||
|
||||
class WTAC(torch.nn.Module):
|
||||
"""Winner-Takes-All-Competition Layer.
|
||||
|
||||
Thin wrapper over the `wtac` function.
|
||||
|
||||
"""
|
||||
def forward(self, distances, labels):
|
||||
return wtac(distances, labels)
|
||||
|
||||
|
||||
class LTAC(torch.nn.Module):
|
||||
"""Loser-Takes-All-Competition Layer.
|
||||
|
||||
Thin wrapper over the `wtac` function.
|
||||
|
||||
"""
|
||||
def forward(self, probs, labels):
|
||||
return wtac(-1.0 * probs, labels)
|
||||
|
||||
|
||||
class KNNC(torch.nn.Module):
|
||||
"""K-Nearest-Neighbors-Competition.
|
||||
|
||||
Thin wrapper over the `knnc` function.
|
||||
|
||||
"""
|
||||
def __init__(self, k=1, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.k = k
|
||||
|
||||
def forward(self, distances, labels):
|
||||
return knnc(distances, labels, k=self.k)
|
||||
|
||||
def extra_repr(self):
|
||||
return f"k: {self.k}"
|
61
prototorch/modules/initializers.py
Normal file
61
prototorch/modules/initializers.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""ProtoTroch Module Initializers."""
|
||||
|
||||
import torch
|
||||
|
||||
|
||||
# Transformations
|
||||
class MatrixInitializer(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
...
|
||||
|
||||
def generate(self, shape):
|
||||
raise NotImplementedError("Subclasses should implement this!")
|
||||
|
||||
|
||||
class ZerosInitializer(MatrixInitializer):
|
||||
def generate(self, shape):
|
||||
return torch.zeros(shape)
|
||||
|
||||
|
||||
class OnesInitializer(MatrixInitializer):
|
||||
def __init__(self, scale=1.0):
|
||||
super().__init__()
|
||||
self.scale = scale
|
||||
|
||||
def generate(self, shape):
|
||||
return torch.ones(shape) * self.scale
|
||||
|
||||
|
||||
class UniformInitializer(MatrixInitializer):
|
||||
def __init__(self, minimum=0.0, maximum=1.0, scale=1.0):
|
||||
super().__init__()
|
||||
self.minimum = minimum
|
||||
self.maximum = maximum
|
||||
self.scale = scale
|
||||
|
||||
def generate(self, shape):
|
||||
return torch.ones(shape).uniform_(self.minimum,
|
||||
self.maximum) * self.scale
|
||||
|
||||
|
||||
class DataAwareInitializer(MatrixInitializer):
|
||||
def __init__(self, data, transform=torch.nn.Identity()):
|
||||
super().__init__()
|
||||
self.data = data
|
||||
self.transform = transform
|
||||
|
||||
def __del__(self):
|
||||
del self.data
|
||||
|
||||
|
||||
class EigenVectorInitializer(DataAwareInitializer):
|
||||
def generate(self, shape):
|
||||
# TODO
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
# Aliases
|
||||
EV = EigenVectorInitializer
|
||||
Random = RandomInitializer = UniformInitializer
|
||||
Zeros = ZerosInitializer
|
||||
Ones = OnesInitializer
|
@@ -1,7 +1,6 @@
|
||||
"""ProtoTorch losses."""
|
||||
|
||||
import torch
|
||||
|
||||
from prototorch.functions.activations import get_activation
|
||||
from prototorch.functions.losses import glvq_loss
|
||||
|
||||
@@ -21,8 +20,8 @@ class GLVQLoss(torch.nn.Module):
|
||||
|
||||
|
||||
class NeuralGasEnergy(torch.nn.Module):
|
||||
def __init__(self, lm):
|
||||
super().__init__()
|
||||
def __init__(self, lm, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.lm = lm
|
||||
|
||||
def forward(self, d):
|
||||
@@ -38,3 +37,22 @@ class NeuralGasEnergy(torch.nn.Module):
|
||||
@staticmethod
|
||||
def _nghood_fn(rankings, lm):
|
||||
return torch.exp(-rankings / lm)
|
||||
|
||||
|
||||
class GrowingNeuralGasEnergy(NeuralGasEnergy):
|
||||
def __init__(self, topology_layer, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.topology_layer = topology_layer
|
||||
|
||||
@staticmethod
|
||||
def _nghood_fn(rankings, topology):
|
||||
winner = rankings[:, 0]
|
||||
|
||||
weights = torch.zeros_like(rankings, dtype=torch.float)
|
||||
weights[torch.arange(rankings.shape[0]), winner] = 1.0
|
||||
|
||||
neighbours = topology.get_neighbours(winner)
|
||||
|
||||
weights[neighbours] = 0.1
|
||||
|
||||
return weights
|
||||
|
31
prototorch/modules/pooling.py
Normal file
31
prototorch/modules/pooling.py
Normal file
@@ -0,0 +1,31 @@
|
||||
"""ProtoTorch Pooling Modules."""
|
||||
|
||||
import torch
|
||||
from prototorch.functions.pooling import (stratified_max_pooling,
|
||||
stratified_min_pooling,
|
||||
stratified_prod_pooling,
|
||||
stratified_sum_pooling)
|
||||
|
||||
|
||||
class StratifiedSumPooling(torch.nn.Module):
|
||||
"""Thin wrapper over the `stratified_sum_pooling` function."""
|
||||
def forward(self, values, labels):
|
||||
return stratified_sum_pooling(values, labels)
|
||||
|
||||
|
||||
class StratifiedProdPooling(torch.nn.Module):
|
||||
"""Thin wrapper over the `stratified_prod_pooling` function."""
|
||||
def forward(self, values, labels):
|
||||
return stratified_prod_pooling(values, labels)
|
||||
|
||||
|
||||
class StratifiedMinPooling(torch.nn.Module):
|
||||
"""Thin wrapper over the `stratified_min_pooling` function."""
|
||||
def forward(self, values, labels):
|
||||
return stratified_min_pooling(values, labels)
|
||||
|
||||
|
||||
class StratifiedMaxPooling(torch.nn.Module):
|
||||
"""Thin wrapper over the `stratified_max_pooling` function."""
|
||||
def forward(self, values, labels):
|
||||
return stratified_max_pooling(values, labels)
|
49
prototorch/modules/transformations.py
Normal file
49
prototorch/modules/transformations.py
Normal file
@@ -0,0 +1,49 @@
|
||||
"""ProtoTorch Transformation Layers."""
|
||||
|
||||
import torch
|
||||
from torch.nn.parameter import Parameter
|
||||
|
||||
from .initializers import MatrixInitializer
|
||||
|
||||
|
||||
def _precheck_initializer(initializer):
|
||||
if not isinstance(initializer, MatrixInitializer):
|
||||
emsg = f"`initializer` has to be some subtype of " \
|
||||
f"{MatrixInitializer}. " \
|
||||
f"You have provided: {initializer=} instead."
|
||||
raise TypeError(emsg)
|
||||
|
||||
|
||||
class Omega(torch.nn.Module):
|
||||
"""The Omega mapping used in GMLVQ."""
|
||||
def __init__(self,
|
||||
num_replicas=1,
|
||||
input_dim=None,
|
||||
latent_dim=None,
|
||||
initializer=None,
|
||||
*,
|
||||
initialized_weights=None):
|
||||
super().__init__()
|
||||
|
||||
if initialized_weights is not None:
|
||||
self._register_weights(initialized_weights)
|
||||
else:
|
||||
if num_replicas == 1:
|
||||
shape = (input_dim, latent_dim)
|
||||
else:
|
||||
shape = (num_replicas, input_dim, latent_dim)
|
||||
self._initialize_weights(shape, initializer)
|
||||
|
||||
def _register_weights(self, weights):
|
||||
self.register_parameter("_omega", Parameter(weights))
|
||||
|
||||
def _initialize_weights(self, shape, initializer):
|
||||
_precheck_initializer(initializer)
|
||||
_omega = initializer.generate(shape)
|
||||
self._register_weights(_omega)
|
||||
|
||||
def forward(self):
|
||||
return self._omega
|
||||
|
||||
def extra_repr(self):
|
||||
return f"(omega): (shape: {tuple(self._omega.shape)})"
|
36
prototorch/modules/wrappers.py
Normal file
36
prototorch/modules/wrappers.py
Normal file
@@ -0,0 +1,36 @@
|
||||
"""ProtoTorch Wrappers."""
|
||||
|
||||
import torch
|
||||
|
||||
|
||||
class LambdaLayer(torch.nn.Module):
|
||||
def __init__(self, fn, name=None):
|
||||
super().__init__()
|
||||
self.fn = fn
|
||||
self.name = name or fn.__name__ # lambda fns get <lambda>
|
||||
|
||||
def forward(self, *args, **kwargs):
|
||||
return self.fn(*args, **kwargs)
|
||||
|
||||
def extra_repr(self):
|
||||
return self.name
|
||||
|
||||
|
||||
class LossLayer(torch.nn.modules.loss._Loss):
|
||||
def __init__(self,
|
||||
fn,
|
||||
name=None,
|
||||
size_average=None,
|
||||
reduce=None,
|
||||
reduction: str = "mean") -> None:
|
||||
super().__init__(size_average=size_average,
|
||||
reduce=reduce,
|
||||
reduction=reduction)
|
||||
self.fn = fn
|
||||
self.name = name or fn.__name__ # lambda fns get <lambda>
|
||||
|
||||
def forward(self, *args, **kwargs):
|
||||
return self.fn(*args, **kwargs)
|
||||
|
||||
def extra_repr(self):
|
||||
return self.name
|
@@ -1,243 +0,0 @@
|
||||
"""Utilities that provide various small functionalities."""
|
||||
|
||||
import os
|
||||
import pickle
|
||||
import sys
|
||||
from time import time
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
|
||||
|
||||
def progressbar(title, value, end, bar_width=20):
|
||||
percent = float(value) / end
|
||||
arrow = "=" * int(round(percent * bar_width) - 1) + ">"
|
||||
spaces = "." * (bar_width - len(arrow))
|
||||
sys.stdout.write("\r{}: [{}] {}%".format(title, arrow + spaces,
|
||||
int(round(percent * 100))))
|
||||
sys.stdout.flush()
|
||||
if percent == 1.0:
|
||||
print()
|
||||
|
||||
|
||||
def prettify_string(inputs, start="", sep=" ", end="\n"):
|
||||
outputs = start + " ".join(inputs.split()) + end
|
||||
return outputs
|
||||
|
||||
|
||||
def pretty_print(inputs):
|
||||
print(prettify_string(inputs))
|
||||
|
||||
|
||||
def writelog(self, *logs, logdir="./logs", logfile="run.txt"):
|
||||
f = os.path.join(logdir, logfile)
|
||||
with open(f, "a+") as fh:
|
||||
for log in logs:
|
||||
fh.write(log)
|
||||
fh.write("\n")
|
||||
|
||||
|
||||
def start_tensorboard(self, logdir="./logs"):
|
||||
cmd = f"tensorboard --logdir={logdir} --port=6006"
|
||||
os.system(cmd)
|
||||
|
||||
|
||||
def make_directory(save_dir):
|
||||
if not os.path.exists(save_dir):
|
||||
print(f"Making directory {save_dir}.")
|
||||
os.mkdir(save_dir)
|
||||
|
||||
|
||||
def make_gif(filenames, duration, output_file=None):
|
||||
try:
|
||||
import imageio
|
||||
except ModuleNotFoundError as e:
|
||||
print("Please install Protoflow with [other] extra requirements.")
|
||||
raise (e)
|
||||
|
||||
images = list()
|
||||
for filename in filenames:
|
||||
images.append(imageio.imread(filename))
|
||||
if not output_file:
|
||||
output_file = f"makegif.gif"
|
||||
if images:
|
||||
imageio.mimwrite(output_file, images, duration=duration)
|
||||
|
||||
|
||||
def gif_from_dir(directory,
|
||||
duration,
|
||||
prefix="",
|
||||
output_file=None,
|
||||
verbose=True):
|
||||
images = os.listdir(directory)
|
||||
if verbose:
|
||||
print(f"Making gif from {len(images)} images under {directory}.")
|
||||
filenames = list()
|
||||
# Sort images
|
||||
images = sorted(
|
||||
images,
|
||||
key=lambda img: int(os.path.splitext(img)[0].replace(prefix, "")))
|
||||
for image in images:
|
||||
fname = os.path.join(directory, image)
|
||||
filenames.append(fname)
|
||||
if not output_file:
|
||||
output_file = os.path.join(directory, "makegif.gif")
|
||||
make_gif(filenames=filenames, duration=duration, output_file=output_file)
|
||||
|
||||
|
||||
def accuracy_score(y_true, y_pred):
|
||||
accuracy = np.sum(y_true == y_pred)
|
||||
normalized_acc = accuracy / float(len(y_true))
|
||||
return normalized_acc
|
||||
|
||||
|
||||
def predict_and_score(clf,
|
||||
x_test,
|
||||
y_test,
|
||||
verbose=False,
|
||||
title="Test accuracy"):
|
||||
y_pred = clf.predict(x_test)
|
||||
accuracy = np.sum(y_test == y_pred)
|
||||
normalized_acc = accuracy / float(len(y_test))
|
||||
if verbose:
|
||||
print(f"{title}: {normalized_acc * 100:06.04f}%")
|
||||
return normalized_acc
|
||||
|
||||
|
||||
def remove_nan_rows(arr):
|
||||
"""Remove all rows with `nan` values in `arr`."""
|
||||
mask = np.isnan(arr).any(axis=1)
|
||||
return arr[~mask]
|
||||
|
||||
|
||||
def remove_nan_cols(arr):
|
||||
"""Remove all columns with `nan` values in `arr`."""
|
||||
mask = np.isnan(arr).any(axis=0)
|
||||
return arr[~mask]
|
||||
|
||||
|
||||
def replace_in(arr, replacement_dict, inplace=False):
|
||||
"""Replace the keys found in `arr` with the values from
|
||||
the `replacement_dict`.
|
||||
"""
|
||||
if inplace:
|
||||
new_arr = arr
|
||||
else:
|
||||
import copy
|
||||
|
||||
new_arr = copy.deepcopy(arr)
|
||||
for k, v in replacement_dict.items():
|
||||
new_arr[arr == k] = v
|
||||
return new_arr
|
||||
|
||||
|
||||
def train_test_split(data, train=0.7, val=0.15, shuffle=None, return_xy=False):
|
||||
"""Split a classification dataset in such a way so as to
|
||||
preserve the class distribution in subsamples of the dataset.
|
||||
"""
|
||||
if train + val > 1.0:
|
||||
raise ValueError("Invalid split values for train and val.")
|
||||
Y = data[:, -1]
|
||||
labels = set(Y)
|
||||
hist = dict()
|
||||
for l in labels:
|
||||
data_l = data[Y == l]
|
||||
nl = len(data_l)
|
||||
nl_train = int(nl * train)
|
||||
nl_val = int(nl * val)
|
||||
nl_test = nl - (nl_train + nl_val)
|
||||
hist[l] = (nl_train, nl_val, nl_test)
|
||||
|
||||
train_data = list()
|
||||
val_data = list()
|
||||
test_data = list()
|
||||
for l, (nl_train, nl_val, nl_test) in hist.items():
|
||||
data_l = data[Y == l]
|
||||
if shuffle:
|
||||
np.random.shuffle(data_l)
|
||||
train_l = data_l[:nl_train]
|
||||
val_l = data_l[nl_train:nl_train + nl_val]
|
||||
test_l = data_l[nl_train + nl_val:nl_train + nl_val + nl_test]
|
||||
train_data.append(train_l)
|
||||
val_data.append(val_l)
|
||||
test_data.append(test_l)
|
||||
|
||||
def _squash(data_list):
|
||||
data = np.array(data_list[0])
|
||||
for item in data_list[1:]:
|
||||
data = np.vstack((data, np.array(item)))
|
||||
return data
|
||||
|
||||
train_data = _squash(train_data)
|
||||
if val_data:
|
||||
val_data = _squash(val_data)
|
||||
if test_data:
|
||||
test_data = _squash(test_data)
|
||||
if return_xy:
|
||||
x_train = train_data[:, :-1]
|
||||
y_train = train_data[:, -1]
|
||||
x_val = val_data[:, :-1]
|
||||
y_val = val_data[:, -1]
|
||||
x_test = test_data[:, :-1]
|
||||
y_test = test_data[:, -1]
|
||||
return (x_train, y_train), (x_val, y_val), (x_test, y_test)
|
||||
return train_data, val_data, test_data
|
||||
|
||||
|
||||
def class_histogram(data, title="Untitled"):
|
||||
plt.figure(title)
|
||||
plt.clf()
|
||||
plt.title(title)
|
||||
dist, counts = np.unique(data[:, -1], return_counts=True)
|
||||
plt.bar(dist, counts)
|
||||
plt.xticks(dist)
|
||||
print("Call matplotlib.pyplot.show() to see the plot.")
|
||||
|
||||
|
||||
def ntimer(n=10):
|
||||
"""Wraps a function which wraps another function to time it."""
|
||||
if n < 1:
|
||||
raise (Exception(f"Invalid n = {n} given."))
|
||||
|
||||
def timer(func):
|
||||
"""Wraps `func` with a timer and returns the wrapped `func`."""
|
||||
def wrapper(*args, **kwargs):
|
||||
rv = None
|
||||
before = time()
|
||||
for _ in range(n):
|
||||
rv = func(*args, **kwargs)
|
||||
after = time()
|
||||
elapsed = after - before
|
||||
print(f"Elapsed: {elapsed*1e3:02.02f} ms")
|
||||
return rv
|
||||
|
||||
return wrapper
|
||||
|
||||
return timer
|
||||
|
||||
|
||||
def memoize(verbose=True):
|
||||
"""Wraps a function which wraps another function that memoizes."""
|
||||
def memoizer(func):
|
||||
"""Memoize (cache) return values of `func`.
|
||||
Wraps `func` and returns the wrapped `func` so that `func`
|
||||
is executed when the results are not available in the cache.
|
||||
"""
|
||||
cache = {}
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
t = (pickle.dumps(args), pickle.dumps(kwargs))
|
||||
if t not in cache:
|
||||
if verbose:
|
||||
print(f"Adding NEW rv {func.__name__}{args}{kwargs} "
|
||||
"to cache.")
|
||||
cache[t] = func(*args, **kwargs)
|
||||
else:
|
||||
if verbose:
|
||||
print(f"Using OLD rv {func.__name__}{args}{kwargs} "
|
||||
"from cache.")
|
||||
return cache[t]
|
||||
|
||||
return wrapper
|
||||
|
||||
return memoizer
|
@@ -5,7 +5,7 @@ import unittest
|
||||
import numpy as np
|
||||
import torch
|
||||
from prototorch.functions import (activations, competitions, distances,
|
||||
initializers, losses)
|
||||
initializers, losses, pooling)
|
||||
|
||||
|
||||
class TestActivations(unittest.TestCase):
|
||||
@@ -104,10 +104,28 @@ class TestCompetitions(unittest.TestCase):
|
||||
decimal=5)
|
||||
self.assertIsNone(mismatch)
|
||||
|
||||
def test_knnc_k1(self):
|
||||
d = torch.tensor([[2.0, 3.0, 1.99, 3.01], [2.0, 3.0, 2.01, 3.0]])
|
||||
labels = torch.tensor([0, 1, 2, 3])
|
||||
actual = competitions.knnc(d, labels, k=1)
|
||||
desired = torch.tensor([2, 0])
|
||||
mismatch = np.testing.assert_array_almost_equal(actual,
|
||||
desired,
|
||||
decimal=5)
|
||||
self.assertIsNone(mismatch)
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
|
||||
class TestPooling(unittest.TestCase):
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
def test_stratified_min(self):
|
||||
d = torch.tensor([[1.0, 0.0, 2.0, 3.0], [9.0, 8.0, 0, 1]])
|
||||
labels = torch.tensor([0, 0, 1, 2])
|
||||
actual = competitions.stratified_min(d, labels)
|
||||
actual = pooling.stratified_min_pooling(d, labels)
|
||||
desired = torch.tensor([[0.0, 2.0, 3.0], [8.0, 0.0, 1.0]])
|
||||
mismatch = np.testing.assert_array_almost_equal(actual,
|
||||
desired,
|
||||
@@ -118,28 +136,70 @@ class TestCompetitions(unittest.TestCase):
|
||||
d = torch.tensor([[1.0, 0.0, 2.0, 3.0], [9.0, 8.0, 0, 1]])
|
||||
labels = torch.tensor([0, 0, 1, 2])
|
||||
labels = torch.eye(3)[labels]
|
||||
actual = competitions.stratified_min(d, labels)
|
||||
actual = pooling.stratified_min_pooling(d, labels)
|
||||
desired = torch.tensor([[0.0, 2.0, 3.0], [8.0, 0.0, 1.0]])
|
||||
mismatch = np.testing.assert_array_almost_equal(actual,
|
||||
desired,
|
||||
decimal=5)
|
||||
self.assertIsNone(mismatch)
|
||||
|
||||
def test_stratified_min_simple(self):
|
||||
def test_stratified_min_trivial(self):
|
||||
d = torch.tensor([[0.0, 2.0, 3.0], [8.0, 0, 1]])
|
||||
labels = torch.tensor([0, 1, 2])
|
||||
actual = competitions.stratified_min(d, labels)
|
||||
actual = pooling.stratified_min_pooling(d, labels)
|
||||
desired = torch.tensor([[0.0, 2.0, 3.0], [8.0, 0.0, 1.0]])
|
||||
mismatch = np.testing.assert_array_almost_equal(actual,
|
||||
desired,
|
||||
decimal=5)
|
||||
self.assertIsNone(mismatch)
|
||||
|
||||
def test_knnc_k1(self):
|
||||
d = torch.tensor([[2.0, 3.0, 1.99, 3.01], [2.0, 3.0, 2.01, 3.0]])
|
||||
labels = torch.tensor([0, 1, 2, 3])
|
||||
actual = competitions.knnc(d, labels, k=1)
|
||||
desired = torch.tensor([2, 0])
|
||||
def test_stratified_max(self):
|
||||
d = torch.tensor([[1.0, 0.0, 2.0, 3.0, 9.0], [9.0, 8.0, 0, 1, 7.0]])
|
||||
labels = torch.tensor([0, 0, 3, 2, 0])
|
||||
actual = pooling.stratified_max_pooling(d, labels)
|
||||
desired = torch.tensor([[9.0, 3.0, 2.0], [9.0, 1.0, 0.0]])
|
||||
mismatch = np.testing.assert_array_almost_equal(actual,
|
||||
desired,
|
||||
decimal=5)
|
||||
self.assertIsNone(mismatch)
|
||||
|
||||
def test_stratified_max_one_hot(self):
|
||||
d = torch.tensor([[1.0, 0.0, 2.0, 3.0, 9.0], [9.0, 8.0, 0, 1, 7.0]])
|
||||
labels = torch.tensor([0, 0, 2, 1, 0])
|
||||
labels = torch.nn.functional.one_hot(labels, num_classes=3)
|
||||
actual = pooling.stratified_max_pooling(d, labels)
|
||||
desired = torch.tensor([[9.0, 3.0, 2.0], [9.0, 1.0, 0.0]])
|
||||
mismatch = np.testing.assert_array_almost_equal(actual,
|
||||
desired,
|
||||
decimal=5)
|
||||
self.assertIsNone(mismatch)
|
||||
|
||||
def test_stratified_sum(self):
|
||||
d = torch.tensor([[1.0, 0.0, 2.0, 3.0], [9.0, 8.0, 0, 1]])
|
||||
labels = torch.LongTensor([0, 0, 1, 2])
|
||||
actual = pooling.stratified_sum_pooling(d, labels)
|
||||
desired = torch.tensor([[1.0, 2.0, 3.0], [17.0, 0.0, 1.0]])
|
||||
mismatch = np.testing.assert_array_almost_equal(actual,
|
||||
desired,
|
||||
decimal=5)
|
||||
self.assertIsNone(mismatch)
|
||||
|
||||
def test_stratified_sum_one_hot(self):
|
||||
d = torch.tensor([[1.0, 0.0, 2.0, 3.0], [9.0, 8.0, 0, 1]])
|
||||
labels = torch.tensor([0, 0, 1, 2])
|
||||
labels = torch.eye(3)[labels]
|
||||
actual = pooling.stratified_sum_pooling(d, labels)
|
||||
desired = torch.tensor([[1.0, 2.0, 3.0], [17.0, 0.0, 1.0]])
|
||||
mismatch = np.testing.assert_array_almost_equal(actual,
|
||||
desired,
|
||||
decimal=5)
|
||||
self.assertIsNone(mismatch)
|
||||
|
||||
def test_stratified_prod(self):
|
||||
d = torch.tensor([[1.0, 0.0, 2.0, 3.0, 9.0], [9.0, 8.0, 0, 1, 7.0]])
|
||||
labels = torch.tensor([0, 0, 3, 2, 0])
|
||||
actual = pooling.stratified_prod_pooling(d, labels)
|
||||
desired = torch.tensor([[0.0, 3.0, 2.0], [504.0, 1.0, 0.0]])
|
||||
mismatch = np.testing.assert_array_almost_equal(actual,
|
||||
desired,
|
||||
decimal=5)
|
||||
|
Reference in New Issue
Block a user