From f455502582d6f58dc5ab8167a44f2f0c7ddb5b97 Mon Sep 17 00:00:00 2001 From: zxdawn Date: Thu, 7 Sep 2023 12:23:06 +0200 Subject: [PATCH 1/2] Add kernel matched filter --- spectral/algorithms/detectors.py | 233 +++++++++++++++++++++++++++++- spectral/algorithms/transforms.py | 162 +++++++++++++++++++++ 2 files changed, 393 insertions(+), 2 deletions(-) diff --git a/spectral/algorithms/detectors.py b/spectral/algorithms/detectors.py index d6e1427..52c63d9 100644 --- a/spectral/algorithms/detectors.py +++ b/spectral/algorithms/detectors.py @@ -3,14 +3,15 @@ ''' from __future__ import absolute_import, division, print_function, unicode_literals +from sklearn.decomposition import KernelPCA -__all__ = ['MatchedFilter', 'matched_filter', 'RX', 'rx', 'ace'] +__all__ = ['MatchedFilter', 'matched_filter', 'KernelMatchedFilter', 'kernel_matched_filter', 'RX', 'rx', 'ace'] import math import numpy as np from .algorithms import calc_stats -from .transforms import LinearTransform +from .transforms import LinearTransform, NonLinearTransform, kernel_func, center_matrix from .spatial import map_outer_window_stats @@ -174,6 +175,234 @@ def mf_wrapper(bg, x): return MatchedFilter(background, target)(X) +class KernelMatchedFilter(NonLinearTransform): + r'''A callable kernel matched filter. + + Given kernel function and target/background matrix, the kernel matched + filter response is given by: + + .. math:: + y(\hat{k}_r) = \frac{\hat{k}_t^T \hat{K}^{-2} \hat{k}_x}{\hat{k}_t^T \hat{K}^{-2} \hat{k}_t} + + where: + + .. math:: + $\hat{k}t = k_t - \frac{1}{N}\sum{i=1}^N k(x_i, t)1$ + $\hat{k}x = k_x - \frac{1}{N}\sum{i=1}^N k(x_i, x)1$ + + :math:`\hat{K}` is the centered kernel matrix, + :math:`\hat{k}` is a kernel, such as linear kernel, polynomial kernel and Gaussian Radial Bases Function kernel (RBF). + + The inverse :math:`\hat{K}^{-2}` may not be numerically stable if the background spectral samples are not independent. + Therefore, the pseudo-inverse of K is used, which is based on eigenvalue decomposition + where eigenvectors with eigenvalues larger than `eigval_min` are used. + ''' + + def __init__(self, background, target, kernel, X, eigval_min, **kwargs): + '''Creates the filter, given background/target means and covariance. + + Arguments: + + `background` (`GaussianStats`): + + The Gaussian statistics for the background (e.g., the result + of calling :func:`calc_stats`). + + `target` (ndarray): + + Length-K target mean + + `X` (numpy.ndarray): + + `X` can be an image with shape (R, C, B) or an ndarray of shape (R * C, B) + + `kernel` (str): + + The kernel function: 'linear', 'poly', or 'rbf'. + + `eigval_min` (str): + + The minimum of eigenvalues when calculating :math:`\hat{K}^{-2}`. + ''' + self.target = target + self.background = background + self._whitening_transform = None + + if len(X.shape) == 3: + X = X.reshape((-1, X.shape[-1])) + + # compute the empirical kernel map + k_t = kernel_func(X, target.reshape(1, -1), kernel=kernel, **kwargs) + + # Compute centralized kernel matrix + self.k_t = center_matrix(k_t) + + # Compute kernal matrix K and K^-2 + K = kernel_func(X, kernel=kernel, **kwargs) + + # Centering the symmetric NxN kernel matrix. + K = center_matrix(K) + + # compute kernel PCA matrix + kpca = KernelPCA(kernel=kernel, n_components=None, **kwargs) + kpca.fit(X) + eigvals, eigvecs = kpca.eigenvalues_, kpca.eigenvectors_ + + # compute the inverse matrix for large eigvals + eigvecs = eigvecs[:, eigvals > eigval_min] + eigvals = eigvals[eigvals > eigval_min] + + # Reconstruct pseudo-inverse + K_pseudo_inv = eigvecs @ np.diag(1/eigvals) @ eigvecs.T + + self.K_2 = K_pseudo_inv @ K_pseudo_inv + + # Compute normalizing coefficient + coef = (self.k_t.T).dot(self.K_2).dot(self.k_t) + + # Construct nonlinear transform + A = (1/coef) * (self.k_t.T).dot(self.K_2) + NonLinearTransform.__init__(self, A, kernel=kernel, **kwargs) + + + def whiten(self, X): + '''Transforms data to the whitened space of the background. + + Arguments: + + `X` (ndarray): + + Size (M,N,K) or (M*N,K) array of length K vectors to transform. + + Returns an array of same size as `X` but linearly transformed to the + whitened space of the filter. + ''' + if self._whitening_transform is None: + A = math.sqrt(self.coef) * self.background.sqrt_inv_cov + self._whitening_transform = LinearTransform(A, pre=-self.u_b) + return self._whitening_transform(X) + + +def kernel_matched_filter(X, target, kernel, eigval_min=1e-5, background=None, window=None, cov=None, **kwargs): + r'''Computes a non-linear matched filter target detector score. + + Usage: + + y = kernel_matched_filter(X, target, background) + + y = kernel_matched_filter(X, target, window= [, cov=]) + + Given kernel function and target/background matrix, the kernel matched + filter response is given by: + + .. math:: + y(\hat{k}_r) = \frac{\hat{k}_t^T \hat{K}^{-2} \hat{k}_x}{\hat{k}_t^T \hat{K}^{-2} \hat{k}_t} + + where: + + .. math:: + $\hat{k}t = k_t - \frac{1}{N}\sum{i=1}^N k(x_i, t)1$ + $\hat{k}x = k_x - \frac{1}{N}\sum{i=1}^N k(x_i, x)1$ + + :math:`\hat{K}` is the centered kernel matrix, + :math:`\hat{k}` is a kernel, such as linear kernel, polynomial kernel and Gaussian Radial Bases Function kernel (RBF). + + The inverse :math:`\hat{K}^{-2}` may not be numerically stable if the background spectral samples are not independent. + Therefore, the pseudo-inverse of K is used, which is based on eigenvalue decomposition + where eigenvectors with eigenvalues larger than `eigval_min` are used. + + Arguments: + + `X` (numpy.ndarray): + + For the first calling method shown, `X` can be an image with + shape (R, C, B) or an ndarray of shape (R * C, B). If the + `background` keyword is given, it will be used for the image + background statistics; otherwise, background statistics will be + computed from `X`. + + If the `window` keyword is given, `X` must be a 3-dimensional + array and background statistics will be computed for each point + in the image using a local window defined by the keyword. + + `target` (ndarray): + + Length-K vector specifying the target to be detected. + + `kernel` (str): + + The kernel function: 'linear', 'poly', or 'rbf'. + + `eigval_min` (str): + + The minimum of eigenvalues when calculating :math:`\hat{K}^{-2}`. + + `background` (`GaussianStats`): + + The Gaussian statistics for the background (e.g., the result + of calling :func:`calc_stats` for an image). This argument is not + required if `window` is given. + + `window` (2-tuple of odd integers): + + Must have the form (`inner`, `outer`), where the two values + specify the widths (in pixels) of inner and outer windows centered + about the pixel being evaulated. Both values must be odd integers. + The background mean and covariance will be estimated from pixels + in the outer window, excluding pixels within the inner window. For + example, if (`inner`, `outer`) = (5, 21), then the number of + pixels used to estimate background statistics will be + :math:`21^2 - 5^2 = 416`. If this argument is given, `background` + is not required (and will be ignored, if given). + + The window is modified near image borders, where full, centered + windows cannot be created. The outer window will be shifted, as + needed, to ensure that the outer window still has height and width + `outer` (in this situation, the pixel being evaluated will not be + at the center of the outer window). The inner window will be + clipped, as needed, near image borders. For example, assume an + image with 145 rows and columns. If the window used is + (5, 21), then for the image pixel at (0, 0) (upper left corner), + the the inner window will cover `image[:3, :3]` and the outer + window will cover `image[:21, :21]`. For the pixel at (50, 1), the + inner window will cover `image[48:53, :4]` and the outer window + will cover `image[40:51, :21]`. + + `cov` (ndarray): + + An optional covariance to use. If this parameter is given, `cov` + will be used for all matched filter calculations (background + covariance will not be recomputed in each window) and only the + background mean will be recomputed in each window. If the + `window` argument is specified, providing `cov` will allow the + result to be computed *much* faster. + + Returns numpy.ndarray: + + The return value will be the matched filter scores distance) for each + pixel given. If `X` has shape (R, C, K), the returned ndarray will + have shape (R, C). + + References: + + Kwon, H. and Nasrabadi, N.M., 2007. Kernel spectral matched filter for hyperspectral imagery. + International Journal of Computer Vision, 71, pp.127-141. + ''' + if background is not None and window is not None: + raise ValueError('`background` and `window` are mutually ' \ + 'exclusive arguments.') + + if window is not None: + def mf_wrapper(bg, x): + return KernelMatchedFilter(bg, target, kernel, x, eigval_min, **kwargs)(x) + return map_outer_window_stats(mf_wrapper, X, window[0], window[1], + dim_out=1, cov=cov) + else: + if background is None: + background = calc_stats(X) + return KernelMatchedFilter(background, target, kernel, X, eigval_min, **kwargs)(X) + + class RX(): r'''An implementation of the RX anomaly detector. Given the mean and covariance of the background, this detector returns the squared Mahalanobis diff --git a/spectral/algorithms/transforms.py b/spectral/algorithms/transforms.py index fb068a6..3e01d11 100644 --- a/spectral/algorithms/transforms.py +++ b/spectral/algorithms/transforms.py @@ -9,6 +9,7 @@ except: from collections import Callable import numpy as np +from sklearn.metrics.pairwise import linear_kernel, polynomial_kernel, rbf_kernel class LinearTransform: @@ -163,3 +164,164 @@ def chain(self, transform): post = np.array(post) A = np.dot(self._A, transform._A) return LinearTransform(A, pre=pre, post=post) + + + +class NonLinearTransform: + '''A callable non-linear transform object. + + In addition to the __call__ method, which applies the transform to given, + data, a LinearTransform object also has the following members: + + `dim_in` (int): + + The expected length of input vectors. This will be `None` if the + input dimension is unknown (e.g., if the transform is a scalar). + + `dim_out` (int): + + The length of output vectors (after linear transformation). This + will be `None` if the input dimension is unknown (e.g., if + the transform is a scalar). + + `dtype` (numpy dtype): + + The numpy dtype for the output ndarray data. + ''' + def __init__(self, A, **kwargs): + '''Arguments: + + `A` (:class:`~numpy.ndarrray`): + + An (J,K) array to be applied to length-K targets. + + Keyword Argments: + + `pre` (scalar or length-K sequence): + + Additive offset to be applied prior to linear transformation. + + `post` (scalar or length-J sequence): + + An additive offset to be applied after linear transformation. + + `dtype` (numpy dtype): + + Explicit type for transformed data. + ''' + + self._pre = kwargs.get('pre', None) + self._post = kwargs.get('post', None) + self._kernel = kwargs.get('kernel', None) + + # set kernel parameters + if self._kernel == 'linear': + self._kernel_kwargs = {'kernel': self._kernel} + elif self._kernel == 'poly': + self._kernel_kwargs = {'kernel': self._kernel, + 'degree': kwargs.get('degree', 3), + 'gamma': kwargs.get('gamma', None), + 'coef0': kwargs.get('coef0', 1), + } + elif self._kernel == 'rbf': + self._kernel_kwargs = {'kernel': self._kernel, + 'gamma': kwargs.get('gamma', None), + } + + A = np.array(A, copy=True) + if A.ndim == 0: + # Do not know input/ouput dimensions + self._A = A + (self.dim_out, self.dim_in) = (None, None) + else: + if len(A.shape) == 1: + self._A = A.reshape(((1,) + A.shape)) + else: + self._A = A + (self.dim_out, self.dim_in) = self._A.shape + self.dtype = kwargs.get('dtype', self._A.dtype) + + def __call__(self, X): + '''Applies the non-linear transformation to the given data. + + Arguments: + + `X` (:class:`~numpy.ndarray` or object with `transform` method): + + If `X` is an ndarray, it is either an (M,N,K) array containing + M*N length-K vectors to be transformed or it is an (R,K) array + of length-K vectors to be transformed. If `X` is an object with + a method named `transform` the result of passing the + `LinearTransform` object to the `transform` method will be + returned. + + Returns an (M,N,J) or (R,J) array, depending on shape of `X`, where J + is the length of the first dimension of the array `A` passed to + __init__. + ''' + if not isinstance(X, np.ndarray): + if hasattr(X, 'transform') and isinstance(X.transform, Callable): + return X.transform(self) + else: + raise TypeError('Unable to apply transform to object.') + + shape = X.shape + if len(shape) == 3: + X = X.reshape((-1, shape[-1])) + if self._pre is not None: + X = X + self._pre + + # apply kernel and centerlize the result + X = kernel_func(X, **self._kernel_kwargs) + X = center_matrix(X) + + Y = np.dot(self._A, X.T).T + if self._post is not None: + Y += self._post + return Y.reshape((shape[:2] + (-1,))).squeeze().astype(self.dtype) + else: + if self._pre is not None: + X = X + self._pre + + # apply kernel and centerlize the result + X = kernel_func(X, **self._kernel_kwargs) + X = center_matrix(X) + + Y = np.dot(self._A, X.T).T + if self._post is not None: + Y += self._post + return Y.astype(self.dtype) + + +def norm_data(data): + '''Normalize data''' + data -= np.amin(data, axis=0, keepdims=True) + data /= np.amax(data, axis=0, keepdims=True) + + return data + + +def center_matrix(matrix): + '''Compute centralized kernel matrix''' + N = matrix.shape[0] + + if matrix.shape[1] == 1: + return matrix - np.mean(matrix) + else: + C = np.eye(N) - np.ones((N,N))/N + return C @ matrix @ C + + +def kernel_func(x, y=None, **kwargs): + '''Apply kernel function''' + kernel = kwargs.get('kernel', 'linear') + kwargs.pop('kernel', None) + + if kernel == 'linear': + return linear_kernel(x, y) + elif kernel == 'poly': + return polynomial_kernel(x, y, **kwargs) + elif kernel == 'rbf': + return rbf_kernel(x, y, **kwargs) + else: + raise ValueError("Invalid kernel") \ No newline at end of file From 08c603174e24fe02ac327b58184d707e2032f0f5 Mon Sep 17 00:00:00 2001 From: zxdawn Date: Thu, 7 Sep 2023 13:27:44 +0200 Subject: [PATCH 2/2] cleanup equations --- spectral/algorithms/detectors.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spectral/algorithms/detectors.py b/spectral/algorithms/detectors.py index 52c63d9..f2c9baf 100644 --- a/spectral/algorithms/detectors.py +++ b/spectral/algorithms/detectors.py @@ -187,8 +187,8 @@ class KernelMatchedFilter(NonLinearTransform): where: .. math:: - $\hat{k}t = k_t - \frac{1}{N}\sum{i=1}^N k(x_i, t)1$ - $\hat{k}x = k_x - \frac{1}{N}\sum{i=1}^N k(x_i, x)1$ + \hat{k}_{t}^T=k_{t}^T-\frac{1}{N} \sum_{i=1}^N k\left(x_i, t\right) \overrightarrow{1} + \hat{k}_{x}^T=k_{x}^T-\frac{1}{N} \sum_{i=1}^N k\left(x_i, x\right) \overrightarrow{1} :math:`\hat{K}` is the centered kernel matrix, :math:`\hat{k}` is a kernel, such as linear kernel, polynomial kernel and Gaussian Radial Bases Function kernel (RBF). @@ -301,8 +301,8 @@ def kernel_matched_filter(X, target, kernel, eigval_min=1e-5, background=None, w where: .. math:: - $\hat{k}t = k_t - \frac{1}{N}\sum{i=1}^N k(x_i, t)1$ - $\hat{k}x = k_x - \frac{1}{N}\sum{i=1}^N k(x_i, x)1$ + \hat{k}_{t}^T=k_{t}^T-\frac{1}{N} \sum_{i=1}^N k\left(x_i, t\right) \overrightarrow{1} + \hat{k}_{x}^T=k_{x}^T-\frac{1}{N} \sum_{i=1}^N k\left(x_i, x\right) \overrightarrow{1} :math:`\hat{K}` is the centered kernel matrix, :math:`\hat{k}` is a kernel, such as linear kernel, polynomial kernel and Gaussian Radial Bases Function kernel (RBF).