# Source code for openTSNE.affinity

```
import logging
import operator
from typing import Iterable
from functools import reduce
import numpy as np
import scipy.sparse as sp
from openTSNE import _tsne
from openTSNE import nearest_neighbors
from openTSNE import utils
log = logging.getLogger(__name__)
[docs]class Affinities:
"""Compute the affinities between samples.
t-SNE takes as input an affinity matrix :math:`P`, and does not really care
about anything else from the data. This means we can use t-SNE for any data
where we are able to express interactions between samples with an affinity
matrix.
Attributes
----------
P: array_like
The :math:`N \\times N` affinity matrix expressing interactions between
:math:`N` initial data samples.
verbose: bool
"""
def __init__(self, verbose=False):
self.P = None
self.verbose = verbose
[docs] def to_new(self, data, return_distances=False):
"""Compute the affinities of new samples to the initial samples.
This is necessary for embedding new data points into an existing
embedding.
Parameters
----------
data: np.ndarray
The data points to be added to the existing embedding.
return_distances: bool
If needed, the function can return the indices of the nearest
neighbors and their corresponding distances.
Returns
-------
P: array_like
An :math:`N \\times M` affinity matrix expressing interactions
between :math:`N` new data points the initial :math:`M` data
samples.
indices: np.ndarray
Returned if ``return_distances=True``. The indices of the :math:`k`
nearest neighbors in the existing embedding for every new data
point.
distances: np.ndarray
Returned if ``return_distances=True``. The distances to the
:math:`k` nearest neighbors in the existing embedding for every new
data point.
"""
[docs]class PerplexityBasedNN(Affinities):
"""Compute affinities using nearest neighbors.
Please see the :ref:`parameter-guide` for more information.
Parameters
----------
data: np.ndarray
The data matrix.
perplexity: float
Perplexity can be thought of as the continuous :math:`k` number of
nearest neighbors, for which t-SNE will attempt to preserve distances.
method: str
Specifies the nearest neighbor method to use. Can be ``exact``, ``annoy``,
``pynndescent``, ``approx``, or ``auto`` (default). ``approx`` uses Annoy
if the input data matrix is not a sparse object and if Annoy supports
the given metric. Otherwise it uses Pynndescent. ``auto`` uses exact
nearest neighbors for N<1000 and the same heuristic as ``approx`` for N>=1000.
metric: Union[str, Callable]
The metric to be used to compute affinities between points in the
original space.
metric_params: dict
Additional keyword arguments for the metric function.
symmetrize: bool
Symmetrize affinity matrix. Standard t-SNE symmetrizes the interactions
but when embedding new data, symmetrization is not performed.
n_jobs: int
The number of threads to use while running t-SNE. This follows the
scikit-learn convention, ``-1`` meaning all processors, ``-2`` meaning
all but one, etc.
random_state: Union[int, RandomState]
If the value is an int, random_state is the seed used by the random
number generator. If the value is a RandomState instance, then it will
be used as the random number generator. If the value is None, the random
number generator is the RandomState instance used by `np.random`.
verbose: bool
"""
def __init__(
self,
data,
perplexity=30,
method="auto",
metric="euclidean",
metric_params=None,
symmetrize=True,
n_jobs=1,
random_state=None,
verbose=False,
):
self.n_samples = data.shape[0]
self.perplexity = self.check_perplexity(perplexity)
self.verbose = verbose
k_neighbors = min(self.n_samples - 1, int(3 * self.perplexity))
self.knn_index, self.__neighbors, self.__distances = build_knn_index(
data, method, k_neighbors, metric, metric_params, n_jobs, random_state, verbose
)
with utils.Timer("Calculating affinity matrix...", self.verbose):
self.P = joint_probabilities_nn(
self.__neighbors,
self.__distances,
[self.perplexity],
symmetrize=symmetrize,
n_jobs=n_jobs,
)
self.n_jobs = n_jobs
[docs] def set_perplexity(self, new_perplexity):
"""Change the perplexity of the affinity matrix.
Note that we only allow lowering the perplexity or restoring it to its
original value. This restriction exists because setting a higher
perplexity value requires recomputing all the nearest neighbors, which
can take a long time. To avoid potential confusion as to why execution
time is slow, this is not allowed. If you would like to increase the
perplexity above the initial value, simply create a new instance.
Parameters
----------
new_perplexity: float
The new perplexity.
"""
# If the value hasn't changed, there's nothing to do
if new_perplexity == self.perplexity:
return
# Verify that the perplexity isn't too large
new_perplexity = self.check_perplexity(new_perplexity)
# Recompute the affinity matrix
k_neighbors = min(self.n_samples - 1, int(3 * new_perplexity))
if k_neighbors > self.__neighbors.shape[1]:
raise RuntimeError(
"The desired perplexity `%.2f` is larger than the initial one "
"used. This would need to recompute the nearest neighbors, "
"which is not efficient. Please create a new `%s` instance "
"with the increased perplexity."
% (new_perplexity, self.__class__.__name__)
)
self.perplexity = new_perplexity
with utils.Timer(
"Perplexity changed. Recomputing affinity matrix...", self.verbose
):
self.P = joint_probabilities_nn(
self.__neighbors[:, :k_neighbors],
self.__distances[:, :k_neighbors],
[self.perplexity],
symmetrize=True,
n_jobs=self.n_jobs,
)
[docs] def to_new(self, data, perplexity=None, return_distances=False):
"""Compute the affinities of new samples to the initial samples.
This is necessary for embedding new data points into an existing
embedding.
Please see the :ref:`parameter-guide` for more information.
Parameters
----------
data: np.ndarray
The data points to be added to the existing embedding.
perplexity: float
Perplexity can be thought of as the continuous :math:`k` number of
nearest neighbors, for which t-SNE will attempt to preserve
distances.
return_distances: bool
If needed, the function can return the indices of the nearest
neighbors and their corresponding distances.
Returns
-------
P: array_like
An :math:`N \\times M` affinity matrix expressing interactions
between :math:`N` new data points the initial :math:`M` data
samples.
indices: np.ndarray
Returned if ``return_distances=True``. The indices of the :math:`k`
nearest neighbors in the existing embedding for every new data
point.
distances: np.ndarray
Returned if ``return_distances=True``. The distances to the
:math:`k` nearest neighbors in the existing embedding for every new
data point.
"""
perplexity = perplexity if perplexity is not None else self.perplexity
perplexity = self.check_perplexity(perplexity)
k_neighbors = min(self.n_samples - 1, int(3 * perplexity))
neighbors, distances = self.knn_index.query(data, k_neighbors)
with utils.Timer("Calculating affinity matrix...", self.verbose):
P = joint_probabilities_nn(
neighbors,
distances,
[perplexity],
symmetrize=False,
normalization="point-wise",
n_reference_samples=self.n_samples,
n_jobs=self.n_jobs,
)
if return_distances:
return P, neighbors, distances
return P
[docs] def check_perplexity(self, perplexity):
if perplexity <= 0:
raise ValueError("Perplexity must be >=0. %.2f given" % perplexity)
if self.n_samples - 1 < 3 * perplexity:
old_perplexity, perplexity = perplexity, (self.n_samples - 1) / 3
log.warning(
"Perplexity value %d is too high. Using perplexity %.2f instead"
% (old_perplexity, perplexity)
)
return perplexity
def build_knn_index(
data, method, k, metric, metric_params=None, n_jobs=1, random_state=None, verbose=False
):
if not sp.issparse(data) and metric in [
"cosine",
"euclidean",
"manhattan",
"hamming",
"dot",
"l1",
"l2",
"taxicab",
]:
preferred_approx_method = nearest_neighbors.Annoy
else:
preferred_approx_method = nearest_neighbors.NNDescent
if data.shape[0] < 1000:
preferred_method = nearest_neighbors.BallTree
else:
preferred_method = preferred_approx_method
methods = {
"exact": nearest_neighbors.BallTree,
"auto": preferred_method,
"approx": preferred_approx_method,
"annoy": nearest_neighbors.Annoy,
"pynndescent": nearest_neighbors.NNDescent,
}
if isinstance(method, nearest_neighbors.KNNIndex):
knn_index = method
elif method not in methods:
raise ValueError(
"Unrecognized nearest neighbor algorithm `%s`. Please choose one "
"of the supported methods or provide a valid `KNNIndex` instance." % method
)
else:
knn_index = methods[method](
metric=metric,
metric_params=metric_params,
n_jobs=n_jobs,
random_state=random_state,
verbose=verbose,
)
neighbors, distances = knn_index.build(data, k=k)
return knn_index, neighbors, distances
def joint_probabilities_nn(
neighbors,
distances,
perplexities,
symmetrize=True,
normalization="pair-wise",
n_reference_samples=None,
n_jobs=1,
):
"""Compute the conditional probability matrix P_{j|i}.
This method computes an approximation to P using the nearest neighbors.
Parameters
----------
neighbors: np.ndarray
A `n_samples * k_neighbors` matrix containing the indices to each
points" nearest neighbors in descending order.
distances: np.ndarray
A `n_samples * k_neighbors` matrix containing the distances to the
neighbors at indices defined in the neighbors parameter.
perplexities: double
The desired perplexity of the probability distribution.
symmetrize: bool
Whether to symmetrize the probability matrix or not. Symmetrizing is
used for typical t-SNE, but does not make sense when embedding new data
into an existing embedding.
normalization: str
The normalization scheme to use for the affinities. Standard t-SNE
considers interactions between all the data points, therefore the entire
affinity matrix is regarded as a probability distribution, and must sum
to 1. When embedding new points, we only consider interactions to
existing points, and treat each point separately. In this case, we
row-normalize the affinity matrix, meaning each point gets its own
probability distribution.
n_reference_samples: int
The number of samples in the existing (reference) embedding. Needed to
properly construct the sparse P matrix.
n_jobs: int
Number of threads.
Returns
-------
csr_matrix
A `n_samples * n_reference_samples` matrix containing the probabilities
that a new sample would appear as a neighbor of a reference point.
"""
assert normalization in (
"pair-wise",
"point-wise",
), f"Unrecognized normalization scheme `{normalization}`."
n_samples, k_neighbors = distances.shape
if n_reference_samples is None:
n_reference_samples = n_samples
# Compute asymmetric pairwise input similarities
conditional_P = _tsne.compute_gaussian_perplexity(
np.array(distances, dtype=float),
np.array(perplexities, dtype=float),
num_threads=n_jobs,
)
conditional_P = np.asarray(conditional_P)
P = sp.csr_matrix(
(
conditional_P.ravel(),
neighbors.ravel(),
range(0, n_samples * k_neighbors + 1, k_neighbors),
),
shape=(n_samples, n_reference_samples),
)
# Symmetrize the probability matrix
if symmetrize:
P = (P + P.T) / 2
if normalization == "pair-wise":
P /= np.sum(P)
elif normalization == "point-wise":
P = sp.diags(np.asarray(1 / P.sum(axis=1)).ravel()) @ P
return P
[docs]class FixedSigmaNN(Affinities):
"""Compute affinities using using nearest neighbors and a fixed bandwidth
for the Gaussians in the ambient space.
Using a fixed Gaussian bandwidth can enable us to find smaller clusters of
data points than we might be able to using the automatically determined
bandwidths using perplexity. Note however that this requires mostly trial
and error.
Parameters
----------
data: np.ndarray
The data matrix.
sigma: float
The bandwidth to use for the Gaussian kernels in the ambient space.
k: int
The number of nearest neighbors to consider for each kernel.
method: str
Specifies the nearest neighbor method to use. Can be ``exact``, ``annoy``,
``pynndescent``, ``approx``, or ``auto`` (default). ``approx`` uses Annoy
if the input data matrix is not a sparse object and if Annoy supports
the given metric. Otherwise it uses Pynndescent. ``auto`` uses exact
nearest neighbors for N<1000 and the same heuristic as ``approx`` for N>=1000.
metric: Union[str, Callable]
The metric to be used to compute affinities between points in the
original space.
metric_params: dict
Additional keyword arguments for the metric function.
symmetrize: bool
Symmetrize affinity matrix. Standard t-SNE symmetrizes the interactions
but when embedding new data, symmetrization is not performed.
n_jobs: int
The number of threads to use while running t-SNE. This follows the
scikit-learn convention, ``-1`` meaning all processors, ``-2`` meaning
all but one, etc.
random_state: Union[int, RandomState]
If the value is an int, random_state is the seed used by the random
number generator. If the value is a RandomState instance, then it will
be used as the random number generator. If the value is None, the random
number generator is the RandomState instance used by `np.random`.
verbose: bool
"""
def __init__(
self,
data,
sigma,
k=30,
method="auto",
metric="euclidean",
metric_params=None,
symmetrize=True,
n_jobs=1,
random_state=None,
verbose=False,
):
self.n_samples = n_samples = data.shape[0]
self.verbose = verbose
if k >= self.n_samples:
raise ValueError(
"`k` (%d) cannot be larger than N-1 (%d)." % (k, self.n_samples)
)
knn_index, neighbors, distances = build_knn_index(
data, method, k, metric, metric_params, n_jobs, random_state, self.verbose
)
self.knn_index = knn_index
with utils.Timer("Calculating affinity matrix...", self.verbose):
# Compute asymmetric pairwise input similarities
conditional_P = np.exp(-(distances ** 2) / (2 * sigma ** 2))
conditional_P /= np.sum(conditional_P, axis=1)[:, np.newaxis]
P = sp.csr_matrix(
(
conditional_P.ravel(),
neighbors.ravel(),
range(0, n_samples * k + 1, k),
),
shape=(n_samples, n_samples),
)
# Symmetrize the probability matrix
if symmetrize:
P = (P + P.T) / 2
# Convert weights to probabilities
P /= np.sum(P)
self.sigma = sigma
self.k = k
self.P = P
self.n_jobs = n_jobs
[docs] def to_new(self, data, k=None, sigma=None, return_distances=False):
"""Compute the affinities of new samples to the initial samples.
This is necessary for embedding new data points into an existing
embedding.
Parameters
----------
data: np.ndarray
The data points to be added to the existing embedding.
k: int
The number of nearest neighbors to consider for each kernel.
sigma: float
The bandwidth to use for the Gaussian kernels in the ambient space.
return_distances: bool
If needed, the function can return the indices of the nearest
neighbors and their corresponding distances.
Returns
-------
P: array_like
An :math:`N \\times M` affinity matrix expressing interactions
between :math:`N` new data points the initial :math:`M` data
samples.
indices: np.ndarray
Returned if ``return_distances=True``. The indices of the :math:`k`
nearest neighbors in the existing embedding for every new data
point.
distances: np.ndarray
Returned if ``return_distances=True``. The distances to the
:math:`k` nearest neighbors in the existing embedding for every new
data point.
"""
n_samples = data.shape[0]
n_reference_samples = self.n_samples
if k is None:
k = self.k
elif k >= n_reference_samples:
raise ValueError(
"`k` (%d) cannot be larger than the number of reference "
"samples (%d)." % (k, self.n_samples)
)
if sigma is None:
sigma = self.sigma
# Find nearest neighbors and the distances to the new points
neighbors, distances = self.knn_index.query(data, k)
with utils.Timer("Calculating affinity matrix...", self.verbose):
# Compute asymmetric pairwise input similarities
conditional_P = np.exp(-(distances ** 2) / (2 * sigma ** 2))
# Convert weights to probabilities
conditional_P /= np.sum(conditional_P, axis=1)[:, np.newaxis]
P = sp.csr_matrix(
(
conditional_P.ravel(),
neighbors.ravel(),
range(0, n_samples * k + 1, k),
),
shape=(n_samples, n_reference_samples),
)
if return_distances:
return P, neighbors, distances
return P
[docs]class MultiscaleMixture(Affinities):
"""Calculate affinities using a Gaussian mixture kernel.
Instead of using a single perplexity to compute the affinities between data
points, we can use a multiscale Gaussian kernel instead. This allows us to
incorporate long range interactions.
Please see the :ref:`parameter-guide` for more information.
Parameters
----------
data: np.ndarray
The data matrix.
perplexities: List[float]
A list of perplexity values, which will be used in the multiscale
Gaussian kernel. Perplexity can be thought of as the continuous
:math:`k` number of nearest neighbors, for which t-SNE will attempt to
preserve distances.
method: str
Specifies the nearest neighbor method to use. Can be ``exact``, ``annoy``,
``pynndescent``, ``approx``, or ``auto`` (default). ``approx`` uses Annoy
if the input data matrix is not a sparse object and if Annoy supports
the given metric. Otherwise it uses Pynndescent. ``auto`` uses exact
nearest neighbors for N<1000 and the same heuristic as ``approx`` for N>=1000.
metric: Union[str, Callable]
The metric to be used to compute affinities between points in the
original space.
metric_params: dict
Additional keyword arguments for the metric function.
symmetrize: bool
Symmetrize affinity matrix. Standard t-SNE symmetrizes the interactions
but when embedding new data, symmetrization is not performed.
n_jobs: int
The number of threads to use while running t-SNE. This follows the
scikit-learn convention, ``-1`` meaning all processors, ``-2`` meaning
all but one, etc.
random_state: Union[int, RandomState]
If the value is an int, random_state is the seed used by the random
number generator. If the value is a RandomState instance, then it will
be used as the random number generator. If the value is None, the random
number generator is the RandomState instance used by `np.random`.
verbose: bool
"""
def __init__(
self,
data,
perplexities,
method="auto",
metric="euclidean",
metric_params=None,
symmetrize=True,
n_jobs=1,
random_state=None,
verbose=False,
):
self.n_samples = data.shape[0]
self.verbose = verbose
# We will compute the nearest neighbors to the max value of perplexity,
# smaller values can just use indexing to truncate unneeded neighbors
perplexities = self.check_perplexities(perplexities)
max_perplexity = np.max(perplexities)
k_neighbors = min(self.n_samples - 1, int(3 * max_perplexity))
self.knn_index, self.__neighbors, self.__distances = build_knn_index(
data, method, k_neighbors, metric, metric_params, n_jobs, random_state, verbose
)
with utils.Timer("Calculating affinity matrix...", self.verbose):
self.P = self._calculate_P(
self.__neighbors,
self.__distances,
perplexities,
symmetrize=symmetrize,
n_jobs=n_jobs,
)
self.perplexities = perplexities
self.n_jobs = n_jobs
@staticmethod
def _calculate_P(
neighbors,
distances,
perplexities,
symmetrize=True,
normalization="pair-wise",
n_reference_samples=None,
n_jobs=1,
):
return joint_probabilities_nn(
neighbors,
distances,
perplexities,
symmetrize=symmetrize,
normalization=normalization,
n_reference_samples=n_reference_samples,
n_jobs=n_jobs,
)
[docs] def set_perplexities(self, new_perplexities):
"""Change the perplexities of the affinity matrix.
Note that we only allow lowering the perplexities or restoring them to
their original maximum value. This restriction exists because setting a
higher perplexity value requires recomputing all the nearest neighbors,
which can take a long time. To avoid potential confusion as to why
execution time is slow, this is not allowed. If you would like to
increase the perplexity above the initial value, simply create a new
instance.
Parameters
----------
new_perplexities: List[float]
The new list of perplexities.
"""
if np.array_equal(self.perplexities, new_perplexities):
return
new_perplexities = self.check_perplexities(new_perplexities)
max_perplexity = np.max(new_perplexities)
k_neighbors = min(self.n_samples - 1, int(3 * max_perplexity))
if k_neighbors > self.__neighbors.shape[1]:
raise RuntimeError(
"The largest perplexity `%.2f` is larger than the initial one "
"used. This would need to recompute the nearest neighbors, "
"which is not efficient. Please create a new `%s` instance "
"with the increased perplexity."
% (max_perplexity, self.__class__.__name__)
)
self.perplexities = new_perplexities
with utils.Timer(
"Perplexity changed. Recomputing affinity matrix...", self.verbose
):
self.P = self._calculate_P(
self.__neighbors[:, :k_neighbors],
self.__distances[:, :k_neighbors],
self.perplexities,
symmetrize=True,
n_jobs=self.n_jobs,
)
[docs] def to_new(self, data, perplexities=None, return_distances=False):
"""Compute the affinities of new samples to the initial samples.
This is necessary for embedding new data points into an existing
embedding.
Please see the :ref:`parameter-guide` for more information.
Parameters
----------
data: np.ndarray
The data points to be added to the existing embedding.
perplexities: List[float]
A list of perplexity values, which will be used in the multiscale
Gaussian kernel. Perplexity can be thought of as the continuous
:math:`k` number of nearest neighbors, for which t-SNE will attempt
to preserve distances.
return_distances: bool
If needed, the function can return the indices of the nearest
neighbors and their corresponding distances.
Returns
-------
P: array_like
An :math:`N \\times M` affinity matrix expressing interactions
between :math:`N` new data points the initial :math:`M` data
samples.
indices: np.ndarray
Returned if ``return_distances=True``. The indices of the :math:`k`
nearest neighbors in the existing embedding for every new data
point.
distances: np.ndarray
Returned if ``return_distances=True``. The distances to the
:math:`k` nearest neighbors in the existing embedding for every new
data point.
"""
perplexities = perplexities if perplexities is not None else self.perplexities
perplexities = self.check_perplexities(perplexities)
max_perplexity = np.max(perplexities)
k_neighbors = min(self.n_samples - 1, int(3 * max_perplexity))
neighbors, distances = self.knn_index.query(data, k_neighbors)
with utils.Timer("Calculating affinity matrix...", self.verbose):
P = self._calculate_P(
neighbors,
distances,
perplexities,
symmetrize=False,
normalization="point-wise",
n_reference_samples=self.n_samples,
n_jobs=self.n_jobs,
)
if return_distances:
return P, neighbors, distances
return P
[docs] def check_perplexities(self, perplexities: Iterable[float]) -> Iterable[float]:
"""Check and correct/truncate perplexities.
If a perplexity is too large, it is corrected to the largest allowed
value. It is then inserted into the list of perplexities only if that
value doesn't already exist in the list.
"""
usable_perplexities = []
for perplexity in sorted(perplexities):
if perplexity <= 0:
raise ValueError("Perplexity must be >=0. %.2f given" % perplexity)
if 3 * perplexity > self.n_samples - 1:
new_perplexity = (self.n_samples - 1) / 3
if new_perplexity in usable_perplexities:
log.warning(
"Perplexity value %d is too high. Dropping "
"because the max perplexity is already in the "
"list." % perplexity
)
else:
usable_perplexities.append(new_perplexity)
log.warning(
"Perplexity value %d is too high. Using "
"perplexity %.2f instead" % (perplexity, new_perplexity)
)
else:
usable_perplexities.append(perplexity)
return usable_perplexities
[docs]class Multiscale(MultiscaleMixture):
"""Calculate affinities using averaged Gaussian perplexities.
In contrast to :class:`MultiscaleMixture`, which uses a Gaussian mixture
kernel, here, we first compute single scale Gaussian kernels, convert them
to probability distributions, then average them out between scales.
Please see the :ref:`parameter-guide` for more information.
Parameters
----------
data: np.ndarray
The data matrix.
perplexities: List[float]
A list of perplexity values, which will be used in the multiscale
Gaussian kernel. Perplexity can be thought of as the continuous
:math:`k` number of nearest neighbors, for which t-SNE will attempt to
preserve distances.
method: str
Specifies the nearest neighbor method to use. Can be ``exact``, ``annoy``,
``pynndescent``, ``approx``, or ``auto`` (default). ``approx`` uses Annoy
if the input data matrix is not a sparse object and if Annoy supports
the given metric. Otherwise it uses Pynndescent. ``auto`` uses exact
nearest neighbors for N<1000 and the same heuristic as ``approx`` for N>=1000.
metric: Union[str, Callable]
The metric to be used to compute affinities between points in the
original space.
metric_params: dict
Additional keyword arguments for the metric function.
symmetrize: bool
Symmetrize affinity matrix. Standard t-SNE symmetrizes the interactions
but when embedding new data, symmetrization is not performed.
n_jobs: int
The number of threads to use while running t-SNE. This follows the
scikit-learn convention, ``-1`` meaning all processors, ``-2`` meaning
all but one, etc.
random_state: Union[int, RandomState]
If the value is an int, random_state is the seed used by the random
number generator. If the value is a RandomState instance, then it will
be used as the random number generator. If the value is None, the random
number generator is the RandomState instance used by `np.random`.
verbose: bool
"""
@staticmethod
def _calculate_P(
neighbors,
distances,
perplexities,
symmetrize=True,
normalization="pair-wise",
n_reference_samples=None,
n_jobs=1,
):
# Compute normalized probabilities for each perplexity
partial_Ps = [
joint_probabilities_nn(
neighbors,
distances,
[perplexity],
symmetrize=symmetrize,
normalization=normalization,
n_reference_samples=n_reference_samples,
n_jobs=n_jobs,
)
for perplexity in perplexities
]
# Sum them together, then normalize
P = reduce(operator.add, partial_Ps, 0)
# Take care to properly normalize the affinity matrix
if normalization == "pair-wise":
P /= np.sum(P)
elif normalization == "point-wise":
P = sp.diags(np.asarray(1 / P.sum(axis=1)).ravel()) @ P
return P
class Uniform(Affinities):
"""Compute affinities using using nearest neighbors and uniform kernel in
the ambient space.
Parameters
----------
data: np.ndarray
The data matrix.
k_neighbors: int
method: str
Specifies the nearest neighbor method to use. Can be ``exact``, ``annoy``,
``pynndescent``, ``approx``, or ``auto`` (default). ``approx`` uses Annoy
if the input data matrix is not a sparse object and if Annoy supports
the given metric. Otherwise it uses Pynndescent. ``auto`` uses exact
nearest neighbors for N<1000 and the same heuristic as ``approx`` for N>=1000.
metric: Union[str, Callable]
The metric to be used to compute affinities between points in the
original space.
metric_params: dict
Additional keyword arguments for the metric function.
symmetrize: bool
Symmetrize affinity matrix. Standard t-SNE symmetrizes the interactions
but when embedding new data, symmetrization is not performed.
n_jobs: int
The number of threads to use while running t-SNE. This follows the
scikit-learn convention, ``-1`` meaning all processors, ``-2`` meaning
all but one, etc.
random_state: Union[int, RandomState]
If the value is an int, random_state is the seed used by the random
number generator. If the value is a RandomState instance, then it will
be used as the random number generator. If the value is None, the random
number generator is the RandomState instance used by `np.random`.
verbose: bool
"""
def __init__(
self,
data,
k_neighbors=30,
method="auto",
metric="euclidean",
metric_params=None,
symmetrize=True,
n_jobs=1,
random_state=None,
verbose=False,
):
self.n_samples = data.shape[0]
self.k_neighbors = k_neighbors
self.verbose = verbose
self.n_jobs = n_jobs
if k_neighbors >= self.n_samples:
raise ValueError(
"`k_neighbors` (%d) cannot be larger than N-1 (%d)." %
(k_neighbors, self.n_samples)
)
self.knn_index, neighbors, distances = build_knn_index(
data, method, k_neighbors, metric, metric_params, n_jobs, random_state, verbose
)
P = sp.csr_matrix(
(
np.ones_like(distances).ravel(),
neighbors.ravel(),
range(0, self.n_samples * self.k_neighbors + 1, self.k_neighbors),
),
shape=(self.n_samples, self.n_samples),
)
# Symmetrize the probability matrix
if symmetrize:
P = (P + P.T) / 2
# Convert weights to probabilities
P /= np.sum(P)
self.P = P
def to_new(self, data, k_neighbors=None, return_distances=False):
"""Compute the affinities of new samples to the initial samples.
This is necessary for embedding new data points into an existing
embedding.
Parameters
----------
data: np.ndarray
The data points to be added to the existing embedding.
k_neighbors: int
The number of nearest neighbors to consider.
return_distances: bool
If needed, the function can return the indices of the nearest
neighbors and their corresponding distances.
Returns
-------
P: array_like
An :math:`N \\times M` affinity matrix expressing interactions
between :math:`N` new data points the initial :math:`M` data
samples.
indices: np.ndarray
Returned if ``return_distances=True``. The indices of the :math:`k`
nearest neighbors in the existing embedding for every new data
point.
distances: np.ndarray
Returned if ``return_distances=True``. The distances to the
:math:`k` nearest neighbors in the existing embedding for every new
data point.
"""
n_samples = data.shape[0]
n_reference_samples = self.n_samples
if k_neighbors is None:
k_neighbors = self.k_neighbors
elif k_neighbors >= n_reference_samples:
raise ValueError(
"`k` (%d) cannot be larger than the number of reference "
"samples (%d)." % (k_neighbors, self.n_samples)
)
# Find nearest neighbors and the distances to the new points
neighbors, distances = self.knn_index.query(data, k_neighbors)
values = np.ones_like(distances)
values /= np.sum(values, axis=1)[:, np.newaxis]
P = sp.csr_matrix(
(
values.ravel(),
neighbors.ravel(),
range(0, n_samples * k_neighbors + 1, k_neighbors),
),
shape=(n_samples, n_reference_samples),
)
if return_distances:
return P, neighbors, distances
return P
```