Skip to content

Utilities API

normalization

min_max_scaling(time_series, min_val=0.0, max_val=1.0, inplace=False)

This function applied min max scaling for an 1D-array. It is inspired by: https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MinMaxScaler.html but lighter and reimplemented in order to not introduce unnecessary dependencies for small functionality.

Min max scaling as implemented here is sensitive to extreme outliers, but guarantees the value range not exceeding min_val and max_val.

Parameters:

Name Type Description Default
time_series ndarray

1D array containing consecutive values for one feature

required
min_val float

the minimum value the scaled time series will reach

0.0
max_val float

the maximum value the scale time series will reach

1.0
inplace bool

boolean to specify whether the input array will be scaled and changed in place

False

Returns:

Type Description
ndarray

the scaled input array.

Source code in changepoynt/utils/normalization.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def min_max_scaling(time_series: np.ndarray, min_val: float = 0.0, max_val: float = 1.0, inplace: bool = False)\
        -> np.ndarray:
    """
    This function applied min max scaling for an 1D-array. It is inspired by:
    https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MinMaxScaler.html
    but lighter and reimplemented in order to not introduce unnecessary dependencies for small functionality.

    Min max scaling as implemented here is sensitive to extreme outliers, but guarantees the value range not
    exceeding min_val and max_val.

    :param time_series: 1D array containing consecutive values for one feature
    :param min_val: the minimum value the scaled time series will reach
    :param max_val: the maximum value the scale time series will reach
    :param inplace: boolean to specify whether the input array will be scaled and changed in place

    :return: the scaled input array.
    """
    # make some assertion checks
    assert time_series.ndim == 1, 'Time series needs to be an 1D array.'

    # copy the time series if specified
    if not inplace: time_series = time_series.copy()

    # get the minimum and maximum
    minimum = np.min(time_series, axis=0)
    maximum = np.max(time_series, axis=0)

    # check whether they are equal to not divide by zero
    if maximum == minimum:
        # only push the time series around zero
        time_series = time_series - minimum
    else:
        # scale the time series to values between zero and one
        time_series = (time_series-minimum) / (maximum-minimum)

    # scale into the wished value range
    time_series = time_series * (max_val - min_val) + min_val
    return time_series

z_scaling(time_series, inplace=False)

This function applie z-normalization to an 1D-array. It is inspired by: https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.StandardScaler.html but lighter and reimplemented in order to not introduce unnecessary dependencies for small functionality.

Parameters:

Name Type Description Default
time_series ndarray

1D array containing consecutive values for one feature

required
inplace bool

boolean to specify whether the input array will be scaled and changed in place

False

Returns:

Type Description
ndarray

the scaled input array.

Source code in changepoynt/utils/normalization.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def z_scaling(time_series: np.ndarray, inplace: bool = False) -> np.ndarray:
    """
    This function applie z-normalization to an 1D-array. It is inspired by:
    https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.StandardScaler.html
    but lighter and reimplemented in order to not introduce unnecessary dependencies for small functionality.

    :param time_series: 1D array containing consecutive values for one feature
    :param inplace: boolean to specify whether the input array will be scaled and changed in place

    :return: the scaled input array.
    """
    # make some assertion checks
    assert time_series.ndim == 1, 'Time series needs to be an 1D array.'

    # copy the time series if specified
    if not inplace: time_series = time_series.copy()

    # compute sample mean and sample variance
    mean = np.mean(time_series)
    std = np.std(time_series)

    # subtract the mean
    time_series -= mean
    if std:
        time_series /= std
    return time_series

linalg

HankelCorrelationFFTRepresentation

This class represents a matrix -atrix product of a hankel matrix, e.g., H*H.T. It is needed as the multiplication with it needs to go through two iterations instead of one.

Source code in changepoynt/utils/linalg.py
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
class HankelCorrelationFFTRepresentation:
    """
    This class represents a matrix -atrix product of a hankel matrix, e.g., H*H.T.
    It is needed as the multiplication with it needs to go through two iterations instead of one.
    """

    def __init__(self, hankel_matrix: HankelFFTRepresentation):

        # save the parameters that are necessary for later computations
        self.hankel_rfft = hankel_matrix.hankel_rfft
        self.fft_length = hankel_matrix.fft_length
        self.window_length = hankel_matrix.window_length
        self.window_number = hankel_matrix.window_number
        self.lag = hankel_matrix.lag
        self.hankel_matrix = hankel_matrix
        self.shape = (self.window_length, self.window_length)

    def __matmul__(self, other_matrix: np.ndarray) -> np.ndarray:
        if isinstance(other_matrix, np.ndarray):
            return fast_numba_hankel_correlation_matmul(self.hankel_rfft, self.fft_length, self.window_number,
                                                        other_matrix, self.lag)
        else:
            raise NotImplementedError('Matmul only supported for other numpy arrays.')

HankelFFTRepresentation

This matrix represents a Hankel matrix and makes matrix multiplication with it faster:

See our paper for more details: Efficient Hankel Matrix Decomposition for Changepoint Detection Lucas Weber, Richard Lenz 2024

Source code in changepoynt/utils/linalg.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
class HankelFFTRepresentation:
    """
    This matrix represents a Hankel matrix and makes matrix multiplication with it faster:

    See our paper for more details:
    Efficient Hankel Matrix Decomposition for Changepoint Detection
    Lucas Weber, Richard Lenz 2024
    """

    def __init__(self, time_series: np.ndarray, end_index: int, window_length: int, window_number: int, lag: int = 1,
                 const_offset: float = 0.0, _copy_representation: 'HankelFFTRepresentation' = None):

        # create new hankel matrix
        if _copy_representation is None:

            # save the parameters that are necessary for later computations
            self.window_length = window_length
            self.window_number = window_number
            self.lag = lag
            self.const_offset = const_offset

            # create the representation and save it into the class
            hankel_rfft, fft_len, _ = get_fast_hankel_representation(time_series, end_index,
                                                                     window_length, window_number, lag=lag,
                                                                     const_offset=const_offset)
            self.hankel_rfft = hankel_rfft
            self.fft_length = fft_len

        # copy existing hankel matrix
        else:

            # check that nothing else is set
            assert time_series is None and end_index is None and window_length is None and lag is None \
                , 'Using the constructor for copying requires all other options to be set to None.'

            # copy the parameters from the other model
            self.window_length = _copy_representation.window_length
            self.window_number = _copy_representation.window_number
            self.lag = 1
            self.hankel_rfft = _copy_representation.hankel_rfft
            self.fft_length = _copy_representation.fft_length
            self.const_offset = _copy_representation.const_offset

        # set the shape property
        self.shape = (self.window_length, self.window_number)

    def multiply_other_from_right(self, other_matrix: np.ndarray) -> np.ndarray:
        # check the dimensions
        if other_matrix.shape[0] != self.window_number:
            raise ValueError(f'matmul: Right matrix has a mismatch in its core dimension 0 '
                             f'(size {other_matrix.shape[0]} is different from {self.window_number})')

        # make the product
        return fast_numba_hankel_matmul(self.hankel_rfft, self.window_length, self.fft_length, other_matrix,
                                        self.lag)

    def multiply_other_from_left(self, other_matrix: np.ndarray) -> np.ndarray:

        # check the dimensions
        if other_matrix.shape[1] != self.window_length:
            raise ValueError(f'matmul: Left matrix has a mismatch in its core dimension 0 '
                             f'(size {other_matrix.shape[1]} is different from {self.window_length})')

        # make the product
        return fast_numba_hankel_left_matmul(self.hankel_rfft, self.window_number, self.fft_length, other_matrix,
                                             self.lag)

    @property
    def T(self) -> 'HankelFFTRepresentation':

        # create a shallow copy
        new_matrix = self.shallow_copy()

        # check that we have no lag in column direction, as we have not implemented lag in row direction
        # for the multiplications yet
        if new_matrix.lag != 1:
            raise ValueError('As of now we can not calculate with transposed lagged Hankel matrices.')

        # switch the window sizes
        new_matrix.window_number, new_matrix.window_length = new_matrix.window_length, new_matrix.window_number
        return new_matrix

    def __matmul__(self, other):

        # right side matmul
        if isinstance(other, np.ndarray):
            return self.multiply_other_from_right(other)

        # right side matmul with transposed
        elif isinstance(other, self.__class__):
            # check that the fft representation is the same and only transposed
            if self.equal_hankel_transposed(other):
                return HankelCorrelationFFTRepresentation(self)
            else:
                raise ValueError('At this point matmul is only supported for itself with its transposed from left.')

    def __array_ufunc__(self, ufunc, method, *args, **kwargs):
        if method != '__call__' or len(kwargs) > 0 or len(args) != 2:
            return NotImplemented
        if ufunc == np.matmul:
            # left side matmul
            if isinstance(args[0], np.ndarray) and args[1] is self:
                return self.multiply_other_from_left(args[0])
            else:
                return NotImplemented
        else:
            return NotImplemented

    def __array_function__(self, func, types, args, kwargs):
        """
        We currently only support concatenation.
        https://numpy.org/neps/nep-0018-array-function-protocol.html
        """
        if func != np.concatenate:
            return NotImplemented
        if not all(issubclass(t, self.__class__) for t in types):
            return NotImplemented

        # check the keyword arguments
        axis = kwargs.get('axis', 0)
        if axis not in [0, 1]:
            raise ValueError('Concatenation only in the first two dimensions.')

        # check that we only received one arg
        if len(args) > 1:
            raise ValueError('We only accept one non keyword argument.')

        # create the list of matrices
        matrix_list = [(matrix, matrix.shape[0]*idx if axis == 0 else 0, matrix.shape[1]*idx if axis == 1 else 0)
                       for idx, matrix in enumerate(args[0])]
        return MultilevelHankelFFTRepresentation(matrix_list)

    def shallow_copy(self) -> 'HankelFFTRepresentation':

        # type hints don't like this, but I don't want to add None to the hint, as normal users should not use
        # directly use the constructor for copying.
        return HankelFFTRepresentation(time_series=None, end_index=None, window_length=None, window_number=None,
                                       lag=None, _copy_representation=self)

    def equal_hankel(self, other: 'HankelFFTRepresentation') -> bool:
        return (self.hankel_rfft is other.hankel_rfft
                and self.window_number == other.window_number and self.window_length == other.window_length)

    def equal_hankel_transposed(self, other: 'HankelFFTRepresentation') -> bool:
        return (self.hankel_rfft is other.hankel_rfft
                and self.window_number == other.window_length and self.window_length == other.window_number)

__array_function__(func, types, args, kwargs)

We currently only support concatenation. https://numpy.org/neps/nep-0018-array-function-protocol.html

Source code in changepoynt/utils/linalg.py
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
def __array_function__(self, func, types, args, kwargs):
    """
    We currently only support concatenation.
    https://numpy.org/neps/nep-0018-array-function-protocol.html
    """
    if func != np.concatenate:
        return NotImplemented
    if not all(issubclass(t, self.__class__) for t in types):
        return NotImplemented

    # check the keyword arguments
    axis = kwargs.get('axis', 0)
    if axis not in [0, 1]:
        raise ValueError('Concatenation only in the first two dimensions.')

    # check that we only received one arg
    if len(args) > 1:
        raise ValueError('We only accept one non keyword argument.')

    # create the list of matrices
    matrix_list = [(matrix, matrix.shape[0]*idx if axis == 0 else 0, matrix.shape[1]*idx if axis == 1 else 0)
                   for idx, matrix in enumerate(args[0])]
    return MultilevelHankelFFTRepresentation(matrix_list)

MultilevelHankelFFTRepresentation

This class implements multiplication with a multilevel Hankel matrix, i.e., a matrix which can be divided into multiple parts that each have Hankel structure.

We expect to receive a list of: - Hankel matrices as HankelFFTRepresentation objects - The positions of the Hankel matrices in the original matrix (upper left row, upper left col)

TODO: Currently only tested with row/col concatenation [H,H]/[[H],[H]] and not with multiple blocks [[H, H], [H, H]]

