import functools
import importlib
import copy
import pygmtools
NOT_IMPLEMENTED_MSG = \
'The backend function for {} is not implemented. ' \
'If you are a user, please use other backends as workarounds.' \
'If you are a developer, it will be truly appreciated if you could develop and share your' \
' implementation with the community! See our Github: https://github.com/Thinklab-SJTU/pygmtools'
[docs]def build_aff_mat(node_feat1, edge_feat1, connectivity1, node_feat2, edge_feat2, connectivity2,
n1=None, ne1=None, n2=None, ne2=None,
node_aff_fn=None, edge_aff_fn=None,
backend=None):
r"""
Build affinity matrix for graph matching from input node/edge features. The affinity matrix encodes both node-wise
and edge-wise affinities and formulates the Quadratic Assignment Problem (QAP), which is the mathematical form of
graph matching.
:param node_feat1: :math:`(b\times n_1 \times f_{node})` the node feature of graph1
:param edge_feat1: :math:`(b\times ne_1 \times f_{edge})` the edge feature of graph1
:param connectivity1: :math:`(b\times ne_1 \times 2)` sparse connectivity information of graph 1.
``connectivity1[i, j, 0]`` is the starting node index of edge ``j`` at batch ``i``, and
``connectivity1[i, j, 1]`` is the ending node index of edge ``j`` at batch ``i``
:param node_feat2: :math:`(b\times n_2 \times f_{node})` the node feature of graph2
:param edge_feat2: :math:`(b\times ne_2 \times f_{edge})` the edge feature of graph2
:param connectivity2: :math:`(b\times ne_2 \times 2)` sparse connectivity information of graph 2.
``connectivity2[i, j, 0]`` is the starting node index of edge ``j`` at batch ``i``, and
``connectivity2[i, j, 1]`` is the ending node index of edge ``j`` at batch ``i``
:param n1: :math:`(b)` number of nodes in graph1. If not given, it will be inferred based on the shape of
``node_feat1`` or the values in ``connectivity1``
:param ne1: :math:`(b)` number of edges in graph1. If not given, it will be inferred based on the shape of
``edge_feat1``
:param n2: :math:`(b)` number of nodes in graph2. If not given, it will be inferred based on the shape of
``node_feat2`` or the values in ``connectivity2``
:param ne2: :math:`(b)` number of edges in graph2. If not given, it will be inferred based on the shape of
``edge_feat2``
:param node_aff_fn: (default: inner_prod_aff_fn) the node affinity function with the characteristic
``node_aff_fn(2D Tensor, 2D Tensor) -> 2D Tensor``, which accepts two node feature tensors and
outputs the node-wise affinity tensor. See :func:`~pygmtools.utils.inner_prod_aff_fn` as an
example.
:param edge_aff_fn: (default: inner_prod_aff_fn) the edge affinity function with the characteristic
``edge_aff_fn(2D Tensor, 2D Tensor) -> 2D Tensor``, which accepts two edge feature tensors and
outputs the edge-wise affinity tensor. See :func:`~pygmtools.utils.inner_prod_aff_fn` as an
example.
:param backend: (default: ``pygmtools.BACKEND`` variable) the backend for computation.
:return: :math:`(b\times n_1n_2 \times n_1n_2)` the affinity matrix
Example for numpy backend::
>>> import numpy as np
>>> import pygmtools as pygm
>>> pygm.BACKEND = 'numpy'
# Generate a batch of graphs
>>> batch_size = 10
>>> A1 = np.random.rand(batch_size, 4, 4)
>>> A2 = np.random.rand(batch_size, 4, 4)
>>> n1 = n2 = np.repeat([4], batch_size)
# Build affinity matrix by the default inner-product function
>>> conn1, edge1, ne1 = pygm.utils.dense_to_sparse(A1)
>>> conn2, edge2, ne2 = pygm.utils.dense_to_sparse(A2)
>>> K = pygm.utils.build_aff_mat(None, edge1, conn1, None, edge2, conn2, n1, ne1, n2, ne2)
# Build affinity matrix by gaussian kernel
>>> import functools
>>> gaussian_aff = functools.partial(pygm.utils.gaussian_aff_fn, sigma=1.)
>>> K2 = pygm.utils.build_aff_mat(None, edge1, conn1, None, edge2, conn2, n1, ne1, n2, ne2, edge_aff_fn=gaussian_aff)
# Build affinity matrix based on node features
>>> F1 = np.random.rand(batch_size, 4, 10)
>>> F2 = np.random.rand(batch_size, 4, 10)
>>> K3 = pygm.utils.build_aff_mat(F1, edge1, conn1, F2, edge2, conn2, n1, ne1, n2, ne2, edge_aff_fn=gaussian_aff)
# The affinity matrices K, K2, K3 can be further processed by GM solvers
Example for Pytorch backend::
>>> import torch
>>> import pygmtools as pygm
>>> pygm.BACKEND = 'pytorch'
# Generate a batch of graphs
>>> batch_size = 10
>>> A1 = torch.rand(batch_size, 4, 4)
>>> A2 = torch.rand(batch_size, 4, 4)
>>> n1 = n2 = torch.tensor([4] * batch_size)
# Build affinity matrix by the default inner-product function
>>> conn1, edge1, ne1 = pygm.utils.dense_to_sparse(A1)
>>> conn2, edge2, ne2 = pygm.utils.dense_to_sparse(A2)
>>> K = pygm.utils.build_aff_mat(None, edge1, conn1, None, edge2, conn2, n1, ne1, n2, ne2)
# Build affinity matrix by gaussian kernel
>>> import functools
>>> gaussian_aff = functools.partial(pygm.utils.gaussian_aff_fn, sigma=1.)
>>> K2 = pygm.utils.build_aff_mat(None, edge1, conn1, None, edge2, conn2, n1, ne1, n2, ne2, edge_aff_fn=gaussian_aff)
# Build affinity matrix based on node features
>>> F1 = torch.rand(batch_size, 4, 10)
>>> F2 = torch.rand(batch_size, 4, 10)
>>> K3 = pygm.utils.build_aff_mat(F1, edge1, conn1, F2, edge2, conn2, n1, ne1, n2, ne2, edge_aff_fn=gaussian_aff)
# The affinity matrices K, K2, K3 can be further processed by GM solvers
"""
if backend is None:
backend = pygmtools.BACKEND
# check the correctness of input
batch_size = None
if node_feat1 is not None or node_feat2 is not None:
assert all([_ is not None for _ in (node_feat1, node_feat2)]), \
'The following arguments must all be given if you want to compute node-wise affinity: ' \
'node_feat1, node_feat2'
_check_data_type(node_feat1, backend)
_check_data_type(node_feat2, backend)
assert all([_check_shape(_, 3, backend) for _ in (node_feat1, node_feat2)]), \
f'The shape of the following tensors are illegal, expected 3-dimensional, ' \
f'got node_feat1={len(_get_shape(node_feat1))}d; node_feat2={len(_get_shape(node_feat2))}d!'
if batch_size is None:
batch_size = _get_shape(node_feat1)[0]
assert _get_shape(node_feat1)[0] == _get_shape(node_feat2)[0] == batch_size, 'batch size mismatch'
if edge_feat1 is not None or edge_feat2 is not None:
assert all([_ is not None for _ in (edge_feat1, edge_feat2, connectivity1, connectivity2)]), \
'The following arguments must all be given if you want to compute edge-wise affinity: ' \
'edge_feat1, edge_feat2, connectivity1, connectivity2'
assert all([_check_shape(_, 3, backend) for _ in (edge_feat1, edge_feat2, connectivity1, connectivity2)]), \
f'The shape of the following tensors are illegal, expected 3-dimensional, ' \
f'got edge_feat1:{len(_get_shape(edge_feat1))}d; edge_feat2:{len(_get_shape(edge_feat2))}d; ' \
f'connectivity1:{len(_get_shape(connectivity1))}d; connectivity2:{len(_get_shape(connectivity2))}d!'
assert _get_shape(connectivity1)[2] == _get_shape(connectivity1)[2] == 2, \
'the 3rd dimension of connectivity1, connectivity2 must be 2-dimensional'
if batch_size is None:
batch_size = _get_shape(edge_feat1)[0]
assert _get_shape(edge_feat1)[0] == _get_shape(edge_feat2)[0] == _get_shape(connectivity1)[0] == \
_get_shape(connectivity2)[0] == batch_size, 'batch size mismatch'
# assign the default affinity functions if not given
if node_aff_fn is None:
node_aff_fn = functools.partial(inner_prod_aff_fn, backend=backend)
if edge_aff_fn is None:
edge_aff_fn = functools.partial(inner_prod_aff_fn, backend=backend)
node_aff = node_aff_fn(node_feat1, node_feat2) if node_feat1 is not None else None
edge_aff = edge_aff_fn(edge_feat1, edge_feat2) if edge_feat1 is not None else None
return _aff_mat_from_node_edge_aff(node_aff, edge_aff, connectivity1, connectivity2, n1, n2, ne1, ne2, backend=backend)
[docs]def inner_prod_aff_fn(feat1, feat2, backend=None):
r"""
Inner product affinity function. The affinity is defined as
.. math::
\mathbf{f}_1^\top \cdot \mathbf{f}_2
:param feat1: :math:`(b\times n_1 \times f)` the feature vectors :math:`\mathbf{f}_1`
:param feat2: :math:`(b\times n_2 \times f)` the feature vectors :math:`\mathbf{f}_2`
:param backend: (default: ``pygmtools.BACKEND`` variable) the backend for computation.
:return: :math:`(b\times n_1\times n_2)` element-wise inner product affinity matrix
"""
if backend is None:
backend = pygmtools.BACKEND
_check_data_type(feat1, backend)
_check_data_type(feat2, backend)
args = (feat1, feat2)
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod.inner_prod_aff_fn
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
return fn(*args)
[docs]def gaussian_aff_fn(feat1, feat2, sigma=1., backend=None):
r"""
Gaussian kernel affinity function. The affinity is defined as
.. math::
\exp(-\frac{(\mathbf{f}_1 - \mathbf{f}_2)^2}{\sigma})
:param feat1: :math:`(b\times n_1 \times f)` the feature vectors :math:`\mathbf{f}_1`
:param feat2: :math:`(b\times n_2 \times f)` the feature vectors :math:`\mathbf{f}_2`
:param sigma: (default: 1) the parameter :math:`\sigma` in Gaussian kernel
:param backend: (default: ``pygmtools.BACKEND`` variable) the backend for computation.
:return: :math:`(b\times n_1\times n_2)` element-wise Gaussian affinity matrix
"""
if backend is None:
backend = pygmtools.BACKEND
_check_data_type(feat1, backend)
_check_data_type(feat2, backend)
args = (feat1, feat2, sigma)
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod.gaussian_aff_fn
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
return fn(*args)
[docs]def build_batch(input, return_ori_dim=False, backend=None):
r"""
Build a batched tensor from a list of tensors. If the list of tensors are with different sizes of dimensions, it
will be padded to the largest dimension.
The batched tensor and the number of original dimensions will be returned.
:param input: list of input tensors
:param return_ori_dim: (default: False) return the original dimension
:param backend: (default: ``pygmtools.BACKEND`` variable) the backend for computation.
:return: batched tensor, (if ``return_ori_dim=True``) a list of the original dimensions
Example for numpy backend::
>>> import numpy as np
>>> import pygmtools as pygm
>>> pygm.BACKEND = 'numpy'
# batched adjacency matrices
>>> A1 = np.random.rand(4, 4)
>>> A2 = np.random.rand(5, 5)
>>> A3 = np.random.rand(3, 3)
>>> batched_A, ori_shape = pygm.utils.build_batch([A1, A2, A3], return_ori_dim=True)
>>> batched_A.shape
(3, 5, 5)
>>> ori_shape
([4, 5, 3], [4, 5, 3])
# batched node features (feature dimension=10)
>>> F1 = np.random.rand(4, 10)
>>> F2 = np.random.rand(5, 10)
>>> F3 = np.random.rand(3, 10)
>>> batched_F = pygm.utils.build_batch([F1, F2, F3])
>>> batched_F.shape
(3, 5, 10)
Example for Pytorch backend::
>>> import torch
>>> import pygmtools as pygm
>>> pygm.BACKEND = 'pytorch'
# batched adjacency matrices
>>> A1 = torch.rand(4, 4)
>>> A2 = torch.rand(5, 5)
>>> A3 = torch.rand(3, 3)
>>> batched_A, ori_shape = pygm.utils.build_batch([A1, A2, A3], return_ori_dim=True)
>>> batched_A.shape
torch.Size([3, 5, 5])
>>> ori_shape
(tensor([4, 5, 3]), tensor([4, 5, 3]))
# batched node features (feature dimension=10)
>>> F1 = torch.rand(4, 10)
>>> F2 = torch.rand(5, 10)
>>> F3 = torch.rand(3, 10)
>>> batched_F = pygm.utils.build_batch([F1, F2, F3])
>>> batched_F.shape
torch.Size([3, 5, 10])
"""
if backend is None:
backend = pygmtools.BACKEND
for item in input:
_check_data_type(item, backend)
args = (input, return_ori_dim)
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod.build_batch
except ImportError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
return fn(*args)
[docs]def dense_to_sparse(dense_adj, backend=None):
r"""
Convert a dense connectivity/adjacency matrix to a sparse connectivity/adjacency matrix and an edge weight tensor.
:param dense_adj: :math:`(b\times n\times n)` the dense adjacency matrix. This function also supports non-batched
input where the batch dimension ``b`` is ignored
:param backend: (default: ``pygmtools.BACKEND`` variable) the backend for computation.
:return: :math:`(b\times ne\times 2)` sparse connectivity matrix, :math:`(b\times ne\times 1)` edge weight tensor,
:math:`(b)` number of edges
Example for numpy backend::
>>> import numpy as np
>>> import pygmtools as pygm
>>> pygm.BACKEND = 'numpy'
>>> np.random.seed(0)
>>> batch_size = 10
>>> A = np.random.rand(batch_size, 4, 4)
>>> A[:, np.arange(4), np.arange(4)] = 0 # remove the diagonal elements
>>> A.shape
(10, 4, 4)
>>> conn, edge, ne = pygm.utils.dense_to_sparse(A)
>>> conn.shape # connectivity: (batch x num_edge x 2)
(10, 12, 2)
>>> edge.shape # edge feature (batch x num_edge x feature_dim)
(10, 12, 1)
>>> ne
[12, 12, 12, 12, 12, 12, 12, 12, 12, 12]
Example for Pytorch backend::
>>> import torch
>>> import pygmtools as pygm
>>> pygm.BACKEND = 'pytorch'
>>> _ = torch.manual_seed(0)
>>> batch_size = 10
>>> A = torch.rand(batch_size, 4, 4)
>>> torch.diagonal(A, dim1=1, dim2=2)[:] = 0 # remove the diagonal elements
>>> A.shape
torch.Size([10, 4, 4])
>>> conn, edge, ne = pygm.utils.dense_to_sparse(A)
>>> conn.shape # connectivity: (batch x num_edge x 2)
torch.Size([10, 12, 2])
>>> edge.shape # edge feature (batch x num_edge x feature_dim)
torch.Size([10, 12, 1])
>>> ne
tensor([12, 12, 12, 12, 12, 12, 12, 12, 12, 12])
"""
if backend is None:
backend = pygmtools.BACKEND
_check_data_type(dense_adj, backend)
if _check_shape(dense_adj, 2, backend):
dense_adj = _unsqueeze(dense_adj, 0, backend)
non_batched_input = True
elif _check_shape(dense_adj, 3, backend):
non_batched_input = False
else:
raise ValueError(f'the input argument dense_adj is expected to be 2-dimensional or 3-dimensional, got '
f'dense_adj:{len(_get_shape(dense_adj))}!')
args = (dense_adj,)
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod.dense_to_sparse
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
result = fn(*args)
if non_batched_input:
return _squeeze(result[0], 0, backend), _squeeze(result[1], 0, backend)
else:
return result
[docs]def compute_affinity_score(X, K, backend=None):
r"""
Compute the affinity score of graph matching. It is the objective score of the corresponding Quadratic Assignment
Problem.
.. math::
\texttt{vec}(\mathbf{X})^\top \mathbf{K} \texttt{vec}(\mathbf{X})
here :math:`\texttt{vec}` means column-wise vectorization.
:param X: :math:`(b\times n_1 \times n_2)` the permutation matrix that represents the matching result
:param K: :math:`(b\times n_1n_2 \times n_1n_2)` the affinity matrix
:param backend: (default: ``pygmtools.BACKEND`` variable) the backend for computation.
:return: :math:`(b)` the objective score
.. note::
This function also supports non-batched input if the first dimension of ``X, K`` is removed.
"""
if backend is None:
backend = pygmtools.BACKEND
_check_data_type(X, backend)
_check_data_type(K, backend)
if _check_shape(X, 2, backend) and _check_shape(K, 2, backend):
X = _unsqueeze(X, 0, backend)
K = _unsqueeze(K, 0, backend)
non_batched_input = True
elif _check_shape(X, 3, backend) and _check_shape(X, 3, backend):
non_batched_input = False
else:
raise ValueError(f'the input argument K, X are expected to have the same number of dimensions (=2 or 3), got'
f'X:{len(_get_shape(X))} and K:{len(_get_shape(K))}!')
args = (X, K)
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod.compute_affinity_score
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
result = fn(*args)
if non_batched_input:
return _squeeze(result, 0, backend)
else:
return result
[docs]def to_numpy(input, backend=None):
r"""
Convert a tensor to a numpy ndarray.
This is the helper function to convert tensors across different backends via numpy.
:param input: input tensor/:mod:`~pygmtools.utils.MultiMatchingResult`
:param backend: (default: ``pygmtools.BACKEND`` variable) the backend for computation.
:return: numpy ndarray
"""
if backend is None:
backend = pygmtools.BACKEND
args = (input,)
# pygmtools built-in types
if type(input) is MultiMatchingResult:
fn = MultiMatchingResult.to_numpy
# tf/torch/.. tensor types
else:
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod.to_numpy
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
return fn(*args)
[docs]def from_numpy(input, device=None, backend=None):
r"""
Convert a numpy ndarray to a tensor.
This is the helper function to convert tensors across different backends via numpy.
:param input: input ndarray/:mod:`~pygmtools.utils.MultiMatchingResult`
:param device: (default: None) the target device
:param backend: (default: ``pygmtools.BACKEND`` variable) the backend for computation.
:return: tensor for the backend
"""
if backend is None:
backend = pygmtools.BACKEND
args = (input, device)
# pygmtools built-in types
if type(input) is MultiMatchingResult:
fn = functools.partial(MultiMatchingResult.from_numpy, new_backend=backend)
# tf/torch/.. tensor types
else:
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod.from_numpy
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
return fn(*args)
[docs]def generate_isomorphic_graphs(node_num, graph_num=2, node_feat_dim=0, backend=None):
r"""
Generate a set of isomorphic graphs, for testing purposes and examples.
:param node_num: number of nodes in each graph
:param graph_num: (default: 2) number of graphs
:param node_feat_dim: (default: 0) number of node feature dimensions
:param backend: (default: ``pygmtools.BACKEND`` variable) the backend for computation.
:return: if ``graph_num==2``, this function returns :math:`(m\times n \times n)` the adjacency matrix, and
:math:`(n \times n)` the permutation matrix;
else, this function returns :math:`(m\times n \times n)` the adjacency matrix, and
:math:`(m\times m\times n \times n)` the multi-matching permutation matrix
"""
if backend is None:
backend = pygmtools.BACKEND
args = (node_num, graph_num, node_feat_dim)
assert node_num > 0 and graph_num >= 2, "input data not understood."
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod.generate_isomorphic_graphs
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
if node_feat_dim > 0:
As, X_gt, Fs = fn(*args)
if graph_num == 2:
return As, X_gt[0, 1], Fs
else:
return As, X_gt, Fs
else:
As, X_gt = fn(*args)
if graph_num == 2:
return As, X_gt[0, 1]
else:
return As, X_gt
[docs]class MultiMatchingResult:
r"""
A memory-efficient class for multi-graph matching results. For non-cycle consistent results, the dense storage
for :math:`m` graphs with :math:`n` nodes requires a size of :math:`(m\times m \times n \times n)`, and this
implementation requires :math:`((m-1)\times m \times n \times n / 2)`. For cycle consistent result, this
implementation requires only :math:`(m\times n\times n)`.
Numpy Example:
>>> import numpy as np
>>> import pygmtools as pygm
>>> np.random.seed(0)
>>> X = pygm.utils.MultiMatchingResult(backend='numpy')
>>> X[0, 1] = np.zeros((4, 4))
>>> X[0, 1][np.arange(0, 4, dtype=np.int64), np.random.permutation(4)] = 1
>>> X
MultiMatchingResult:
{'0,1': array([[0., 0., 1., 0.],
[0., 0., 0., 1.],
[0., 1., 0., 0.],
[1., 0., 0., 0.]])}
>>> X[1, 0]
array([[0., 0., 0., 1.],
[0., 0., 1., 0.],
[1., 0., 0., 0.],
[0., 1., 0., 0.]])
"""
def __init__(self, cycle_consistent=False, backend=None):
self.match_dict = {}
self._cycle_consistent = cycle_consistent
if backend is None:
self.backend = pygmtools.BACKEND
else:
self.backend = backend
def __getitem__(self, item):
assert len(item) == 2, "key should be the indices of two graphs, e.g. (0, 1)"
idx1, idx2 = item
if self._cycle_consistent:
return _mm(self.match_dict[idx1], _transpose(self.match_dict[idx2], 0, 1, self.backend), self.backend)
else:
if idx1 < idx2:
return self.match_dict[f'{idx1},{idx2}']
else:
return _transpose(self.match_dict[f'{idx2},{idx1}'], 0, 1, self.backend)
def __setitem__(self, key, value):
if self._cycle_consistent:
assert type(key) is int, "key should be the index of one graph, and value should be the matching to universe"
self.match_dict[key] = value
else:
assert len(key) == 2, "key should be the indices of two graphs, e.g. (0, 1)"
idx1, idx2 = key
if idx1 < idx2:
self.match_dict[f'{idx1},{idx2}'] = value
else:
self.match_dict[f'{idx2},{idx1}'] = _transpose(value, 0, 1, self.backend)
def __str__(self):
return 'MultiMatchingResult:\n' + self.match_dict.__str__()
def __repr__(self):
return 'MultiMatchingResult:\n' + self.match_dict.__repr__()
[docs] @staticmethod
def from_numpy(data, new_backend):
r"""
Convert a numpy-backend MultiMatchingResult data to another backend.
:param data: the numpy-backend data
:param new_backend: the target backend
:return: a new MultiMatchingResult instance for ``new_backend``
"""
new_data = copy.deepcopy(data)
new_data.from_numpy_(new_backend)
return new_data
[docs] @staticmethod
def to_numpy(data):
r"""
Convert an any-type MultiMatchingResult to numpy backend.
:param data: the any-type data
:return: a new MultiMatchingResult instance for numpy
"""
new_data = copy.deepcopy(data)
new_data.to_numpy_()
return new_data
[docs] def from_numpy_(self, new_backend):
"""
In-place operation for :func:`~pygmtools.utils.MultiMatchingResult.from_numpy`.
"""
if self.backend != 'numpy':
raise ValueError('Attempting to convert from non-numpy data.')
self.backend = new_backend
for k, v in self.match_dict.items():
self.match_dict[k] = from_numpy(v, self.backend)
[docs] def to_numpy_(self):
"""
In-place operation for :func:`~pygmtools.utils.MultiMatchingResult.to_numpy`.
"""
self.backend = 'numpy'
for k, v in self.match_dict.items():
self.match_dict[k] = to_numpy(v, self.backend)
###################################################
# Private Functions that Unseeable from Users #
###################################################
def _aff_mat_from_node_edge_aff(node_aff, edge_aff, connectivity1, connectivity2,
n1, n2, ne1, ne2,
backend=None):
r"""
Build affinity matrix K from node and edge affinity matrices.
:param node_aff: :math:`(b\times n_1 \times n_2)` the node affinity matrix
:param edge_aff: :math:`(b\times ne_1 \times ne_2)` the edge affinity matrix
:param connectivity1: :math:`(b\times ne_1 \times 2)` sparse connectivity information of graph 1
:param connectivity2: :math:`(b\times ne_2 \times 2)` sparse connectivity information of graph 2
:param n1: :math:`(b)` number of nodes in graph1. If not given, it will be inferred based on the shape of
``node_feat1`` or the values in ``connectivity1``
:param ne1: :math:`(b)` number of edges in graph1. If not given, it will be inferred based on the shape of
``edge_feat1``
:param n2: :math:`(b)` number of nodes in graph2. If not given, it will be inferred based on the shape of
``node_feat2`` or the values in ``connectivity2``
:param ne2: :math:`(b)` number of edges in graph2. If not given, it will be inferred based on the shape of
``edge_feat2``
:return: :math:`(b\times n_1n_2 \times n_1n_2)` the affinity matrix
"""
if backend is None:
backend = pygmtools.BACKEND
args = (node_aff, edge_aff, connectivity1, connectivity2, n1, n2, ne1, ne2)
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod._aff_mat_from_node_edge_aff
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
return fn(*args)
def _check_data_type(input, backend=None):
r"""
Check whether the input data meets the backend. If not met, it will raise an ValueError
:param input: input data (must be Tensor/ndarray)
:return: None
"""
if backend is None:
backend = pygmtools.BACKEND
args = (input, )
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod._check_data_type
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
return fn(*args)
def _check_shape(input, num_dim, backend=None):
r"""
Check the shape of the input tensor
:param input: the input tensor
:param num_dim: number of dimensions
:return: True or False
"""
if backend is None:
backend = pygmtools.BACKEND
args = (input, num_dim)
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod._check_shape
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
return fn(*args)
def _get_shape(input, backend=None):
r"""
Get the shape of the input tensor
:param input: the input tensor
:return: a list of ints indicating the shape
"""
if backend is None:
backend = pygmtools.BACKEND
args = (input,)
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod._get_shape
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
return fn(*args)
def _squeeze(input, dim, backend=None):
r"""
Squeeze the input tensor at the given dimension. This function is expected to behave the same as torch.squeeze
:param input: input tensor
:param dim: squeezed dimension
:return: squeezed tensor
"""
if backend is None:
backend = pygmtools.BACKEND
args = (input, dim)
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod._squeeze
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
return fn(*args)
def _unsqueeze(input, dim, backend=None):
r"""
Unsqueeze the input tensor at the given dimension. This function is expected to behave the same as torch.unsqueeze
:param input: input tensor
:param dim: unsqueezed dimension
:return: unsqueezed tensor
"""
if backend is None:
backend = pygmtools.BACKEND
args = (input, dim)
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod._unsqueeze
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
return fn(*args)
def _transpose(input, dim1, dim2, backend=None):
r"""
Swap the dim1 and dim2 dimensions of the input tensor.
:param input: input tensor
:param dim1: swapped dimension 1
:param dim2: swapped dimension 2
:return: transposed tensor
"""
if backend is None:
backend = pygmtools.BACKEND
args = (input, dim1, dim2)
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod._transpose
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
return fn(*args)
def _mm(input1, input2, backend=None):
r"""
Matrix multiplication.
:param input1: input tensor 1
:param input2: input tensor 2
:return: multiplication result
"""
if backend is None:
backend = pygmtools.BACKEND
args = (input1, input2)
try:
mod = importlib.import_module(f'pygmtools.{backend}_backend')
fn = mod._mm
except ModuleNotFoundError and AttributeError:
raise NotImplementedError(
NOT_IMPLEMENTED_MSG.format(backend)
)
return fn(*args)