43 Commits

Author SHA1 Message Date
Alexander Engelsberger
09c80e2d54 Merge branch 'master' into kernel_distances 2021-05-11 16:10:56 +02:00
Alexander Engelsberger
bc20acd63b Bump version: 0.4.1 → 0.4.2 2021-05-11 16:08:37 +02:00
Jensun Ravichandran
7bb93f027a Support for unequal prototype distributions 2021-05-11 16:11:11 +02:00
Alexander Engelsberger
a864cf5d4d Bump version: 0.4.0 → 0.4.1 2021-05-11 13:37:54 +02:00
Alexander Engelsberger
2175f524e8 Update bug report issues template. 2021-05-11 13:35:38 +02:00
Alexander Engelsberger
c1c21e92df Add LVQ 1 and LVQ 2.1 loss functions. 2021-05-11 13:25:10 +02:00
Alexander Engelsberger
2b676ee06e Fix travis.yml. 2021-05-10 17:15:05 +02:00
Jensun Ravichandran
dda2f1d779 Clean-up CI setup 2021-05-10 16:37:43 +02:00
Alexander Engelsberger
3a8388e24f Version 0.4.0 2021-05-10 15:13:58 +02:00
Alexander Engelsberger
a9eef8ae6d Bump version: 0.3.1 → 0.4.0 2021-05-10 15:10:07 +02:00
Alexander Engelsberger
ac3091d8da Update Bumpversion config 2021-05-10 15:09:38 +02:00
Jensun Ravichandran
ce3991de94 Accept torch datasets to initialize components 2021-05-07 15:19:22 +02:00
Jensun Ravichandran
47b4b9bcb1 Expose prototorch.datasets 2021-05-07 15:18:33 +02:00
Alexander Engelsberger
19475d7e2b Update Tecator dataset storage id. 2021-05-06 18:42:36 +02:00
Jensun Ravichandran
269eb8ba25 Update unittests to reflect recent changes 2021-05-04 21:17:07 +02:00
Jensun Ravichandran
b06ded683d Update functions/activations.py 2021-05-04 20:55:49 +02:00
Jensun Ravichandran
466e9bde6b Refactor functions/losses.py 2021-05-04 20:36:48 +02:00
Alexander Engelsberger
fc7d64aaea Use Github Default Issue Templates 2021-05-04 11:20:15 +02:00
Jensun Ravichandran
9a7d3192c0 [BUG] GLVQ training is unstable
GLVQ training is unstable when prototypes are initialized exactly to datapoints
without small shifts. Perhaps because of zero distances?
2021-04-29 19:25:28 +02:00
Jensun Ravichandran
e686adbea1 Add spiral dataset 2021-04-29 19:15:35 +02:00
Jensun Ravichandran
b7d53aa5f1 Update initializers 2021-04-29 19:15:27 +02:00
Jensun Ravichandran
9b663477fd Update components 2021-04-29 18:06:26 +02:00
Jensun Ravichandran
a70166280a Update readme 2021-04-29 14:31:36 +02:00
Jensun Ravichandran
a083c4b276 Merge pull request #2 from si-cim/new-components
Create Component and initializer classes.
2021-04-29 13:25:58 +02:00
Alexander Engelsberger
65e0637b17 Fix RBF Kernel Dimensions. 2021-04-27 17:58:05 +02:00
Alexander Engelsberger
209f9e641b Fix kernel dimensions. 2021-04-27 16:56:56 +02:00
Alexander Engelsberger
ba537fe1d5 Automatic formatting. 2021-04-27 15:43:10 +02:00
Alexander Engelsberger
b0cd2de18e Batch Kernel. [Ineficient] 2021-04-27 15:38:34 +02:00
Alexander Engelsberger
7d353f5b5a Kernel Distances. 2021-04-27 12:06:15 +02:00
Alexander Engelsberger
40751aa50a Create Component and initializer classes. 2021-04-26 20:49:50 +02:00
Alexander Engelsberger
7c30ffe2c7 Automatic Formatting. 2021-04-23 17:25:23 +02:00
Alexander Engelsberger
e1d56595c1 Add NumpyDataset. 2021-04-23 17:24:59 +02:00
Alexander Engelsberger
4540c8848e Add neural gas energy function as loss. 2021-04-23 17:24:59 +02:00
Alexander Engelsberger
c88f288d12 Copy utilities for visualization from Protoflow. 2021-04-23 17:24:59 +02:00
Jensun Ravichandran
e2918dffed Add euclidean_distance_v2 2021-04-22 16:55:50 +02:00
Jensun Ravichandran
7d9dfc27ee Add similarities file 2021-04-22 13:12:19 +02:00
Alexander Engelsberger
ae75b9ebf7 Bump version: 0.2.0 → 0.3.0-dev0 2021-04-21 14:57:45 +02:00
Alexander Engelsberger
34973808b8 Improve documentation. 2021-04-21 14:55:54 +02:00
Alexander Engelsberger
c42df6e203 Merge version 0.2.0 into feature/plugin-architecture. 2021-04-19 16:44:26 +02:00
Jensun Ravichandran
101b50f4e6 Update prototypes.py
Changes:
1. Change single-quotes to double-quotes.
2021-04-15 12:35:06 +02:00
Alexander Engelsberger
cd9303267b Use git version. 2021-04-14 13:48:00 +02:00
Alexander Engelsberger
599dfc3fda Fix issue with plugin subpackage import. 2021-04-13 22:55:49 +02:00
Alexander Engelsberger
5b2ab34232 Add plugin loader. 2021-04-13 12:36:22 +02:00
46 changed files with 1718 additions and 487 deletions

View File

@@ -1,20 +1,11 @@
[bumpversion] [bumpversion]
current_version = 0.2.0 current_version = 0.4.2
commit = True commit = True
tag = True tag = True
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-(?P<release>[a-z]+)(?P<build>\d+))? parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)
serialize = serialize =
{major}.{minor}.{patch}-{release}{build}
{major}.{minor}.{patch} {major}.{minor}.{patch}
[bumpversion:part:release]
optional_value = prod
first_value = dev
values =
dev
rc
prod
[bumpversion:file:setup.py] [bumpversion:file:setup.py]
[bumpversion:file:./prototorch/__init__.py] [bumpversion:file:./prototorch/__init__.py]

31
.github/ISSUE_TEMPLATE/bug_report.md vendored Normal file
View File

@@ -0,0 +1,31 @@
---
name: Bug report
about: Create a report to help us improve
title: ''
labels: ''
assignees: ''
---
**Describe the bug**
A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior:
1. Install Prototorch by running '...'
2. Run script '...'
3. See errors
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Desktop (please complete the following information):**
- OS: [e.g. Ubuntu 20.10]
- Prototorch Version: [e.g. v0.4.0]
- Python Version: [e.g. 3.9.5]
**Additional context**
Add any other context about the problem here.

View File

@@ -0,0 +1,20 @@
---
name: Feature request
about: Suggest an idea for this project
title: ''
labels: ''
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.

View File

@@ -23,10 +23,7 @@ jobs:
- name: Install dependencies - name: Install dependencies
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
pip install . pip install .[all]
- name: Install extras
run: |
pip install -r requirements.txt
- name: Lint with flake8 - name: Lint with flake8
run: | run: |
pip install flake8 pip install flake8

View File

@@ -5,10 +5,8 @@ python: 3.8
cache: cache:
directories: directories:
- "./tests/artifacts" - "./tests/artifacts"
# - "$HOME/.prototorch/datasets"
install: install:
- pip install . --progress-bar off - pip install .[all] --progress-bar off
- pip install -r requirements.txt
# Generate code coverage report # Generate code coverage report
script: script:
@@ -25,8 +23,8 @@ deploy:
password: password:
secure: rVQNCxKIuiEtMz4zLSsjdt6spG7cf3miKN5eqjxZfcELALHxAV4w/+CideQObOn3u9emmxb87R9XWKcogqK2MXqnuIcY4mWg7HUqaip1bhz/4YiVXjFILcG6itjX9IUF1DrtjKKRk6xryucSZcEB7yTcXz1hQTb768KWlLlKOVTRNwr7j07eyeafexz/L2ANQCqfOZgS4b0k2AMeDBRPykPULtyeneEFlb6MJZ2MxeqtTNVK4b/6VsQSZwQ9jGJNGWonn5Y287gHmzvEcymSJogTe2taxGBWawPnOsibws9v88DEAHdsEvYdnqEE3hFl0R5La2Lkjd8CjNUYegxioQ57i3WNS3iksq10ZLMCbH29lb9YPG7r6Y8z9H85735kV2gKLdf+o7SPS03TRgjSZKN6pn4pLG0VWkxC6l8VfLuJnRNTHX4g6oLQwOWIBbxybn9Zw/yLjAXAJNgBHt5v86H6Jfi1Va4AhEV6itkoH9IM3/uDhrE/mmorqyVled/CPNtBWNTyoDevLNxMUDnbuhH0JzLki+VOjKnTxEfq12JB8X9faFG5BjvU9oGjPPewrp5DGGzg6KDra7dikciWUxE1eTFFDhMyG1CFGcjKlDvlAGHyI6Kih35egGUeq+N/pitr2330ftM9Dm4rWpOTxPyCI89bXKssx/MgmLG7kSM= secure: rVQNCxKIuiEtMz4zLSsjdt6spG7cf3miKN5eqjxZfcELALHxAV4w/+CideQObOn3u9emmxb87R9XWKcogqK2MXqnuIcY4mWg7HUqaip1bhz/4YiVXjFILcG6itjX9IUF1DrtjKKRk6xryucSZcEB7yTcXz1hQTb768KWlLlKOVTRNwr7j07eyeafexz/L2ANQCqfOZgS4b0k2AMeDBRPykPULtyeneEFlb6MJZ2MxeqtTNVK4b/6VsQSZwQ9jGJNGWonn5Y287gHmzvEcymSJogTe2taxGBWawPnOsibws9v88DEAHdsEvYdnqEE3hFl0R5La2Lkjd8CjNUYegxioQ57i3WNS3iksq10ZLMCbH29lb9YPG7r6Y8z9H85735kV2gKLdf+o7SPS03TRgjSZKN6pn4pLG0VWkxC6l8VfLuJnRNTHX4g6oLQwOWIBbxybn9Zw/yLjAXAJNgBHt5v86H6Jfi1Va4AhEV6itkoH9IM3/uDhrE/mmorqyVled/CPNtBWNTyoDevLNxMUDnbuhH0JzLki+VOjKnTxEfq12JB8X9faFG5BjvU9oGjPPewrp5DGGzg6KDra7dikciWUxE1eTFFDhMyG1CFGcjKlDvlAGHyI6Kih35egGUeq+N/pitr2330ftM9Dm4rWpOTxPyCI89bXKssx/MgmLG7kSM=
on: on:
tags: true tags: true
skip_existing: true skip_existing: true
# The password is encrypted with: # The password is encrypted with:
# `cd prototorch && travis encrypt your-pypi-api-token --add deploy.password` # `cd prototorch && travis encrypt your-pypi-api-token --add deploy.password`

View File