Source code in changepoynt/utils/linalg.py
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
class MultilevelHankelFFTRepresentation:
    """
    This class implements multiplication with a multilevel Hankel matrix, i.e., a matrix which can be divided into
    multiple parts that each have Hankel structure.

    We expect to receive a list of:
    - Hankel matrices as HankelFFTRepresentation objects
    - The positions of the Hankel matrices in the original matrix (upper left row, upper left col)

    TODO: Currently only tested with row/col concatenation [H,H]/[[H],[H]] and not with multiple blocks [[H, H], [H, H]]
    """

    def __init__(self, matrix_tuples: list[tuple[HankelFFTRepresentation, int, int]]):

        # check the input
        max_row, max_col, rows, cols, fft_length = self.check_input(matrix_tuples)

        # extract the matrices
        self.matrices = [matrix_tuple[0] for matrix_tuple in matrix_tuples]

        # extract the positions
        self.positions = np.array([(*matrix_tuple[1:], *matrix_tuple[0].shape) for matrix_tuple in matrix_tuples])

        # keep the shape of the matrix
        self.shape = (max_row, max_col)
        self.sub_matrix_shape = (rows, cols)
        self.fft_length = fft_length

        # collect all indices for matrices starting at the same column
        self.row_matrices = collections.defaultdict(list)
        self.column_matrices = collections.defaultdict(list)
        for idx, (rx, cx, _, _) in enumerate(self.positions):
            self.row_matrices[cx].append(idx)
            self.column_matrices[rx].append(idx)

        # IMPORTANT!!: Sort the indices so the matrices are used in the right order (for the matmul later)
        for idx, idces in self.row_matrices.items():
            self.row_matrices[idx] = sorted(idces, key=lambda x: self.positions[x, 1])
        for idx, idces in self.column_matrices.items():
            self.column_matrices[idx] = sorted(idces, key=lambda x: self.positions[x, 0])

    @staticmethod
    def check_input(input_list: list[tuple[HankelFFTRepresentation, int, int]]) -> (int, int, int, int):

        # check whether the list is empty
        if len(input_list) < 1:
            raise ValueError('You did not input any Hankel matrices.')

        # go through the elements and check the format
        for idx, tupled in enumerate(input_list):

            # check the correct size
            if len(tupled) != 3:
                raise ValueError(f'Tuple {idx} does not have the expected length of three (length {len(tupled)}).')

            # check that we received the correct matrix objects
            if not isinstance(tupled[0], HankelFFTRepresentation):
                raise ValueError(f'Tuple {idx} is not a HankelFFTRepresentation object (type {type(tupled[0])}).')

            # check that we received integer positions
            if not isinstance(tupled[1], int) or not isinstance(tupled[2], int):
                raise ValueError(f'Tuple {idx} has non integer positions (type {type(tupled[1])}, {type(tupled[2])}).')

            # check that we only accept lag of one
            if tupled[0].lag != 1:
                raise ValueError(f'Tuple {idx} has lagged hankel matrices (lag {tupled[0].lag}).')

        # transform the matrix positions into a numpy array
        positions = np.array([(*matrix_tuple[1:], *matrix_tuple[0].shape) for matrix_tuple in input_list])
        max_rx, max_cx = np.max(positions[:, :2] + positions[:, 2:], axis=0)

        # check whether the fft-shapes are equal
        fft_shape = set(tupled[0].fft_length for tupled in input_list)
        if len(fft_shape) != 1:
            raise ValueError(f'Not all matrices have the same fft length.')
        fft_shape = fft_shape.pop()

        # check whether widths and heights are equal
        row_number = set(positions[:, 2])
        if len(row_number) != 1:
            raise ValueError(f'Not all matrices have the same number of rows.')
        column_number = set(positions[:, 3])
        if len(column_number) != 1:
            raise ValueError(f'Not all matrices have the same number of columns.')
        row_number = row_number.pop()
        column_number = column_number.pop()

        # create the expected row numbers
        expected_starts = set((rx, cx) for rx in range(0, max_rx, row_number) for cx in range(0, max_cx, column_number))

        # compare the expected positions with the real ones
        for start_row, start_col, _, _ in positions:
            if (start_row, start_col) in expected_starts:
                expected_starts.remove((start_row, start_col))
            else:
                raise ValueError('Some positions are not as expected')
        if len(expected_starts):
            raise ValueError('Some positions are missing.')

        # return the matrix size and the row and column numbers
        return max_rx, max_cx, row_number, column_number, fft_shape

    @property
    def T(self) -> 'MultilevelHankelFFTRepresentation':
        """
        Compute the transposed.
        Only the internal matrices are copied!
        :return:
        """

        # switch the shape
        self.shape = tuple(reversed(self.shape))

        # switch the columns of the positions
        # https://stackoverflow.com/a/4857981
        self.positions[:, [0, 1, 2, 3]] = self.positions[:, [1, 0, 3, 2]]

        # switch the submatrix shapes
        self.sub_matrix_shape = tuple(reversed(self.sub_matrix_shape))

        # go through all of our matrices and transpose them
        self.matrices = [matrix.T for matrix in self.matrices]

        # switch the collections for row-wise and column-wise matrices
        self.row_matrices, self.column_matrices = self.column_matrices, self.row_matrices

        return self

    def __matmul__(self, other):

        # right side matmul
        if isinstance(other, np.ndarray):
            return self.multiply_other_from_right(other)
        else:
            raise NotImplementedError('Currently this class only support matmul with numpy arrays.')

    def __array_ufunc__(self, ufunc, method, *args, **kwargs):
        if method != '__call__' or len(kwargs) > 0 or len(args) != 2:
            return NotImplemented
        if ufunc == np.matmul:
            # left side matmul
            if isinstance(args[0], np.ndarray) and args[1] is self:
                return self.multiply_other_from_left(args[0])
            else:
                return NotImplemented
        else:
            return NotImplemented

    def multiply_other_from_right(self, other: np.ndarray):

        # we need to create a target matrix
        result = np.zeros((self.shape[0], other.shape[1]))
        l_windows, n_windows = self.sub_matrix_shape

        # check the matrix shapes
        if self.shape[1] != other.shape[0]:
            raise ValueError(f'Shape missmatch: {self.shape} multiplied with {other.shape} is not possible.')

        # go through all blocks in row direction and make the computation
        for idx_list in self.row_matrices.values():

            # create the list of ffts
            ffts = tuple(self.matrices[idx].hankel_rfft for idx in idx_list)

            # get the starting column and end column
            start_col = self.positions[idx_list[0], 1]
            end_col = start_col + self.positions[idx_list[0], 3]

            # compute the result
            tmp = fast_numba_blockwise_matmul(ffts, l_windows, self.fft_length, other[start_col: end_col, :])

            # put the result in
            result += tmp
        return result

    def multiply_other_from_left(self, other: np.ndarray):

        # we need to create a target matrix
        result = np.zeros((other.shape[0], self.shape[1]))
        l_windows, n_windows = self.sub_matrix_shape

        # check the matrix shapes
        if self.shape[0] != other.shape[1]:
            raise ValueError(f'Shape missmatch: {other.shape} multiplied with {self.shape} is not possible.')

        # go through all blocks in row direction and make the computation
        for idx_list in self.column_matrices.values():
            # create the list of ffts
            ffts = tuple(self.matrices[idx].hankel_rfft for idx in idx_list)

            # get the starting column and end column
            start_row = self.positions[idx_list[0], 0]
            end_row = start_row + self.positions[idx_list[0], 2]

            # compute the result
            tmp = fast_numba_hankel_blockwise_left_matmul(ffts, n_windows, self.fft_length, other[:, start_row:end_row])

            # put the result in
            result += tmp
        return result

T property

Compute the transposed. Only the internal matrices are copied!

Returns:

Type Description
MultilevelHankelFFTRepresentation

compile_hankel(time_series, end_index, window_size, rank, lag=1, const_offset=None)

This function constructs a hankel matrix from a 1D time series. Please make sure constructing the matrix with the given parameters (end index, window size, etc.) is possible, as this function does no checks due to performance reasons.

Parameters:

Name Type Description Default
time_series ndarray

1D array with float values as the time series

required
end_index int

the index (point in time) where the time series starts

required
window_size int

the size of the windows cut from the time series

required
rank int

the amount of time series in the matrix

required
lag int

the lag between the time series of the different columns

1
const_offset float

an offset subtracted from all values of the time series before filling the hankel matrix

None

Returns:

Type Description
ndarray

The hankel matrix with lag one

Source code in changepoynt/utils/linalg.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
@nb.njit()
def compile_hankel(time_series: np.ndarray, end_index: int, window_size: int, rank: int, lag: int = 1,
                   const_offset: float = None) -> np.ndarray:
    """
    This function constructs a hankel matrix from a 1D time series. Please make sure constructing the matrix with
    the given parameters (end index, window size, etc.) is possible, as this function does no checks due to
    performance reasons.

    :param time_series: 1D array with float values as the time series
    :param end_index: the index (point in time) where the time series starts
    :param window_size: the size of the windows cut from the time series
    :param rank: the amount of time series in the matrix
    :param lag: the lag between the time series of the different columns
    :param const_offset: an offset subtracted from all values of the time series before filling the hankel matrix
    :return: The hankel matrix with lag one
    """

    # make an empty matrix to place the values
    #
    # almost no faster way:
    # https://stackoverflow.com/questions/71410927/vectorized-way-to-construct-a-block-hankel-matrix-in-numpy-or-scipy
    hankel = np.empty((window_size, rank))

    # go through the time series and make the hankel matrix
    for cx in range(rank):
        hankel[:, -cx - 1] = time_series[(end_index - window_size - cx * lag):(end_index - cx * lag)]
    if const_offset is not None:
        hankel = hankel - const_offset
    return hankel

examples()

This function implements some usage examples for quick internal testing. It is not aimed at being used.

Returns:

Type Description

None

