Source code for cmdtools.analysis.pcca

import numpy as np
from scipy.linalg import schur, ordqz
from .optimization import Optimizer
from ..utils import get_pi
import warnings

# TODO: find a better solution to this
try:
    import slepc  # noqa: F401
    HAS_SLEPC = True
except ImportError:
    HAS_SLEPC = False

# Class interfaces
# These are mainly wrappers around the functions below


[docs]class KrylovSchur: def __init__(self, onseperation="warn"): self.onseperation = onseperation
[docs] def solve(self, A, n, massmatrix=None): return krylovschur(A, n, massmatrix, self.onseperation)
[docs]class ScipySchur: def __init__(self, onseperation="warn"): self.onseperation = onseperation
[docs] def solve(self, T, n, massmatrix=None): return scipyschur(T, n, massmatrix, self.onseperation)
[docs]class PCCA: def __init__(self, T=None, n=None, pi="uniform", massmatrix=None, eigensolver=ScipySchur(), optimizer=Optimizer()): self.T = T self.n = n self.pi = get_pi(T, pi) self.massmatrix = massmatrix self.eigensolver = eigensolver self.optimizer = optimizer if T is not None: self.solve()
[docs] def solve(self): T, n, pi, massmatrix, eigensolver, optimizer = self.T, self.n, \ self.pi, self.massmatrix, self.eigensolver, self.optimizer chi, X, A, = \ _pcca(T, n, pi, massmatrix, eigensolver, optimizer) self.chi, self.X, self.A = chi, X, A return chi
# Functions # the logic of the above classes following a more functional style # compatibility to old functioncalls / tests
[docs]def pcca(T, n, pi="uniform"): p = PCCA(T, n, pi) return p.solve()
# this will be the new pcca function, the old function is replaced by the Class def _pcca(T, n, pi, massmatrix, eigensolver, optimizer): X = eigensolver.solve(T, n, massmatrix) X = gramschmidt(X, pi) A = optimizer.solve(X, pi) chi = np.dot(X, A) return chi, X, A
[docs]def gramschmidt(X, pi): """Gram Schmidt orthogonalization wrt. scalar product induced by pi""" X = np.copy(X) D = np.diag(pi) for i in range(np.size(X, 1)): if i == 0: if np.isclose(np.dot(X[:, 0], np.full(np.size(X, 0), 1)), 0): raise RuntimeError("First column is orthogonal to 1-Vector, \ try swapping the columns") X[:, 0] = 1 else: for j in range(i): X[:, i] -= X[:, i].dot(D).dot(X[:, j]) * X[:, j] X[:, i] /= np.sqrt(X[:, i].dot(D).dot(X[:, i])) return X
[docs]def scipyschur(T, n, massmatrix=None, onseperation="warn"): e = np.sort(np.linalg.eigvals(T)) v_in = np.real(e[-n]) v_out = np.real(e[-(n + 1)]) # do not seperate conjugate eigenvalues if np.isclose(v_in, v_out): msg = "Invalid number of clusters (splitting conjugate eigenvalues, choose another n)" if onseperation == "warn": warnings.warn(msg, RuntimeWarning) elif onseperation == "continue": pass elif onseperation == "fix": return scipyschur(T, n+1, massmatrix, "error") else: raise RuntimeError(msg) # determine the eigenvalue gap cutoff = (v_in + v_out) / 2 # schur decomposition if massmatrix is None: _, X, _ = schur(T, sort=lambda x: np.real(x) > cutoff) else: _, _, _, _, _, X = \ ordqz(T, massmatrix, sort=lambda a, b: np.real(a / b) > cutoff) return X[:, 0:n] # use only first n vectors
[docs]def krylovschur(A, n, massmatrix=None, onseperation="continue"): if massmatrix is not None: raise NotImplementedError if onseperation != "continue": raise NotImplementedError from petsc4py import PETSc from slepc4py import SLEPc M = petsc_matrix(A) E = SLEPc.EPS().create() E.setOperators(M) E.setDimensions(nev=n) E.setWhichEigenpairs(E.Which.LARGEST_REAL) E.solve() X = np.column_stack([x.array for x in E.getInvariantSubspace()]) return X[:, :n]
[docs]def petsc_matrix(A): from scipy import sparse from petsc4py import PETSc from slepc4py import SLEPc M = PETSc.Mat() if sparse.isspmatrix_csr(A): nrows = np.size(A, 0) M.createAIJWithArrays(nrows, (A.indptr, A.indices, A.data)) else: M.createDense(list(np.shape(A)), array=A) return M