@@ -31,15 +31,15 @@ To also install the extras, use
pip install -U prototorch[all] pip install -U prototorch[all]
``` ```
*Note: If you're using [ZSH](https://www.zsh.org/), the square brackets `[ ]` *Note: If you're using [ZSH](https://www.zsh.org/) (which is also the default
have to be escaped like so: `\[\]`, making the install command `pip install -U shell on MacOS now), the square brackets `[ ]` have to be escaped like so:
prototorch\[all\]`.* `\[\]`, making the install command `pip install -U prototorch\[all\]`.*
To install the bleeding-edge features and improvements: To install the bleeding-edge features and improvements:
```bash ```bash
git clone https://github.com/si-cim/prototorch.git git clone https://github.com/si-cim/prototorch.git
git checkout dev
cd prototorch cd prototorch
git checkout dev
pip install -e .[all] pip install -e .[all]
``` ```

View File

@@ -11,8 +11,26 @@ Datasets
Functions Functions
-------------------------------------- --------------------------------------
.. automodule:: prototorch.functions
**Dimensions:**
- :math:`B` ... Batch size
- :math:`P` ... Number of prototypes
- :math:`n_x` ... Data dimension for vectorial data
- :math:`n_w` ... Data dimension for vectorial prototypes
Activations
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. automodule:: prototorch.functions.activations
:members: :members:
:exclude-members: register_activation, get_activation
:undoc-members:
Distances
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. automodule:: prototorch.functions.distances
:members:
:exclude-members: sed
:undoc-members: :undoc-members:
Modules Modules

View File

@@ -12,9 +12,8 @@
# #
import os import os
import sys import sys
sys.path.insert(0, os.path.abspath("../../"))
import sphinx_rtd_theme sys.path.insert(0, os.path.abspath("../../"))
# -- Project information ----------------------------------------------------- # -- Project information -----------------------------------------------------
@@ -24,7 +23,7 @@ author = "Jensun Ravichandran"
# The full version, including alpha/beta/rc tags # The full version, including alpha/beta/rc tags
# #
release = "0.2.0" release = "0.4.2"
# -- General configuration --------------------------------------------------- # -- General configuration ---------------------------------------------------
@@ -128,15 +127,12 @@ latex_elements = {
# The paper size ("letterpaper" or "a4paper"). # The paper size ("letterpaper" or "a4paper").
# #
# "papersize": "letterpaper", # "papersize": "letterpaper",
# The font size ("10pt", "11pt" or "12pt"). # The font size ("10pt", "11pt" or "12pt").
# #
# "pointsize": "10pt", # "pointsize": "10pt",
# Additional stuff for the LaTeX preamble. # Additional stuff for the LaTeX preamble.
# #
# "preamble": "", # "preamble": "",
# Latex figure (float) alignment # Latex figure (float) alignment
# #
# "figure_align": "htbp", # "figure_align": "htbp",
@@ -146,15 +142,21 @@ latex_elements = {
# (source start file, target name, title, # (source start file, target name, title,
# author, documentclass [howto, manual, or own class]). # author, documentclass [howto, manual, or own class]).
latex_documents = [ latex_documents = [
(master_doc, "prototorch.tex", "ProtoTorch Documentation", (
"Jensun Ravichandran", "manual"), master_doc,
"prototorch.tex",
"ProtoTorch Documentation",
"Jensun Ravichandran",
"manual",
),
] ]
# -- Options for manual page output --------------------------------------- # -- Options for manual page output ---------------------------------------
# One entry per manual page. List of tuples # One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section). # (source start file, name, description, authors, manual section).
man_pages = [(master_doc, "ProtoTorch", "ProtoTorch Documentation", [author], 1)] man_pages = [(master_doc, "ProtoTorch", "ProtoTorch Documentation", [author],
1)]
# -- Options for Texinfo output ------------------------------------------- # -- Options for Texinfo output -------------------------------------------
@@ -162,9 +164,15 @@ man_pages = [(master_doc, "ProtoTorch", "ProtoTorch Documentation", [author], 1)
# (source start file, target name, title, author, # (source start file, target name, title, author,
# dir menu entry, description, category) # dir menu entry, description, category)
texinfo_documents = [ texinfo_documents = [
(master_doc, "prototorch", "ProtoTorch Documentation", author, "prototorch", (
"Prototype-based machine learning in PyTorch.", master_doc,
"Miscellaneous"), "prototorch",
"ProtoTorch Documentation",
author,
"prototorch",
"Prototype-based machine learning in PyTorch.",
"Miscellaneous",
),
] ]
# Example configuration for intersphinx: refer to the Python standard library. # Example configuration for intersphinx: refer to the Python standard library.

View File

@@ -3,13 +3,14 @@
import numpy as np import numpy as np
import torch import torch
from matplotlib import pyplot as plt from matplotlib import pyplot as plt
from sklearn.datasets import load_iris
from sklearn.preprocessing import StandardScaler
from torchinfo import summary
from prototorch.functions.competitions import wtac from prototorch.functions.competitions import wtac
from prototorch.functions.distances import euclidean_distance from prototorch.functions.distances import euclidean_distance
from prototorch.modules.losses import GLVQLoss from prototorch.modules.losses import GLVQLoss
from prototorch.modules.prototypes import Prototypes1D from prototorch.modules.prototypes import Prototypes1D
from sklearn.datasets import load_iris
from sklearn.preprocessing import StandardScaler
from torchinfo import summary
# Prepare and preprocess the data # Prepare and preprocess the data
scaler = StandardScaler() scaler = StandardScaler()
@@ -29,7 +30,8 @@ class Model(torch.nn.Module):
prototypes_per_class=3, prototypes_per_class=3,
nclasses=3, nclasses=3,
prototype_initializer="stratified_random", prototype_initializer="stratified_random",
data=[x_train, y_train]) data=[x_train, y_train],
)
def forward(self, x): def forward(self, x):
protos = self.proto_layer.prototypes protos = self.proto_layer.prototypes
@@ -61,8 +63,10 @@ for epoch in range(70):
with torch.no_grad(): with torch.no_grad():
pred = wtac(dis, plabels) pred = wtac(dis, plabels)
correct = pred.eq(y_in.view_as(pred)).sum().item() correct = pred.eq(y_in.view_as(pred)).sum().item()
acc = 100. * correct / len(x_train) acc = 100.0 * correct / len(x_train)
print(f"Epoch: {epoch + 1:03d} Loss: {loss.item():05.02f} Acc: {acc:05.02f}%") print(
f"Epoch: {epoch + 1:03d} Loss: {loss.item():05.02f} Acc: {acc:05.02f}%"
)
# Take a gradient descent step # Take a gradient descent step
optimizer.zero_grad() optimizer.zero_grad()
@@ -83,13 +87,15 @@ for epoch in range(70):
ax.set_ylabel("Data dimension 2") ax.set_ylabel("Data dimension 2")
cmap = "viridis" cmap = "viridis"
ax.scatter(x_train[:, 0], x_train[:, 1], c=y_train, edgecolor="k") ax.scatter(x_train[:, 0], x_train[:, 1], c=y_train, edgecolor="k")
ax.scatter(protos[:, 0], ax.scatter(
protos[:, 1], protos[:, 0],
c=plabels, protos[:, 1],
cmap=cmap, c=plabels,
edgecolor="k", cmap=cmap,
marker="D", edgecolor="k",
s=50) marker="D",
s=50,
)
# Paint decision regions # Paint decision regions
x = np.vstack((x_train, protos)) x = np.vstack((x_train, protos))

View File

@@ -20,11 +20,13 @@ class Model(torch.nn.Module):
"""GMLVQ model as a siamese network.""" """GMLVQ model as a siamese network."""
super().__init__() super().__init__()
x, y = train_data.data, train_data.targets x, y = train_data.data, train_data.targets
self.p1 = Prototypes1D(input_dim=100, self.p1 = Prototypes1D(
prototypes_per_class=2, input_dim=100,
nclasses=2, prototypes_per_class=2,
prototype_initializer="stratified_random", nclasses=2,
data=[x, y]) prototype_initializer="stratified_random",
data=[x, y],
)
self.omega = torch.nn.Linear(in_features=100, self.omega = torch.nn.Linear(in_features=100,
out_features=100, out_features=100,
bias=False) bias=False)

View File

@@ -13,8 +13,9 @@ import torch
import torch.nn as nn import torch.nn as nn
import torchvision import torchvision
from torchvision import transforms from torchvision import transforms
from prototorch.modules.losses import GLVQLoss
from prototorch.functions.helper import calculate_prototype_accuracy from prototorch.functions.helper import calculate_prototype_accuracy
from prototorch.modules.losses import GLVQLoss
from prototorch.modules.models import GTLVQ from prototorch.modules.models import GTLVQ
# Parameters and options # Parameters and options
@@ -26,32 +27,40 @@ momentum = 0.5
log_interval = 10 log_interval = 10
cuda = "cuda:1" cuda = "cuda:1"
random_seed = 1 random_seed = 1
device = torch.device(cuda if torch.cuda.is_available() else 'cpu') device = torch.device(cuda if torch.cuda.is_available() else "cpu")
# Configures reproducability # Configures reproducability
torch.manual_seed(random_seed) torch.manual_seed(random_seed)
np.random.seed(random_seed) np.random.seed(random_seed)
# Prepare and preprocess the data # Prepare and preprocess the data
train_loader = torch.utils.data.DataLoader(torchvision.datasets.MNIST( train_loader = torch.utils.data.DataLoader(
'./files/', torchvision.datasets.MNIST(
train=True, "./files/",
download=True, train=True,
transform=torchvision.transforms.Compose( download=True,
[transforms.ToTensor(), transform=torchvision.transforms.Compose([
transforms.Normalize((0.1307, ), (0.3081, ))])), transforms.ToTensor(),
batch_size=batch_size_train, transforms.Normalize((0.1307, ), (0.3081, ))
shuffle=True) ]),
),
batch_size=batch_size_train,
shuffle=True,
)
test_loader = torch.utils.data.DataLoader(torchvision.datasets.MNIST( test_loader = torch.utils.data.DataLoader(
'./files/', torchvision.datasets.MNIST(
train=False, "./files/",
download=True, train=False,
transform=torchvision.transforms.Compose( download=True,
[transforms.ToTensor(), transform=torchvision.transforms.Compose([
transforms.Normalize((0.1307, ), (0.3081, ))])), transforms.ToTensor(),
batch_size=batch_size_test, transforms.Normalize((0.1307, ), (0.3081, ))
shuffle=True) ]),
),
batch_size=batch_size_test,
shuffle=True,
)
# Define the GLVQ model plus appropriate feature extractor # Define the GLVQ model plus appropriate feature extractor
@@ -67,25 +76,34 @@ class CNNGTLVQ(torch.nn.Module):
): ):
super(CNNGTLVQ, self).__init__() super(CNNGTLVQ, self).__init__()
#Feature Extractor - Simple CNN # Feature Extractor - Simple CNN
self.fe = nn.Sequential(nn.Conv2d(1, 32, 3, 1), nn.ReLU(), self.fe = nn.Sequential(
nn.Conv2d(32, 64, 3, 1), nn.ReLU(), nn.Conv2d(1, 32, 3, 1),
nn.MaxPool2d(2), nn.Dropout(0.25), nn.ReLU(),
nn.Flatten(), nn.Linear(9216, bottleneck_dim), nn.Conv2d(32, 64, 3, 1),
nn.Dropout(0.5), nn.LeakyReLU(), nn.ReLU(),
nn.LayerNorm(bottleneck_dim)) nn.MaxPool2d(2),
nn.Dropout(0.25),
nn.Flatten(),
nn.Linear(9216, bottleneck_dim),
nn.Dropout(0.5),
nn.LeakyReLU(),
nn.LayerNorm(bottleneck_dim),
)
# Forward pass of subspace and prototype initialization data through feature extractor # Forward pass of subspace and prototype initialization data through feature extractor
subspace_data = self.fe(subspace_data) subspace_data = self.fe(subspace_data)
prototype_data[0] = self.fe(prototype_data[0]) prototype_data[0] = self.fe(prototype_data[0])
# Initialization of GTLVQ # Initialization of GTLVQ
self.gtlvq = GTLVQ(num_classes, self.gtlvq = GTLVQ(
subspace_data, num_classes,
prototype_data, subspace_data,
tangent_projection_type=tangent_projection_type, prototype_data,
feature_dim=bottleneck_dim, tangent_projection_type=tangent_projection_type,
prototypes_per_class=prototypes_per_class) feature_dim=bottleneck_dim,
prototypes_per_class=prototypes_per_class,
)
def forward(self, x): def forward(self, x):
# Feature Extraction # Feature Extraction
@@ -103,20 +121,24 @@ subspace_data = torch.cat(
prototype_data = next(iter(train_loader)) prototype_data = next(iter(train_loader))
# Build the CNN GTLVQ model # Build the CNN GTLVQ model
model = CNNGTLVQ(10, model = CNNGTLVQ(
subspace_data, 10,
prototype_data, subspace_data,
tangent_projection_type="local", prototype_data,
bottleneck_dim=128).to(device) tangent_projection_type="local",
bottleneck_dim=128,
).to(device)
# Optimize using SGD optimizer from `torch.optim` # Optimize using SGD optimizer from `torch.optim`
optimizer = torch.optim.Adam([{ optimizer = torch.optim.Adam(
'params': model.fe.parameters() [{
}, { "params": model.fe.parameters()
'params': model.gtlvq.parameters() }, {
}], "params": model.gtlvq.parameters()
lr=learning_rate) }],
criterion = GLVQLoss(squashing='sigmoid_beta', beta=10) lr=learning_rate,
)
criterion = GLVQLoss(squashing="sigmoid_beta", beta=10)
# Training loop # Training loop
for epoch in range(n_epochs): for epoch in range(n_epochs):
@@ -139,8 +161,8 @@ for epoch in range(n_epochs):
if batch_idx % log_interval == 0: if batch_idx % log_interval == 0:
acc = calculate_prototype_accuracy(distances, y_train, plabels) acc = calculate_prototype_accuracy(distances, y_train, plabels)
print( print(
f'Epoch: {epoch + 1:02d}/{n_epochs:02d} Epoch Progress: {100. * batch_idx / len(train_loader):02.02f} % Loss: {loss.item():02.02f} \ f"Epoch: {epoch + 1:02d}/{n_epochs:02d} Epoch Progress: {100. * batch_idx / len(train_loader):02.02f} % Loss: {loss.item():02.02f} \
Train Acc: {acc.item():02.02f}') Train Acc: {acc.item():02.02f}")
# Test # Test
with torch.no_grad(): with torch.no_grad():
@@ -154,9 +176,9 @@ for epoch in range(n_epochs):
i = torch.argmin(test_distances, 1) i = torch.argmin(test_distances, 1)
correct += torch.sum(y_test == test_plabels[i]) correct += torch.sum(y_test == test_plabels[i])
total += y_test.size(0) total += y_test.size(0)
print('Accuracy of the network on the test images: %d %%' % print("Accuracy of the network on the test images: %d %%" %
(torch.true_divide(correct, total) * 100)) (torch.true_divide(correct, total) * 100))
# Save the model # Save the model
PATH = './glvq_mnist_model.pth' PATH = "./glvq_mnist_model.pth"
torch.save(model.state_dict(), PATH) torch.save(model.state_dict(), PATH)

View File

@@ -22,10 +22,12 @@ class Model(torch.nn.Module):
def __init__(self): def __init__(self):
"""Local-GMLVQ model.""" """Local-GMLVQ model."""
super().__init__() super().__init__()
self.p1 = Prototypes1D(input_dim=2, self.p1 = Prototypes1D(
prototype_distribution=[1, 2, 2], input_dim=2,
prototype_initializer="stratified_random", prototype_distribution=[1, 2, 2],
data=[x_train, y_train]) prototype_initializer="stratified_random",
data=[x_train, y_train],
)
omegas = torch.zeros(5, 2, 2) omegas = torch.zeros(5, 2, 2)
self.omegas = torch.nn.Parameter(omegas) self.omegas = torch.nn.Parameter(omegas)
eye_(self.omegas) eye_(self.omegas)
@@ -76,14 +78,16 @@ for epoch in range(100):
ax.set_xlabel("Data dimension 1") ax.set_xlabel("Data dimension 1")
ax.set_ylabel("Data dimension 2") ax.set_ylabel("Data dimension 2")
cmap = "viridis" cmap = "viridis"
ax.scatter(x_train[:, 0], x_train[:, 1], c=y_train, edgecolor='k') ax.scatter(x_train[:, 0], x_train[:, 1], c=y_train, edgecolor="k")
ax.scatter(protos[:, 0], ax.scatter(
protos[:, 1], protos[:, 0],
c=plabels, protos[:, 1],
cmap=cmap, c=plabels,
edgecolor='k', cmap=cmap,
marker='D', edgecolor="k",
s=50) marker="D",
s=50,
)
# Paint decision regions # Paint decision regions
x = np.vstack((x_train, protos)) x = np.vstack((x_train, protos))

View File

@@ -0,0 +1,65 @@
"""This example script shows the usage of the new components architecture.
Serialization/deserialization also works as expected.
"""
# DATASET
import torch
from sklearn.datasets import load_iris
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
x_train, y_train = load_iris(return_X_y=True)
x_train = x_train[:, [0, 2]]
scaler.fit(x_train)
x_train = scaler.transform(x_train)
x_train = torch.Tensor(x_train)
y_train = torch.Tensor(y_train)
num_classes = len(torch.unique(y_train))
# CREATE NEW COMPONENTS
from prototorch.components import *
from prototorch.components.initializers import *
unsupervised = Components(6, SelectionInitializer(x_train))
print(unsupervised())
prototypes = LabeledComponents(
(3, 2), StratifiedSelectionInitializer(x_train, y_train))
print(prototypes())
components = ReasoningComponents(
(3, 6), StratifiedSelectionInitializer(x_train, y_train))
print(components())
# TEST SERIALIZATION
import io
save = io.BytesIO()
torch.save(unsupervised, save)
save.seek(0)
serialized_unsupervised = torch.load(save)
assert torch.all(unsupervised.components == serialized_unsupervised.components
), "Serialization of Components failed."
save = io.BytesIO()
torch.save(prototypes, save)
save.seek(0)
serialized_prototypes = torch.load(save)
assert torch.all(prototypes.components == serialized_prototypes.components
), "Serialization of Components failed."
assert torch.all(prototypes.component_labels == serialized_prototypes.
component_labels), "Serialization of Components failed."
save = io.BytesIO()
torch.save(components, save)
save.seek(0)
serialized_components = torch.load(save)
assert torch.all(components.components == serialized_components.components
), "Serialization of Components failed."
assert torch.all(components.reasonings == serialized_components.reasonings
), "Serialization of Components failed."

View File

@@ -1,11 +1,42 @@
"""ProtoTorch package.""" """ProtoTorch package."""
__version__ = '0.2.0' # Core Setup
__version__ = "0.4.2"
from prototorch import datasets, functions, modules __all_core__ = [
"datasets",
__all__ = [ "functions",
'datasets', "modules",
'functions',
'modules',
] ]
from .datasets import *
# Plugin Loader
import pkgutil
import pkg_resources
__path__ = pkgutil.extend_path(__path__, __name__)
def discover_plugins():
return {
entry_point.name: entry_point.load()
for entry_point in pkg_resources.iter_entry_points(
"prototorch.plugins")
}
discovered_plugins = discover_plugins()
locals().update(discovered_plugins)
# Generate combines __version__ and __all__
version_plugins = "\n".join([
"- " + name + ": v" + plugin.__version__
for name, plugin in discovered_plugins.items()
])
if version_plugins != "":
version_plugins = "\nPlugins: \n" + version_plugins
version = "core: v" + __version__ + version_plugins
__all__ = __all_core__ + list(discovered_plugins.keys())

View File

@@ -0,0 +1,2 @@
from prototorch.components.components import *
from prototorch.components.initializers import *

View File

@@ -0,0 +1,151 @@
"""ProtoTorch components modules."""
import warnings
from typing import Tuple
import torch
from prototorch.components.initializers import (ClassAwareInitializer,
ComponentsInitializer,
EqualLabelsInitializer,
UnequalLabelsInitializer,
ZeroReasoningsInitializer)
from prototorch.functions.initializers import get_initializer
from torch.nn.parameter import Parameter
class Components(torch.nn.Module):
"""Components is a set of learnable Tensors."""
def __init__(self,
number_of_components=None,
initializer=None,
*,
initialized_components=None,
dtype=torch.float32):
super().__init__()
# Ignore all initialization settings if initialized_components is given.
if initialized_components is not None:
self._components = Parameter(initialized_components)
if number_of_components is not None or initializer is not None:
wmsg = "Arguments ignored while initializing Components"
warnings.warn(wmsg)
else:
self._initialize_components(number_of_components, initializer)
def _precheck_initializer(self, initializer):
if not isinstance(initializer, ComponentsInitializer):
emsg = f"`initializer` has to be some subtype of " \
f"{ComponentsInitializer}. " \
f"You have provided: {initializer=} instead."
raise TypeError(emsg)
def _initialize_components(self, number_of_components, initializer):
self._precheck_initializer(initializer)
self._components = Parameter(
initializer.generate(number_of_components))
@property
def components(self):
"""Tensor containing the component tensors."""
return self._components.detach().cpu()
def forward(self):
return self._components
def extra_repr(self):
return f"components.shape: {tuple(self._components.shape)}"
class LabeledComponents(Components):
"""LabeledComponents generate a set of components and a set of labels.
Every Component has a label assigned.
"""
def __init__(self,
distribution=None,
initializer=None,
*,
initialized_components=None):
if initialized_components is not None:
super().__init__(initialized_components=initialized_components[0])
self._labels = initialized_components[1]
else:
self._initialize_labels(distribution)
super().__init__(number_of_components=len(self._labels),
initializer=initializer)
def _initialize_components(self, number_of_components, initializer):
if isinstance(initializer, ClassAwareInitializer):
self._precheck_initializer(initializer)
self._components = Parameter(
initializer.generate(number_of_components, self.distribution))
else:
super()._initialize_components(self, number_of_components,
initializer)
def _initialize_labels(self, distribution):
if type(distribution) == tuple:
num_classes, prototypes_per_class = distribution
labels = EqualLabelsInitializer(num_classes, prototypes_per_class)
elif type(distribution) == list:
labels = UnequalLabelsInitializer(distribution)
self.distribution = labels.distribution
self._labels = labels.generate()
@property
def component_labels(self):
"""Tensor containing the component tensors."""
return self._labels.detach().cpu()
def forward(self):
return super().forward(), self._labels
class ReasoningComponents(Components):
"""ReasoningComponents generate a set of components and a set of reasoning matrices.
Every Component has a reasoning matrix assigned.
A reasoning matrix is a Nx2 matrix, where N is the number of Classes. The
first element is called positive reasoning :math:`p`, the second negative
reasoning :math:`n`. A components can reason in favour (positive) of a
class, against (negative) a class or not at all (neutral).
It holds that :math:`0 \leq n \leq 1`, :math:`0 \leq p \leq 1` and :math:`0
\leq n+p \leq 1`. Therefore :math:`n` and :math:`p` are two elements of a
three element probability distribution.
"""
def __init__(self,
reasonings=None,
initializer=None,
*,
initialized_components=None):
if initialized_components is not None:
super().__init__(initialized_components=initialized_components[0])
self._reasonings = initialized_components[1]
else:
self._initialize_reasonings(reasonings)
super().__init__(number_of_components=len(self._reasonings),
initializer=initializer)
def _initialize_reasonings(self, reasonings):
if type(reasonings) == tuple:
num_classes, number_of_components = reasonings
reasonings = ZeroReasoningsInitializer(num_classes,
number_of_components)
self._reasonings = reasonings.generate()
@property
def reasonings(self):
"""Returns Reasoning Matrix.
Dimension NxCx2
"""
return self._reasonings.detach().cpu()
def forward(self):
return super().forward(), self._reasonings

View File

@@ -0,0 +1,197 @@
"""ProtoTroch Initializers."""
import warnings
from collections.abc import Iterable
from itertools import chain
import torch
from torch.utils.data import DataLoader, Dataset
def parse_init_arg(arg):
if isinstance(arg, Dataset):
data, labels = next(iter(DataLoader(arg, batch_size=len(arg))))
# data = data.view(len(arg), -1) # flatten
else:
data, labels = arg
if not isinstance(data, torch.Tensor):
wmsg = f"Converting data to {torch.Tensor}."
warnings.warn(wmsg)
data = torch.Tensor(data)
if not isinstance(labels, torch.Tensor):
wmsg = f"Converting labels to {torch.Tensor}."
warnings.warn(wmsg)
labels = torch.Tensor(labels)
return data, labels
# Components
class ComponentsInitializer(object):
def generate(self, number_of_components):
raise NotImplementedError("Subclasses should implement this!")
class DimensionAwareInitializer(ComponentsInitializer):
def __init__(self, c_dims):
super().__init__()
if isinstance(c_dims, Iterable):
self.components_dims = tuple(c_dims)
else:
self.components_dims = (c_dims, )
class OnesInitializer(DimensionAwareInitializer):
def generate(self, length):
gen_dims = (length, ) + self.components_dims
return torch.ones(gen_dims)
class ZerosInitializer(DimensionAwareInitializer):
def generate(self, length):
gen_dims = (length, ) + self.components_dims
return torch.zeros(gen_dims)
class UniformInitializer(DimensionAwareInitializer):
def __init__(self, c_dims, min=0.0, max=1.0):
super().__init__(c_dims)
self.min = min
self.max = max
def generate(self, length):
gen_dims = (length, ) + self.components_dims
return torch.ones(gen_dims).uniform_(self.min, self.max)
class PositionAwareInitializer(ComponentsInitializer):
def __init__(self, positions):
super().__init__()
self.data = positions
class SelectionInitializer(PositionAwareInitializer):
def generate(self, length):
indices = torch.LongTensor(length).random_(0, len(self.data))
return self.data[indices]
class MeanInitializer(PositionAwareInitializer):
def generate(self, length):
mean = torch.mean(self.data, dim=0)
repeat_dim = [length] + [1] * len(mean.shape)
return mean.repeat(repeat_dim)
class ClassAwareInitializer(ComponentsInitializer):
def __init__(self, arg):
super().__init__()
data, labels = parse_init_arg(arg)
self.data = data
self.labels = labels
self.clabels = torch.unique(self.labels)
self.num_classes = len(self.clabels)
def _get_samples_from_initializer(self, length, dist):
if not dist:
per_class = length // self.num_classes
dist = self.num_classes * [per_class]
samples_list = [
init.generate(n) for init, n in zip(self.initializers, dist)
]
return torch.vstack(samples_list)
class StratifiedMeanInitializer(ClassAwareInitializer):
def __init__(self, arg):
super().__init__(arg)
self.initializers = []
for clabel in self.clabels:
class_data = self.data[self.labels == clabel]
class_initializer = MeanInitializer(class_data)
self.initializers.append(class_initializer)
def generate(self, length, dist=[]):
samples = self._get_samples_from_initializer(length, dist)
return samples
class StratifiedSelectionInitializer(ClassAwareInitializer):
def __init__(self, arg, *, noise=None):
super().__init__(arg)
self.noise = noise
self.initializers = []
for clabel in self.clabels:
class_data = self.data[self.labels == clabel]
class_initializer = SelectionInitializer(class_data)
self.initializers.append(class_initializer)
def add_noise(self, x):
"""Shifts some dimensions of the data randomly."""
n1 = torch.rand_like(x)
n2 = torch.rand_like(x)
mask = torch.bernoulli(n1) - torch.bernoulli(n2)
return x + (self.noise * mask)
def generate(self, length, dist=[]):
samples = self._get_samples_from_initializer(length, dist)
if self.noise is not None:
# samples = self.add_noise(samples)
samples = samples + self.noise
return samples
# Labels
class LabelsInitializer:
def generate(self):
raise NotImplementedError("Subclasses should implement this!")
class UnequalLabelsInitializer(LabelsInitializer):
def __init__(self, dist):
self.dist = dist
@property
def distribution(self):
return self.dist
def generate(self):
clabels = range(len(self.dist))
labels = list(chain(*[[i] * n for i, n in zip(clabels, self.dist)]))
return torch.tensor(labels)
class EqualLabelsInitializer(LabelsInitializer):
def __init__(self, classes, per_class):
self.classes = classes
self.per_class = per_class
@property
def distribution(self):
return self.classes * [self.per_class]
def generate(self):
return torch.arange(self.classes).repeat(self.per_class, 1).T.flatten()
# Reasonings
class ReasoningsInitializer:
def generate(self, length):
raise NotImplementedError("Subclasses should implement this!")
class ZeroReasoningsInitializer(ReasoningsInitializer):
def __init__(self, classes, length):
self.classes = classes
self.length = length
def generate(self):
return torch.zeros((self.length, self.classes, 2))
# Aliases
SSI = StratifiedSampleInitializer = StratifiedSelectionInitializer
SMI = StratifiedMeanInitializer
Random = RandomInitializer = UniformInitializer

View File

@@ -1,7 +1,11 @@
"""ProtoTorch datasets.""" """ProtoTorch datasets."""
from .abstract import NumpyDataset
from .spiral import Spiral
from .tecator import Tecator from .tecator import Tecator
__all__ = [ __all__ = [
'Tecator', "NumpyDataset",
"Spiral",
"Tecator",
] ]

View File

@@ -12,8 +12,16 @@ import os
import torch import torch
class NumpyDataset(torch.utils.data.TensorDataset):
"""Create a PyTorch TensorDataset from NumPy arrays."""
def __init__(self, *arrays):
tensors = [torch.Tensor(arr) for arr in arrays]
super().__init__(*tensors)
class Dataset(torch.utils.data.Dataset): class Dataset(torch.utils.data.Dataset):
"""Abstract dataset class to be inherited.""" """Abstract dataset class to be inherited."""
_repr_indent = 2 _repr_indent = 2
def __init__(self, root): def __init__(self, root):
@@ -30,8 +38,9 @@ class Dataset(torch.utils.data.Dataset):
class ProtoDataset(Dataset): class ProtoDataset(Dataset):
"""Abstract dataset class to be inherited.""" """Abstract dataset class to be inherited."""
training_file = 'training.pt'
test_file = 'test.pt' training_file = "training.pt"
test_file = "test.pt"
def __init__(self, root, train=True, download=True, verbose=True): def __init__(self, root, train=True, download=True, verbose=True):
super().__init__(root) super().__init__(root)
@@ -39,11 +48,11 @@ class ProtoDataset(Dataset):
self.verbose = verbose self.verbose = verbose
if download: if download:
self.download() self._download()
if not self._check_exists(): if not self._check_exists():
raise RuntimeError('Dataset not found. ' raise RuntimeError("Dataset not found. "
'You can use download=True to download it') "You can use download=True to download it")
data_file = self.training_file if self.train else self.test_file data_file = self.training_file if self.train else self.test_file
@@ -52,30 +61,30 @@ class ProtoDataset(Dataset):
@property @property
def raw_folder(self): def raw_folder(self):
return os.path.join(self.root, self.__class__.__name__, 'raw') return os.path.join(self.root, self.__class__.__name__, "raw")
@property @property
def processed_folder(self): def processed_folder(self):
return os.path.join(self.root, self.__class__.__name__, 'processed') return os.path.join(self.root, self.__class__.__name__, "processed")
@property @property
def class_to_idx(self): def class_to_idx(self):
return {_class: i for i, _class in enumerate(self.classes)} return {_class: i for i, _class in enumerate(self.classes)}
def _check_exists(self): def _check_exists(self):
return (os.path.exists( return os.path.exists(
os.path.join(self.processed_folder, self.training_file)) os.path.join(
and os.path.exists( self.processed_folder, self.training_file)) and os.path.exists(
os.path.join(self.processed_folder, self.test_file))) os.path.join(self.processed_folder, self.test_file))
def __repr__(self): def __repr__(self):
head = 'Dataset ' + self.__class__.__name__ head = "Dataset " + self.__class__.__name__
body = ['Number of datapoints: {}'.format(self.__len__())] body = ["Number of datapoints: {}".format(self.__len__())]
if self.root is not None: if self.root is not None:
body.append('Root location: {}'.format(self.root)) body.append("Root location: {}".format(self.root))
body += self.extra_repr().splitlines() body += self.extra_repr().splitlines()
lines = [head] + [' ' * self._repr_indent + line for line in body] lines = [head] + [" " * self._repr_indent + line for line in body]
return '\n'.join(lines) return "\n".join(lines)
def extra_repr(self): def extra_repr(self):
return f"Split: {'Train' if self.train is True else 'Test'}" return f"Split: {'Train' if self.train is True else 'Test'}"
@@ -83,5 +92,5 @@ class ProtoDataset(Dataset):
def __len__(self): def __len__(self):
return len(self.data) return len(self.data)
def download(self): def _download(self):
raise NotImplementedError raise NotImplementedError

View File

@@ -0,0 +1,33 @@
"""Spiral dataset for binary classification."""
import numpy as np
import torch
def make_spiral(n_samples=500, noise=0.3):
def get_samples(n, delta_t):
points = []
for i in range(n):
r = i / n_samples * 5
t = 1.75 * i / n * 2 * np.pi + delta_t
x = r * np.sin(t) + np.random.rand(1) * noise
y = r * np.cos(t) + np.random.rand(1) * noise
points.append([x, y])
return points
n = n_samples // 2
positive = get_samples(n=n, delta_t=0)
negative = get_samples(n=n, delta_t=np.pi)
x = np.concatenate(
[np.array(positive).reshape(n, -1),
np.array(negative).reshape(n, -1)],
axis=0)
y = np.concatenate([np.zeros(n), np.ones(n)])
return x, y
class Spiral(torch.utils.data.TensorDataset):
"""Spiral dataset for binary classification."""
def __init__(self, n_samples=500, noise=0.3):
x, y = make_spiral(n_samples, noise)
super().__init__(torch.Tensor(x), torch.LongTensor(y))

View File

@@ -46,42 +46,46 @@ from prototorch.datasets.abstract import ProtoDataset
class Tecator(ProtoDataset): class Tecator(ProtoDataset):
"""Tecator dataset for classification.""" """
resources = [ `Tecator Dataset <http://lib.stat.cmu.edu/datasets/tecator>`__
('1MMuUK8V41IgNpnPDbg3E-QAL6wlErTk0', for classification.
'ba5607c580d0f91bb27dc29d13c2f8df'), """
_resources = [
("1P9WIYnyxFPh6f1vqAbnKfK8oYmUgyV83",
"ba5607c580d0f91bb27dc29d13c2f8df"),
] # (google_storage_id, md5hash) ] # (google_storage_id, md5hash)
classes = ['0 - low_fat', '1 - high_fat'] classes = ["0 - low_fat", "1 - high_fat"]
def __getitem__(self, index): def __getitem__(self, index):
img, target = self.data[index], int(self.targets[index]) img, target = self.data[index], int(self.targets[index])
return img, target return img, target
def download(self): def _download(self):
"""Download the data if it doesn't exist in already.""" """Download the data if it doesn't exist in already."""
if self._check_exists(): if self._check_exists():
return return
if self.verbose: if self.verbose:
print('Making directories...') print("Making directories...")
os.makedirs(self.raw_folder, exist_ok=True) os.makedirs(self.raw_folder, exist_ok=True)
os.makedirs(self.processed_folder, exist_ok=True) os.makedirs(self.processed_folder, exist_ok=True)
if self.verbose: if self.verbose:
print('Downloading...') print("Downloading...")
for fileid, md5 in self.resources: for fileid, md5 in self._resources:
filename = 'tecator.npz' filename = "tecator.npz"
download_file_from_google_drive(fileid, download_file_from_google_drive(fileid,
root=self.raw_folder, root=self.raw_folder,
filename=filename, filename=filename,
md5=md5) md5=md5)
if self.verbose: if self.verbose:
print('Processing...') print("Processing...")
with np.load(os.path.join(self.raw_folder, 'tecator.npz'), with np.load(os.path.join(self.raw_folder, "tecator.npz"),
allow_pickle=False) as f: allow_pickle=False) as f:
x_train, y_train = f['x_train'], f['y_train'] x_train, y_train = f["x_train"], f["y_train"]
x_test, y_test = f['x_test'], f['y_test'] x_test, y_test = f["x_test"], f["y_test"]
training_set = [ training_set = [
torch.tensor(x_train, dtype=torch.float32), torch.tensor(x_train, dtype=torch.float32),
torch.tensor(y_train), torch.tensor(y_train),
@@ -92,11 +96,11 @@ class Tecator(ProtoDataset):
] ]
with open(os.path.join(self.processed_folder, self.training_file), with open(os.path.join(self.processed_folder, self.training_file),
'wb') as f: "wb") as f:
torch.save(training_set, f) torch.save(training_set, f)
with open(os.path.join(self.processed_folder, self.test_file), with open(os.path.join(self.processed_folder, self.test_file),
'wb') as f: "wb") as f:
torch.save(test_set, f) torch.save(test_set, f)
if self.verbose: if self.verbose:
print('Done!') print("Done!")

View File

@@ -4,9 +4,9 @@ from .activations import identity, sigmoid_beta, swish_beta
from .competitions import knnc, wtac from .competitions import knnc, wtac
__all__ = [ __all__ = [
'identity', "identity",
'sigmoid_beta', "sigmoid_beta",
'swish_beta', "swish_beta",
'knnc', "knnc",
'wtac', "wtac",
] ]

View File

@@ -16,40 +16,43 @@ def register_activation(function):
@register_activation @register_activation
# @torch.jit.script # @torch.jit.script
def identity(x, beta=torch.tensor(0)): def identity(x, beta=0.0):
"""Identity activation function. """Identity activation function.
Definition: Definition:
:math:`f(x) = x` :math:`f(x) = x`
Keyword Arguments:
beta (`float`): Ignored.
""" """
return x return x
@register_activation @register_activation
# @torch.jit.script # @torch.jit.script
def sigmoid_beta(x, beta=torch.tensor(10)): def sigmoid_beta(x, beta=10.0):
r"""Sigmoid activation function with scaling. r"""Sigmoid activation function with scaling.
Definition: Definition:
:math:`f(x) = \frac{1}{1 + e^{-\beta x}}` :math:`f(x) = \frac{1}{1 + e^{-\beta x}}`
Keyword Arguments: Keyword Arguments:
beta (`torch.tensor`): Scaling parameter :math:`\beta` beta (`float`): Scaling parameter :math:`\beta`
""" """
out = torch.reciprocal(1.0 + torch.exp(-int(beta.item()) * x)) out = 1.0 / (1.0 + torch.exp(-1.0 * beta * x))
return out return out
@register_activation @register_activation
# @torch.jit.script # @torch.jit.script
def swish_beta(x, beta=torch.tensor(10)): def swish_beta(x, beta=10.0):
r"""Swish activation function with scaling. r"""Swish activation function with scaling.
Definition: Definition:
:math:`f(x) = \frac{x}{1 + e^{-\beta x}}` :math:`f(x) = \frac{x}{1 + e^{-\beta x}}`
Keyword Arguments: Keyword Arguments:
beta (`torch.tensor`): Scaling parameter :math:`\beta` beta (`float`): Scaling parameter :math:`\beta`
""" """
out = x * sigmoid_beta(x, beta=beta) out = x * sigmoid_beta(x, beta=beta)
return out return out
@@ -61,4 +64,4 @@ def get_activation(funcname):
return funcname return funcname
if funcname in ACTIVATIONS: if funcname in ACTIVATIONS:
return ACTIVATIONS.get(funcname) return ACTIVATIONS.get(funcname)
raise NameError(f'Activation {funcname} was not found.') raise NameError(f"Activation {funcname} was not found.")

View File

@@ -12,7 +12,7 @@ def stratified_min(distances, labels):
return distances return distances
batch_size = distances.size()[0] batch_size = distances.size()[0]
winning_distances = torch.zeros(nclasses, batch_size) winning_distances = torch.zeros(nclasses, batch_size)
inf = torch.full_like(distances.T, fill_value=float('inf')) inf = torch.full_like(distances.T, fill_value=float("inf"))
# distances_to_wpluses = torch.where(matcher, distances, inf) # distances_to_wpluses = torch.where(matcher, distances, inf)
for i, cl in enumerate(clabels): for i, cl in enumerate(clabels):
# cdists = distances.T[labels == cl] # cdists = distances.T[labels == cl]

View File

@@ -1,15 +1,25 @@
"""ProtoTorch distance functions.""" """ProtoTorch distance functions."""
import torch
from prototorch.functions.helper import equal_int_shape, _int_and_mixed_shape, _check_shapes
import numpy as np import numpy as np
import torch
from prototorch.functions.helper import (
_check_shapes,
_int_and_mixed_shape,
equal_int_shape,
)
def squared_euclidean_distance(x, y): def squared_euclidean_distance(x, y):
"""Compute the squared Euclidean distance between :math:`x` and :math:`y`. r"""Compute the squared Euclidean distance between :math:`\bm x` and :math:`\bm y`.
Expected dimension of x is 2. Compute :math:`{\langle \bm x - \bm y \rangle}_2`
Expected dimension of y is 2.
:param `torch.tensor` x: Two dimensional vector
:param `torch.tensor` y: Two dimensional vector
**Alias:**
``prototorch.functions.distances.sed``
""" """
expanded_x = x.unsqueeze(dim=1) expanded_x = x.unsqueeze(dim=1)
batchwise_difference = y - expanded_x batchwise_difference = y - expanded_x
@@ -19,21 +29,45 @@ def squared_euclidean_distance(x, y):
def euclidean_distance(x, y): def euclidean_distance(x, y):
"""Compute the Euclidean distance between :math:`x` and :math:`y`. r"""Compute the Euclidean distance between :math:`x` and :math:`y`.
Expected dimension of x is 2. Compute :math:`\sqrt{{\langle \bm x - \bm y \rangle}_2}`
Expected dimension of y is 2.
:param `torch.tensor` x: Input Tensor of shape :math:`X \times N`
:param `torch.tensor` y: Input Tensor of shape :math:`Y \times N`
:returns: Distance Tensor of shape :math:`X \times Y`
:rtype: `torch.tensor`
""" """
distances_raised = squared_euclidean_distance(x, y) distances_raised = squared_euclidean_distance(x, y)
distances = torch.sqrt(distances_raised) distances = torch.sqrt(distances_raised)
return distances return distances
def lpnorm_distance(x, y, p): def euclidean_distance_v2(x, y):
r"""Compute :math:`{\langle x, y \rangle}_p`. diff = y - x.unsqueeze(1)
pairwise_distances = (diff @ diff.permute((0, 2, 1))).sqrt()
# Passing `dim1=-2` and `dim2=-1` to `diagonal()` takes the
# batch diagonal. See:
# https://pytorch.org/docs/stable/generated/torch.diagonal.html
distances = torch.diagonal(pairwise_distances, dim1=-2, dim2=-1)
# print(f"{diff.shape=}") # (nx, ny, ndim)
# print(f"{pairwise_distances.shape=}") # (nx, ny, ny)
# print(f"{distances.shape=}") # (nx, ny)
return distances
Expected dimension of x is 2.
Expected dimension of y is 2. def lpnorm_distance(x, y, p):
r"""Calculate the lp-norm between :math:`\bm x` and :math:`\bm y`.
Also known as Minkowski distance.
Compute :math:`{\| \bm x - \bm y \|}_p`.
Calls ``torch.cdist``
:param `torch.tensor` x: Two dimensional vector
:param `torch.tensor` y: Two dimensional vector
:param p: p parameter of the lp norm
""" """
distances = torch.cdist(x, y, p=p) distances = torch.cdist(x, y, p=p)
return distances return distances
@@ -42,11 +76,11 @@ def lpnorm_distance(x, y, p):
def omega_distance(x, y, omega): def omega_distance(x, y, omega):
r"""Omega distance. r"""Omega distance.
Compute :math:`{\langle \Omega x, \Omega y \rangle}_p` Compute :math:`{\| \Omega \bm x - \Omega \bm y \|}_p`
Expected dimension of x is 2. :param `torch.tensor` x: Two dimensional vector
Expected dimension of y is 2. :param `torch.tensor` y: Two dimensional vector
Expected dimension of omega is 2. :param `torch.tensor` omega: Two dimensional matrix
""" """
projected_x = x @ omega projected_x = x @ omega
projected_y = y @ omega projected_y = y @ omega
@@ -57,11 +91,11 @@ def omega_distance(x, y, omega):
def lomega_distance(x, y, omegas): def lomega_distance(x, y, omegas):
r"""Localized Omega distance. r"""Localized Omega distance.
Compute :math:`{\langle \Omega_k x, \Omega_k y_k \rangle}_p` Compute :math:`{\| \Omega_k \bm x - \Omega_k \bm y_k \|}_p`
Expected dimension of x is 2. :param `torch.tensor` x: Two dimensional vector
Expected dimension of y is 2. :param `torch.tensor` y: Two dimensional vector
Expected dimension of omegas is 3. :param `torch.tensor` omegas: Three dimensional matrix
""" """
projected_x = x @ omegas projected_x = x @ omegas
projected_y = torch.diagonal(y @ omegas).T projected_y = torch.diagonal(y @ omegas).T
@@ -74,31 +108,30 @@ def lomega_distance(x, y, omegas):
def euclidean_distance_matrix(x, y, squared=False, epsilon=1e-10): def euclidean_distance_matrix(x, y, squared=False, epsilon=1e-10):
r""" Computes an euclidean distanes matrix given two distinct vectors. r"""Computes an euclidean distances matrix given two distinct vectors.
last dimension must be the vector dimension! last dimension must be the vector dimension!
compute the distance via the identity of the dot product. This avoids the memory overhead due to the subtraction! compute the distance via the identity of the dot product. This avoids the memory overhead due to the subtraction!
x.shape = (number_of_x_vectors, vector_dim) - ``x.shape = (number_of_x_vectors, vector_dim)``
y.shape = (number_of_y_vectors, vector_dim) - ``y.shape = (number_of_y_vectors, vector_dim)``
output: matrix of distances (number_of_x_vectors, number_of_y_vectors) output: matrix of distances (number_of_x_vectors, number_of_y_vectors)
""" """
for tensor in [x, y]: for tensor in [x, y]:
if tensor.ndim != 2: if tensor.ndim != 2:
raise ValueError( raise ValueError(
'The tensor dimension must be two. You provide: tensor.ndim=' + "The tensor dimension must be two. You provide: tensor.ndim=" +
str(tensor.ndim) + '.') str(tensor.ndim) + ".")
if not equal_int_shape([tuple(x.shape)[1]], [tuple(y.shape)[1]]): if not equal_int_shape([tuple(x.shape)[1]], [tuple(y.shape)[1]]):
raise ValueError( raise ValueError(
'The vector shape must be equivalent in both tensors. You provide: tuple(y.shape)[1]=' "The vector shape must be equivalent in both tensors. You provide: tuple(y.shape)[1]="
+ str(tuple(x.shape)[1]) + ' and tuple(y.shape)(y)[1]=' + + str(tuple(x.shape)[1]) + " and tuple(y.shape)(y)[1]=" +
str(tuple(y.shape)[1]) + '.') str(tuple(y.shape)[1]) + ".")
y = torch.transpose(y) y = torch.transpose(y)
diss = torch.sum(x**2, axis=1, diss = (torch.sum(x**2, axis=1, keepdims=True) - 2 * torch.dot(x, y) +
keepdims=True) - 2 * torch.dot(x, y) + torch.sum( torch.sum(y**2, axis=0, keepdims=True))
y**2, axis=0, keepdims=True)
if not squared: if not squared:
if epsilon == 0: if epsilon == 0:
@@ -110,13 +143,19 @@ def euclidean_distance_matrix(x, y, squared=False, epsilon=1e-10):
def tangent_distance(signals, protos, subspaces, squared=False, epsilon=1e-10): def tangent_distance(signals, protos, subspaces, squared=False, epsilon=1e-10):
r""" Tangent distances based on the tensorflow implementation of Sascha Saralajews r"""Tangent distances based on the tensorflow implementation of Sascha Saralajews
For more info about Tangen distances see DOI:10.1109/IJCNN.2016.7727534.
For more info about Tangen distances see
DOI:10.1109/IJCNN.2016.7727534.
The subspaces is always assumed as transposed and must be orthogonal! The subspaces is always assumed as transposed and must be orthogonal!
For local non sparse signals subspaces must be provided! For local non sparse signals subspaces must be provided!
shape(signals): batch x proto_number x channels x dim1 x dim2 x ... x dimN
shape(protos): proto_number x dim1 x dim2 x ... x dimN - shape(signals): batch x proto_number x channels x dim1 x dim2 x ... x dimN
shape(subspaces): (optional [proto_number]) x prod(dim1 * dim2 * ... * dimN) x prod(projected_atom_shape) - shape(protos): proto_number x dim1 x dim2 x ... x dimN
- shape(subspaces): (optional [proto_number]) x prod(dim1 * dim2 * ... * dimN) x prod(projected_atom_shape)
subspace should be orthogonalized subspace should be orthogonalized
Pytorch implementation of Sascha Saralajew's tensorflow code. Pytorch implementation of Sascha Saralajew's tensorflow code.
Translation by Christoph Raab Translation by Christoph Raab
@@ -159,17 +198,17 @@ def tangent_distance(signals, protos, subspaces, squared=False, epsilon=1e-10):
# no solution without map possible --> memory efficient but slow! # no solution without map possible --> memory efficient but slow!
projectors = torch.eye(subspace_int_shape[-2]) - torch.bmm( projectors = torch.eye(subspace_int_shape[-2]) - torch.bmm(
subspaces, subspaces,
subspaces) #K.batch_dot(subspaces, subspaces, [2, 2]) subspaces) # K.batch_dot(subspaces, subspaces, [2, 2])
projected_protos = (protos @ subspaces projected_protos = (protos @ subspaces
).T #K.batch_dot(projectors, protos, [1, 1])) ).T # K.batch_dot(projectors, protos, [1, 1]))
def projected_norm(projector): def projected_norm(projector):
return torch.sum(torch.dot(signals, projector)**2, axis=1) return torch.sum(torch.dot(signals, projector)**2, axis=1)
diss = torch.transpose(map(projected_norm, projectors)) \ diss = (torch.transpose(map(projected_norm, projectors)) -
- 2 * torch.dot(signals, projected_protos) \ 2 * torch.dot(signals, projected_protos) +
+ torch.sum(projected_protos**2, axis=0, keepdims=True) torch.sum(projected_protos**2, axis=0, keepdims=True))
if not squared: if not squared:
if epsilon == 0: if epsilon == 0:
@@ -189,17 +228,18 @@ def tangent_distance(signals, protos, subspaces, squared=False, epsilon=1e-10):
# global tangent space # global tangent space
if subspaces.ndim == 2: if subspaces.ndim == 2:
#Scope Projectors # Scope Projectors
projectors = subspaces # projectors = subspaces #
#Scope: Tangentspace Projections # Scope: Tangentspace Projections
diff = torch.reshape( diff = torch.reshape(
diff, (signal_shape[0] * signal_shape[2], signal_shape[1], -1)) diff, (signal_shape[0] * signal_shape[2], signal_shape[1], -1))
projected_diff = diff @ projectors projected_diff = diff @ projectors
projected_diff = torch.reshape( projected_diff = torch.reshape(
projected_diff, projected_diff,
(signal_shape[0], signal_shape[2], signal_shape[1]) + (signal_shape[0], signal_shape[2], signal_shape[1]) +
signal_shape[3:]) signal_shape[3:],
)
diss = torch.norm(projected_diff, 2, dim=-1) diss = torch.norm(projected_diff, 2, dim=-1)
return diss.permute([0, 2, 1]) return diss.permute([0, 2, 1])
@@ -217,11 +257,93 @@ def tangent_distance(signals, protos, subspaces, squared=False, epsilon=1e-10):
projected_diff = torch.reshape( projected_diff = torch.reshape(
projected_diff, projected_diff,
(signal_shape[1], signal_shape[0], signal_shape[2]) + (signal_shape[1], signal_shape[0], signal_shape[2]) +
signal_shape[3:]) signal_shape[3:],
)
diss = torch.norm(projected_diff, 2, dim=-1) diss = torch.norm(projected_diff, 2, dim=-1)
return diss.permute([1, 0, 2]).squeeze(-1) return diss.permute([1, 0, 2]).squeeze(-1)
class KernelDistance:
r"""Kernel Distance
Distance based on a kernel function.
"""
def __init__(self, kernel_fn):
self.kernel_fn = kernel_fn
def __call__(self, x_batch: torch.Tensor, y_batch: torch.Tensor):
return self._single_call(x_batch, y_batch)
def _single_call(self, x, y):
remove_dims = []
if len(x.shape) == 1:
x = x.unsqueeze(0)
remove_dims.append(0)
if len(y.shape) == 1:
y = y.unsqueeze(0)
remove_dims.append(-1)
output = self.kernel_fn(x, x).diag().unsqueeze(1) - 2 * self.kernel_fn(
x, y) + self.kernel_fn(y, y).diag()
for dim in remove_dims:
output.squeeze_(dim)
return torch.sqrt(output)
class BatchKernelDistance:
r"""Kernel Distance
Distance based on a kernel function.
"""
def __init__(self, kernel_fn):
self.kernel_fn = kernel_fn
def __call__(self, x_batch: torch.Tensor, y_batch: torch.Tensor):
remove_dims = 0
# Extend Single inputs
if len(x_batch.shape) == 1:
x_batch = x_batch.unsqueeze(0)
remove_dims += 1
if len(y_batch.shape) == 1:
y_batch = y_batch.unsqueeze(0)
remove_dims += 1
# Loop over batches
output = torch.FloatTensor(len(x_batch), len(y_batch))
for i, x in enumerate(x_batch):
for j, y in enumerate(y_batch):
output[i][j] = self._single_call(x, y)
for _ in range(remove_dims):
output.squeeze_(0)
return output
def _single_call(self, x, y):
kappa_xx = self.kernel_fn(x, x)
kappa_xy = self.kernel_fn(x, y)
kappa_yy = self.kernel_fn(y, y)
squared_distance = kappa_xx - 2 * kappa_xy + kappa_yy
return torch.sqrt(squared_distance)
class SquaredKernelDistance(KernelDistance):
r"""Squared Kernel Distance
Kernel distance without final squareroot.
"""
def single_call(self, x, y):
kappa_xx = self.kernel_fn(x, x)
kappa_xy = self.kernel_fn(x, y)
kappa_yy = self.kernel_fn(y, y)
return kappa_xx - 2 * kappa_xy + kappa_yy
# Aliases # Aliases
sed = squared_euclidean_distance sed = squared_euclidean_distance

View File

@@ -23,7 +23,7 @@ def predict_label(y_pred, plabels):
def mixed_shape(inputs): def mixed_shape(inputs):
if not torch.is_tensor(inputs): if not torch.is_tensor(inputs):
raise ValueError('Input must be a tensor.') raise ValueError("Input must be a tensor.")
else: else:
int_shape = list(inputs.shape) int_shape = list(inputs.shape)
# sometimes int_shape returns mixed integer types # sometimes int_shape returns mixed integer types
@@ -39,11 +39,11 @@ def mixed_shape(inputs):
def equal_int_shape(shape_1, shape_2): def equal_int_shape(shape_1, shape_2):
if not isinstance(shape_1, if not isinstance(shape_1,
(tuple, list)) or not isinstance(shape_2, (tuple, list)): (tuple, list)) or not isinstance(shape_2, (tuple, list)):
raise ValueError('Input shapes must list or tuple.') raise ValueError("Input shapes must list or tuple.")
for shape in [shape_1, shape_2]: for shape in [shape_1, shape_2]:
if not all([isinstance(x, int) or x is None for x in shape]): if not all([isinstance(x, int) or x is None for x in shape]):
raise ValueError( raise ValueError(
'Input shapes must be list or tuple of int and None values.') "Input shapes must be list or tuple of int and None values.")
if len(shape_1) != len(shape_2): if len(shape_1) != len(shape_2):
return False return False

View File

@@ -104,4 +104,4 @@ def get_initializer(funcname):
return funcname return funcname
if funcname in INITIALIZERS: if funcname in INITIALIZERS:
return INITIALIZERS.get(funcname) return INITIALIZERS.get(funcname)
raise NameError(f'Initializer {funcname} was not found.') raise NameError(f"Initializer {funcname} was not found.")

View File

@@ -0,0 +1,28 @@
"""
Experimental Kernels
"""
import torch
class ExplicitKernel:
def __init__(self, projection=torch.nn.Identity()):
self.projection = projection
def __call__(self, x, y):
return self.projection(x) @ self.projection(y).T
class RadialBasisFunctionKernel:
def __init__(self, sigma) -> None:
self.s2 = sigma * sigma
def __call__(self, x, y):
remove_dim = False
if len(x.shape) > 1:
x = x.unsqueeze(1)
remove_dim = True
output = torch.exp(-torch.sum((x - y)**2, dim=-1) / (2 * self.s2))
if remove_dim:
output = output.squeeze(1)
return output

View File

@@ -3,15 +3,22 @@
import torch import torch
def _get_dp_dm(distances, targets, plabels): def _get_matcher(targets, labels):
matcher = torch.eq(targets.unsqueeze(dim=1), plabels) """Returns a boolean tensor."""
if plabels.ndim == 2: matcher = torch.eq(targets.unsqueeze(dim=1), labels)
if labels.ndim == 2:
# if the labels are one-hot vectors # if the labels are one-hot vectors
nclasses = targets.size()[1] nclasses = targets.size()[1]
matcher = torch.eq(torch.sum(matcher, dim=-1), nclasses) matcher = torch.eq(torch.sum(matcher, dim=-1), nclasses)
return matcher
def _get_dp_dm(distances, targets, plabels):
"""Returns the d+ and d- values for a batch of distances."""
matcher = _get_matcher(targets, plabels)
not_matcher = torch.bitwise_not(matcher) not_matcher = torch.bitwise_not(matcher)
inf = torch.full_like(distances, fill_value=float('inf')) inf = torch.full_like(distances, fill_value=float("inf"))
d_matching = torch.where(matcher, distances, inf) d_matching = torch.where(matcher, distances, inf)
d_unmatching = torch.where(not_matcher, distances, inf) d_unmatching = torch.where(not_matcher, distances, inf)
dp = torch.min(d_matching, dim=1, keepdim=True).values dp = torch.min(d_matching, dim=1, keepdim=True).values
@@ -24,3 +31,26 @@ def glvq_loss(distances, target_labels, prototype_labels):
dp, dm = _get_dp_dm(distances, target_labels, prototype_labels) dp, dm = _get_dp_dm(distances, target_labels, prototype_labels)
mu = (dp - dm) / (dp + dm) mu = (dp - dm) / (dp + dm)
return mu return mu
def lvq1_loss(distances, target_labels, prototype_labels):
"""LVQ1 loss function with support for one-hot labels.
See Section 4 [Sado&Yamada]
https://papers.nips.cc/paper/1995/file/9c3b1830513cc3b8fc4b76635d32e692-Paper.pdf
"""
dp, dm = _get_dp_dm(distances, target_labels, prototype_labels)
mu = dp
mu[dp > dm] = -dm[dp > dm]
return mu
def lvq21_loss(distances, target_labels, prototype_labels):
"""LVQ2.1 loss function with support for one-hot labels.
See Section 4 [Sado&Yamada]
https://papers.nips.cc/paper/1995/file/9c3b1830513cc3b8fc4b76635d32e692-Paper.pdf
"""
dp, dm = _get_dp_dm(distances, target_labels, prototype_labels)
mu = dp - dm
return mu

View File

@@ -1,7 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import print_function from __future__ import absolute_import, division, print_function
from __future__ import absolute_import
from __future__ import division
import torch import torch

View File

@@ -0,0 +1,18 @@
"""ProtoTorch similarity functions."""
import torch
def cosine_similarity(x, y):
"""Compute the cosine similarity between :math:`x` and :math:`y`.
Expected dimension of x is 2.
Expected dimension of y is 2.
"""
norm_x = x.pow(2).sum(1).sqrt()
norm_y = y.pow(2).sum(1).sqrt()
norm_mat = norm_x.unsqueeze(-1) @ norm_y.unsqueeze(-1).T
epsilon = torch.finfo(norm_mat.dtype).eps
norm_mat.clamp_(min=epsilon)
similarities = (x @ y.T) / norm_mat
return similarities

View File

@@ -3,5 +3,5 @@
from .prototypes import Prototypes1D from .prototypes import Prototypes1D
__all__ = [ __all__ = [
'Prototypes1D', "Prototypes1D",
] ]

View File

@@ -7,7 +7,7 @@ from prototorch.functions.losses import glvq_loss
class GLVQLoss(torch.nn.Module): class GLVQLoss(torch.nn.Module):
def __init__(self, margin=0.0, squashing='identity', beta=10, **kwargs): def __init__(self, margin=0.0, squashing="identity", beta=10, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
self.margin = margin self.margin = margin
self.squashing = get_activation(squashing) self.squashing = get_activation(squashing)
@@ -18,3 +18,23 @@ class GLVQLoss(torch.nn.Module):
mu = glvq_loss(distances, targets, prototype_labels=plabels) mu = glvq_loss(distances, targets, prototype_labels=plabels)
batch_loss = self.squashing(mu + self.margin, beta=self.beta) batch_loss = self.squashing(mu + self.margin, beta=self.beta)
return torch.sum(batch_loss, dim=0) return torch.sum(batch_loss, dim=0)
class NeuralGasEnergy(torch.nn.Module):
def __init__(self, lm):
super().__init__()
self.lm = lm
def forward(self, d):
order = torch.argsort(d, dim=1)
ranks = torch.argsort(order, dim=1)
cost = torch.sum(self._nghood_fn(ranks, self.lm) * d)
return cost, order
def extra_repr(self):
return f"lambda: {self.lm}"
@staticmethod
def _nghood_fn(rankings, lm):
return torch.exp(-rankings / lm)

View File

@@ -1,9 +1,10 @@
from torch import nn
import torch import torch
from prototorch.modules.prototypes import Prototypes1D from torch import nn
from prototorch.functions.distances import tangent_distance, euclidean_distance_matrix
from prototorch.functions.normalization import orthogonalization from prototorch.functions.distances import euclidean_distance_matrix, tangent_distance
from prototorch.functions.helper import _check_shapes, _int_and_mixed_shape from prototorch.functions.helper import _check_shapes, _int_and_mixed_shape
from prototorch.functions.normalization import orthogonalization
from prototorch.modules.prototypes import Prototypes1D
class GTLVQ(nn.Module): class GTLVQ(nn.Module):
@@ -71,7 +72,7 @@ class GTLVQ(nn.Module):
subspace_data=None, subspace_data=None,
prototype_data=None, prototype_data=None,
subspace_size=256, subspace_size=256,
tangent_projection_type='local', tangent_projection_type="local",
prototypes_per_class=2, prototypes_per_class=2,
feature_dim=256, feature_dim=256,
): ):
@@ -82,37 +83,39 @@ class GTLVQ(nn.Module):
self.feature_dim = feature_dim self.feature_dim = feature_dim
if subspace_data is None: if subspace_data is None:
raise ValueError('Init Data must be specified!') raise ValueError("Init Data must be specified!")
self.tpt = tangent_projection_type self.tpt = tangent_projection_type
with torch.no_grad(): with torch.no_grad():
if self.tpt == 'local' or self.tpt == 'local_proj': if self.tpt == "local" or self.tpt == "local_proj":
self.init_local_subspace(subspace_data) self.init_local_subspace(subspace_data)
elif self.tpt == 'global': elif self.tpt == "global":
self.init_gobal_subspace(subspace_data, subspace_size) self.init_gobal_subspace(subspace_data, subspace_size)
else: else:
self.subspaces = None self.subspaces = None
# Hypothesis-Margin-Classifier # Hypothesis-Margin-Classifier
self.cls = Prototypes1D(input_dim=feature_dim, self.cls = Prototypes1D(
prototypes_per_class=prototypes_per_class, input_dim=feature_dim,
nclasses=num_classes, prototypes_per_class=prototypes_per_class,
prototype_initializer='stratified_mean', nclasses=num_classes,
data=prototype_data) prototype_initializer="stratified_mean",
data=prototype_data,
)
def forward(self, x): def forward(self, x):
# Tangent Projection # Tangent Projection
if self.tpt == 'local_proj': if self.tpt == "local_proj":
x_conform = x.unsqueeze(1).repeat_interleave(self.num_protos, x_conform = (x.unsqueeze(1).repeat_interleave(self.num_protos,
1).unsqueeze(2) 1).unsqueeze(2))
dis, proj_x = self.local_tangent_projection(x_conform) dis, proj_x = self.local_tangent_projection(x_conform)
proj_x = proj_x.reshape(x.shape[0] * self.num_protos, proj_x = proj_x.reshape(x.shape[0] * self.num_protos,
self.feature_dim) self.feature_dim)
return proj_x, dis return proj_x, dis
elif self.tpt == "local": elif self.tpt == "local":
x_conform = x.unsqueeze(1).repeat_interleave(self.num_protos, x_conform = (x.unsqueeze(1).repeat_interleave(self.num_protos,
1).unsqueeze(2) 1).unsqueeze(2))
dis = tangent_distance(x_conform, self.cls.prototypes, dis = tangent_distance(x_conform, self.cls.prototypes,
self.subspaces) self.subspaces)
elif self.tpt == "gloabl": elif self.tpt == "gloabl":
@@ -127,25 +130,27 @@ class GTLVQ(nn.Module):
_, _, v = torch.svd(data) _, _, v = torch.svd(data)
subspace = (torch.eye(v.shape[0]) - (v @ v.T)).T subspace = (torch.eye(v.shape[0]) - (v @ v.T)).T
subspaces = subspace[:, :num_subspaces] subspaces = subspace[:, :num_subspaces]
self.subspaces = torch.nn.Parameter( self.subspaces = (torch.nn.Parameter(
subspaces).clone().detach().requires_grad_(True) subspaces).clone().detach().requires_grad_(True))
def init_local_subspace(self, data): def init_local_subspace(self, data):
_, _, v = torch.svd(data) _, _, v = torch.svd(data)
inital_projector = (torch.eye(v.shape[0]) - (v @ v.T)).T inital_projector = (torch.eye(v.shape[0]) - (v @ v.T)).T
subspaces = inital_projector.unsqueeze(0).repeat_interleave( subspaces = inital_projector.unsqueeze(0).repeat_interleave(
self.num_protos, 0) self.num_protos, 0)
self.subspaces = torch.nn.Parameter( self.subspaces = (torch.nn.Parameter(
subspaces).clone().detach().requires_grad_(True) subspaces).clone().detach().requires_grad_(True))
def global_tangent_distances(self, x): def global_tangent_distances(self, x):
# Tangent Projection # Tangent Projection
x, projected_prototypes = x @ self.subspaces, self.cls.prototypes @ self.subspaces x, projected_prototypes = (
x @ self.subspaces,
self.cls.prototypes @ self.subspaces,
)
# Euclidean Distance # Euclidean Distance
return euclidean_distance_matrix(x, projected_prototypes) return euclidean_distance_matrix(x, projected_prototypes)
def local_tangent_projection(self, def local_tangent_projection(self, signals):
signals):
# Note: subspaces is always assumed as transposed and must be orthogonal! # Note: subspaces is always assumed as transposed and must be orthogonal!
# shape(signals): batch x proto_number x channels x dim1 x dim2 x ... x dimN # shape(signals): batch x proto_number x channels x dim1 x dim2 x ... x dimN
# shape(protos): proto_number x dim1 x dim2 x ... x dimN # shape(protos): proto_number x dim1 x dim2 x ... x dimN
@@ -183,8 +188,7 @@ class GTLVQ(nn.Module):
def orthogonalize_subspace(self): def orthogonalize_subspace(self):
if self.subspaces is not None: if self.subspaces is not None:
with torch.no_grad(): with torch.no_grad():
ortho_subpsaces = orthogonalization( ortho_subpsaces = (orthogonalization(self.subspaces)
self.subspaces if self.tpt == "global" else
) if self.tpt == 'global' else torch.nn.init.orthogonal_( torch.nn.init.orthogonal_(self.subspaces))
self.subspaces)
self.subspaces.copy_(ortho_subpsaces) self.subspaces.copy_(ortho_subpsaces)

View File

@@ -14,11 +14,11 @@ class _Prototypes(torch.nn.Module):
def _validate_prototype_distribution(self): def _validate_prototype_distribution(self):
if 0 in self.prototype_distribution: if 0 in self.prototype_distribution:
warnings.warn('Are you sure about the `0` in ' warnings.warn("Are you sure about the `0` in "
'`prototype_distribution`?') "`prototype_distribution`?")
def extra_repr(self): def extra_repr(self):
return f'prototypes.shape: {tuple(self.prototypes.shape)}' return f"prototypes.shape: {tuple(self.prototypes.shape)}"
def forward(self): def forward(self):
return self.prototypes, self.prototype_labels return self.prototypes, self.prototype_labels
@@ -29,14 +29,19 @@ class Prototypes1D(_Prototypes):
TODO Complete this doc-string. TODO Complete this doc-string.
""" """
def __init__(self, def __init__(
prototypes_per_class=1, self,
prototype_initializer='ones', prototypes_per_class=1,
prototype_distribution=None, prototype_initializer="ones",
data=None, prototype_distribution=None,
dtype=torch.float32, data=None,
one_hot_labels=False, dtype=torch.float32,
**kwargs): one_hot_labels=False,
**kwargs,
):
warnings.warn(
PendingDeprecationWarning(
"Prototypes1D will be replaced in future versions."))
# Convert tensors to python lists before processing # Convert tensors to python lists before processing
if prototype_distribution is not None: if prototype_distribution is not None:
@@ -44,25 +49,25 @@ class Prototypes1D(_Prototypes):
prototype_distribution = prototype_distribution.tolist() prototype_distribution = prototype_distribution.tolist()
if data is None: if data is None:
if 'input_dim' not in kwargs: if "input_dim" not in kwargs:
raise NameError('`input_dim` required if ' raise NameError("`input_dim` required if "
'no `data` is provided.') "no `data` is provided.")
if prototype_distribution: if prototype_distribution:
kwargs_nclasses = sum(prototype_distribution) kwargs_nclasses = sum(prototype_distribution)
else: else:
if 'nclasses' not in kwargs: if "nclasses" not in kwargs:
raise NameError('`prototype_distribution` required if ' raise NameError("`prototype_distribution` required if "
'both `data` and `nclasses` are not ' "both `data` and `nclasses` are not "
'provided.') "provided.")
kwargs_nclasses = kwargs.pop('nclasses') kwargs_nclasses = kwargs.pop("nclasses")
input_dim = kwargs.pop('input_dim') input_dim = kwargs.pop("input_dim")
if prototype_initializer in [ if prototype_initializer in [
'stratified_mean', 'stratified_random' "stratified_mean", "stratified_random"
]: ]:
warnings.warn( warnings.warn(
f'`prototype_initializer`: `{prototype_initializer}` ' f"`prototype_initializer`: `{prototype_initializer}` "
'requires `data`, but `data` is not provided. ' "requires `data`, but `data` is not provided. "
'Using randomly generated data instead.') "Using randomly generated data instead.")
x_train = torch.rand(kwargs_nclasses, input_dim) x_train = torch.rand(kwargs_nclasses, input_dim)
y_train = torch.arange(kwargs_nclasses) y_train = torch.arange(kwargs_nclasses)
if one_hot_labels: if one_hot_labels:
@@ -75,39 +80,39 @@ class Prototypes1D(_Prototypes):
nclasses = torch.unique(y_train, dim=-1).shape[-1] nclasses = torch.unique(y_train, dim=-1).shape[-1]
if nclasses == 1: if nclasses == 1:
warnings.warn('Are you sure about having one class only?') warnings.warn("Are you sure about having one class only?")
if x_train.ndim != 2: if x_train.ndim != 2:
raise ValueError('`data[0].ndim != 2`.') raise ValueError("`data[0].ndim != 2`.")
if y_train.ndim == 2: if y_train.ndim == 2:
if y_train.shape[1] == 1 and one_hot_labels: if y_train.shape[1] == 1 and one_hot_labels:
raise ValueError('`one_hot_labels` is set to `True` ' raise ValueError("`one_hot_labels` is set to `True` "
'but target labels are not one-hot-encoded.') "but target labels are not one-hot-encoded.")
if y_train.shape[1] != 1 and not one_hot_labels: if y_train.shape[1] != 1 and not one_hot_labels:
raise ValueError('`one_hot_labels` is set to `False` ' raise ValueError("`one_hot_labels` is set to `False` "
'but target labels in `data` ' "but target labels in `data` "
'are one-hot-encoded.') "are one-hot-encoded.")
if y_train.ndim == 1 and one_hot_labels: if y_train.ndim == 1 and one_hot_labels:
raise ValueError('`one_hot_labels` is set to `True` ' raise ValueError("`one_hot_labels` is set to `True` "
'but target labels are not one-hot-encoded.') "but target labels are not one-hot-encoded.")
# Verify input dimension if `input_dim` is provided # Verify input dimension if `input_dim` is provided
if 'input_dim' in kwargs: if "input_dim" in kwargs:
input_dim = kwargs.pop('input_dim') input_dim = kwargs.pop("input_dim")
if input_dim != x_train.shape[1]: if input_dim != x_train.shape[1]:
raise ValueError(f'Provided `input_dim`={input_dim} does ' raise ValueError(f"Provided `input_dim`={input_dim} does "
'not match data dimension ' "not match data dimension "
f'`data[0].shape[1]`={x_train.shape[1]}') f"`data[0].shape[1]`={x_train.shape[1]}")
# Verify the number of classes if `nclasses` is provided # Verify the number of classes if `nclasses` is provided
if 'nclasses' in kwargs: if "nclasses" in kwargs:
kwargs_nclasses = kwargs.pop('nclasses') kwargs_nclasses = kwargs.pop("nclasses")
if kwargs_nclasses != nclasses: if kwargs_nclasses != nclasses:
raise ValueError(f'Provided `nclasses={kwargs_nclasses}` does ' raise ValueError(f"Provided `nclasses={kwargs_nclasses}` does "
'not match data labels ' "not match data labels "
'`torch.unique(data[1]).shape[0]`' "`torch.unique(data[1]).shape[0]`"
f'={nclasses}') f"={nclasses}")
super().__init__(**kwargs) super().__init__(**kwargs)

View File

@@ -1 +0,0 @@
from .colors import color_scheme, get_legend_handles

View File

@@ -0,0 +1,46 @@
"""Easy matplotlib animation. From https://github.com/jwkvam/celluloid."""
from collections import defaultdict
from typing import Dict, List
from matplotlib.animation import ArtistAnimation
from matplotlib.artist import Artist
from matplotlib.figure import Figure
__version__ = "0.2.0"
class Camera:
"""Make animations easier."""
def __init__(self, figure: Figure) -> None:
"""Create camera from matplotlib figure."""
self._figure = figure
# need to keep track off artists for each axis
self._offsets: Dict[str, Dict[int, int]] = {
k: defaultdict(int)
for k in
["collections", "patches", "lines", "texts", "artists", "images"]
}
self._photos: List[List[Artist]] = []
def snap(self) -> List[Artist]:
"""Capture current state of the figure."""
frame_artists: List[Artist] = []
for i, axis in enumerate(self._figure.axes):
if axis.legend_ is not None:
axis.add_artist(axis.legend_)
for name in self._offsets:
new_artists = getattr(axis, name)[self._offsets[name][i]:]
frame_artists += new_artists
self._offsets[name][i] += len(new_artists)
self._photos.append(frame_artists)
return frame_artists
def animate(self, *args, **kwargs) -> ArtistAnimation:
"""Animate the snapshots taken.
Uses matplotlib.animation.ArtistAnimation
Returns
-------
ArtistAnimation
"""
return ArtistAnimation(self._figure, self._photos, *args, **kwargs)

View File

@@ -1,13 +1,14 @@
"""ProtoFlow color utilities.""" """ProtoFlow color utilities."""
from matplotlib import cm
from matplotlib.colors import Normalize
from matplotlib.colors import to_hex
from matplotlib.colors import to_rgb
import matplotlib.lines as mlines import matplotlib.lines as mlines
from matplotlib import cm
from matplotlib.colors import Normalize, to_hex, to_rgb
def color_scheme(n, cmap="viridis", form="hex", tikz=False, def color_scheme(n,
cmap="viridis",
form="hex",
tikz=False,
zero_indexed=False): zero_indexed=False):
"""Return *n* colors from the color scheme. """Return *n* colors from the color scheme.
@@ -57,13 +58,16 @@ def get_legend_handles(labels, marker="dots", zero_indexed=False):
zero_indexed=zero_indexed) zero_indexed=zero_indexed)
for label, color in zip(labels, colors.values()): for label, color in zip(labels, colors.values()):
if marker == "dots": if marker == "dots":
handle = mlines.Line2D([], [], handle = mlines.Line2D(
color="white", [],
markerfacecolor=color, [],
marker="o", color="white",
markersize=10, markerfacecolor=color,
markeredgecolor="k", marker="o",
label=label) markersize=10,
markeredgecolor="k",
label=label,
)
else: else:
handle = mlines.Line2D([], [], handle = mlines.Line2D([], [],
color=color, color=color,

243
prototorch/utils/utils.py Normal file
View File

@@ -0,0 +1,243 @@
"""Utilities that provide various small functionalities."""
import os
import pickle
import sys
from time import time
import matplotlib.pyplot as plt
import numpy as np
def progressbar(title, value, end, bar_width=20):
percent = float(value) / end
arrow = "=" * int(round(percent * bar_width) - 1) + ">"
spaces = "." * (bar_width - len(arrow))
sys.stdout.write("\r{}: [{}] {}%".format(title, arrow + spaces,
int(round(percent * 100))))
sys.stdout.flush()
if percent == 1.0:
print()
def prettify_string(inputs, start="", sep=" ", end="\n"):
outputs = start + " ".join(inputs.split()) + end
return outputs
def pretty_print(inputs):
print(prettify_string(inputs))
def writelog(self, *logs, logdir="./logs", logfile="run.txt"):
f = os.path.join(logdir, logfile)
with open(f, "a+") as fh:
for log in logs:
fh.write(log)
fh.write("\n")
def start_tensorboard(self, logdir="./logs"):
cmd = f"tensorboard --logdir={logdir} --port=6006"
os.system(cmd)
def make_directory(save_dir):
if not os.path.exists(save_dir):
print(f"Making directory {save_dir}.")
os.mkdir(save_dir)
def make_gif(filenames, duration, output_file=None):
try:
import imageio
except ModuleNotFoundError as e:
print("Please install Protoflow with [other] extra requirements.")
raise (e)
images = list()
for filename in filenames:
images.append(imageio.imread(filename))
if not output_file:
output_file = f"makegif.gif"
if images:
imageio.mimwrite(output_file, images, duration=duration)
def gif_from_dir(directory,
duration,
prefix="",
output_file=None,
verbose=True):
images = os.listdir(directory)
if verbose:
print(f"Making gif from {len(images)} images under {directory}.")
filenames = list()
# Sort images
images = sorted(
images,
key=lambda img: int(os.path.splitext(img)[0].replace(prefix, "")))
for image in images:
fname = os.path.join(directory, image)
filenames.append(fname)
if not output_file:
output_file = os.path.join(directory, "makegif.gif")
make_gif(filenames=filenames, duration=duration, output_file=output_file)
def accuracy_score(y_true, y_pred):
accuracy = np.sum(y_true == y_pred)
normalized_acc = accuracy / float(len(y_true))
return normalized_acc
def predict_and_score(clf,
x_test,
y_test,
verbose=False,
title="Test accuracy"):
y_pred = clf.predict(x_test)
accuracy = np.sum(y_test == y_pred)
normalized_acc = accuracy / float(len(y_test))
if verbose:
print(f"{title}: {normalized_acc * 100:06.04f}%")
return normalized_acc
def remove_nan_rows(arr):
"""Remove all rows with `nan` values in `arr`."""
mask = np.isnan(arr).any(axis=1)
return arr[~mask]
def remove_nan_cols(arr):
"""Remove all columns with `nan` values in `arr`."""
mask = np.isnan(arr).any(axis=0)
return arr[~mask]
def replace_in(arr, replacement_dict, inplace=False):
"""Replace the keys found in `arr` with the values from
the `replacement_dict`.
"""
if inplace:
new_arr = arr
else:
import copy
new_arr = copy.deepcopy(arr)
for k, v in replacement_dict.items():
new_arr[arr == k] = v
return new_arr
def train_test_split(data, train=0.7, val=0.15, shuffle=None, return_xy=False):
"""Split a classification dataset in such a way so as to
preserve the class distribution in subsamples of the dataset.
"""
if train + val > 1.0:
raise ValueError("Invalid split values for train and val.")
Y = data[:, -1]
labels = set(Y)
hist = dict()
for l in labels:
data_l = data[Y == l]
nl = len(data_l)
nl_train = int(nl * train)
nl_val = int(nl * val)
nl_test = nl - (nl_train + nl_val)
hist[l] = (nl_train, nl_val, nl_test)
train_data = list()
val_data = list()
test_data = list()
for l, (nl_train, nl_val, nl_test) in hist.items():
data_l = data[Y == l]
if shuffle:
np.random.shuffle(data_l)
train_l = data_l[:nl_train]
val_l = data_l[nl_train:nl_train + nl_val]
test_l = data_l[nl_train + nl_val:nl_train + nl_val + nl_test]
train_data.append(train_l)
val_data.append(val_l)
test_data.append(test_l)
def _squash(data_list):
data = np.array(data_list[0])
for item in data_list[1:]:
data = np.vstack((data, np.array(item)))
return data
train_data = _squash(train_data)
if val_data:
val_data = _squash(val_data)
if test_data:
test_data = _squash(test_data)
if return_xy:
x_train = train_data[:, :-1]
y_train = train_data[:, -1]
x_val = val_data[:, :-1]
y_val = val_data[:, -1]
x_test = test_data[:, :-1]
y_test = test_data[:, -1]
return (x_train, y_train), (x_val, y_val), (x_test, y_test)
return train_data, val_data, test_data
def class_histogram(data, title="Untitled"):
plt.figure(title)
plt.clf()
plt.title(title)
dist, counts = np.unique(data[:, -1], return_counts=True)
plt.bar(dist, counts)
plt.xticks(dist)
print("Call matplotlib.pyplot.show() to see the plot.")
def ntimer(n=10):
"""Wraps a function which wraps another function to time it."""
if n < 1:
raise (Exception(f"Invalid n = {n} given."))
def timer(func):
"""Wraps `func` with a timer and returns the wrapped `func`."""
def wrapper(*args, **kwargs):
rv = None
before = time()
for _ in range(n):
rv = func(*args, **kwargs)
after = time()
elapsed = after - before
print(f"Elapsed: {elapsed*1e3:02.02f} ms")
return rv
return wrapper
return timer
def memoize(verbose=True):
"""Wraps a function which wraps another function that memoizes."""
def memoizer(func):
"""Memoize (cache) return values of `func`.
Wraps `func` and returns the wrapped `func` so that `func`
is executed when the results are not available in the cache.
"""
cache = {}
def wrapper(*args, **kwargs):
t = (pickle.dumps(args), pickle.dumps(kwargs))
if t not in cache:
if verbose:
print(f"Adding NEW rv {func.__name__}{args}{kwargs} "
"to cache.")
cache[t] = func(*args, **kwargs)
else:
if verbose:
print(f"Using OLD rv {func.__name__}{args}{kwargs} "
"from cache.")
return cache[t]
return wrapper
return memoizer

View File

@@ -1,5 +0,0 @@
matplotlib==3.1.2
pytest==5.3.4
requests==2.22.0
codecov==2.0.22
tqdm==4.44.1

104
setup.py
View File

@@ -1,7 +1,14 @@
"""Install ProtoTorch.""" """
_____ _ _______ _
| __ \ | | |__ __| | |
| |__) | __ ___ | |_ ___ | | ___ _ __ ___| |__
| ___/ '__/ _ \| __/ _ \| |/ _ \| '__/ __| '_ \
| | | | | (_) | || (_) | | (_) | | | (__| | | |
|_| |_| \___/ \__\___/|_|\___/|_| \___|_| |_|
from setuptools import setup ProtoTorch Core Package
from setuptools import find_packages """
from setuptools import find_packages, setup
PROJECT_URL = "https://github.com/si-cim/prototorch" PROJECT_URL = "https://github.com/si-cim/prototorch"
DOWNLOAD_URL = "https://github.com/si-cim/prototorch.git" DOWNLOAD_URL = "https://github.com/si-cim/prototorch.git"
@@ -14,58 +21,63 @@ INSTALL_REQUIRES = [
"torchvision>=0.5.0", "torchvision>=0.5.0",
"numpy>=1.9.1", "numpy>=1.9.1",
] ]
DATASETS = [
"requests",
"tqdm",
]
DEV = ["bumpversion"]
DOCS = [ DOCS = [
"recommonmark", "recommonmark",
"sphinx", "sphinx",
"sphinx_rtd_theme", "sphinx_rtd_theme",
"sphinxcontrib-katex", "sphinxcontrib-katex",
] ]
DATASETS = [
"requests",
"tqdm",
]
EXAMPLES = [ EXAMPLES = [
"sklearn", "sklearn",
"matplotlib", "matplotlib",
"torchinfo", "torchinfo",
] ]
TESTS = ["pytest"] TESTS = ["codecov", "pytest"]
ALL = DOCS + DATASETS + EXAMPLES + TESTS ALL = DATASETS + DEV + DOCS + EXAMPLES + TESTS
setup(name="prototorch", setup(
version="0.2.0", name="prototorch",
description="Highly extensible, GPU-supported " version="0.4.2",
"Learning Vector Quantization (LVQ) toolbox " description="Highly extensible, GPU-supported "
"built using PyTorch and its nn API.", "Learning Vector Quantization (LVQ) toolbox "
long_description=long_description, "built using PyTorch and its nn API.",
long_description_content_type="text/markdown", long_description=long_description,
author="Jensun Ravichandran", long_description_content_type="text/markdown",
author_email="jjensun@gmail.com", author="Jensun Ravichandran",
url=PROJECT_URL, author_email="jjensun@gmail.com",
download_url=DOWNLOAD_URL, url=PROJECT_URL,
license="MIT", download_url=DOWNLOAD_URL,
install_requires=INSTALL_REQUIRES, license="MIT",
extras_require={ install_requires=INSTALL_REQUIRES,
"docs": DOCS, extras_require={
"datasets": DATASETS, "docs": DOCS,
"examples": EXAMPLES, "datasets": DATASETS,
"tests": TESTS, "examples": EXAMPLES,
"all": ALL, "tests": TESTS,
}, "all": ALL,
classifiers=[ },
"Development Status :: 2 - Pre-Alpha", classifiers=[
"Environment :: Console", "Development Status :: 2 - Pre-Alpha",
"Intended Audience :: Developers", "Environment :: Console",
"Intended Audience :: Education", "Intended Audience :: Developers",
"Intended Audience :: Science/Research", "Intended Audience :: Education",
"License :: OSI Approved :: MIT License", "Intended Audience :: Science/Research",
"Natural Language :: English", "License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3.6", "Natural Language :: English",
"Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.7",
"Operating System :: OS Independent", "Programming Language :: Python :: 3.8",
"Topic :: Scientific/Engineering :: Artificial Intelligence", "Programming Language :: Python :: 3.9",
"Topic :: Software Development :: Libraries", "Operating System :: OS Independent",
"Topic :: Software Development :: Libraries :: Python Modules", "Topic :: Scientific/Engineering :: Artificial Intelligence",
], "Topic :: Software Development :: Libraries",
packages=find_packages()) "Topic :: Software Development :: Libraries :: Python Modules",
],
packages=find_packages(),
zip_safe=False,
)

View File

@@ -12,26 +12,26 @@ from prototorch.datasets import abstract, tecator
class TestAbstract(unittest.TestCase): class TestAbstract(unittest.TestCase):
def test_getitem(self): def test_getitem(self):
with self.assertRaises(NotImplementedError): with self.assertRaises(NotImplementedError):
abstract.Dataset('./artifacts')[0] abstract.Dataset("./artifacts")[0]
def test_len(self): def test_len(self):
with self.assertRaises(NotImplementedError): with self.assertRaises(NotImplementedError):
len(abstract.Dataset('./artifacts')) len(abstract.Dataset("./artifacts"))
class TestProtoDataset(unittest.TestCase): class TestProtoDataset(unittest.TestCase):
def test_getitem(self): def test_getitem(self):
with self.assertRaises(NotImplementedError): with self.assertRaises(NotImplementedError):
abstract.ProtoDataset('./artifacts')[0] abstract.ProtoDataset("./artifacts")[0]
def test_download(self): def test_download(self):
with self.assertRaises(NotImplementedError): with self.assertRaises(NotImplementedError):
abstract.ProtoDataset('./artifacts').download() abstract.ProtoDataset("./artifacts").download()
class TestTecator(unittest.TestCase): class TestTecator(unittest.TestCase):
def setUp(self): def setUp(self):
self.artifacts_dir = './artifacts/Tecator' self.artifacts_dir = "./artifacts/Tecator"
self._remove_artifacts() self._remove_artifacts()
def _remove_artifacts(self): def _remove_artifacts(self):
@@ -39,23 +39,23 @@ class TestTecator(unittest.TestCase):
shutil.rmtree(self.artifacts_dir) shutil.rmtree(self.artifacts_dir)
def test_download_false(self): def test_download_false(self):
rootdir = self.artifacts_dir.rpartition('/')[0] rootdir = self.artifacts_dir.rpartition("/")[0]
self._remove_artifacts() self._remove_artifacts()
with self.assertRaises(RuntimeError): with self.assertRaises(RuntimeError):
_ = tecator.Tecator(rootdir, download=False) _ = tecator.Tecator(rootdir, download=False)
def test_download_caching(self): def test_download_caching(self):
rootdir = self.artifacts_dir.rpartition('/')[0] rootdir = self.artifacts_dir.rpartition("/")[0]
_ = tecator.Tecator(rootdir, download=True, verbose=False) _ = tecator.Tecator(rootdir, download=True, verbose=False)
_ = tecator.Tecator(rootdir, download=False, verbose=False) _ = tecator.Tecator(rootdir, download=False, verbose=False)
def test_repr(self): def test_repr(self):
rootdir = self.artifacts_dir.rpartition('/')[0] rootdir = self.artifacts_dir.rpartition("/")[0]
train = tecator.Tecator(rootdir, download=True, verbose=True) train = tecator.Tecator(rootdir, download=True, verbose=True)
self.assertTrue('Split: Train' in train.__repr__()) self.assertTrue("Split: Train" in train.__repr__())
def test_download_train(self): def test_download_train(self):
rootdir = self.artifacts_dir.rpartition('/')[0] rootdir = self.artifacts_dir.rpartition("/")[0]
train = tecator.Tecator(root=rootdir, train = tecator.Tecator(root=rootdir,
train=True, train=True,
download=True, download=True,
@@ -67,7 +67,7 @@ class TestTecator(unittest.TestCase):
self.assertEqual(x_train.shape[1], 100) self.assertEqual(x_train.shape[1], 100)
def test_download_test(self): def test_download_test(self):
rootdir = self.artifacts_dir.rpartition('/')[0] rootdir = self.artifacts_dir.rpartition("/")[0]
test = tecator.Tecator(root=rootdir, train=False, verbose=False) test = tecator.Tecator(root=rootdir, train=False, verbose=False)
x_test, y_test = test.data, test.targets x_test, y_test = test.data, test.targets
self.assertEqual(x_test.shape[0], 71) self.assertEqual(x_test.shape[0], 71)
@@ -75,19 +75,19 @@ class TestTecator(unittest.TestCase):
self.assertEqual(x_test.shape[1], 100) self.assertEqual(x_test.shape[1], 100)
def test_class_to_idx(self): def test_class_to_idx(self):
rootdir = self.artifacts_dir.rpartition('/')[0] rootdir = self.artifacts_dir.rpartition("/")[0]
test = tecator.Tecator(root=rootdir, train=False, verbose=False) test = tecator.Tecator(root=rootdir, train=False, verbose=False)
_ = test.class_to_idx _ = test.class_to_idx
def test_getitem(self): def test_getitem(self):
rootdir = self.artifacts_dir.rpartition('/')[0] rootdir = self.artifacts_dir.rpartition("/")[0]
test = tecator.Tecator(root=rootdir, train=False, verbose=False) test = tecator.Tecator(root=rootdir, train=False, verbose=False)
x, y = test[0] x, y = test[0]
self.assertEqual(x.shape[0], 100) self.assertEqual(x.shape[0], 100)
self.assertIsInstance(y, int) self.assertIsInstance(y, int)
def test_loadable_with_dataloader(self): def test_loadable_with_dataloader(self):
rootdir = self.artifacts_dir.rpartition('/')[0] rootdir = self.artifacts_dir.rpartition("/")[0]
test = tecator.Tecator(root=rootdir, train=False, verbose=False) test = tecator.Tecator(root=rootdir, train=False, verbose=False)
_ = torch.utils.data.DataLoader(test, batch_size=64, shuffle=True) _ = torch.utils.data.DataLoader(test, batch_size=64, shuffle=True)

View File

@@ -5,13 +5,18 @@ import unittest
import numpy as np import numpy as np
import torch import torch
from prototorch.functions import (activations, competitions, distances, from prototorch.functions import (
initializers, losses) activations,
competitions,
distances,
initializers,
losses,
)
class TestActivations(unittest.TestCase): class TestActivations(unittest.TestCase):
def setUp(self): def setUp(self):
self.flist = ['identity', 'sigmoid_beta', 'swish_beta'] self.flist = ["identity", "sigmoid_beta", "swish_beta"]
self.x = torch.randn(1024, 1) self.x = torch.randn(1024, 1)
def test_registry(self): def test_registry(self):
@@ -39,7 +44,7 @@ class TestActivations(unittest.TestCase):
self.assertEqual(1, f(1)) self.assertEqual(1, f(1))
def test_unknown_deserialization(self): def test_unknown_deserialization(self):
for funcname in ['blubb', 'foobar']: for funcname in ["blubb", "foobar"]:
with self.assertRaises(NameError): with self.assertRaises(NameError):
_ = activations.get_activation(funcname) _ = activations.get_activation(funcname)
@@ -52,7 +57,7 @@ class TestActivations(unittest.TestCase):
self.assertIsNone(mismatch) self.assertIsNone(mismatch)
def test_sigmoid_beta1(self): def test_sigmoid_beta1(self):
actual = activations.sigmoid_beta(self.x, beta=torch.tensor(1)) actual = activations.sigmoid_beta(self.x, beta=1.0)
desired = torch.sigmoid(self.x) desired = torch.sigmoid(self.x)
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
desired, desired,
@@ -60,7 +65,7 @@ class TestActivations(unittest.TestCase):
self.assertIsNone(mismatch) self.assertIsNone(mismatch)
def test_swish_beta1(self): def test_swish_beta1(self):
actual = activations.swish_beta(self.x, beta=torch.tensor(1)) actual = activations.swish_beta(self.x, beta=1.0)
desired = self.x * torch.sigmoid(self.x) desired = self.x * torch.sigmoid(self.x)
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
desired, desired,
@@ -76,7 +81,7 @@ class TestCompetitions(unittest.TestCase):
pass pass
def test_wtac(self): def test_wtac(self):
d = torch.tensor([[2., 3., 1.99, 3.01], [2., 3., 2.01, 3.]]) d = torch.tensor([[2.0, 3.0, 1.99, 3.01], [2.0, 3.0, 2.01, 3.0]])
labels = torch.tensor([0, 1, 2, 3]) labels = torch.tensor([0, 1, 2, 3])
actual = competitions.wtac(d, labels) actual = competitions.wtac(d, labels)
desired = torch.tensor([2, 0]) desired = torch.tensor([2, 0])
@@ -86,7 +91,7 @@ class TestCompetitions(unittest.TestCase):
self.assertIsNone(mismatch) self.assertIsNone(mismatch)
def test_wtac_unequal_dist(self): def test_wtac_unequal_dist(self):
d = torch.tensor([[2., 3., 4.], [2., 3., 1.]]) d = torch.tensor([[2.0, 3.0, 4.0], [2.0, 3.0, 1.0]])
labels = torch.tensor([0, 1, 1]) labels = torch.tensor([0, 1, 1])
actual = competitions.wtac(d, labels) actual = competitions.wtac(d, labels)
desired = torch.tensor([0, 1]) desired = torch.tensor([0, 1])
@@ -96,7 +101,7 @@ class TestCompetitions(unittest.TestCase):
self.assertIsNone(mismatch) self.assertIsNone(mismatch)
def test_wtac_one_hot(self): def test_wtac_one_hot(self):
d = torch.tensor([[1.99, 3.01], [3., 2.01]]) d = torch.tensor([[1.99, 3.01], [3.0, 2.01]])
labels = torch.tensor([[0, 1], [1, 0]]) labels = torch.tensor([[0, 1], [1, 0]])
actual = competitions.wtac(d, labels) actual = competitions.wtac(d, labels)
desired = torch.tensor([[0, 1], [1, 0]]) desired = torch.tensor([[0, 1], [1, 0]])
@@ -106,38 +111,38 @@ class TestCompetitions(unittest.TestCase):
self.assertIsNone(mismatch) self.assertIsNone(mismatch)
def test_stratified_min(self): def test_stratified_min(self):
d = torch.tensor([[1., 0., 2., 3.], [9., 8., 0, 1]]) d = torch.tensor([[1.0, 0.0, 2.0, 3.0], [9.0, 8.0, 0, 1]])
labels = torch.tensor([0, 0, 1, 2]) labels = torch.tensor([0, 0, 1, 2])
actual = competitions.stratified_min(d, labels) actual = competitions.stratified_min(d, labels)
desired = torch.tensor([[0., 2., 3.], [8., 0., 1.]]) desired = torch.tensor([[0.0, 2.0, 3.0], [8.0, 0.0, 1.0]])
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
desired, desired,
decimal=5) decimal=5)
self.assertIsNone(mismatch) self.assertIsNone(mismatch)
def test_stratified_min_one_hot(self): def test_stratified_min_one_hot(self):
d = torch.tensor([[1., 0., 2., 3.], [9., 8., 0, 1]]) d = torch.tensor([[1.0, 0.0, 2.0, 3.0], [9.0, 8.0, 0, 1]])
labels = torch.tensor([0, 0, 1, 2]) labels = torch.tensor([0, 0, 1, 2])
labels = torch.eye(3)[labels] labels = torch.eye(3)[labels]
actual = competitions.stratified_min(d, labels) actual = competitions.stratified_min(d, labels)
desired = torch.tensor([[0., 2., 3.], [8., 0., 1.]]) desired = torch.tensor([[0.0, 2.0, 3.0], [8.0, 0.0, 1.0]])
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
desired, desired,
decimal=5) decimal=5)
self.assertIsNone(mismatch) self.assertIsNone(mismatch)
def test_stratified_min_simple(self): def test_stratified_min_simple(self):
d = torch.tensor([[0., 2., 3.], [8., 0, 1]]) d = torch.tensor([[0.0, 2.0, 3.0], [8.0, 0, 1]])
labels = torch.tensor([0, 1, 2]) labels = torch.tensor([0, 1, 2])
actual = competitions.stratified_min(d, labels) actual = competitions.stratified_min(d, labels)
desired = torch.tensor([[0., 2., 3.], [8., 0., 1.]]) desired = torch.tensor([[0.0, 2.0, 3.0], [8.0, 0.0, 1.0]])
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
desired, desired,
decimal=5) decimal=5)
self.assertIsNone(mismatch) self.assertIsNone(mismatch)
def test_knnc_k1(self): def test_knnc_k1(self):
d = torch.tensor([[2., 3., 1.99, 3.01], [2., 3., 2.01, 3.]]) d = torch.tensor([[2.0, 3.0, 1.99, 3.01], [2.0, 3.0, 2.01, 3.0]])
labels = torch.tensor([0, 1, 2, 3]) labels = torch.tensor([0, 1, 2, 3])
actual = competitions.knnc(d, labels, k=torch.tensor([1])) actual = competitions.knnc(d, labels, k=torch.tensor([1]))
desired = torch.tensor([2, 0]) desired = torch.tensor([2, 0])
@@ -194,12 +199,12 @@ class TestDistances(unittest.TestCase):
desired = torch.empty(self.nx, self.ny) desired = torch.empty(self.nx, self.ny)
for i in range(self.nx): for i in range(self.nx):
for j in range(self.ny): for j in range(self.ny):
desired[i][j] = torch.nn.functional.pairwise_distance( desired[i][j] = (torch.nn.functional.pairwise_distance(
self.x[i].reshape(1, -1), self.x[i].reshape(1, -1),
self.y[j].reshape(1, -1), self.y[j].reshape(1, -1),
p=2, p=2,
keepdim=False, keepdim=False,
)**2 )**2)
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
desired, desired,
decimal=2) decimal=2)
@@ -254,14 +259,14 @@ class TestDistances(unittest.TestCase):
self.assertIsNone(mismatch) self.assertIsNone(mismatch)
def test_lpnorm_pinf(self): def test_lpnorm_pinf(self):
actual = distances.lpnorm_distance(self.x, self.y, p=float('inf')) actual = distances.lpnorm_distance(self.x, self.y, p=float("inf"))
desired = torch.empty(self.nx, self.ny) desired = torch.empty(self.nx, self.ny)
for i in range(self.nx): for i in range(self.nx):
for j in range(self.ny): for j in range(self.ny):
desired[i][j] = torch.nn.functional.pairwise_distance( desired[i][j] = torch.nn.functional.pairwise_distance(
self.x[i].reshape(1, -1), self.x[i].reshape(1, -1),
self.y[j].reshape(1, -1), self.y[j].reshape(1, -1),
p=float('inf'), p=float("inf"),
keepdim=False, keepdim=False,
) )
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
@@ -275,12 +280,12 @@ class TestDistances(unittest.TestCase):
desired = torch.empty(self.nx, self.ny) desired = torch.empty(self.nx, self.ny)
for i in range(self.nx): for i in range(self.nx):
for j in range(self.ny): for j in range(self.ny):
desired[i][j] = torch.nn.functional.pairwise_distance( desired[i][j] = (torch.nn.functional.pairwise_distance(
self.x[i].reshape(1, -1), self.x[i].reshape(1, -1),
self.y[j].reshape(1, -1), self.y[j].reshape(1, -1),
p=2, p=2,
keepdim=False, keepdim=False,
)**2 )**2)
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
desired, desired,
decimal=2) decimal=2)
@@ -293,12 +298,12 @@ class TestDistances(unittest.TestCase):
desired = torch.empty(self.nx, self.ny) desired = torch.empty(self.nx, self.ny)
for i in range(self.nx): for i in range(self.nx):
for j in range(self.ny): for j in range(self.ny):
desired[i][j] = torch.nn.functional.pairwise_distance( desired[i][j] = (torch.nn.functional.pairwise_distance(
self.x[i].reshape(1, -1), self.x[i].reshape(1, -1),
self.y[j].reshape(1, -1), self.y[j].reshape(1, -1),
p=2, p=2,
keepdim=False, keepdim=False,
)**2 )**2)
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
desired, desired,
decimal=2) decimal=2)
@@ -311,8 +316,12 @@ class TestDistances(unittest.TestCase):
class TestInitializers(unittest.TestCase): class TestInitializers(unittest.TestCase):
def setUp(self): def setUp(self):
self.flist = [ self.flist = [
'zeros', 'ones', 'rand', 'randn', 'stratified_mean', "zeros",
'stratified_random' "ones",
"rand",
"randn",
"stratified_mean",
"stratified_random",
] ]
self.x = torch.tensor( self.x = torch.tensor(
[[0, -1, -2], [10, 11, 12], [0, 0, 0], [2, 2, 2]], [[0, -1, -2], [10, 11, 12], [0, 0, 0], [2, 2, 2]],
@@ -340,7 +349,7 @@ class TestInitializers(unittest.TestCase):
self.assertEqual(1, f(1)) self.assertEqual(1, f(1))
def test_unknown_deserialization(self): def test_unknown_deserialization(self):
for funcname in ['blubb', 'foobar']: for funcname in ["blubb", "foobar"]:
with self.assertRaises(NameError): with self.assertRaises(NameError):
_ = initializers.get_initializer(funcname) _ = initializers.get_initializer(funcname)
@@ -383,7 +392,7 @@ class TestInitializers(unittest.TestCase):
def test_stratified_mean_equal1(self): def test_stratified_mean_equal1(self):
pdist = torch.tensor([1, 1]) pdist = torch.tensor([1, 1])
actual, _ = initializers.stratified_mean(self.x, self.y, pdist, False) actual, _ = initializers.stratified_mean(self.x, self.y, pdist, False)
desired = torch.tensor([[5., 5., 5.], [1., 1., 1.]]) desired = torch.tensor([[5.0, 5.0, 5.0], [1.0, 1.0, 1.0]])
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
desired, desired,
decimal=5) decimal=5)
@@ -393,7 +402,7 @@ class TestInitializers(unittest.TestCase):
pdist = torch.tensor([1, 1]) pdist = torch.tensor([1, 1])
actual, _ = initializers.stratified_random(self.x, self.y, pdist, actual, _ = initializers.stratified_random(self.x, self.y, pdist,
False) False)
desired = torch.tensor([[0., -1., -2.], [0., 0., 0.]]) desired = torch.tensor([[0.0, -1.0, -2.0], [0.0, 0.0, 0.0]])
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
desired, desired,
decimal=5) decimal=5)
@@ -402,8 +411,8 @@ class TestInitializers(unittest.TestCase):
def test_stratified_mean_equal2(self): def test_stratified_mean_equal2(self):
pdist = torch.tensor([2, 2]) pdist = torch.tensor([2, 2])
actual, _ = initializers.stratified_mean(self.x, self.y, pdist, False) actual, _ = initializers.stratified_mean(self.x, self.y, pdist, False)
desired = torch.tensor([[5., 5., 5.], [5., 5., 5.], [1., 1., 1.], desired = torch.tensor([[5.0, 5.0, 5.0], [5.0, 5.0, 5.0],
[1., 1., 1.]]) [1.0, 1.0, 1.0], [1.0, 1.0, 1.0]])
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
desired, desired,
decimal=5) decimal=5)
@@ -413,8 +422,8 @@ class TestInitializers(unittest.TestCase):
pdist = torch.tensor([2, 2]) pdist = torch.tensor([2, 2])
actual, _ = initializers.stratified_random(self.x, self.y, pdist, actual, _ = initializers.stratified_random(self.x, self.y, pdist,
False) False)
desired = torch.tensor([[0., -1., -2.], [0., -1., -2.], [0., 0., 0.], desired = torch.tensor([[0.0, -1.0, -2.0], [0.0, -1.0, -2.0],
[0., 0., 0.]]) [0.0, 0.0, 0.0], [0.0, 0.0, 0.0]])
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
desired, desired,
decimal=5) decimal=5)
@@ -423,8 +432,8 @@ class TestInitializers(unittest.TestCase):
def test_stratified_mean_unequal(self): def test_stratified_mean_unequal(self):
pdist = torch.tensor([1, 3]) pdist = torch.tensor([1, 3])
actual, _ = initializers.stratified_mean(self.x, self.y, pdist, False) actual, _ = initializers.stratified_mean(self.x, self.y, pdist, False)
desired = torch.tensor([[5., 5., 5.], [1., 1., 1.], [1., 1., 1.], desired = torch.tensor([[5.0, 5.0, 5.0], [1.0, 1.0, 1.0],
[1., 1., 1.]]) [1.0, 1.0, 1.0], [1.0, 1.0, 1.0]])
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
desired, desired,
decimal=5) decimal=5)
@@ -434,8 +443,8 @@ class TestInitializers(unittest.TestCase):
pdist = torch.tensor([1, 3]) pdist = torch.tensor([1, 3])
actual, _ = initializers.stratified_random(self.x, self.y, pdist, actual, _ = initializers.stratified_random(self.x, self.y, pdist,
False) False)
desired = torch.tensor([[0., -1., -2.], [0., 0., 0.], [0., 0., 0.], desired = torch.tensor([[0.0, -1.0, -2.0], [0.0, 0.0, 0.0],
[0., 0., 0.]]) [0.0, 0.0, 0.0], [0.0, 0.0, 0.0]])
mismatch = np.testing.assert_array_almost_equal(actual, mismatch = np.testing.assert_array_almost_equal(actual,
desired, desired,
decimal=5) decimal=5)
@@ -444,8 +453,8 @@ class TestInitializers(unittest.TestCase):
def test_stratified_mean_unequal_one_hot(self): def test_stratified_mean_unequal_one_hot(self):
pdist = torch.tensor([1, 3]) pdist = torch.tensor([1, 3])
y = torch.eye(2)[self.y] y = torch.eye(2)[self.y]
desired1 = torch.tensor([[5., 5., 5.], [1., 1., 1.], [1., 1., 1.], desired1 = torch.tensor([[5.0, 5.0, 5.0], [1.0, 1.0, 1.0],
[1., 1., 1.]]) [1.0, 1.0, 1.0], [1.0, 1.0, 1.0]])
actual1, actual2 = initializers.stratified_mean(self.x, y, pdist) actual1, actual2 = initializers.stratified_mean(self.x, y, pdist)
desired2 = torch.tensor([[1, 0], [0, 1], [0, 1], [0, 1]]) desired2 = torch.tensor([[1, 0], [0, 1], [0, 1], [0, 1]])
mismatch = np.testing.assert_array_almost_equal(actual1, mismatch = np.testing.assert_array_almost_equal(actual1,
@@ -460,8 +469,8 @@ class TestInitializers(unittest.TestCase):
pdist = torch.tensor([1, 3]) pdist = torch.tensor([1, 3])
y = torch.eye(2)[self.y] y = torch.eye(2)[self.y]
actual1, actual2 = initializers.stratified_random(self.x, y, pdist) actual1, actual2 = initializers.stratified_random(self.x, y, pdist)
desired1 = torch.tensor([[0., -1., -2.], [0., 0., 0.], [0., 0., 0.], desired1 = torch.tensor([[0.0, -1.0, -2.0], [0.0, 0.0, 0.0],
[0., 0., 0.]]) [0.0, 0.0, 0.0], [0.0, 0.0, 0.0]])
desired2 = torch.tensor([[1, 0], [0, 1], [0, 1], [0, 1]]) desired2 = torch.tensor([[1, 0], [0, 1], [0, 1], [0, 1]])
mismatch = np.testing.assert_array_almost_equal(actual1, mismatch = np.testing.assert_array_almost_equal(actual1,
desired1, desired1,

98
tests/test_kernels.py Normal file
View File

@@ -0,0 +1,98 @@
"""ProtoTorch kernels test suite."""
import unittest
import numpy as np
import torch
from prototorch.functions.distances import KernelDistance
from prototorch.functions.kernels import ExplicitKernel, RadialBasisFunctionKernel
class TestExplicitKernel(unittest.TestCase):
def setUp(self):
self.single_x = torch.randn(1024)
self.single_y = torch.randn(1024)
self.batch_x = torch.randn(32, 1024)
self.batch_y = torch.randn(32, 1024)
def test_single_values(self):
kernel = ExplicitKernel()
self.assertEqual(
kernel(self.single_x, self.single_y).shape, torch.Size([]))
def test_single_batch(self):
kernel = ExplicitKernel()
self.assertEqual(
kernel(self.single_x, self.batch_y).shape, torch.Size([32]))
def test_batch_single(self):
kernel = ExplicitKernel()
self.assertEqual(
kernel(self.batch_x, self.single_y).shape, torch.Size([32]))
def test_batch_values(self):
kernel = ExplicitKernel()
self.assertEqual(
kernel(self.batch_x, self.batch_y).shape, torch.Size([32, 32]))
class TestRadialBasisFunctionKernel(unittest.TestCase):
def setUp(self):
self.single_x = torch.randn(1024)
self.single_y = torch.randn(1024)
self.batch_x = torch.randn(32, 1024)
self.batch_y = torch.randn(32, 1024)
def test_single_values(self):
kernel = RadialBasisFunctionKernel(1)
self.assertEqual(
kernel(self.single_x, self.single_y).shape, torch.Size([]))
def test_single_batch(self):
kernel = RadialBasisFunctionKernel(1)
self.assertEqual(
kernel(self.single_x, self.batch_y).shape, torch.Size([32]))
def test_batch_single(self):
kernel = RadialBasisFunctionKernel(1)
self.assertEqual(
kernel(self.batch_x, self.single_y).shape, torch.Size([32]))
def test_batch_values(self):
kernel = RadialBasisFunctionKernel(1)
self.assertEqual(
kernel(self.batch_x, self.batch_y).shape, torch.Size([32, 32]))
class TestKernelDistance(unittest.TestCase):
def setUp(self):
self.single_x = torch.randn(1024)
self.single_y = torch.randn(1024)
self.batch_x = torch.randn(32, 1024)
self.batch_y = torch.randn(32, 1024)
self.kernel = ExplicitKernel()
def test_single_values(self):
distance = KernelDistance(self.kernel)
self.assertEqual(
distance(self.single_x, self.single_y).shape, torch.Size([]))
def test_single_batch(self):
distance = KernelDistance(self.kernel)
self.assertEqual(
distance(self.single_x, self.batch_y).shape, torch.Size([32]))
def test_batch_single(self):
distance = KernelDistance(self.kernel)
self.assertEqual(
distance(self.batch_x, self.single_y).shape, torch.Size([32]))
def test_batch_values(self):
distance = KernelDistance(self.kernel)
self.assertEqual(
distance(self.batch_x, self.batch_y).shape, torch.Size([32, 32]))

View File

@@ -29,10 +29,12 @@ class TestPrototypes(unittest.TestCase):
_ = prototypes.Prototypes1D(nclasses=1, input_dim=1) _ = prototypes.Prototypes1D(nclasses=1, input_dim=1)
def test_prototypes1d_init_without_pdist(self): def test_prototypes1d_init_without_pdist(self):
p1 = prototypes.Prototypes1D(input_dim=6, p1 = prototypes.Prototypes1D(
nclasses=2, input_dim=6,
prototypes_per_class=4, nclasses=2,
prototype_initializer='ones') prototypes_per_class=4,
prototype_initializer="ones",
)
protos = p1.prototypes protos = p1.prototypes
actual = protos.detach().numpy() actual = protos.detach().numpy()
desired = torch.ones(8, 6) desired = torch.ones(8, 6)
@@ -45,7 +47,7 @@ class TestPrototypes(unittest.TestCase):
pdist = [2, 2] pdist = [2, 2]
p1 = prototypes.Prototypes1D(input_dim=3, p1 = prototypes.Prototypes1D(input_dim=3,
prototype_distribution=pdist, prototype_distribution=pdist,
prototype_initializer='zeros') prototype_initializer="zeros")
protos = p1.prototypes protos = p1.prototypes
actual = protos.detach().numpy() actual = protos.detach().numpy()
desired = torch.zeros(4, 3) desired = torch.zeros(4, 3)
@@ -60,14 +62,15 @@ class TestPrototypes(unittest.TestCase):
input_dim=3, input_dim=3,
nclasses=2, nclasses=2,
prototypes_per_class=1, prototypes_per_class=1,
prototype_initializer='stratified_mean', prototype_initializer="stratified_mean",
data=None) data=None,
)
def test_prototypes1d_init_torch_pdist(self): def test_prototypes1d_init_torch_pdist(self):
pdist = torch.tensor([2, 2]) pdist = torch.tensor([2, 2])
p1 = prototypes.Prototypes1D(input_dim=3, p1 = prototypes.Prototypes1D(input_dim=3,
prototype_distribution=pdist, prototype_distribution=pdist,
prototype_initializer='zeros') prototype_initializer="zeros")
protos = p1.prototypes protos = p1.prototypes
actual = protos.detach().numpy() actual = protos.detach().numpy()
desired = torch.zeros(4, 3) desired = torch.zeros(4, 3)
@@ -77,24 +80,30 @@ class TestPrototypes(unittest.TestCase):
self.assertIsNone(mismatch) self.assertIsNone(mismatch)
def test_prototypes1d_init_without_inputdim_with_data(self): def test_prototypes1d_init_without_inputdim_with_data(self):
_ = prototypes.Prototypes1D(nclasses=2, _ = prototypes.Prototypes1D(
prototypes_per_class=1, nclasses=2,
prototype_initializer='stratified_mean', prototypes_per_class=1,
data=[[[1.], [0.]], [1, 0]]) prototype_initializer="stratified_mean",
data=[[[1.0], [0.0]], [1, 0]],
)
def test_prototypes1d_init_with_int_data(self): def test_prototypes1d_init_with_int_data(self):
_ = prototypes.Prototypes1D(nclasses=2, _ = prototypes.Prototypes1D(
prototypes_per_class=1, nclasses=2,
prototype_initializer='stratified_mean', prototypes_per_class=1,
data=[[[1], [0]], [1, 0]]) prototype_initializer="stratified_mean",
data=[[[1], [0]], [1, 0]],
)
def test_prototypes1d_init_one_hot_without_data(self): def test_prototypes1d_init_one_hot_without_data(self):
_ = prototypes.Prototypes1D(input_dim=1, _ = prototypes.Prototypes1D(
nclasses=2, input_dim=1,
prototypes_per_class=1, nclasses=2,
prototype_initializer='stratified_mean', prototypes_per_class=1,
data=None, prototype_initializer="stratified_mean",
one_hot_labels=True) data=None,
one_hot_labels=True,
)
def test_prototypes1d_init_one_hot_labels_false(self): def test_prototypes1d_init_one_hot_labels_false(self):
"""Test if ValueError is raised when `one_hot_labels` is set to `False` """Test if ValueError is raised when `one_hot_labels` is set to `False`
@@ -105,9 +114,10 @@ class TestPrototypes(unittest.TestCase):
input_dim=1, input_dim=1,
nclasses=2, nclasses=2,
prototypes_per_class=1, prototypes_per_class=1,
prototype_initializer='stratified_mean', prototype_initializer="stratified_mean",
data=([[0.], [1.]], [[0, 1], [1, 0]]), data=([[0.0], [1.0]], [[0, 1], [1, 0]]),
one_hot_labels=False) one_hot_labels=False,
)
def test_prototypes1d_init_1d_y_data_one_hot_labels_true(self): def test_prototypes1d_init_1d_y_data_one_hot_labels_true(self):
"""Test if ValueError is raised when `one_hot_labels` is set to `True` """Test if ValueError is raised when `one_hot_labels` is set to `True`
@@ -118,9 +128,10 @@ class TestPrototypes(unittest.TestCase):
input_dim=1, input_dim=1,
nclasses=2, nclasses=2,
prototypes_per_class=1, prototypes_per_class=1,
prototype_initializer='stratified_mean', prototype_initializer="stratified_mean",
data=([[0.], [1.]], [0, 1]), data=([[0.0], [1.0]], [0, 1]),
one_hot_labels=True) one_hot_labels=True,
)
def test_prototypes1d_init_one_hot_labels_true(self): def test_prototypes1d_init_one_hot_labels_true(self):
"""Test if ValueError is raised when `one_hot_labels` is set to `True` """Test if ValueError is raised when `one_hot_labels` is set to `True`
@@ -132,25 +143,27 @@ class TestPrototypes(unittest.TestCase):
input_dim=1, input_dim=1,
nclasses=2, nclasses=2,
prototypes_per_class=1, prototypes_per_class=1,
prototype_initializer='stratified_mean', prototype_initializer="stratified_mean",
data=([[0.], [1.]], [[0], [1]]), data=([[0.0], [1.0]], [[0], [1]]),
one_hot_labels=True) one_hot_labels=True,
)
def test_prototypes1d_init_with_int_dtype(self): def test_prototypes1d_init_with_int_dtype(self):
with self.assertRaises(RuntimeError): with self.assertRaises(RuntimeError):
_ = prototypes.Prototypes1D( _ = prototypes.Prototypes1D(
nclasses=2, nclasses=2,
prototypes_per_class=1, prototypes_per_class=1,
prototype_initializer='stratified_mean', prototype_initializer="stratified_mean",
data=[[[1], [0]], [1, 0]], data=[[[1], [0]], [1, 0]],
dtype=torch.int32) dtype=torch.int32,
)
def test_prototypes1d_inputndim_with_data(self): def test_prototypes1d_inputndim_with_data(self):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
_ = prototypes.Prototypes1D(input_dim=1, _ = prototypes.Prototypes1D(input_dim=1,
nclasses=1, nclasses=1,
prototypes_per_class=1, prototypes_per_class=1,
data=[[1.], [1]]) data=[[1.0], [1]])
def test_prototypes1d_inputdim_with_data(self): def test_prototypes1d_inputdim_with_data(self):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
@@ -158,8 +171,9 @@ class TestPrototypes(unittest.TestCase):
input_dim=2, input_dim=2,
nclasses=2, nclasses=2,
prototypes_per_class=1, prototypes_per_class=1,
prototype_initializer='stratified_mean', prototype_initializer="stratified_mean",
data=[[[1.], [0.]], [1, 0]]) data=[[[1.0], [0.0]], [1, 0]],
)
def test_prototypes1d_nclasses_with_data(self): def test_prototypes1d_nclasses_with_data(self):
"""Test ValueError raise if provided `nclasses` is not the same """Test ValueError raise if provided `nclasses` is not the same
@@ -170,13 +184,14 @@ class TestPrototypes(unittest.TestCase):
input_dim=1, input_dim=1,
nclasses=1, nclasses=1,
prototypes_per_class=1, prototypes_per_class=1,
prototype_initializer='stratified_mean', prototype_initializer="stratified_mean",
data=[[[1.], [2.]], [1, 2]]) data=[[[1.0], [2.0]], [1, 2]],
)
def test_prototypes1d_init_with_ppc(self): def test_prototypes1d_init_with_ppc(self):
p1 = prototypes.Prototypes1D(data=[self.x, self.y], p1 = prototypes.Prototypes1D(data=[self.x, self.y],
prototypes_per_class=2, prototypes_per_class=2,
prototype_initializer='zeros') prototype_initializer="zeros")
protos = p1.prototypes protos = p1.prototypes
actual = protos.detach().numpy() actual = protos.detach().numpy()
desired = torch.zeros(4, 3) desired = torch.zeros(4, 3)
@@ -186,9 +201,11 @@ class TestPrototypes(unittest.TestCase):
self.assertIsNone(mismatch) self.assertIsNone(mismatch)
def test_prototypes1d_init_with_pdist(self): def test_prototypes1d_init_with_pdist(self):
p1 = prototypes.Prototypes1D(data=[self.x, self.y], p1 = prototypes.Prototypes1D(
prototype_distribution=[6, 9], data=[self.x, self.y],
prototype_initializer='zeros') prototype_distribution=[6, 9],
prototype_initializer="zeros",
)
protos = p1.prototypes protos = p1.prototypes
actual = protos.detach().numpy() actual = protos.detach().numpy()
desired = torch.zeros(15, 3) desired = torch.zeros(15, 3)
@@ -201,10 +218,12 @@ class TestPrototypes(unittest.TestCase):
def my_initializer(*args, **kwargs): def my_initializer(*args, **kwargs):
return torch.full((2, 99), 99.0), torch.tensor([0, 1]) return torch.full((2, 99), 99.0), torch.tensor([0, 1])
p1 = prototypes.Prototypes1D(input_dim=99, p1 = prototypes.Prototypes1D(
nclasses=2, input_dim=99,
prototypes_per_class=1, nclasses=2,
prototype_initializer=my_initializer) prototypes_per_class=1,
prototype_initializer=my_initializer,
)
protos = p1.prototypes protos = p1.prototypes
actual = protos.detach().numpy() actual = protos.detach().numpy()
desired = 99 * torch.ones(2, 99) desired = 99 * torch.ones(2, 99)
@@ -231,7 +250,7 @@ class TestPrototypes(unittest.TestCase):
def test_prototypes1d_validate_extra_repr_not_empty(self): def test_prototypes1d_validate_extra_repr_not_empty(self):
p1 = prototypes.Prototypes1D(input_dim=0, prototype_distribution=[0]) p1 = prototypes.Prototypes1D(input_dim=0, prototype_distribution=[0])
rep = p1.extra_repr() rep = p1.extra_repr()
self.assertNotEqual(rep, '') self.assertNotEqual(rep, "")
def tearDown(self): def tearDown(self):
del self.x, self.y, self.gen del self.x, self.y, self.gen
@@ -243,11 +262,11 @@ class TestLosses(unittest.TestCase):
pass pass
def test_glvqloss_init(self): def test_glvqloss_init(self):
_ = losses.GLVQLoss(0, 'swish_beta', beta=20) _ = losses.GLVQLoss(0, "swish_beta", beta=20)
def test_glvqloss_forward_1ppc(self): def test_glvqloss_forward_1ppc(self):
criterion = losses.GLVQLoss(margin=0, criterion = losses.GLVQLoss(margin=0,
squashing='sigmoid_beta', squashing="sigmoid_beta",
beta=100) beta=100)
d = torch.stack([torch.ones(100), torch.zeros(100)], dim=1) d = torch.stack([torch.ones(100), torch.zeros(100)], dim=1)
labels = torch.tensor([0, 1]) labels = torch.tensor([0, 1])
@@ -259,7 +278,7 @@ class TestLosses(unittest.TestCase):
def test_glvqloss_forward_2ppc(self): def test_glvqloss_forward_2ppc(self):
criterion = losses.GLVQLoss(margin=0, criterion = losses.GLVQLoss(margin=0,
squashing='sigmoid_beta', squashing="sigmoid_beta",
beta=100) beta=100)
d = torch.stack([ d = torch.stack([
torch.ones(100), torch.ones(100),

15
tox.ini
View File

@@ -1,15 +0,0 @@
# tox (https://tox.readthedocs.io/) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
[tox]
envlist = py36,py37,py38
[testenv]
deps =
pytest
coverage
commands =
pip install -e .
coverage run -m pytest