Source code in changepoynt/utils/linalg.py
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
def examples():
    """
    This function implements some usage examples for quick internal testing. It is not aimed at being used.
    :return: None
    """

    import time

    # create a random signal
    signal = np.random.rand(200) * 1000

    # create the two different hankel representations
    hankel_matrix_fft = HankelFFTRepresentation(signal, 100, 50, 20, lag=1)
    hankel_matrix_fft2 = HankelFFTRepresentation(signal, 200, 50, 20, lag=1)
    hankel_matrix = compile_hankel(signal, end_index=100, window_size=50, rank=20, lag=1)
    hankel_matrix2 = compile_hankel(signal, end_index=200, window_size=50, rank=20, lag=1)

    # create test matrices to multiply with
    other_matrix = np.random.rand(20, 65)
    other_matrix2 = np.random.rand(50, 15)

    # create multilevel matrices by concatenation
    multi_hankel_fft = np.concatenate((hankel_matrix_fft, hankel_matrix_fft2, hankel_matrix_fft), axis=1)
    multi_hankel = np.concatenate((hankel_matrix, hankel_matrix2, hankel_matrix), axis=1)
    print(multi_hankel_fft.shape)

    # create additional test matrices to multiply with
    other_multi_matrix = np.random.rand(60, 65)
    other_multi_matrix_left = np.random.rand(200, 50)

    # test the multilevel hankel matrix right product
    res = multi_hankel_fft @ other_multi_matrix
    res2 = multi_hankel @ other_multi_matrix
    print('Test multilevel right multiplication:', np.allclose(res, res2))

    # test the multilevel hankel matrix left product
    res = other_multi_matrix_left @ multi_hankel_fft
    res2 = other_multi_matrix_left @ multi_hankel
    print('Test multilevel left multiplication:', np.allclose(res, res2))

    # test the multilevel hankel matrix left product transposed
    multi_hankel_fft = multi_hankel_fft.T
    multi_hankel = multi_hankel.T
    other_multi_matrix_left = np.random.rand(200, multi_hankel_fft.shape[0])
    res = other_multi_matrix_left @ multi_hankel_fft
    res2 = other_multi_matrix_left @ multi_hankel
    print('Test multilevel left multiplication:', np.allclose(res, res2))

    # test the right product
    res = hankel_matrix_fft @ other_matrix
    res2 = hankel_matrix @ other_matrix
    print('Test right:', np.allclose(res, res2))

    # test the left product
    res = other_matrix2.T @ hankel_matrix_fft
    res2 = other_matrix2.T @ hankel_matrix
    print('Test left:', np.allclose(res, res2))

    # test the transposed right product
    res = hankel_matrix_fft.T @ other_matrix2
    res2 = hankel_matrix.T @ other_matrix2
    print('Test transposed right:', np.allclose(res, res2))

    # test the transposed left product
    res = other_matrix.T @ hankel_matrix_fft.T
    res2 = other_matrix.T @ hankel_matrix.T
    print('Test transposed left:', np.allclose(res, res2))

    # check the correlation matrix multiplication
    hankel_corr = hankel_matrix @ hankel_matrix.T
    hankel_fft_corr = hankel_matrix_fft @ hankel_matrix_fft.T
    res = hankel_fft_corr @ other_matrix2
    res2 = hankel_corr @ other_matrix2
    print('Test correlation multiplication:', np.allclose(res, res2))

    # set a random seed
    np.random.seed(1234)

    # create the random starting vector
    x = np.random.rand(100)
    x /= np.linalg.norm(x)
    A = np.random.rand(100, 100)

    # test the power method
    singval, singvec = power_method(A, x, n_iterations=100)
    singvecs, singvals, _ = np.linalg.svd(A)
    print(singval, singvals[0])

    # test the tridiagonalization method for speed
    size = 1000
    d = 3 * np.random.rand(size)
    e = -1 * np.random.rand(size - 1)
    start = time.time()
    tri_eigvals, tri_eigvecs = tridiagonal_eigenvalues(d, e, size // 2)
    print(f'Specialized tridiagonal SVD took: {time.time() - start} s.')
    T = np.diag(d) + np.diag(e, k=1) + np.diag(e, k=-1)
    start = time.time()
    singvecs, singvals, _ = np.linalg.svd(T)
    print(f'Normal SVD took: {time.time() - start} s.')

    # test randomized svd
    singval, singvec = facebook_randomized_svd(A, 20)
    singvecs, singvals, _ = np.linalg.svd(A)
    print(singval, singvals[0])

facebook_randomized_svd(a_matrix, randomized_rank)

This function implements randomized singular vector decomposition of a matrix as surveyed and described in

Halko, Nathan, Per-Gunnar Martinsson, and Joel A. Tropp. "Finding structure with randomness: Probabilistic algorithms for constructing approximate matrix decompositions." SIAM review 53.2 (2011): 217-288. on page 4 chapter 1.3 and further.

Parameters:

Name Type Description Default
a_matrix ndarray

2D-Matrix filled with floats for which we want to find the left eigenvectors

required
randomized_rank int

the rank of the noise matrix used for randomized svd. the higher the rank, the better the approximation but the lower the precision of the eigenvectors

required

Returns:

Type Description
(ndarray, ndarray)
Source code in changepoynt/utils/linalg.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def facebook_randomized_svd(a_matrix: np.ndarray, randomized_rank: int) -> (np.ndarray, np.ndarray):
    """
    This function implements randomized singular vector decomposition of a matrix as surveyed and described in

    Halko, Nathan, Per-Gunnar Martinsson, and Joel A. Tropp.
    "Finding structure with randomness: Probabilistic algorithms for constructing approximate matrix decompositions."
    SIAM review 53.2 (2011): 217-288.
    on page 4 chapter 1.3 and further.

    :param a_matrix: 2D-Matrix filled with floats for which we want to find the left eigenvectors
    :param randomized_rank: the rank of the noise matrix used for randomized svd. the higher the rank, the better the
    approximation but the lower the precision of the eigenvectors
    :return:
    """
    singular_vectors, singular_values, _ = fbpca.pca(a_matrix, randomized_rank, True)
    return singular_values, singular_vectors

lanczos(a_matrix, r_0, k)

This function computes the tri-diagonalization matrix from the square matrix C which is the result of the lanczos algorithm.

The algorithm has been described and proven in: Idé, Tsuyoshi, and Koji Tsuda. "Change-point detection using krylov subspace learning." Proceedings of the 2007 SIAM International Conference on Data Mining. Society for Industrial and Applied Mathematics, 2007.

Parameters:

Name Type Description Default
a_matrix ndarray

2D-Matrix of size NxN filled with floats where we want to find the krylov subspace approx. for

required
r_0 ndarray

intial starting vector for the subspace approximation

required
k int

size of the approximation

required

Returns:

Type Description
(ndarray, ndarray)

Returns the alpha and beta values for the tridiagonal, symmetric matrix T. alphas are the values from the main diagonal and beta from the off diagonal as described in https://docs.scipy.org/doc/scipy/reference/generated/scipy.linalg.eigh_tridiagonal.html (alpha = d, beta = e)

Source code in changepoynt/utils/linalg.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def lanczos(a_matrix: np.ndarray, r_0: np.ndarray, k: int) -> (np.ndarray, np.ndarray):
    """
    This function computes the tri-diagonalization matrix from the square matrix C which is the result of the lanczos
    algorithm.

    The algorithm has been described and proven in:
    Idé, Tsuyoshi, and Koji Tsuda.
    "Change-point detection using krylov subspace learning."
    Proceedings of the 2007 SIAM International Conference on Data Mining.
    Society for Industrial and Applied Mathematics, 2007.

    :param a_matrix: 2D-Matrix of size NxN filled with floats where we want to find the krylov subspace approx. for
    :param r_0: intial starting vector for the subspace approximation
    :param k: size of the approximation
    :return: Returns the alpha and beta values for the tridiagonal, symmetric matrix T. alphas are the values from the
    main diagonal and beta from the off diagonal as described in
    https://docs.scipy.org/doc/scipy/reference/generated/scipy.linalg.eigh_tridiagonal.html (alpha = d, beta = e)
    """

    # save the initial vector
    r_i = r_0
    q_i = np.zeros_like(r_i)

    # initialization of the diagonal elements
    alphas = np.zeros(shape=(k + 1,), dtype=np.float64)
    betas = np.ones(shape=(k + 1,), dtype=np.float64)

    # Subroutine 1 of the paper
    for j in range(k):
        # compute r_(j+1)
        new_q = r_i / betas[j]

        # compute the new alpha
        intermediate_amatrix_newq = a_matrix @ new_q
        # Comment: (new_q.T @ intermediate_amatrix_newq).shape = [1,1], but with numpy 2.0 we have to explicitly
        # extract the scalar
        alphas[j + 1] = (new_q.T @ intermediate_amatrix_newq)[0, 0]

        # compute the new r
        r_i = intermediate_amatrix_newq - alphas[j + 1] * new_q - betas[j] * q_i

        # compute the next beta
        betas[j + 1] = np.linalg.norm(r_i)

        # update the previous q
        q_i = new_q

    return alphas[1:], betas[1:-1]

power_method(a_matrix, x_vector, n_iterations)

This function searches the largest (dominant) eigenvalue and corresponding eigenvector by repeated multiplication of the matrix A with an initial vector. It assumes a dominant eigenvalue bigger than the second one, otherwise it won't converge.

For proof and explanation look at: https://pythonnumericalmethods.berkeley.edu/notebooks/chapter15.02-The-Power-Method.html

Parameters:

Name Type Description Default
a_matrix ndarray

2D-Matrix of size NxN filled with floats

required
x_vector ndarray

Vector of size Nx1 filled with floats

required
n_iterations int

the amount of iterations for the approximation

required

Returns:

Type Description
(float, ndarray)

the dominant eigenvalue and corresponding eigenvector

Source code in changepoynt/utils/linalg.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def power_method(a_matrix: np.ndarray, x_vector: np.ndarray, n_iterations: int) -> (float, np.ndarray):
    """
    This function searches the largest (dominant) eigenvalue and corresponding eigenvector by repeated multiplication
    of the matrix A with an initial vector. It assumes a dominant eigenvalue bigger than the second one, otherwise
    it won't converge.

    For proof and explanation look at:
    https://pythonnumericalmethods.berkeley.edu/notebooks/chapter15.02-The-Power-Method.html
    :param a_matrix: 2D-Matrix of size NxN filled with floats
    :param x_vector: Vector of size Nx1 filled with floats
    :param n_iterations: the amount of iterations for the approximation
    :return: the dominant eigenvalue and corresponding eigenvector
    """

    # go through the iterations and continue to scale the returned vector, so we do not reach extreme values
    # during the iteration we scale the vector by its maximum as we can than easily extract the eigenvalue
    # TODO: This only works for our symmetric correlation matrices but not for any arbitrary matrices
    for _ in range(n_iterations):
        # multiplication with a_matrix.T @ a_matrix as can be seen in explanation of
        # https://numpy.org/doc/stable/reference/generated/numpy.linalg.svd.html
        x_vector = a_matrix @ x_vector

        # scale the vector so we keep the values in bound
        x_vector = x_vector / np.max(np.abs(x_vector))

    # get the normed eigenvector
    x_vector = x_vector / np.linalg.norm(x_vector)

    # get the corresponding eigenvalue
    eigenvalue = np.linalg.norm(a_matrix @ x_vector)
    return eigenvalue, (a_matrix @ x_vector) / eigenvalue

randomized_hankel_svd(hankel_matrix, k, subspace_iteration_q=2, oversampling_p=2)

Function for the randomized singular vector decomposition using [1]. Implementation modified from: https://pypi.org/project/fbpca/

Source code in changepoynt/utils/linalg.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def randomized_hankel_svd(hankel_matrix: np.ndarray, k: int, subspace_iteration_q: int = 2, oversampling_p: int = 2):
    """
    Function for the randomized singular vector decomposition using [1].
    Implementation modified from: https://pypi.org/project/fbpca/
    """

    # get the parameter l from the paper
    sample_length_l = k + oversampling_p
    assert 1.25 * sample_length_l < min(hankel_matrix.shape)

    # Apply A to a random matrix, obtaining Q.
    random_matrix_omega = np.random.uniform(low=-1, high=1, size=(hankel_matrix.shape[1], sample_length_l))
    projection_matrix_q = hankel_matrix @ random_matrix_omega

    # Form a matrix Q whose columns constitute a well-conditioned basis for the columns of the earlier Q.
    if subspace_iteration_q == 0:
        (projection_matrix_q, _) = sp.linalg.qr(projection_matrix_q, mode='economic')
    if subspace_iteration_q > 0:
        (projection_matrix_q, _) = sp.linalg.lu(projection_matrix_q, permute_l=True)

    # Conduct normalized power iterations.
    for it in range(subspace_iteration_q):

        # QA
        projection_matrix_q = (projection_matrix_q.T @ hankel_matrix).T

        (projection_matrix_q, _) = splg.lu(projection_matrix_q, permute_l=True)

        # AAQ
        projection_matrix_q = hankel_matrix @ projection_matrix_q

        if it + 1 < subspace_iteration_q:
            (projection_matrix_q, _) = splg.lu(projection_matrix_q, permute_l=True)
        else:
            (projection_matrix_q, _) = splg.qr(projection_matrix_q, mode='economic')

    # SVD Q'*A to obtain approximations to the singular values and right singular vectors of A; adjust the left singular
    # vectors of Q'*A to approximate the left singular vectors of A.
    lower_space_hankel = projection_matrix_q.T @ hankel_matrix
    (R, s, Va) = splg.svd(lower_space_hankel, full_matrices=False)
    U = projection_matrix_q.dot(R)

    # Retain only the leftmost k columns of U, the uppermost k rows of Va, and the first k entries of s.
    return U[:, :k], s[:k], Va[:k, :]

rayleigh_ritz_singular_value_decomposition(a_matrix, k)

This function uses the Rayleigh-Ritz method implemented in ARPACK to compute the k highest eigenvalues and corresponding eigenvectors. It should be faster than a complete svd.

!NOTE!: The order of the k highest eigenvalues is not guaranteed by this method!

Parameters:

Name Type Description Default
a_matrix ndarray

2D-Matrix filled with floats for which we want to find the left eigenvectors

required
k int

the number of highest eigenvectors we want to find

required

Returns:

Type Description
(ndarray, ndarray)

returns the eigenvalues and eigenvectors as numpy arrays

Source code in changepoynt/utils/linalg.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def rayleigh_ritz_singular_value_decomposition(a_matrix: np.ndarray, k: int) -> (np.ndarray, np.ndarray):
    """
    This function uses the Rayleigh-Ritz method implemented in ARPACK to compute the k highest eigenvalues and
    corresponding eigenvectors. It should be faster than a complete svd.

    !NOTE!:
    The order of the k highest eigenvalues is not guaranteed by this method!

    :param a_matrix: 2D-Matrix filled with floats for which we want to find the left eigenvectors
    :param k: the number of highest eigenvectors we want to find
    :return: returns the eigenvalues and eigenvectors as numpy arrays
    """
    singular_vectors, singular_values, _ = scipy.sparse.linalg.svds(a_matrix, k=k)
    return singular_values, singular_vectors

tridiagonal_eigenvalues(alphas, betas, amount=-1)

This function uses a fast approach for symmetric tridiagonal matrices to calculate the [amount] highest eigenvalues and corresponding eigenvectors.

Parameters:

Name Type Description Default
alphas ndarray

main diagonal elements

required
betas ndarray

off diagonal elements

required
amount

The number of eigenvalues you want to compute (from the highest)

-1

Returns:

Type Description

eigenvalues and corresponding eigenvectors

Source code in changepoynt/utils/linalg.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def tridiagonal_eigenvalues(alphas: np.ndarray, betas: np.ndarray, amount=-1):
    """
    This function uses a fast approach for symmetric tridiagonal matrices to calculate the [amount] highest eigenvalues
    and corresponding eigenvectors.

    :param alphas: main diagonal elements
    :param betas: off diagonal elements
    :param amount: The number of eigenvalues you want to compute (from the highest)
    :return: eigenvalues and corresponding eigenvectors
    """

    # check whether we need to use default parameters
    if amount < 0:
        amount = alphas.shape[0]

    # assertions about shape and dimensions as well as number of eigenvectors
    assert 0 < amount <= alphas.shape[0], 'We can only calculate one to size of matrix eigenvalues.'
    assert alphas.ndim == 1, 'The alphas need to be vectors.'
    assert betas.ndim == 1, 'The betas need to be vectors.'
    assert alphas.shape[0] - 1 == betas.shape[0], 'Alpha size needs to be exactly one bigger than beta size.'

    # compute the decomposition
    eigenvalues, eigenvectors = splg.eigh_tridiagonal(d=alphas, e=betas, select='i',
                                                      select_range=(alphas.shape[0] - amount, alphas.shape[0] - 1))

    # return them to be in sinking order
    return eigenvalues[::-1], eigenvectors[:, ::-1]

block_linalg

BlockHankel

This class is the base class for BlockHankel matrices. It mainly serves as the interface to capture numpy matrix multiplications (@) using the functions matmul (if a BlockHankel matrix is on the left side of the @ operator) and array_ufunc (if a BlockHankel is on the right side of the @ operator)

If these functions are called, the input is given to the abstract methods multiply_other_from_left and multiply_other_from_right which we expect the subclasses to implement in detail

Source code in changepoynt/utils/block_linalg.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
class BlockHankel:
    """
    This class is the base class for BlockHankel matrices.
    It mainly serves as the interface to capture numpy matrix multiplications (@) using the functions
    __matmul__ (if a BlockHankel matrix is on the left side of the @ operator)
    and __array_ufunc__ (if a BlockHankel is on the right side of the @ operator)

    If these functions are called, the input is given to the abstract methods
    multiply_other_from_left
    and
    multiply_other_from_right
    which we expect the subclasses to implement in detail
    """

    # denote that we will have a shape
    shape: tuple[int, ...]

    @abc.abstractmethod
    def materialize(self) -> np.ndarray:
        raise NotImplementedError

    @abc.abstractmethod
    def multiply_other_from_right(self, other_maxtrix: np.ndarray) -> np.ndarray:
        """Subclasses should implement this method."""
        raise NotImplementedError

    @abc.abstractmethod
    def multiply_other_from_left(self, other_maxtrix: np.ndarray) -> np.ndarray:
        """Subclasses should implement this method."""
        raise NotImplementedError

    @abc.abstractmethod
    def copy(self, deep: bool = False) -> "BlockHankel":
        """Subclasses should implement this method."""
        raise NotImplementedError

    @property
    @abc.abstractmethod
    def T(self,) -> "BlockHankel":
        """Subclasses should implement this method. The method should transpose the matrix."""
        raise NotImplementedError

    def __matmul__(self, other) -> typing.Union[np.ndarray, "BlockHankel"]:
        """
        This function is called by numpy if we use an instance of the current class on the left side of
        a matrix multiplication operator (@)

        :param other: the other matrix
        :return:
        """
        # right side matmul
        if isinstance(other, np.ndarray):
            return self.multiply_other_from_right(other)
        if isinstance(other, BlockHankel):
            return BlockHankelProductRepresentation(self, other)
        else:
            raise ValueError('At this point matmul is only with other matrices.')

    def __array_ufunc__(self, ufunc, method, *args, **kwargs) -> typing.Union[np.ndarray, "BlockHankel"]:
        """
        This function is called by numpy, if an instance of the current class on the left side of
        an operator where the left side is any numpy object.

        Basically, numpy first checks if the left side of an operator has defined an operation for
        both objects on the right and left. If this fails, it checks the object on the right.

        :param ufunc: the function that is called on the objects
        :param method: what type of invocation it was
        :param args: the arguments of the function
        :param kwargs: the keyword arguments of the function
        :return:
        """
        if method != '__call__' or len(kwargs) > 0 or len(args) != 2:
            return NotImplemented
        if ufunc == np.matmul:
            # left side matmul
            if isinstance(args[0], np.ndarray) and args[1] is self:
                return self.multiply_other_from_left(args[0])
            else:
                return NotImplemented
        else:
            return NotImplemented

    def __array_function__(self, func, types, args, kwargs) -> "MultilevelBlockHankelRepresentation":
        """
        We currently only support concatenation.
        https://numpy.org/neps/nep-0018-array-function-protocol.html
        """
        if func != np.concatenate:
            return NotImplemented
        if not all(issubclass(t, BlockHankel) for t in types) or not issubclass(self.__class__, BlockHankel):
            return NotImplemented

        # check the keyword arguments
        axis = kwargs.get('axis', 0)
        if axis not in [0, 1]:
            raise ValueError('Concatenation only in the first two dimensions.')

        # check that we only received one arg
        if len(args) > 1:
            raise ValueError('We only accept one non keyword argument.')

        # create the list of matrices
        return MultilevelBlockHankelRepresentation(args[0], axis)

T abstractmethod property

Subclasses should implement this method. The method should transpose the matrix.

__array_function__(func, types, args, kwargs)

We currently only support concatenation. https://numpy.org/neps/nep-0018-array-function-protocol.html

Source code in changepoynt/utils/block_linalg.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def __array_function__(self, func, types, args, kwargs) -> "MultilevelBlockHankelRepresentation":
    """
    We currently only support concatenation.
    https://numpy.org/neps/nep-0018-array-function-protocol.html
    """
    if func != np.concatenate:
        return NotImplemented
    if not all(issubclass(t, BlockHankel) for t in types) or not issubclass(self.__class__, BlockHankel):
        return NotImplemented

    # check the keyword arguments
    axis = kwargs.get('axis', 0)
    if axis not in [0, 1]:
        raise ValueError('Concatenation only in the first two dimensions.')

    # check that we only received one arg
    if len(args) > 1:
        raise ValueError('We only accept one non keyword argument.')

    # create the list of matrices
    return MultilevelBlockHankelRepresentation(args[0], axis)

__array_ufunc__(ufunc, method, *args, **kwargs)

This function is called by numpy, if an instance of the current class on the left side of an operator where the left side is any numpy object.

Basically, numpy first checks if the left side of an operator has defined an operation for both objects on the right and left. If this fails, it checks the object on the right.

Parameters:

Name Type Description Default
ufunc

the function that is called on the objects

required
method

what type of invocation it was

required
args

the arguments of the function

()
kwargs

the keyword arguments of the function

{}

Returns:

Type Description
Union[ndarray, BlockHankel]
Source code in changepoynt/utils/block_linalg.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def __array_ufunc__(self, ufunc, method, *args, **kwargs) -> typing.Union[np.ndarray, "BlockHankel"]:
    """
    This function is called by numpy, if an instance of the current class on the left side of
    an operator where the left side is any numpy object.

    Basically, numpy first checks if the left side of an operator has defined an operation for
    both objects on the right and left. If this fails, it checks the object on the right.

    :param ufunc: the function that is called on the objects
    :param method: what type of invocation it was
    :param args: the arguments of the function
    :param kwargs: the keyword arguments of the function
    :return:
    """
    if method != '__call__' or len(kwargs) > 0 or len(args) != 2:
        return NotImplemented
    if ufunc == np.matmul:
        # left side matmul
        if isinstance(args[0], np.ndarray) and args[1] is self:
            return self.multiply_other_from_left(args[0])
        else:
            return NotImplemented
    else:
        return NotImplemented

__matmul__(other)

This function is called by numpy if we use an instance of the current class on the left side of a matrix multiplication operator (@)

Parameters:

Name Type Description Default
other

the other matrix

required

Returns:

Type Description
Union[ndarray, BlockHankel]
Source code in changepoynt/utils/block_linalg.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
def __matmul__(self, other) -> typing.Union[np.ndarray, "BlockHankel"]:
    """
    This function is called by numpy if we use an instance of the current class on the left side of
    a matrix multiplication operator (@)

    :param other: the other matrix
    :return:
    """
    # right side matmul
    if isinstance(other, np.ndarray):
        return self.multiply_other_from_right(other)
    if isinstance(other, BlockHankel):
        return BlockHankelProductRepresentation(self, other)
    else:
        raise ValueError('At this point matmul is only with other matrices.')

copy(deep=False) abstractmethod

Subclasses should implement this method.

Source code in changepoynt/utils/block_linalg.py
415
416
417
418
@abc.abstractmethod
def copy(self, deep: bool = False) -> "BlockHankel":
    """Subclasses should implement this method."""
    raise NotImplementedError

multiply_other_from_left(other_maxtrix) abstractmethod

Subclasses should implement this method.

Source code in changepoynt/utils/block_linalg.py
410
411
412
413
@abc.abstractmethod
def multiply_other_from_left(self, other_maxtrix: np.ndarray) -> np.ndarray:
    """Subclasses should implement this method."""
    raise NotImplementedError

multiply_other_from_right(other_maxtrix) abstractmethod

Subclasses should implement this method.

Source code in changepoynt/utils/block_linalg.py
405
406
407
408
@abc.abstractmethod
def multiply_other_from_right(self, other_maxtrix: np.ndarray) -> np.ndarray:
    """Subclasses should implement this method."""
    raise NotImplementedError

BlockHankelProductRepresentation

Bases: BlockHankel

This class represents a matrix-matrix product of two hankel matrices. H @ H Instead of computing the product and materializing it, we keep both matrices so we have hankel property for products.

See Section III-G in: L. Weber and R. Lenz, "Accelerating Singular Spectrum Transformation for Scalable Change Point Detection," in IEEE Access, vol. 13, pp. 213556-213577, 2025, doi: 10.1109/ACCESS.2025.3640386. for details.

Source code in changepoynt/utils/block_linalg.py
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
class BlockHankelProductRepresentation(BlockHankel):
    """
    This class represents a matrix-matrix product of two hankel matrices.
    H @ H
    Instead of computing the product and materializing it, we keep both matrices so we have hankel property for
    products.

    See Section III-G in:
    L. Weber and R. Lenz,
    "Accelerating Singular Spectrum Transformation for Scalable Change Point Detection,"
    in IEEE Access, vol. 13, pp. 213556-213577, 2025, doi: 10.1109/ACCESS.2025.3640386.
    for details.
    """

    def __init__(self, first_hankel: BlockHankel, second_hankel: BlockHankel):

        # save the matrices for keeping
        self.first_hankel = first_hankel
        self.second_hankel = second_hankel

        # check that the dimensions match
        if self.first_hankel.shape[1] != self.second_hankel.shape[0]:
            raise ValueError(f'matmul: Operant shape missmatch: {self.first_hankel.shape} @ {self.second_hankel.shape} missmatch in inner dimensions.')

        # save the shape
        self.shape = (self.first_hankel.shape[0], self.second_hankel.shape[1])

    def copy(self, deep: bool = False) -> "BlockHankelProductRepresentation":
        new = self.__class__.__new__(self.__class__)

        # copy the matrices
        new.first_hankel = self.first_hankel.copy(deep=deep)
        new.second_hankel = self.second_hankel.copy(deep=deep)

        # copy the shape
        new.shape = self.shape
        return new

    @property
    def T(self):

        # make a copy of myself
        new = self.copy()

        # switch matrix order and transpose the matrices
        new.first_hankel, new.second_hankel = new.second_hankel.T, new.first_hankel.T

        # update the shape
        new.shape = tuple(reversed(new.shape))
        return new

    def multiply_other_from_right(self, other: np.ndarray) -> np.ndarray:
        return self.first_hankel @ (self.second_hankel @ other)

    def multiply_other_from_left(self, other: np.ndarray) -> np.ndarray:
        return (other @ self.first_hankel) @ self.second_hankel

    def materialize(self) -> np.ndarray:
        # we need to materialize the second otherwise, we will again only create a
        # BlockHankelProductRepresentation (the current class)
        #
        # materializing the second makes this a product of a BlockHankel with a np.ndarray
        # instead of BlockHankel with BlockHankel
        return self.first_hankel @ self.second_hankel.materialize()

__array_function__(func, types, args, kwargs)

We currently only support concatenation. https://numpy.org/neps/nep-0018-array-function-protocol.html

Source code in changepoynt/utils/block_linalg.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def __array_function__(self, func, types, args, kwargs) -> "MultilevelBlockHankelRepresentation":
    """
    We currently only support concatenation.
    https://numpy.org/neps/nep-0018-array-function-protocol.html
    """
    if func != np.concatenate:
        return NotImplemented
    if not all(issubclass(t, BlockHankel) for t in types) or not issubclass(self.__class__, BlockHankel):
        return NotImplemented

    # check the keyword arguments
    axis = kwargs.get('axis', 0)
    if axis not in [0, 1]:
        raise ValueError('Concatenation only in the first two dimensions.')

    # check that we only received one arg
    if len(args) > 1:
        raise ValueError('We only accept one non keyword argument.')

    # create the list of matrices
    return MultilevelBlockHankelRepresentation(args[0], axis)

__array_ufunc__(ufunc, method, *args, **kwargs)

This function is called by numpy, if an instance of the current class on the left side of an operator where the left side is any numpy object.

Basically, numpy first checks if the left side of an operator has defined an operation for both objects on the right and left. If this fails, it checks the object on the right.

Parameters:

Name Type Description Default
ufunc

the function that is called on the objects

required
method

what type of invocation it was

required
args

the arguments of the function

()
kwargs

the keyword arguments of the function

{}

Returns:

Type Description
Union[ndarray, BlockHankel]
Source code in changepoynt/utils/block_linalg.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def __array_ufunc__(self, ufunc, method, *args, **kwargs) -> typing.Union[np.ndarray, "BlockHankel"]:
    """
    This function is called by numpy, if an instance of the current class on the left side of
    an operator where the left side is any numpy object.

    Basically, numpy first checks if the left side of an operator has defined an operation for
    both objects on the right and left. If this fails, it checks the object on the right.

    :param ufunc: the function that is called on the objects
    :param method: what type of invocation it was
    :param args: the arguments of the function
    :param kwargs: the keyword arguments of the function
    :return:
    """
    if method != '__call__' or len(kwargs) > 0 or len(args) != 2:
        return NotImplemented
    if ufunc == np.matmul:
        # left side matmul
        if isinstance(args[0], np.ndarray) and args[1] is self:
            return self.multiply_other_from_left(args[0])
        else:
            return NotImplemented
    else:
        return NotImplemented

__matmul__(other)

This function is called by numpy if we use an instance of the current class on the left side of a matrix multiplication operator (@)

Parameters:

Name Type Description Default
other

the other matrix

required

Returns:

Type Description
Union[ndarray, BlockHankel]
Source code in changepoynt/utils/block_linalg.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
def __matmul__(self, other) -> typing.Union[np.ndarray, "BlockHankel"]:
    """
    This function is called by numpy if we use an instance of the current class on the left side of
    a matrix multiplication operator (@)

    :param other: the other matrix
    :return:
    """
    # right side matmul
    if isinstance(other, np.ndarray):
        return self.multiply_other_from_right(other)
    if isinstance(other, BlockHankel):
        return BlockHankelProductRepresentation(self, other)
    else:
        raise ValueError('At this point matmul is only with other matrices.')

BlockHankelRepresentation

Bases: BlockHankel

This matrix represents a Block Hankel matrix for large matrices and fast_hankel=True it makes matrix multiplication faster:

See our paper for more details: Efficient Hankel Matrix Decomposition for Changepoint Detection Lucas Weber, Richard Lenz 2024

Source code in changepoynt/utils/block_linalg.py
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
class BlockHankelRepresentation(BlockHankel):
    """
    This matrix represents a Block Hankel matrix
    for large matrices and fast_hankel=True it makes matrix multiplication faster:

    See our paper for more details:
    Efficient Hankel Matrix Decomposition for Changepoint Detection
    Lucas Weber, Richard Lenz 2024
    """

    def __init__(self, time_series: np.ndarray, end_index: int, window_length: int, window_number: int,
                 fast_hankel: bool):

        # save the parameters that are necessary for later computations
        self.window_length = window_length
        self.window_number = window_number
        self.fast_hankel = fast_hankel

        # get the hankel matrix block representation
        hankel, size = get_block_hankel_representation(time_series, end_index, window_length, window_number)
        self.size = size

        # create the representation and save it into the class
        if self.fast_hankel:

            # get the next fast fft length for efficient fft
            self.fft_length = spfft.next_fast_len(hankel.shape[0], real=True)

            # make the fft along the individual time series axis
            hankel = spfft.rfft(hankel, n=self.fft_length, axis=0)

        # save the hankel matrix and the shape
        self.hankel = hankel
        self.shape = (self.window_length*time_series.shape[1], self.window_number)

    def copy(self, deep: bool = False) -> "BlockHankelRepresentation":
        new = self.__class__.__new__(self.__class__)

        new.window_length = self.window_length
        new.window_number = self.window_number
        new.fast_hankel = self.fast_hankel
        new.shape = self.shape
        new.size = self.size

        if hasattr(self, "fft_length"):
            new.fft_length = self.fft_length

        if deep:
            new.hankel = self.hankel.copy()
        else:
            new.hankel = self.hankel

        return new

    def multiply_other_from_right(self, other_matrix: np.ndarray) -> np.ndarray:
        # check the dimensions
        if other_matrix.shape[0] != self.shape[1]:
            raise ValueError(f'matmul: Right matrix has a mismatch in its core dimension 0 '
                             f'(size {other_matrix.shape[0]=} is different from {self.shape[1]=})')

        # make the product based on whether we use the fast Hankel option
        if self.fast_hankel:
            return block_hankel_right_matmat_fft(self.hankel, other_matrix, self.fft_length, self.window_length, self.window_number)
        else:
            return block_hankel_right_matmat_direct(self.hankel, other_matrix)

    def multiply_other_from_left(self, other_matrix: np.ndarray) -> np.ndarray:

        # check the dimensions
        if other_matrix.shape[1] != self.shape[0]:
            raise ValueError(f'matmul: Left matrix has a mismatch in its core dimension 0 '
                             f'(size {other_matrix.shape[1]=} is different from {self.shape[0]=})')

        # make the product based on whether we use the fast Hankel option
        if self.fast_hankel:
            return block_hankel_left_matmat_fft(self.hankel, other_matrix, self.fft_length, self.window_length, self.window_number)
        else:
            return block_hankel_left_matmat_direct(self.hankel, other_matrix)

    @property
    def T(self) -> 'BlockHankelRepresentation':

        # make a copy of the object
        new = self.copy(deep=False)

        # switch the shape
        new.shape = tuple(reversed(new.shape))

        # switch the number of windows and window length
        new.window_length, new.window_number = new.window_number, new.window_length

        # switch the last dimensions of the hankel matrix
        new.hankel = new.hankel.transpose((0, 2, 1))
        return new

    def materialize(self) -> np.ndarray:
        """
        Explicitly build the full block Hankel matrix H for test multiplications.

        Definitions
        ----------
        p, q : the shape of the Hankel matrix (p rows, q columns)
        m, n : the shape of the blocks in the Hankel matrix (each block has m rows, n columns)

        B has shape (p + q - 1, m, n).
        H has shape (p*m, q*n).
        """
        hankel = self.hankel
        if self.fast_hankel:
            hankel = np.real_if_close(spfft.irfft(self.hankel, n=self.fft_length, axis=0))[:self.size,...]
        _, m, n = hankel.shape

        hankel_matrix_full = np.zeros(self.shape)
        for i in range(self.window_length):
            for j in range(self.window_number):
                hankel_matrix_full[i * m:(i + 1) * m, j * n:(j + 1) * n] = hankel[i + j]

        return hankel_matrix_full

__array_function__(func, types, args, kwargs)

We currently only support concatenation. https://numpy.org/neps/nep-0018-array-function-protocol.html

Source code in changepoynt/utils/block_linalg.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def __array_function__(self, func, types, args, kwargs) -> "MultilevelBlockHankelRepresentation":
    """
    We currently only support concatenation.
    https://numpy.org/neps/nep-0018-array-function-protocol.html
    """
    if func != np.concatenate:
        return NotImplemented
    if not all(issubclass(t, BlockHankel) for t in types) or not issubclass(self.__class__, BlockHankel):
        return NotImplemented

    # check the keyword arguments
    axis = kwargs.get('axis', 0)
    if axis not in [0, 1]:
        raise ValueError('Concatenation only in the first two dimensions.')

    # check that we only received one arg
    if len(args) > 1:
        raise ValueError('We only accept one non keyword argument.')

    # create the list of matrices
    return MultilevelBlockHankelRepresentation(args[0], axis)

__array_ufunc__(ufunc, method, *args, **kwargs)

This function is called by numpy, if an instance of the current class on the left side of an operator where the left side is any numpy object.

Basically, numpy first checks if the left side of an operator has defined an operation for both objects on the right and left. If this fails, it checks the object on the right.

Parameters:

Name Type Description Default
ufunc

the function that is called on the objects

required
method

what type of invocation it was

required
args

the arguments of the function

()
kwargs

the keyword arguments of the function

{}

Returns:

Type Description
Union[ndarray, BlockHankel]
Source code in changepoynt/utils/block_linalg.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def __array_ufunc__(self, ufunc, method, *args, **kwargs) -> typing.Union[np.ndarray, "BlockHankel"]:
    """
    This function is called by numpy, if an instance of the current class on the left side of
    an operator where the left side is any numpy object.

    Basically, numpy first checks if the left side of an operator has defined an operation for
    both objects on the right and left. If this fails, it checks the object on the right.

    :param ufunc: the function that is called on the objects
    :param method: what type of invocation it was
    :param args: the arguments of the function
    :param kwargs: the keyword arguments of the function
    :return:
    """
    if method != '__call__' or len(kwargs) > 0 or len(args) != 2:
        return NotImplemented
    if ufunc == np.matmul:
        # left side matmul
        if isinstance(args[0], np.ndarray) and args[1] is self:
            return self.multiply_other_from_left(args[0])
        else:
            return NotImplemented
    else:
        return NotImplemented

__matmul__(other)

This function is called by numpy if we use an instance of the current class on the left side of a matrix multiplication operator (@)

Parameters:

Name Type Description Default
other

the other matrix

required

Returns:

Type Description
Union[ndarray, BlockHankel]
Source code in changepoynt/utils/block_linalg.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
def __matmul__(self, other) -> typing.Union[np.ndarray, "BlockHankel"]:
    """
    This function is called by numpy if we use an instance of the current class on the left side of
    a matrix multiplication operator (@)

    :param other: the other matrix
    :return:
    """
    # right side matmul
    if isinstance(other, np.ndarray):
        return self.multiply_other_from_right(other)
    if isinstance(other, BlockHankel):
        return BlockHankelProductRepresentation(self, other)
    else:
        raise ValueError('At this point matmul is only with other matrices.')

materialize()

Explicitly build the full block Hankel matrix H for test multiplications.

Definitions

p, q : the shape of the Hankel matrix (p rows, q columns) m, n : the shape of the blocks in the Hankel matrix (each block has m rows, n columns)

B has shape (p + q - 1, m, n). H has shape (pm, qn).

Source code in changepoynt/utils/block_linalg.py
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
def materialize(self) -> np.ndarray:
    """
    Explicitly build the full block Hankel matrix H for test multiplications.

    Definitions
    ----------
    p, q : the shape of the Hankel matrix (p rows, q columns)
    m, n : the shape of the blocks in the Hankel matrix (each block has m rows, n columns)

    B has shape (p + q - 1, m, n).
    H has shape (p*m, q*n).
    """
    hankel = self.hankel
    if self.fast_hankel:
        hankel = np.real_if_close(spfft.irfft(self.hankel, n=self.fft_length, axis=0))[:self.size,...]
    _, m, n = hankel.shape

    hankel_matrix_full = np.zeros(self.shape)
    for i in range(self.window_length):
        for j in range(self.window_number):
            hankel_matrix_full[i * m:(i + 1) * m, j * n:(j + 1) * n] = hankel[i + j]

    return hankel_matrix_full

MultilevelBlockHankelRepresentation

Bases: BlockHankel

This class implements multiplication with a multilevel Block Hankel matrix, i.e., a matrix which can be divided into multiple parts that each have Hankel structure.

We expect to receive a list of: - Hankel matrices as BlockHankelRepresentation objects - The concatenation axis (currently only 0 and 1 are supported)

Source code in changepoynt/utils/block_linalg.py
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
class MultilevelBlockHankelRepresentation(BlockHankel):
    """
    This class implements multiplication with a multilevel Block Hankel matrix, i.e., a matrix which can be divided into
    multiple parts that each have Hankel structure.

    We expect to receive a list of:
    - Hankel matrices as BlockHankelRepresentation objects
    - The concatenation axis (currently only 0 and 1 are supported)
    """

    def __init__(self, matrix_tuples: list[BlockHankelRepresentation], axis: int):

        # check the input
        self.shape, self.individual_shapes = self.check_input(matrix_tuples, axis)

        # extract the matrices
        self.matrices = matrix_tuples
        self.axis = axis

    @staticmethod
    def check_input(input_list: list[BlockHankelRepresentation], axis: int) -> (int, int):

        # check whether the list is empty
        if len(input_list) < 1:
            raise ValueError('You did not input any Block Hankel matrices.')

        # check that the axis is either 0 or 1
        if axis not in (0, 1):
            raise ValueError(f'Concatenation only in the first two dimensions. Currently specified {axis=}.')

        # save the shapes
        shapes = [(0, 0)]*len(input_list)

        # check that we received only BlockHankelRepresentation matrices and the shapes match
        for idx, ele in enumerate(input_list):

            # check for the type of the class
            if not isinstance(ele, BlockHankelRepresentation):
                raise TypeError(f'We can only concatenate BlockHankelRepresentations in this class. Got {type(ele)}.')

            # save the dimension that we have to check
            shapes[idx] = ele.shape

        # decide which dimension we have to check for uniqueness
        # e.g., if we concatenate along axis 0, all axis 1 sizes have to match
        checkdim = 0 if axis == 1 else 1
        if len(set(ele[checkdim] for ele in shapes)) != 1:
            raise ValueError(f'We cannot concatenate BlockHankelRepresentations in this class due to different shapes in dimension {checkdim}: {shapes}.')

        if axis == 1:
            shape = (shapes[0][0], sum(ele[1] for ele in shapes))
        elif axis == 0:
            shape = (sum(ele[0] for ele in shapes), shapes[0][1])
        else:
            raise ValueError(f'{axis=} is invalid.')
        return shape, shapes

    def copy(self, deep: bool = False) -> "MultilevelBlockHankelRepresentation":
        new = self.__class__.__new__(self.__class__)

        if deep:
            new.matrices = [ele.copy(deep=deep) for ele in self.matrices]
        else:
            new.matrices = self.matrices
        new.axis = self.axis
        new.shape = self.shape

        if deep:
            new.individual_shapes = [ele.copy() for ele in self.individual_shapes]
        else:
            new.individual_shapes = self.individual_shapes
        return new

    @property
    def T(self) -> 'MultilevelBlockHankelRepresentation':
        """
        Compute the transposed.
        :return:
        """
        new = self.copy()
        # switch the shape
        new.shape = tuple(reversed(new.shape))

        # go through all of our matrices and transpose them
        new.matrices = [matrix.T for matrix in new.matrices]

        # update the shapes
        new.individual_shapes = [ele.shape for ele in new.matrices]

        # switch the axis
        new.axis = 1 if new.axis == 0 else 0

        return new

    def multiply_other_from_right(self, other: np.ndarray):

        # we need to create a target matrix
        result = np.zeros((self.shape[0], other.shape[1]))

        # check the matrix shapes
        if self.shape[1] != other.shape[0]:
            raise ValueError(f'Shape missmatch: {self.shape=} multiplied with {other.shape=} is not possible.')

        # choose depending on the axis of concatenation
        starting_point = 0
        if self.axis == 0:  # matrices are stacked ontop of each other
            for idx, matrix in enumerate(self.matrices):
                end_point = starting_point+self.individual_shapes[idx][0]
                result[starting_point:end_point, :] = matrix @ other
                starting_point = end_point
        if self.axis == 1:  # matrices are stacked next to each other
            for idx, matrix in enumerate(self.matrices):
                end_point = starting_point + starting_point+self.individual_shapes[idx][1]
                result += matrix @ other[starting_point:end_point, :]
                starting_point = end_point

        return result

    def multiply_other_from_left(self, other: np.ndarray):

        # we need to create a target matrix
        result = np.zeros((other.shape[0], self.shape[1]))

        # check the matrix shapes
        if self.shape[0] != other.shape[1]:
            raise ValueError(f'Shape missmatch: {other.shape=} multiplied with {self.shape=} is not possible.')

        # choose depending on the axis of concatenation
        starting_point = 0
        if self.axis == 0:  # matrices are stacked ontop of each other
            for idx, matrix in enumerate(self.matrices):
                end_point = starting_point + starting_point + self.individual_shapes[idx][0]
                result += other[:, starting_point:end_point] @ matrix
                starting_point = end_point
        if self.axis == 1:  # matrices are stacked next to each other
            for idx, matrix in enumerate(self.matrices):
                end_point = starting_point + self.individual_shapes[idx][1]
                result[:, starting_point:end_point] = other @ matrix
                starting_point = end_point
        return result

    def materialize(self) -> np.ndarray:
        """
        Explicitly build the full multilevel block Hankel matrix H for test multiplications.
        """

        return np.concatenate(tuple(mat.materialize()for mat in self.matrices), axis=self.axis)

T property

Compute the transposed.

Returns:

Type Description
MultilevelBlockHankelRepresentation

__array_function__(func, types, args, kwargs)

We currently only support concatenation. https://numpy.org/neps/nep-0018-array-function-protocol.html

Source code in changepoynt/utils/block_linalg.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def __array_function__(self, func, types, args, kwargs) -> "MultilevelBlockHankelRepresentation":
    """
    We currently only support concatenation.
    https://numpy.org/neps/nep-0018-array-function-protocol.html
    """
    if func != np.concatenate:
        return NotImplemented
    if not all(issubclass(t, BlockHankel) for t in types) or not issubclass(self.__class__, BlockHankel):
        return NotImplemented

    # check the keyword arguments
    axis = kwargs.get('axis', 0)
    if axis not in [0, 1]:
        raise ValueError('Concatenation only in the first two dimensions.')

    # check that we only received one arg
    if len(args) > 1:
        raise ValueError('We only accept one non keyword argument.')

    # create the list of matrices
    return MultilevelBlockHankelRepresentation(args[0], axis)

__array_ufunc__(ufunc, method, *args, **kwargs)

This function is called by numpy, if an instance of the current class on the left side of an operator where the left side is any numpy object.

Basically, numpy first checks if the left side of an operator has defined an operation for both objects on the right and left. If this fails, it checks the object on the right.

Parameters:

Name Type Description Default
ufunc

the function that is called on the objects

required
method

what type of invocation it was

required
args

the arguments of the function

()
kwargs

the keyword arguments of the function

{}

Returns:

Type Description
Union[ndarray, BlockHankel]
Source code in changepoynt/utils/block_linalg.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def __array_ufunc__(self, ufunc, method, *args, **kwargs) -> typing.Union[np.ndarray, "BlockHankel"]:
    """
    This function is called by numpy, if an instance of the current class on the left side of
    an operator where the left side is any numpy object.

    Basically, numpy first checks if the left side of an operator has defined an operation for
    both objects on the right and left. If this fails, it checks the object on the right.

    :param ufunc: the function that is called on the objects
    :param method: what type of invocation it was
    :param args: the arguments of the function
    :param kwargs: the keyword arguments of the function
    :return:
    """
    if method != '__call__' or len(kwargs) > 0 or len(args) != 2:
        return NotImplemented
    if ufunc == np.matmul:
        # left side matmul
        if isinstance(args[0], np.ndarray) and args[1] is self:
            return self.multiply_other_from_left(args[0])
        else:
            return NotImplemented
    else:
        return NotImplemented

__matmul__(other)

This function is called by numpy if we use an instance of the current class on the left side of a matrix multiplication operator (@)

Parameters:

Name Type Description Default
other

the other matrix

required

Returns:

Type Description
Union[ndarray, BlockHankel]
Source code in changepoynt/utils/block_linalg.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
def __matmul__(self, other) -> typing.Union[np.ndarray, "BlockHankel"]:
    """
    This function is called by numpy if we use an instance of the current class on the left side of
    a matrix multiplication operator (@)

    :param other: the other matrix
    :return:
    """
    # right side matmul
    if isinstance(other, np.ndarray):
        return self.multiply_other_from_right(other)
    if isinstance(other, BlockHankel):
        return BlockHankelProductRepresentation(self, other)
    else:
        raise ValueError('At this point matmul is only with other matrices.')

materialize()

Explicitly build the full multilevel block Hankel matrix H for test multiplications.

Source code in changepoynt/utils/block_linalg.py
751
752
753
754
755
756
def materialize(self) -> np.ndarray:
    """
    Explicitly build the full multilevel block Hankel matrix H for test multiplications.
    """

    return np.concatenate(tuple(mat.materialize()for mat in self.matrices), axis=self.axis)

block_hankel_left_matmat_direct(hankel, other_matrix)

Compute A @ H without FFT, using a vectorized einsum.

H has block structure:

H[i, j] = B[i + j]
Definitions

p, q : the shape of the Hankel matrix (p rows, q columns) m, n : the shape of the blocks in the Hankel matrix (each block has m rows, n columns)

Parameters

hankel : ndarray, shape (p + q - 1, m, n) Block sequence. hankel[k] is an m x n matrix block.

other_matrix : ndarray Matrix to multiply by Either: - dense form: shape (s, p*m) - block form: shape (p, s, m)

Returns

Z : ndarray If A was dense, returns dense shape (s, q*n). If A was block-form, returns block shape (q, s, n).

Source code in changepoynt/utils/block_linalg.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def block_hankel_left_matmat_direct(hankel: np.ndarray, other_matrix: np.ndarray):
    """
    Compute A @ H without FFT, using a vectorized einsum.

    H has block structure:

        H[i, j] = B[i + j]

    Definitions
    ----------
    p, q : the shape of the Hankel matrix (p rows, q columns)
    m, n : the shape of the blocks in the Hankel matrix (each block has m rows, n columns)

    Parameters
    ----------
    hankel : ndarray, shape (p + q - 1, m, n)
        Block sequence. hankel[k] is an m x n matrix block.

    other_matrix : ndarray
        Matrix to multiply by
        Either:
        - dense form: shape (s, p*m)
        - block form: shape (p, s, m)

    Returns
    -------
    Z : ndarray
        If A was dense, returns dense shape (s, q*n).
        If A was block-form, returns block shape (q, s, n).
    """

    num_blocks, m, n = hankel.shape
    return_dense = False

    if other_matrix.ndim == 2:
        s, cols = other_matrix.shape

        if cols % m != 0:
            raise ValueError("A has incompatible number of columns.")

        p = cols // m
        other_matrix_blocks = other_matrix.reshape(s, p, m).transpose(1, 0, 2)
        return_dense = True

    elif other_matrix.ndim == 3:
        p, s, m_A = other_matrix.shape

        if m_A != m:
            raise ValueError("Block dimensions do not match.")

        other_matrix_blocks = other_matrix

    else:
        raise ValueError("A must have shape (s, p*m) or (p, s, m).")

    q = num_blocks - p + 1

    if q <= 0:
        raise ValueError("Need len(B) >= p.")

    # sliding_window_view gives shape (q, m, n, p)
    hankel_windows = np.lib.stride_tricks.sliding_window_view(hankel, window_shape=p, axis=0)

    # Move the window axis so that:
    # BH[j, i] = B[j + i]
    # BH has shape (q, p, m, n)
    hankel_windows_moved = np.moveaxis(hankel_windows, -1, 1)

    # Z[j, s, n] = sum_{i,m} A_blocks[i,s,m] * BH[j,i,m,n]
    result_matrix_blocks = np.einsum("ism,jimn->jsn", other_matrix_blocks, hankel_windows_moved, optimize=True)

    if return_dense:
        return result_matrix_blocks.transpose(1, 0, 2).reshape(s, q * n)

    return result_matrix_blocks

block_hankel_left_matmat_fft(hankel_fft, other_matrix, fft_length, window_length, window_number)

Compute A @ H using FFT, where H is a block Hankel matrix.

H has block structure:

H[i, j] = B[i + j]
Definitions

p, q : the shape of the Hankel matrix (p rows, q columns) m, n : the shape of the blocks in the Hankel matrix (each block has m rows, n columns)

Parameters

hankel_fft : ndarray, shape (p + q - 1, m, n) The fft transformed (along axis 0) of the Block sequence. B[k] is an m x n matrix block.

other_matrix : ndarray The matrix that we multiply from the right (other_matrix @ hankel_fft). Either: - dense form: shape (s, p*m) - block form: shape (p, s, m)

fft_length : int The length of the fft (with padding). We recommend using scipy.fft.next_fast_length

window_length : int The number of blocks per row of the Hankel matrix

window_number : int The number of blocks per column of the Hankel matrix

Returns

result_matrix : ndarray Outcome of other_matrix @ hankel_fft If A was dense, returns dense shape (s, q*n). If A was block-form, returns block shape (q, s, n).

Source code in changepoynt/utils/block_linalg.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def block_hankel_left_matmat_fft(hankel_fft: np.ndarray, other_matrix: np.ndarray, fft_length: int,
                                 window_length: int, window_number: int):
    """
    Compute A @ H using FFT, where H is a block Hankel matrix.

    H has block structure:

        H[i, j] = B[i + j]


    Definitions
    ----------
    p, q : the shape of the Hankel matrix (p rows, q columns)
    m, n : the shape of the blocks in the Hankel matrix (each block has m rows, n columns)

    Parameters
    ----------

    hankel_fft : ndarray, shape (p + q - 1, m, n)
        The fft transformed (along axis 0) of the Block sequence. B[k] is an m x n matrix block.

    other_matrix : ndarray
        The matrix that we multiply from the right (other_matrix @ hankel_fft).
        Either:
        - dense form: shape (s, p*m)
        - block form: shape (p, s, m)

    fft_length : int
        The length of the fft (with padding). We recommend using scipy.fft.next_fast_length

    window_length : int
        The number of blocks per row of the Hankel matrix

    window_number : int
        The number of blocks per column of the Hankel matrix

    Returns
    -------
    result_matrix : ndarray
        Outcome of other_matrix @ hankel_fft
        If A was dense, returns dense shape (s, q*n).
        If A was block-form, returns block shape (q, s, n).
    """

    # get the dimensions of the hankel matrix
    _, m, n = hankel_fft.shape
    p = window_length
    q = window_number

    return_dense = False

    if other_matrix.ndim == 2:
        # other_matrix is dense: shape (s, p*m)
        s, cols = other_matrix.shape

        if cols != p*m:
            raise ValueError("A has incompatible number of columns.")

        other_matrix_blocks = other_matrix.reshape(s, p, m).transpose(1, 0, 2)
        return_dense = True

    elif other_matrix.ndim == 3:
        # other_matrix is already block-form: shape (p, s, m)
        p_x, s, m_x = other_matrix.shape

        if m_x != m:
            raise ValueError("Block dimensions do not match.")

        other_matrix_blocks = other_matrix

    else:
        raise ValueError("A must have shape (s, p*m) or (p, s, m).")

    if q <= 0:
        raise ValueError("Need len(B) >= p.")

    # For left multiplication:
    #
    # result_matrix_j = sum_i fft_hankel_i other_matrix_{i+j}
    #
    # This is obtained from convolution of reversed A with B.
    other_matrix_rev = other_matrix_blocks[::-1]

    other_matrix_fft = spfft.rfft(other_matrix_rev, n=fft_length, axis=0)  # shape (fft_len, s, m)

    # Frequency-wise matrix-matrix multiplication:
    #
    # result_matrix_fft[ell] = other_matrix_fft[ell] @ fft_hankel[ell]
    #
    # Shapes:
    #   fft_hankel[ell] : (s, m)
    #   other_matrix_fft[ell] : (m, n)
    #   result_matrix_fft[ell] : (s, n)
    result_matrix_fft = other_matrix_fft @ hankel_fft                                # shape (fft_len, s, n)

    conv = spfft.irfft(result_matrix_fft, n=fft_length, axis=0)

    # Extract the desired Hankel result
    result_matrix = conv[p - 1 : p - 1 + q]          # shape (q, s, n)

    result_matrix = np.real_if_close(result_matrix)

    if return_dense:
        return result_matrix.transpose(1, 0, 2).reshape(s, q * n)

    return result_matrix

block_hankel_right_matmat_direct(hankel, other_matrix)

Compute H @ X without FFT, using a vectorized einsum.

H has block structure:

H[i, j] = B[i + j]
Definitions

p, q : the shape of the Hankel matrix (p rows, q columns) m, n : the shape of the blocks in the Hankel matrix (each block has m rows, n columns)

Parameters

hankel : ndarray, shape (p + q - 1, m, n) Block sequence. hankel[k] is an m x n matrix block.

other_matrix : ndarray Either: - dense form: shape (q*n, r) - block form: shape (q, n, r)

Returns

Y : ndarray If X was dense, returns dense shape (p*m, r). If X was block-form, returns block shape (p, m, r).

Source code in changepoynt/utils/block_linalg.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def block_hankel_right_matmat_direct(hankel: np.ndarray, other_matrix: np.ndarray):
    """
    Compute H @ X without FFT, using a vectorized einsum.

    H has block structure:

        H[i, j] = B[i + j]

    Definitions
    ----------
    p, q : the shape of the Hankel matrix (p rows, q columns)
    m, n : the shape of the blocks in the Hankel matrix (each block has m rows, n columns)

    Parameters
    ----------
    hankel : ndarray, shape (p + q - 1, m, n)
        Block sequence. hankel[k] is an m x n matrix block.

    other_matrix : ndarray
        Either:
        - dense form: shape (q*n, r)
        - block form: shape (q, n, r)

    Returns
    -------
    Y : ndarray
        If X was dense, returns dense shape (p*m, r).
        If X was block-form, returns block shape (p, m, r).
    """

    num_blocks, m, n = hankel.shape
    return_dense = False

    if other_matrix.ndim == 2:
        rows, r = other_matrix.shape

        if rows % n != 0:
            raise ValueError("other_matrix_blocks has incompatible number of rows.")

        q = rows // n
        other_matrix_blocks = other_matrix.reshape(q, n, r)
        return_dense = True

    elif other_matrix.ndim == 3:
        (q, n_other_matrix, r) = other_matrix.shape

        if n_other_matrix != n:
            raise ValueError("Block dimensions do not match.")

        other_matrix_blocks = other_matrix

    else:
        raise ValueError("other_matrix_blocks must have shape (q*n, r) or (q, n, r).")

    p = num_blocks - q + 1

    if p <= 0:
        raise ValueError("Need len(hankel) >= q.")

    # sliding_window_view gives shape (p, m, n, q)
    hankel_windows = np.lib.stride_tricks.sliding_window_view(hankel, window_shape=q, axis=0)

    # Move the window axis so that:
    # b_moved[i, j] = hankel[i + j]
    # b_moved has shape (p, q, m, n)
    hankel_windows_moved = np.moveaxis(hankel_windows, -1, 1)

    # result_matrix[i, a, c] = sum_{j,b} b_moved[i,j,a,b] * other_matrix_blocks_blocks[j,b,c]
    result_matrix = np.einsum("ijmn,jnr->imr", hankel_windows_moved, other_matrix_blocks, optimize=True)

    if return_dense:
        return result_matrix.reshape(p * m, r)

    return result_matrix

block_hankel_right_matmat_fft(hankel_fft, other_matrix, fft_length, window_length, window_number)

Compute H @ X using FFT, where H is a block Hankel matrix.

H has block structure:

H[i, j] = B[i + j]
Definitions

p, q : the shape of the Hankel matrix (p rows, q columns) m, n : the shape of the blocks in the Hankel matrix (each block has m rows, n columns)

Parameters

hankel_fft : ndarray, shape (p + q - 1, m, n) The fft transformed (along axis 0) of the Block sequence. B[k] is an m x n matrix block.

other_matrix : ndarray Either: - dense form: shape (q*n, r) - block form: shape (q, n, r)

fft_length : int The length of the fft (with padding). We recommend using scipy.fft.next_fast_length

window_length : int The number of blocks per row of the Hankel matrix

window_number : int The number of blocks per column of the Hankel matrix

Returns

Y : ndarray If X was dense, returns dense shape (p*m, r). If X was block-form, returns block shape (p, m, r).

Source code in changepoynt/utils/block_linalg.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def block_hankel_right_matmat_fft(hankel_fft: np.ndarray, other_matrix: np.ndarray, fft_length: int,
                                  window_length: int, window_number: int):
    """
    Compute H @ X using FFT, where H is a block Hankel matrix.

    H has block structure:

        H[i, j] = B[i + j]


    Definitions
    ----------
    p, q : the shape of the Hankel matrix (p rows, q columns)
    m, n : the shape of the blocks in the Hankel matrix (each block has m rows, n columns)

    Parameters
    ----------
    hankel_fft : ndarray, shape (p + q - 1, m, n)
        The fft transformed (along axis 0) of the Block sequence. B[k] is an m x n matrix block.

    other_matrix : ndarray
        Either:
        - dense form: shape (q*n, r)
        - block form: shape (q, n, r)

    fft_length : int
        The length of the fft (with padding). We recommend using scipy.fft.next_fast_length

    window_length : int
        The number of blocks per row of the Hankel matrix

    window_number : int
        The number of blocks per column of the Hankel matrix

    Returns
    -------
    Y : ndarray
        If X was dense, returns dense shape (p*m, r).
        If X was block-form, returns block shape (p, m, r).
    """
    _, m, n = hankel_fft.shape
    p = window_length
    q = window_number

    return_dense = False

    if other_matrix.ndim == 2:
        # X is dense: shape (q*n, r)
        rows, r = other_matrix.shape

        if rows != q*n:
            raise ValueError("X has incompatible number of rows.")

        other_matrix_blocks = other_matrix.reshape(q, n, r)
        return_dense = True

    elif other_matrix.ndim == 3:
        # X is already block-form: shape (q, n, r)
        q_x, n_x, r = other_matrix.shape

        if q_x != q or n_x != n:
            raise ValueError("Block dimensions do not match.")

        other_matrix_blocks = other_matrix

    else:
        raise ValueError("X must have shape (q*n, r) or (q, n, r).")

    if p <= 0:
        raise ValueError("Need len(B) >= q.")

    # Hankel multiplication:
    #
    # Y_i = sum_j B_{i+j} X_j
    #
    # This becomes convolution of B with reversed X.
    other_matrix_rev = other_matrix_blocks[::-1]
    other_matrix_fft = spfft.rfft(other_matrix_rev, n=fft_length, axis=0)  # shape (fft_len, n, r)

    # Frequency-wise matrix-matrix multiplication:
    #
    # FY[ell] = FB[ell] @ FX[ell]
    #
    # Shapes:
    #   FB[ell] : (m, n)
    #   FX[ell] : (n, r)
    #   FY[ell] : (m, r)
    result_matrix_fft = hankel_fft @ other_matrix_fft                                # shape (fft_len, m, r)

    conv = spfft.irfft(result_matrix_fft, n=fft_length, axis=0)

    # Extract the desired Hankel result
    result_matrix_blocks = conv[q - 1 : q - 1 + p]          # shape (p, m, r)

    result_matrix_blocks = np.real_if_close(result_matrix_blocks)

    if return_dense:
        return result_matrix_blocks.reshape(p * m, r)

    return result_matrix_blocks

densityratioestimation

Relative Unconstrained Least-Squares Fitting (RuLSIF): A Python Implementation

[1] Liu S , Yamada M , Collier N , et al. Change-Point Detection in Time-Series Data by Relative Density-Ratio Estimation[J]. 2012.

[2] Kawahara Y , Sugiyama M . Sequential change‐point detection based on direct density‐ratio estimation[M]. John Wiley & Sons, Inc. 2012.

[3] Kawahara Y , Yairi T , Machida K . Change-Point Detection in Time-Series Data Based on Subspace Identification[C] Data Mining, 2007. ICDM 2007. Seventh IEEE International Conference on. IEEE, 2007.

Taken from https://github.com/chenxingqiang/rulsif_abrupt-change_detection copied and heavily modified for performance and jit compilation. Also corrected a missing transpose in the Gaussian Kernel class. Additionally, optimized the cross validation to not recompute the gaussian kernel several times as we only change a factor within the exponent (the sigma).

Has been compared to the demo implementation of the original author: https://github.com/anewgithubname/change_detection

Unfortunately, the cross validation can't be JIT compiled, as we need to compute theta_hat, which is way faster using a specialized scipy function for hermitian matrices. Numba can't deal with scipy yet.

AlphaRelativeDensityRatioEstimator

Computes the alpha-relative density ratio estimate of P(X_ref) and P(X_test) The alpha-relative density ratio estimator, r_alpha(X), is given by the following kernel model: g(X; theta) = SUM( (theta_l * K(X, X_centers_l)), l=0, n ) where theta is a vector of parameters [theta_1, theta_2, ..., theta_l]^T to be learned from the data samples. The parameters theta in the model g(X; theta) is calculated by solving the following optimization problem: theta_hat = argmin [ ( (1/2) * theta^T * H_hat * theta) - (h_hat^T * theta) + ( lambda/2 * theta^T * theta) ] where the expression (lambda/2 * theta^T * theta), with lambda >= 0, is a regularization term used to penalize against overfitting Reference: Relative Density-Ratio Estimation for Robust Distribution Comparison. Makoto Yamada, Taiji Suzuki, Takafumi Kanamori, Hirotaka Hachiya, and Masashi Sugiyama. NIPS, page 594-602. (2011)

Source code in changepoynt/utils/densityratioestimation.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
class AlphaRelativeDensityRatioEstimator:
    """
    Computes the alpha-relative density ratio estimate of P(X_ref) and P(X_test)
    The alpha-relative density ratio estimator, r_alpha(X), is given by the
    following kernel model:
    g(X; theta) = SUM( (theta_l * K(X, X_centers_l)), l=0, n )
    where theta is a vector of parameters [theta_1, theta_2, ..., theta_l]^T
    to be learned from the data samples. The parameters theta in the model
    g(X; theta) is calculated by solving the following optimization problem:
      theta_hat = argmin [ ( (1/2) * theta^T * H_hat * theta) - (h_hat^T * theta) + ( lambda/2 * theta^T * theta) ]
    where the expression (lambda/2 * theta^T * theta), with lambda >= 0, is
    a regularization term used to penalize against overfitting
    Reference:
    Relative Density-Ratio Estimation for Robust Distribution Comparison. Makoto Yamada,
    Taiji Suzuki, Takafumi Kanamori, Hirotaka Hachiya, and Masashi Sugiyama. NIPS,
    page 594-602. (2011)
    """

    def __init__(self, alpha_constraint=0.0, sigma_width=1.0, lambda_regularizer=0.0, kernel_basis=1):
        self.alpha_constraint = alpha_constraint
        self.sigma_width = sigma_width
        self.lambda_regularizer = lambda_regularizer
        self.kernel_basis = kernel_basis

    def apply(self, reference_samples: np.ndarray, test_samples: np.ndarray, gaussian_centers: np.ndarray) \
            -> (float, float):
        """
        Computes the alpha-relative density ratio, r_alpha(X), of P(X_ref) and P(X_test)
          r_alpha(X) = P(Xref) / (alpha * P(Xref) + (1 - alpha) * P(X_test)
        Returns density ratio estimate at X_ref, r_alpha_ref, and at X_test, r_alpha_test
        """
        # Apply the kernel function to the reference and test samples
        k_ref = compute_gaussian_kernel(reference_samples, gaussian_centers, self.sigma_width).T
        k_test = compute_gaussian_kernel(test_samples, gaussian_centers, self.sigma_width).T

        # Compute the parameters, theta_hat, of the density ratio estimator
        H_hat = compute_H_hat(self.alpha_constraint, k_ref, k_test)
        h_hat = compute_h_hat(k_ref)
        theta_hat = compute_theta_hat(H_hat, self.lambda_regularizer*np.eye(self.kernel_basis), h_hat)

        # Estimate the density ratio, r_alpha_ref = r_alpha(X_ref)
        r_alpha_ref = g_of_x_theta(k_ref, theta_hat)
        # Estimate the density ratio, r_alpha_test = r_alpha(X_test)
        r_alpha_test = g_of_x_theta(k_test, theta_hat)

        return r_alpha_ref, r_alpha_test

apply(reference_samples, test_samples, gaussian_centers)

Computes the alpha-relative density ratio, r_alpha(X), of P(X_ref) and P(X_test) r_alpha(X) = P(Xref) / (alpha * P(Xref) + (1 - alpha) * P(X_test) Returns density ratio estimate at X_ref, r_alpha_ref, and at X_test, r_alpha_test

Source code in changepoynt/utils/densityratioestimation.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def apply(self, reference_samples: np.ndarray, test_samples: np.ndarray, gaussian_centers: np.ndarray) \
        -> (float, float):
    """
    Computes the alpha-relative density ratio, r_alpha(X), of P(X_ref) and P(X_test)
      r_alpha(X) = P(Xref) / (alpha * P(Xref) + (1 - alpha) * P(X_test)
    Returns density ratio estimate at X_ref, r_alpha_ref, and at X_test, r_alpha_test
    """
    # Apply the kernel function to the reference and test samples
    k_ref = compute_gaussian_kernel(reference_samples, gaussian_centers, self.sigma_width).T
    k_test = compute_gaussian_kernel(test_samples, gaussian_centers, self.sigma_width).T

    # Compute the parameters, theta_hat, of the density ratio estimator
    H_hat = compute_H_hat(self.alpha_constraint, k_ref, k_test)
    h_hat = compute_h_hat(k_ref)
    theta_hat = compute_theta_hat(H_hat, self.lambda_regularizer*np.eye(self.kernel_basis), h_hat)

    # Estimate the density ratio, r_alpha_ref = r_alpha(X_ref)
    r_alpha_ref = g_of_x_theta(k_ref, theta_hat)
    # Estimate the density ratio, r_alpha_test = r_alpha(X_test)
    r_alpha_test = g_of_x_theta(k_test, theta_hat)

    return r_alpha_ref, r_alpha_test

PearsonRelativeDivergenceEstimator

Calculates the alpha-relative Pearson divergence score The alpha-relative Pearson divergence is given by the following expression: PE_alpha = -(alpha/2(n_ref)) * SUM(r_alpha(X_ref_i)^2, i=0, n_ref) - ((1-alpha)/2(n_test)) * SUM(r_alpha(X_test_j)^2, j=0, n_test) + (1/n_ref) * SUM(r_alpha(X_ref_i), i=0, n_ref) - 1/2 where r_alpha(X) is the alpha-relative density ratio estimator and is given by the following kernel model: g(X; theta) = SUM( (theta_l * K(X, X_centers_l)), l=0, n ) Reference: Relative Density-Ratio Estimation for Robust Distribution Comparison. Makoto Yamada, Taiji Suzuki, Takafumi Kanamori, Hirotaka Hachiya, and Masashi Sugiyama. NIPS, page 594-602. (2011)

Source code in changepoynt/utils/densityratioestimation.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
class PearsonRelativeDivergenceEstimator:
    """
    Calculates the alpha-relative Pearson divergence score
    The alpha-relative Pearson divergence is given by the following expression:
      PE_alpha = -(alpha/2(n_ref)) * SUM(r_alpha(X_ref_i)^2, i=0, n_ref)        -
                  ((1-alpha)/2(n_test)) * SUM(r_alpha(X_test_j)^2, j=0, n_test) +
                  (1/n_ref) * SUM(r_alpha(X_ref_i), i=0, n_ref)                 -
                  1/2
    where r_alpha(X) is the alpha-relative density ratio estimator and is given by
    the following kernel model:
      g(X; theta) = SUM( (theta_l * K(X, X_centers_l)), l=0, n )
    Reference:
    Relative Density-Ratio Estimation for Robust Distribution Comparison. Makoto
    Yamada, Taiji Suzuki, Takafumi Kanamori, Hirotaka Hachiya, and Masashi Sugiyama.
    NIPS, page 594-602. (2011)
    """

    def __init__(self, alpha_constraint=0.0, sigma_width=1.0, lambda_regularizer=0.0, kernel_basis=1):
        self.alpha_constraint = alpha_constraint
        self.sigma_width = sigma_width
        self.lambda_regularizer = lambda_regularizer
        self.kernel_basis = kernel_basis

    def apply(self, reference_samples=None, test_samples=None, gaussian_centers=None):
        """
        Calculates the alpha-relative Pearson divergence score
        """
        density_ratio_estimator = AlphaRelativeDensityRatioEstimator(self.alpha_constraint, self.sigma_width,
                                                                     self.lambda_regularizer, self.kernel_basis)

        # Estimate alpha relative density ratio and pearson divergence score
        (r_alpha_Xref, r_alpha_Xtest) = density_ratio_estimator.apply(reference_samples, test_samples, gaussian_centers)

        pe_divergence = (np.mean(r_alpha_Xref) -
                         (0.5 * (self.alpha_constraint * np.mean(r_alpha_Xref ** 2) +
                                 (1.0 - self.alpha_constraint) * np.mean(r_alpha_Xtest ** 2))) - 0.5)

        return pe_divergence, r_alpha_Xtest

apply(reference_samples=None, test_samples=None, gaussian_centers=None)

Calculates the alpha-relative Pearson divergence score

Source code in changepoynt/utils/densityratioestimation.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def apply(self, reference_samples=None, test_samples=None, gaussian_centers=None):
    """
    Calculates the alpha-relative Pearson divergence score
    """
    density_ratio_estimator = AlphaRelativeDensityRatioEstimator(self.alpha_constraint, self.sigma_width,
                                                                 self.lambda_regularizer, self.kernel_basis)

    # Estimate alpha relative density ratio and pearson divergence score
    (r_alpha_Xref, r_alpha_Xtest) = density_ratio_estimator.apply(reference_samples, test_samples, gaussian_centers)

    pe_divergence = (np.mean(r_alpha_Xref) -
                     (0.5 * (self.alpha_constraint * np.mean(r_alpha_Xref ** 2) +
                             (1.0 - self.alpha_constraint) * np.mean(r_alpha_Xtest ** 2))) - 0.5)

    return pe_divergence, r_alpha_Xtest

RuLSIF

Estimates the alpha-relative Pearson Divergence via the least Squares Relative Density Ratio Approximation Reference: Relative Density-Ratio Estimation for Robust Distribution Comparison. Makoto Yamada, Taiji Suzuki, Takafumi Kanamori, Hirotaka Hachiya, and Masashi Sugiyama. NIPS, page 594-602. (2011)

Source code in changepoynt/utils/densityratioestimation.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
class RuLSIF:
    """
    Estimates the alpha-relative Pearson Divergence via the least Squares Relative
    Density Ratio Approximation
    Reference:
    Relative Density-Ratio Estimation for Robust Distribution Comparison. Makoto
    Yamada, Taiji Suzuki, Takafumi Kanamori, Hirotaka Hachiya, and Masashi Sugiyama.
    NIPS, page 594-602. (2011)
    """

    def __init__(self, alpha=0.1, kernel_number=100, cross_folds=5, sigma: float = None, lambda_: float = None):
        self.alpha = alpha
        self.kernel_number = kernel_number
        self.cross_folds = cross_folds
        self.gaussian_centers = None
        self.sigma_width = sigma
        self.lambda_regularizer = lambda_

        # check if we need to set sigma or lambda using cross validation
        self.cv = self.sigma_width is None or self.lambda_regularizer is None

    @staticmethod
    def compute_gaussian_width_candidates(reference_samples: np.ndarray, test_samples: np.ndarray):
        """
        Compute a candidate list of Gaussian kernel widths. The best width will be
        selected via cross-validation

        Jaakkola's heuristic method for setting the width parameter of the Gaussian
        radial basis function kernel is to pick a quantile (usually the median) of
        the distribution of Euclidean distances between points having different
        labels.
        Reference:

        Jaakkola T S, Haussler D. Exploiting Generative Models in Discriminative Classifiers[J].
        Advances in Neural Information Processing Systems, 1998, 11(11):487--493.

        Jaakkola, M. Diekhaus, and D. Haussler. Using the Fisher kernel method to detect
        remote protein homologies. In T. Lengauer, R. Schneider, P. Bork, D. Brutlad, J.
        Glasgow, H.- W. Mewes, and R. Zimmer, editors, Proceedings of the Seventh
        International Conference on Intelligent Systems for Molecular Biology.

        It is the same technique as proposed in [1]
        """
        # create a complete sample array
        samples = np.c_[reference_samples, test_samples]

        # compute the pairwise distances
        distances = scipy.spatial.distance.pdist(samples.T, 'sqeuclidean')

        # apply the same formula as in
        # https://github.com/anewgithubname/change_detection/blob/main/lib/comp_med.m
        median_distance = np.sqrt(0.5 * np.median(distances[distances > 0]))

        return median_distance * np.array([0.6, 0.8, 1, 1.2, 1.4])

    @staticmethod
    def generate_regularization_params():
        """
        Generates a candidate list of regularization parameters to be used
        with the L1 regularizer term of RULSIF optimization problem.  The
        best regularizer parameter will be chosen via cross-validation.
        The values itself are taken from the paper [1].
        """
        return 10.0 ** np.array([-3, -2, -1, 0, 1])

    def generate_gaussian_centers(self, reference_samples=None):
        """
        Choose Gaussian centers randomly from the reference samples.
        """
        numcols = reference_samples.shape[1]
        reference_sample_idxs = np.random.permutation(numcols)

        self.kernel_number = min(self.kernel_number, numcols)
        gaussian_centers = reference_samples[:, reference_sample_idxs[0:self.kernel_number]]

        return gaussian_centers

    @staticmethod
    def cross_validate(reference_samples: np.ndarray, test_samples: np.ndarray, gaussian_centers: np.ndarray,
                       sigma_widths: np.ndarray, lambda_candidates: np.ndarray, alpha: float, kernel_number: int,
                       cross_folds=5):
        (refRows, refCols) = reference_samples.shape
        (testRows, testCols) = test_samples.shape

        # Initialize cross validation scoring matrix
        cross_validation_scores = np.zeros((sigma_widths.shape[0], lambda_candidates.shape[0]))

        # Initialize a cross validation index assignment list
        reference_samples_cv_idxs = np.random.permutation(refCols)
        reference_samples_cv_split = (np.arange(start=0, stop=refCols, step=1) * cross_folds) // refCols

        test_samples_cv_idxs = np.random.permutation(testCols)
        test_samples_cv_split = (np.arange(start=0, stop=testCols, step=1) * cross_folds) // testCols

        # initially calculate the kernel matrix using the candidate sigma width
        K_ref = compute_gaussian_kernel(reference_samples, gaussian_centers, sigma_widths[0]).T
        K_test = compute_gaussian_kernel(test_samples, gaussian_centers, sigma_widths[0]).T
        old_sigma = sigma_widths[0]

        # create an identity matrix which we need for computing theta
        # but as we can already create it here, we save time in our hottest path deep in the cross validation
        identity = np.eye(kernel_number)

        # Initiate k-fold cross-validation procedure. Using variable notation similar to the RuLSIF formulas in [1].
        for sigma_idx, sigma in enumerate(sigma_widths):

            # update the kernel for the new sigma without recomputing all distances
            K_ref = update_sigma_gaussian_kernel(K_ref, old_sigma, sigma)
            K_test = update_sigma_gaussian_kernel(K_test, old_sigma, sigma)

            # keep track of the sigma we used, so we keep updating our kernel
            old_sigma = sigma

            for fold_idx in range(cross_folds):

                # get the gaussian kernels for the current fold
                K_ref_trainingSet = K_ref[:, reference_samples_cv_idxs[reference_samples_cv_split != fold_idx]]
                K_test_trainingSet = K_test[:, test_samples_cv_idxs[test_samples_cv_split != fold_idx]]

                # compute the matrices for the current trainings fold
                H_h_KthFold = compute_H_hat(alpha, K_ref_trainingSet, K_test_trainingSet)
                h_h_KthFold = compute_h_hat(K_ref_trainingSet)

                # Select the subset of the kernel matrix not used in the training set
                # for use as the test set to validate against
                K_ref_testSet = K_ref[:, reference_samples_cv_idxs[reference_samples_cv_split == fold_idx]]
                K_test_testSet = K_test[:, test_samples_cv_idxs[test_samples_cv_split == fold_idx]]

                for lambda_idx, lambda_candidate in enumerate(lambda_candidates):
                    # Note: This is the absolute hot path of the complete function, if we can speed up anything
                    # within this loop, we will save a lot of time!

                    # compute the theta for the given parameters
                    theta_h_KthFold = compute_theta_hat(H_h_KthFold, identity*lambda_candidate, h_h_KthFold)

                    # compute the g_of_theta for the given parameters
                    r_alpha_Xref = g_of_x_theta(K_ref_testSet, theta_h_KthFold)
                    r_alpha_Xtest = g_of_x_theta(K_test_testSet, theta_h_KthFold)

                    # Calculate the objective function J(theta) under the current parameters and update the scores
                    cross_validation_scores[sigma_idx, lambda_idx] += j_of_theta(alpha, r_alpha_Xref, r_alpha_Xtest)

        return cross_validation_scores/cross_folds

    def compute_model_parameters(self, reference_samples=None, test_samples=None, gaussian_centers=None):
        """
        Computes model parameters via k-fold cross validation process
        """

        # get the parameter candidates
        sigma_widths = self.compute_gaussian_width_candidates(reference_samples, test_samples)
        lambda_candidates = self.generate_regularization_params()

        # compute the scores
        cross_validation_scores = self.cross_validate(reference_samples, test_samples, gaussian_centers, sigma_widths,
                                                      lambda_candidates, self.alpha, self.kernel_number,
                                                      self.cross_folds)

        # check for the minimal score within the cross_validation
        cv_min_idx_for_sigma, cv_min_idx_for_lambda = np.unravel_index(cross_validation_scores.argmin(),
                                                                       cross_validation_scores.shape)

        # get the optimal values for both parameters
        optimal_sigma = sigma_widths[cv_min_idx_for_sigma]
        optimal_lambda = lambda_candidates[cv_min_idx_for_lambda]
        return optimal_sigma, optimal_lambda

    def train(self, reference_samples=None, test_samples=None):
        """
        Learn the proper model parameters if we did not specify them already
        """

        self.gaussian_centers = self.generate_gaussian_centers(reference_samples)

        # check whether we need to find the model parameters or if they have been given
        if self.cv:
            optimal_sigma, optimal_lambda = self.compute_model_parameters(reference_samples,
                                                                          test_samples, self.gaussian_centers)

            self.sigma_width = optimal_sigma
            self.lambda_regularizer = optimal_lambda

    def apply(self, reference_samples=None, test_samples=None):
        """
        Estimates the alpha-relative Pearson divergence as determined by the relative
        ratio of probability densities:
           P(reference_samples[x]) / (alpha * P(reference_samples[x]) + (1 - alpha) * P(test_samples[x]))
        from samples:
           reference_samples[x_i] | reference_samples[x_i] in R^{d}, with i=1 to reference_samples{N}
        drawn independently of P(reference_samples[x])
        and from samples:
           test_samples[x_j] | test_samples[x_j] in R^{d}, with j=1 to test_samples{N}
        drawn independently from P(test_samples[x])
        After the model hyperparameters have been learned and chosen by the train()
        method, the RULSIF algorithm can be applied repeatedly on both in-sample and out
        of sample data
        """

        if self.gaussian_centers is None or self.kernel_number is None:
            raise Exception("Missing kernel basis function parameters")

        if self.sigma_width == 0.0 or self.lambda_regularizer == 0.0:
            raise Exception("Missing model selection parameters")

        divergence_estimator = PearsonRelativeDivergenceEstimator(self.alpha, self.sigma_width,
                                                                  self.lambda_regularizer, self.kernel_number)
        (pe_alpha, r_alpha_Xtest) = divergence_estimator.apply(reference_samples, test_samples, self.gaussian_centers)

        return pe_alpha

    def __call__(self, reference_samples: np.ndarray, test_samples: np.ndarray):

        # make the normalization as done in
        # https://github.com/anewgithubname/change_detection/blob/main/change_detection.m
        all_samples = np.c_[reference_samples, test_samples]
        std = np.std(all_samples, axis=1) + np.finfo(float).eps
        reference_samples /= std[:, None]
        test_samples /= std[:, None]

        # get the parameters estimation
        self.train(reference_samples, test_samples)

        # compute the result
        return self.apply(reference_samples, test_samples)

apply(reference_samples=None, test_samples=None)

Estimates the alpha-relative Pearson divergence as determined by the relative ratio of probability densities: P(reference_samples[x]) / (alpha * P(reference_samples[x]) + (1 - alpha) * P(test_samples[x])) from samples: reference_samples[x_i] | reference_samples[x_i] in R^{d}, with i=1 to reference_samples{N} drawn independently of P(reference_samples[x]) and from samples: test_samples[x_j] | test_samples[x_j] in R^{d}, with j=1 to test_samples{N} drawn independently from P(test_samples[x]) After the model hyperparameters have been learned and chosen by the train() method, the RULSIF algorithm can be applied repeatedly on both in-sample and out of sample data

Source code in changepoynt/utils/densityratioestimation.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
def apply(self, reference_samples=None, test_samples=None):
    """
    Estimates the alpha-relative Pearson divergence as determined by the relative
    ratio of probability densities:
       P(reference_samples[x]) / (alpha * P(reference_samples[x]) + (1 - alpha) * P(test_samples[x]))
    from samples:
       reference_samples[x_i] | reference_samples[x_i] in R^{d}, with i=1 to reference_samples{N}
    drawn independently of P(reference_samples[x])
    and from samples:
       test_samples[x_j] | test_samples[x_j] in R^{d}, with j=1 to test_samples{N}
    drawn independently from P(test_samples[x])
    After the model hyperparameters have been learned and chosen by the train()
    method, the RULSIF algorithm can be applied repeatedly on both in-sample and out
    of sample data
    """

    if self.gaussian_centers is None or self.kernel_number is None:
        raise Exception("Missing kernel basis function parameters")

    if self.sigma_width == 0.0 or self.lambda_regularizer == 0.0:
        raise Exception("Missing model selection parameters")

    divergence_estimator = PearsonRelativeDivergenceEstimator(self.alpha, self.sigma_width,
                                                              self.lambda_regularizer, self.kernel_number)
    (pe_alpha, r_alpha_Xtest) = divergence_estimator.apply(reference_samples, test_samples, self.gaussian_centers)

    return pe_alpha

compute_gaussian_width_candidates(reference_samples, test_samples) staticmethod

Compute a candidate list of Gaussian kernel widths. The best width will be selected via cross-validation

Jaakkola's heuristic method for setting the width parameter of the Gaussian radial basis function kernel is to pick a quantile (usually the median) of the distribution of Euclidean distances between points having different labels. Reference:

Jaakkola T S, Haussler D. Exploiting Generative Models in Discriminative Classifiers[J]. Advances in Neural Information Processing Systems, 1998, 11(11):487--493.

Jaakkola, M. Diekhaus, and D. Haussler. Using the Fisher kernel method to detect remote protein homologies. In T. Lengauer, R. Schneider, P. Bork, D. Brutlad, J. Glasgow, H.- W. Mewes, and R. Zimmer, editors, Proceedings of the Seventh International Conference on Intelligent Systems for Molecular Biology.

It is the same technique as proposed in [1]

Source code in changepoynt/utils/densityratioestimation.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
@staticmethod
def compute_gaussian_width_candidates(reference_samples: np.ndarray, test_samples: np.ndarray):
    """
    Compute a candidate list of Gaussian kernel widths. The best width will be
    selected via cross-validation

    Jaakkola's heuristic method for setting the width parameter of the Gaussian
    radial basis function kernel is to pick a quantile (usually the median) of
    the distribution of Euclidean distances between points having different
    labels.
    Reference:

    Jaakkola T S, Haussler D. Exploiting Generative Models in Discriminative Classifiers[J].
    Advances in Neural Information Processing Systems, 1998, 11(11):487--493.

    Jaakkola, M. Diekhaus, and D. Haussler. Using the Fisher kernel method to detect
    remote protein homologies. In T. Lengauer, R. Schneider, P. Bork, D. Brutlad, J.
    Glasgow, H.- W. Mewes, and R. Zimmer, editors, Proceedings of the Seventh
    International Conference on Intelligent Systems for Molecular Biology.

    It is the same technique as proposed in [1]
    """
    # create a complete sample array
    samples = np.c_[reference_samples, test_samples]

    # compute the pairwise distances
    distances = scipy.spatial.distance.pdist(samples.T, 'sqeuclidean')

    # apply the same formula as in
    # https://github.com/anewgithubname/change_detection/blob/main/lib/comp_med.m
    median_distance = np.sqrt(0.5 * np.median(distances[distances > 0]))

    return median_distance * np.array([0.6, 0.8, 1, 1.2, 1.4])

compute_model_parameters(reference_samples=None, test_samples=None, gaussian_centers=None)

Computes model parameters via k-fold cross validation process

Source code in changepoynt/utils/densityratioestimation.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def compute_model_parameters(self, reference_samples=None, test_samples=None, gaussian_centers=None):
    """
    Computes model parameters via k-fold cross validation process
    """

    # get the parameter candidates
    sigma_widths = self.compute_gaussian_width_candidates(reference_samples, test_samples)
    lambda_candidates = self.generate_regularization_params()

    # compute the scores
    cross_validation_scores = self.cross_validate(reference_samples, test_samples, gaussian_centers, sigma_widths,
                                                  lambda_candidates, self.alpha, self.kernel_number,
                                                  self.cross_folds)

    # check for the minimal score within the cross_validation
    cv_min_idx_for_sigma, cv_min_idx_for_lambda = np.unravel_index(cross_validation_scores.argmin(),
                                                                   cross_validation_scores.shape)

    # get the optimal values for both parameters
    optimal_sigma = sigma_widths[cv_min_idx_for_sigma]
    optimal_lambda = lambda_candidates[cv_min_idx_for_lambda]
    return optimal_sigma, optimal_lambda

generate_gaussian_centers(reference_samples=None)

Choose Gaussian centers randomly from the reference samples.

Source code in changepoynt/utils/densityratioestimation.py
365
366
367
368
369
370
371
372
373
374
375
def generate_gaussian_centers(self, reference_samples=None):
    """
    Choose Gaussian centers randomly from the reference samples.
    """
    numcols = reference_samples.shape[1]
    reference_sample_idxs = np.random.permutation(numcols)

    self.kernel_number = min(self.kernel_number, numcols)
    gaussian_centers = reference_samples[:, reference_sample_idxs[0:self.kernel_number]]

    return gaussian_centers

generate_regularization_params() staticmethod

Generates a candidate list of regularization parameters to be used with the L1 regularizer term of RULSIF optimization problem. The best regularizer parameter will be chosen via cross-validation. The values itself are taken from the paper [1].

Source code in changepoynt/utils/densityratioestimation.py
355
356
357
358
359
360
361
362
363
@staticmethod
def generate_regularization_params():
    """
    Generates a candidate list of regularization parameters to be used
    with the L1 regularizer term of RULSIF optimization problem.  The
    best regularizer parameter will be chosen via cross-validation.
    The values itself are taken from the paper [1].
    """
    return 10.0 ** np.array([-3, -2, -1, 0, 1])

train(reference_samples=None, test_samples=None)

Learn the proper model parameters if we did not specify them already

Source code in changepoynt/utils/densityratioestimation.py
467
468
469
470
471
472
473
474
475
476
477
478
479
480
def train(self, reference_samples=None, test_samples=None):
    """
    Learn the proper model parameters if we did not specify them already
    """

    self.gaussian_centers = self.generate_gaussian_centers(reference_samples)

    # check whether we need to find the model parameters or if they have been given
    if self.cv:
        optimal_sigma, optimal_lambda = self.compute_model_parameters(reference_samples,
                                                                      test_samples, self.gaussian_centers)

        self.sigma_width = optimal_sigma
        self.lambda_regularizer = optimal_lambda

compute_H_hat(alpha=0.0, kernel_matrix_ref_samples=None, kernel_matrix_test_samples=None)

Calculates the H_hat term of the theta_hat optimization problem

Source code in changepoynt/utils/densityratioestimation.py
146
147
148
149
150
151
152
153
154
155
156
157
@nb.njit()
def compute_H_hat(alpha=0.0, kernel_matrix_ref_samples=None, kernel_matrix_test_samples=None):
    """
    Calculates the H_hat term of the theta_hat optimization problem
    """
    n_ref = kernel_matrix_ref_samples.shape[1]
    n_test = kernel_matrix_test_samples.shape[1]

    H_hat = (alpha / n_ref) * kernel_matrix_ref_samples@kernel_matrix_ref_samples.T + \
            ((1.0 - alpha) / n_test) * kernel_matrix_test_samples@kernel_matrix_test_samples.T

    return H_hat

compute_distance(samples, sample_means)

Compute the distances between points in the sample's feature space to points along the center of the distribution

Source code in changepoynt/utils/densityratioestimation.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
@nb.njit()
def compute_distance(samples: np.ndarray, sample_means: np.ndarray) -> np.ndarray:
    """
    Compute the distances between points in the sample's feature space
    to points along the center of the distribution
    """

    # get the amount of samples
    (sd, sample_cols) = samples.shape
    (md, mean_cols) = sample_means.shape

    # compute the squared sums
    squared_samples = np.sum(samples ** 2, 0)
    squared_means = np.sum(sample_means ** 2, 0)

    # compute the kernel matrix by using (a-b)^2 = a^2 - 2ab + b^2
    """kernel_matrix = np.tile(squared_means, (sample_cols, 1))\
                    + np.tile(squared_samples, (mean_cols, 1)).T\
                    - 2 * np.dot(samples.T, sample_means)"""
    repeated_squared_samples = np.zeros((mean_cols, sample_cols))
    for idx in range(mean_cols):
        repeated_squared_samples[idx, :] = squared_samples
    repeated_squared_means = np.zeros((sample_cols, mean_cols))
    for idx in range(sample_cols):
        repeated_squared_means[idx, :] = squared_means
    kernel_matrix = repeated_squared_means + repeated_squared_samples.T - 2 * samples.T@sample_means

    return kernel_matrix

compute_gaussian_kernel(samples, sample_means, sigma)

Computes an n-dimensional Gaussian/RBF kernel matrix by taking points in the sample's feature space and maps them to kernel coordinates in Hilbert space by calculating the distance to each point in the sample space and taking the Gaussian function of the distances. K(X,Y) = exp( -(|| X - Y ||^2) / (2 * sigma^2) ) where X is the matrix of data points in the sample space, Y is the matrix of gaussian centers in the sample space sigma is the width of the gaussian function being used

Source code in changepoynt/utils/densityratioestimation.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
@nb.njit()
def compute_gaussian_kernel(samples: np.ndarray, sample_means: np.ndarray, sigma: float) -> np.ndarray:
    """
    Computes an n-dimensional Gaussian/RBF kernel matrix by taking points
    in the sample's feature space and maps them to kernel coordinates in
    Hilbert space by calculating the distance to each point in the sample
    space and taking the Gaussian function of the distances.
       K(X,Y) = exp( -(|| X - Y ||^2) / (2 * sigma^2) )
    where X is the matrix of data points in the sample space,
          Y is the matrix of gaussian centers in the sample space
         sigma is the width of the gaussian function being used
    """
    squared_distance = compute_distance(samples, sample_means)
    # squared_distance = scipy.spatial.distance_matrix(samples.T, sample_means.T)**2
    return np.exp(-squared_distance / (2 * (sigma ** 2)))

compute_h_hat(kernel_matrix_ref_samples)

Calculates the h_hat term of the theta_hat optimization problem

Source code in changepoynt/utils/densityratioestimation.py
160
161
162
163
164
165
166
167
168
169
170
171
172
@nb.njit()
def compute_h_hat(kernel_matrix_ref_samples):
    """
    Calculates the h_hat term of the theta_hat optimization problem
    """
    h_hat = np.zeros((kernel_matrix_ref_samples.shape[0], 1))
    for idx in range(kernel_matrix_ref_samples.shape[0]):
        h_hat[idx, 0] = kernel_matrix_ref_samples[idx, :].mean()

    # unfortunately this is not numba friendly as axis parameter is not allowed
    # h_hat = np.mean(kernel_matrix_ref_samples, 1, keepdims=True)

    return h_hat

compute_theta_hat(H_hat, lambda_scaled_identity, h_hat)

Calculates theta_hat given H_hat, h_hat, lambda, and the kernel basis function Treat as a system of linear equations and find the exact, optimal solution

Source code in changepoynt/utils/densityratioestimation.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def compute_theta_hat(H_hat: np.ndarray, lambda_scaled_identity, h_hat: np.ndarray):
    """
    Calculates theta_hat given H_hat, h_hat, lambda, and the kernel basis function
    Treat as a system of linear equations and find the exact, optimal
    solution
    """
    # theta_hat = np.linalg.solve(H_hat + (lambda_regularizer * np.eye(kernel_basis)), h_hat)
    # due to the way H_hat is constructed it is symmetric (given some minor numerical errors)
    # see page 11 in [1] for the definition. As it is also real, its is automatically hermitian
    #
    # also we stole and shortened the scipy solve implementation to skip some Checks
    # theta_hat = scipy.linalg.solve(H_hat + (lambda_regularizer * np.eye(kernel_basis)), h_hat, assume_a='her',
    # check_finite=False)
    #
    # creating the identity within this function takes time (as we need to call the numpy function and create
    # the matrix) therefore we do it outside of the function
    return solve(H_hat + lambda_scaled_identity, h_hat)

g_of_x_theta(kernel_matrix_samples, theta_hat)

Calculate the alpha-relative density ratio kernel model

Source code in changepoynt/utils/densityratioestimation.py
204
205
206
207
208
209
@nb.njit()
def g_of_x_theta(kernel_matrix_samples: np.ndarray, theta_hat: np.ndarray) -> np.ndarray:
    """
    Calculate the alpha-relative density ratio kernel model
    """
    return theta_hat.T@kernel_matrix_samples

j_of_theta(alpha, g_xref_theta, g_xtest_theta)

Calculates the squared error criterion, J

Source code in changepoynt/utils/densityratioestimation.py
194
195
196
197
198
199
200
201
@nb.njit()
def j_of_theta(alpha: float, g_xref_theta: np.ndarray, g_xtest_theta: np.ndarray):
    """
    Calculates the squared error criterion, J
    """
    return ((alpha / 2.0) * (np.mean(g_xref_theta ** 2)) +
            ((1 - alpha) / 2.0) * (np.mean(g_xtest_theta ** 2)) -
            np.mean(g_xref_theta))

solve(a, b, lower=False, overwrite_a=False, overwrite_b=False)

DANGER! This implementation is stolen from within the scipy package and is equivalent to scipy.linalg.solve We reduced a lot of checks to tailor it to the given case within this project and make it as fast as possible as it is in the hottest path of RuLSIF and by far the most called function.

This might result in problems, as the checks are really minimal.

Source code in changepoynt/utils/densityratioestimation.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def solve(a, b, lower=False, overwrite_a=False, overwrite_b=False):
    """
    DANGER!
    This implementation is stolen from within the scipy package and is equivalent to scipy.linalg.solve
    We reduced a lot of checks to tailor it to the given case within this project and make it as fast as possible
    as it is in the hottest path of RuLSIF and by far the *most called function*.

    This might result in problems, as the checks are really minimal.
    """

    # calculate matrix norm using LAPACK
    lange = get_lapack_funcs('lange', (a,))
    anorm = lange('1', a)

    # this is the code stolen from scipy.linalg.solve for the option assume_a = "sym"
    sycon, sysv, sysv_lw = get_lapack_funcs(('sycon', 'sysv', 'sysv_lwork'), (a, b))
    lu, ipvt, x, info = sysv(a, b, lwork=_compute_lwork(sysv_lw, a.shape[0], lower),
                             lower=lower, overwrite_a=overwrite_a, overwrite_b=overwrite_b)

    # check whether the solver worked in LAPACK
    _solve_check(info)

    # can't tell what this is, but seems like we need it
    rcond, info = sycon(lu, ipvt, anorm)

    # Get the correct lamch function for doubles (can't use any other!)
    lamch = get_lapack_funcs('lamch', dtype='d')
    _solve_check(info, lamch, rcond)
    return x