Skip to content

Algorithms API

base_algorithm

This file contains the interface for the base algorithm used in the package.

SingularSubspaceAlgorithm

Bases: Algorithm

Source code in changepoynt/algorithms/base_algorithm.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class SingularSubspaceAlgorithm(Algorithm):

    # some parameters defined in the subclasses
    window_length: int
    n_windows: int
    lag: int
    scoring_step: int

    def covered_regions(self) -> tuple[int, int]:
        """
        This function returns the size of the region of interest (ROI). The ROI is the total amount of samples covered
        for the score detection.

        The covered region depends on the window size, the number of windows, and the lag between the two Hankel
        matrices (past and future Hankel matrices).

        The function return two different sizes:

        1) total_region: The total region covered by both matrices with lag. This is also the minimum number of samples
        that a time series must have before the subspace method can run.

        2) matrix_region: The region covered by one Hankel matrix. Essentially the region that is sampled for
        subsequences that are used to extract the characteristics

        :return: total_region: int, matrix_region: int
        """

        # the number of samples one Hankel matrix covers
        matrix_region = self.window_length + self.n_windows - 1

        # we have two Hankel matrices that might overlap
        total_region = matrix_region + self.lag

        return total_region, matrix_region

    def estimate_runtime(self, signal: np.ndarray, steps: int = 30, verbose: bool = False) -> [float, float]:
        """
        This function estimates the runtime for a given signal by running the initial steps.

        Singular subspace algorithms grow linearly with the signal length of the time series, the overall runtime
        is number of steps times signal length.

        :param signal: The signal that you will run the algorithm on.

        :param steps: The number of estimation steps. Increasing this number increases the runtime
                      but also increases the accuracy of the estimation.

        :param verbose: If true, prints out the runtime of the algorithm.

        :return: Runtime estimation in seconds, standard deviation in seconds
        """

        # get the required signal length
        total_covered_region = self.covered_regions()[0]

        # get the number of steps necessary for the whole time series
        processing_steps = (signal.shape[0] - total_covered_region)//self.scoring_step

        # check whether the signal is long enough
        if total_covered_region > signal.shape[0]:
            raise ValueError(f'Test signal for runtime estimation is not long enough: {signal.shape=} < {total_covered_region}')

        # shorten the signal so we only run one step
        if signal.ndim == 2:
            shortened_signal = signal[:total_covered_region+1, :].copy()
        elif signal.ndim == 1:
            shortened_signal = signal[:total_covered_region + 1].copy()
        else:
            raise ValueError(f'Test signal for runtime estimation has weird shape {signal.shape=}.')

        # trigger the potential JIT compilers
        self.transform(shortened_signal)

        # compute the scoring while measuring the time
        times = np.zeros(steps)
        for idx in range(steps):

            # make the transformation and measure the time
            start = time.perf_counter()
            self.transform(shortened_signal)
            timer = time.perf_counter() - start

            # fill the time
            times[idx] = timer


        # compute the time per step
        timer = np.mean(times)
        std = np.std(times)

        # multiply with the amounts of samples that we will be processing
        timer = timer * processing_steps
        std = std * processing_steps

        # check whether we want to print the result
        if verbose:
            print(f"For {signal.shape=} and the current parameters, the runtime will be around {timer:.3f} seconds (+/- {std:.3f} seconds).")
        return timer, std

    @property
    def first_score_position(self):
        return self.covered_regions()[0] - self.compute_offset() - self.scoring_step//2

    @abstractmethod
    def compute_offset(self) -> int:
        raise NotImplementedError

    @abstractmethod
    def transform(self, time_series: np.ndarray):
        raise NotImplementedError

covered_regions()

This function returns the size of the region of interest (ROI). The ROI is the total amount of samples covered for the score detection.

The covered region depends on the window size, the number of windows, and the lag between the two Hankel matrices (past and future Hankel matrices).

The function return two different sizes:

1) total_region: The total region covered by both matrices with lag. This is also the minimum number of samples that a time series must have before the subspace method can run.

2) matrix_region: The region covered by one Hankel matrix. Essentially the region that is sampled for subsequences that are used to extract the characteristics

Returns:

Type Description
tuple[int, int]

total_region: int, matrix_region: int

Source code in changepoynt/algorithms/base_algorithm.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def covered_regions(self) -> tuple[int, int]:
    """
    This function returns the size of the region of interest (ROI). The ROI is the total amount of samples covered
    for the score detection.

    The covered region depends on the window size, the number of windows, and the lag between the two Hankel
    matrices (past and future Hankel matrices).

    The function return two different sizes:

    1) total_region: The total region covered by both matrices with lag. This is also the minimum number of samples
    that a time series must have before the subspace method can run.

    2) matrix_region: The region covered by one Hankel matrix. Essentially the region that is sampled for
    subsequences that are used to extract the characteristics

    :return: total_region: int, matrix_region: int
    """

    # the number of samples one Hankel matrix covers
    matrix_region = self.window_length + self.n_windows - 1

    # we have two Hankel matrices that might overlap
    total_region = matrix_region + self.lag

    return total_region, matrix_region

estimate_runtime(signal, steps=30, verbose=False)

This function estimates the runtime for a given signal by running the initial steps.

Singular subspace algorithms grow linearly with the signal length of the time series, the overall runtime is number of steps times signal length.

Parameters:

Name Type Description Default
signal ndarray

The signal that you will run the algorithm on.

required
steps int

The number of estimation steps. Increasing this number increases the runtime but also increases the accuracy of the estimation.

30
verbose bool

If true, prints out the runtime of the algorithm.

False

Returns:

Type Description
[float, float]

Runtime estimation in seconds, standard deviation in seconds

Source code in changepoynt/algorithms/base_algorithm.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def estimate_runtime(self, signal: np.ndarray, steps: int = 30, verbose: bool = False) -> [float, float]:
    """
    This function estimates the runtime for a given signal by running the initial steps.

    Singular subspace algorithms grow linearly with the signal length of the time series, the overall runtime
    is number of steps times signal length.

    :param signal: The signal that you will run the algorithm on.

    :param steps: The number of estimation steps. Increasing this number increases the runtime
                  but also increases the accuracy of the estimation.

    :param verbose: If true, prints out the runtime of the algorithm.

    :return: Runtime estimation in seconds, standard deviation in seconds
    """

    # get the required signal length
    total_covered_region = self.covered_regions()[0]

    # get the number of steps necessary for the whole time series
    processing_steps = (signal.shape[0] - total_covered_region)//self.scoring_step

    # check whether the signal is long enough
    if total_covered_region > signal.shape[0]:
        raise ValueError(f'Test signal for runtime estimation is not long enough: {signal.shape=} < {total_covered_region}')

    # shorten the signal so we only run one step
    if signal.ndim == 2:
        shortened_signal = signal[:total_covered_region+1, :].copy()
    elif signal.ndim == 1:
        shortened_signal = signal[:total_covered_region + 1].copy()
    else:
        raise ValueError(f'Test signal for runtime estimation has weird shape {signal.shape=}.')

    # trigger the potential JIT compilers
    self.transform(shortened_signal)

    # compute the scoring while measuring the time
    times = np.zeros(steps)
    for idx in range(steps):

        # make the transformation and measure the time
        start = time.perf_counter()
        self.transform(shortened_signal)
        timer = time.perf_counter() - start

        # fill the time
        times[idx] = timer


    # compute the time per step
    timer = np.mean(times)
    std = np.std(times)

    # multiply with the amounts of samples that we will be processing
    timer = timer * processing_steps
    std = std * processing_steps

    # check whether we want to print the result
    if verbose:
        print(f"For {signal.shape=} and the current parameters, the runtime will be around {timer:.3f} seconds (+/- {std:.3f} seconds).")
    return timer, std

baseline

MovingWindow

Bases: Algorithm

This class implements moving window algorithms that are as simple as possible for sanity checking.

This class implements a similar ideas as in: Current Time Series Anomaly Detection Benchmarks are Flawed and are Creating the Illusion of Progress Renjie Wu and Eamonn J. Keogh IEEE TRANSACTIONS ON KNOWLEDGE AND DATA ENGINEERING 2023

Source code in changepoynt/algorithms/baseline.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class MovingWindow(Algorithm):
    """
    This class implements moving window algorithms that are as simple as possible for sanity checking.

    This class implements a similar ideas as in:
    Current Time Series Anomaly Detection Benchmarks are Flawed and are Creating the Illusion of Progress
    Renjie Wu and Eamonn J. Keogh
    IEEE TRANSACTIONS ON KNOWLEDGE AND DATA ENGINEERING
    2023
    """

    def __init__(self, window_length: int, method: str = 'mean') -> None:
        super().__init__()

        # define the possible methods
        possible_methods = {"mean", "var", "meanvar"}

        # save the input parameters
        self.__fit = False
        assert window_length > 0, 'Window length must be greater than zero.'
        self.window_length = window_length
        assert method in possible_methods, f'Method must be one of the following: {possible_methods}.'
        self.method = method


    def fit(self, time_series: np.ndarray) -> None:
        """
        Check for the properties of the input.
        :param time_series:
        :return:
        """
        # check the dimensions of the input array
        assert time_series.ndim == 1, "Time series needs to be an 1D array."

        # check that we have at least two windows
        assert time_series.shape[0] > 2*self.window_length, 'Time series needs to be longer than 2x window length.'

        self.__fit = True

    def transform(self, time_series: np.ndarray) -> np.ndarray:
        if not self.__fit:
            self.fit(time_series)

        # use numpy tricks to create the sliding window
        sliding_window = np.lib.stride_tricks.sliding_window_view(time_series, self.window_length)
        sliding_window_var = np.var(sliding_window, axis=-1)
        sliding_window_mean = np.mean(sliding_window, axis=-1)

        # create a time series for the scores
        score = np.zeros_like(time_series)
        if self.method.startswith('mean'):
            score[self.window_length:-self.window_length+1] += np.abs(sliding_window_mean[:-self.window_length]
                                                                      - sliding_window_mean[self.window_length:])
        if self.method.endswith('var'):
            score[self.window_length:-self.window_length+1] += np.abs(sliding_window_var[:-self.window_length]
                                                                      - sliding_window_var[self.window_length:])
        return score

fit(time_series)

Check for the properties of the input.

Parameters:

Name Type Description Default
time_series ndarray
required

Returns:

Type Description
None
Source code in changepoynt/algorithms/baseline.py
67
68
69
70
71
72
73
74
75
76
77
78
79
def fit(self, time_series: np.ndarray) -> None:
    """
    Check for the properties of the input.
    :param time_series:
    :return:
    """
    # check the dimensions of the input array
    assert time_series.ndim == 1, "Time series needs to be an 1D array."

    # check that we have at least two windows
    assert time_series.shape[0] > 2*self.window_length, 'Time series needs to be longer than 2x window length.'

    self.__fit = True

ZERO

Bases: Algorithm

This class implements the ZERO baseline algorithm.

"An Evaluation of Change Point Detection Algorithms" Gerrit J. J. van den Burg and Christopher K. I. Williams arXiv preprint arXiv:2003.06222, 2020

Source code in changepoynt/algorithms/baseline.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ZERO(Algorithm):
    """
    This class implements the ZERO baseline algorithm.

    "An Evaluation of Change Point Detection Algorithms"
    Gerrit J. J. van den Burg and Christopher K. I. Williams
    arXiv preprint arXiv:2003.06222, 2020
    """

    def __init__(self) -> None:
        super().__init__()

    def fit(self, time_series: np.ndarray) -> None:
        pass

    def transform(self, time_series: np.ndarray) -> np.ndarray:
        return np.zeros_like(time_series)

sst

SST

Bases: SingularSubspaceAlgorithm

This class implements all the utility and functionality necessary to compute the SST change point detection algorithm as described in:

[1] Idé, Tsuyoshi, and Keisuke Inoue. "Knowledge discovery from heterogeneous dynamic systems using change-point correlations." Proceedings of the 2005 SIAM international conference on data mining. Society for Industrial and Applied Mathematics, 2005.

[2] Idé, Tsuyoshi, and Koji Tsuda. "Change-point detection using krylov subspace learning." Proceedings of the 2007 SIAM International Conference on Data Mining. Society for Industrial and Applied Mathematics, 2007.

It also uses a technique called randomized singular value decomposition which is surveyed and described in

[3] Halko, Nathan, Per-Gunnar Martinsson, and Joel A. Tropp. "Finding structure with randomness: Probabilistic algorithms for constructing approximate matrix decompositions." SIAM review 53.2 (2011): 217-288.

For the option fast_hankel=True it uses and algorithm based on

[4] L. Weber and R. Lenz. "Accelerating Singular Spectrum Transformation for Scalable Change Point Detection," in IEEE Access, Volume 11, 2025. doi: 10.1109/ACCESS.2025.3640386.

There will be a parameter specifying, whether to use the implicit krylov approximation from [2]. This significantly speeds up the computation but can reduce accuracy as the "eigensignals" are only approximated indirectly using a krylov subspace with the possible change point signal as seed. This method also deploys a feedback of the dominant "eigensignal" as seed into the power method (dominant eigenvector estimation) with additive gaussian noise.

!NOTE!: Most computational heavy functions are implemented as standalone functions, even if they require instance variables as this enables us to use jit compiled code provided by the numba compiler for faster calculations.

Also, we decided to do type hinting but not type checking as it requires too much boilerplate code. We recommend to input the specified types as the program will break in unforeseen ways otherwise.

Source code in changepoynt/algorithms/sst.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
class SST(SingularSubspaceAlgorithm):
    """
    This class implements all the utility and functionality necessary to compute the SST change point detection
    algorithm as described in:

    [1]
    Idé, Tsuyoshi, and Keisuke Inoue.
    "Knowledge discovery from heterogeneous dynamic systems using change-point correlations."
    Proceedings of the 2005 SIAM international conference on data mining.
    Society for Industrial and Applied Mathematics, 2005.

    [2]
    Idé, Tsuyoshi, and Koji Tsuda.
    "Change-point detection using krylov subspace learning."
    Proceedings of the 2007 SIAM International Conference on Data Mining.
    Society for Industrial and Applied Mathematics, 2007.

    It also uses a technique called randomized singular value decomposition which is surveyed and described in

    [3]
    Halko, Nathan, Per-Gunnar Martinsson, and Joel A. Tropp.
    "Finding structure with randomness: Probabilistic algorithms for constructing approximate matrix decompositions."
    SIAM review 53.2 (2011): 217-288.

    For the option fast_hankel=True it uses and algorithm based on

    [4]
    L. Weber and R. Lenz.
    "Accelerating Singular Spectrum Transformation for Scalable Change Point Detection,"
    in IEEE Access, Volume 11, 2025.
    doi: 10.1109/ACCESS.2025.3640386.

    There will be a parameter specifying, whether to use the implicit krylov approximation from [2]. This
    significantly speeds up the computation but can reduce accuracy as the "eigensignals" are only approximated
    indirectly using a krylov subspace with the possible change point signal as seed. This method also deploys
    a feedback of the dominant "eigensignal" as seed into the power method (dominant eigenvector estimation)
    with additive gaussian noise.

    !NOTE!:
    Most computational heavy functions are implemented as standalone functions, even if they require instance variables
    as this enables us to use jit compiled code provided by the numba compiler for faster calculations.

    Also, we decided to do type hinting but not type checking as it requires too much boilerplate code. We recommend
    to input the specified types as the program will break in unforeseen ways otherwise.
    """

    def __init__(self, window_length: int, n_windows: int = None, lag: int = None, rank: int = 5, scale: bool = True,
                 method: str = 'ika', lanczos_rank: int = None, random_rank: int = None,
                 feedback_noise_level: float = 1e-3, scoring_step: int = 1, use_fast_hankel: bool = False,
                 mitigate_offset: bool = False) -> None:
        """
        Initializing the Singular Spectrum Transformation (SST) requires setting a lot of parameters. See the parameters
        explanation for some intuition into the right choices. Currently, there are two SST methods from [1] and [2]
        available for use.

        :param window_length: This specifies the length of the time series (in samples), which will be used to extract
        the representative "eigensignals" to be compared before and after the lag. The window length should be big
        enough to cover any wanted patterns (e.g., bigger than the periodicity of periodic signals).

        :param n_windows: This specifies the number of consecutive time windows used to extract the "eigensignals" from
        the given time series. It should be big enough to cover different parts of the target behavior. If one does not
        specify this value, we use the rule of thumb and take as many time windows as you specified the length of the
        window (rule of thumb).

        :param lag: This parameter specifies the distance of the comparison for behaviors. In easy terms it tells the
        algorithms how far it should look into the future to find change in behavior to the current signal. If you do
        not specify this parameter, we use a rule of thumb and look ahead half of the window length you specified to
        cover the behavior.

        :param rank: This parameter specifies the amount of "eigensignals" which will be used to measure the
        dissimilarity of the signal in the future behavior. As a rule of thumb, we take the five most dominant
        "eigensignals" if you do not specify otherwise.

        :param scale: Due to numeric stability, we REALLY RECOMMEND scaling the signal into a restricted value range.
        Per default, we use a min max scaling to ensure a restricted value range. In the presence of extreme outliers,
        this could cause problems, as the signal will be squished.

        :param method: Currently, "svd" [1], "ika" [2] and "rsvd" are available. ika corresponds to IKA-SST in [2] which
        will speed up the computation significantly.

        :param lanczos_rank: In order to use the implicit approximation of "eigensignals" by using "ika" [2] method,
        one needs to decide the rank of the implicit approximation of each "eigensignal". As a rule of thumb, we
        determine this value as being twice the amount of specified "eigensignals" to span the subspace
        (parameter rank). This is also recommended by the author of IKA-SST in [2].

        :param random_rank: In order to use the randomized singular value decomposition, one needs to provide a
        randomized rank (size of second matrix dimension for the randomized matrix) as specified in [3]. The lower
        this value, the faster the computation but the higher the error (as the approximation gets worse).

        :param feedback_noise_level: This specifies the amplitude of additive white gaussian noise added to the dominant
        "eigensignal" of the future behavior when shifting forward. This idea is noted in [2] and initializes
        the seed of the power method for dominant eigenvector estimation with the precious dominant eigenvector
        plus the noise level specified here. The noise level should just be a small fraction of the value range
        of the signal.

        :param scoring_step: the distance between scoring steps in samples (e.g., 2 would half the computation).

        :param use_fast_hankel: Use the O(N*logN) version for the decomposition. This is likely to have large
        speed improvements for window sizes > 150

        :param mitigate_offset: Use a sliding mean window to mitigate the constant offset of time series.
        """

        # save the specified parameters into instance variables
        self.window_length = window_length
        self.n_windows = n_windows
        self.lag = lag
        self.rank = rank
        self.scale = scale
        self.method = method
        self.lanczos_rank = lanczos_rank
        self.random_rank = random_rank
        self.noise = feedback_noise_level
        self.scoring_step = scoring_step
        self.use_fast_hankel = use_fast_hankel
        self.mitigate_offset = mitigate_offset

        # set some default values when they have not been specified
        if self.n_windows is None:
            self.n_windows = self.window_length
        if self.lag is None:
            # rule of thumb
            self.lag = max(self.n_windows // 3, 1)
        if self.lanczos_rank is None:
            # make rank even and multiply by two just as specified in [2]
            self.lanczos_rank = self.rank * 2 - (self.rank & 1)
        if self.random_rank is None:
            # compute the rank as specified in [3] and
            # https://scikit-learn.org/stable/modules/generated/sklearn.utils.extmath.randomized_svd.html
            self.random_rank = min(self.rank + 10, self.window_length, self.n_windows)

        # specify the methods and their corresponding functions as lambda functions expecting only the
        # 1) future hankel matrix,
        # 2) the current hankel matrix and
        # 3) the feedback vector (e.g. for dominant eigenvector feedback)
        # all other parameter should be specified as values in the partial lambda function
        #
        # We recommend using:
        # 1) rsvd
        # 2) ika if speed is a concern
        # 3) weighted if noise is low
        self.methods = {'ika': partial(_implicit_krylov_approximation,
                                       rank=self.rank,
                                       lanczos_rank=self.lanczos_rank),
                        'svd': partial(_rayleigh_singular_value_decomposition,
                                       rank=self.rank),
                        'rsvd': partial(_random_singular_value_decomposition,
                                        rank=self.rank,
                                        randomized_rank=self.random_rank),
                        'fbrsvd': partial(_facebook_random_singular_value_decomposition,
                                          rank=self.rank,
                                          randomized_rank=self.random_rank),
                        'naive': partial(_naive_singular_value_decomposition,
                                          rank=self.rank),
                        'naive updated': partial(_naive_singular_value_decomposition_updated_score,
                                         rank=self.rank),
                        'weighted': partial(_weighted_random_singular_value_decomposition,
                                            rank=self.rank,
                                            randomized_rank=self.random_rank),
                        'symmetric': partial(_symmetric_random_singular_value_decomposition,
                                            rank=self.rank,
                                            randomized_rank=self.random_rank),
                        }
        if self.method not in self.methods:
            raise ValueError(f'Method {self.method} not defined. Possible methods: {list(self.methods.keys())}.')

        # set up the methods we use for the construction of the hankel matrix (either it is the fft representation
        # of the other one)
        if use_fast_hankel and self.method not in ["rsvd", "ika", "weighted", "symmetric"]:
            raise ValueError(f'{self.method} method is not defined with use_fast_hankel=True')
        self.hankel_construction = {
            False: lg.compile_hankel,
            True: lg.HankelFFTRepresentation
        }

        # We can only use one of the following options: mitigate_offset OR use_fast_hankel (read: exclusive or, XOR)
        # or none of both
        #
        # the reasons is that fast hankel does the matrix multiplication via an FFT convolution over the whole Hankel
        # matrix at once
        #
        # mitigate_offset works by subtracting column wise mean values (one per subsequence that construct the Hankel
        # matrix.
        #
        # When using the FFT convolution, we cannot subtract column wise mean values
        if self.use_fast_hankel and self.mitigate_offset:
            raise ValueError(f'use_fast_hankel={self.use_fast_hankel} is not allowed when mitigate_offset={self.mitigate_offset}. You can only use one or none of them.')

        # check whether the method is correct
        assert self.method in self.methods, f'Specified method {self.method} is not available in {self.methods.keys()}.'

    def compute_offset(self) -> int:
        return self.n_windows // 2 + self.lag

    def transform(self, time_series: np.ndarray) -> np.ndarray:
        """
        This function calculates the anomaly score for each sample within the time series, starting from an initial
        sample being the first sample of fitting the past hankel matrix (window_length + n_windows samples) and the new
        future hankel matrix (+lag). Values before that will be zero

        It also does some assertions regarding the specified parameters and whether they fit the time series.

        This function builds the interface to the jit compiled static function used to iterate over the array.

        :param time_series: 1D array containing the time series to be scored
        :return: anomaly score
        """

        # check the dimensions of the input array
        assert time_series.ndim == 1, "Time series needs to be an 1D array."

        # compute the starting point of the scoring (past and future hankel need to fit)
        starting_point = self.covered_regions()[0]
        assert starting_point < time_series.shape[0], "The time series is too short to score any points."

        # scale the time series (or just copy it if already scaled)
        if self.scale:
            time_series = normalization.min_max_scaling(time_series, min_val=1.0, max_val=2.0, inplace=False)
        else:
            time_series = time_series.copy()

        # get the changepoint scorer from the different methods
        scoring_function = self.methods[self.method]
        hankel_function = self.hankel_construction[self.use_fast_hankel]

        # start the scaling itself by calling the jit compiled staticmethod and return the result
        score = _transform(time_series=time_series, start_idx=starting_point, offset=self.compute_offset(),
                           window_length=self.window_length, n_windows=self.n_windows, lag=self.lag,
                           scoring_step=self.scoring_step, scoring_function=scoring_function,
                           hankel_construction_function=hankel_function,
                           mitigate_offset=self.mitigate_offset)
        return score

__init__(window_length, n_windows=None, lag=None, rank=5, scale=True, method='ika', lanczos_rank=None, random_rank=None, feedback_noise_level=0.001, scoring_step=1, use_fast_hankel=False, mitigate_offset=False)

Initializing the Singular Spectrum Transformation (SST) requires setting a lot of parameters. See the parameters explanation for some intuition into the right choices. Currently, there are two SST methods from [1] and [2] available for use.

Parameters:

Name Type Description Default
window_length int

This specifies the length of the time series (in samples), which will be used to extract the representative "eigensignals" to be compared before and after the lag. The window length should be big enough to cover any wanted patterns (e.g., bigger than the periodicity of periodic signals).

required
n_windows int

This specifies the number of consecutive time windows used to extract the "eigensignals" from the given time series. It should be big enough to cover different parts of the target behavior. If one does not specify this value, we use the rule of thumb and take as many time windows as you specified the length of the window (rule of thumb).

None
lag int

This parameter specifies the distance of the comparison for behaviors. In easy terms it tells the algorithms how far it should look into the future to find change in behavior to the current signal. If you do not specify this parameter, we use a rule of thumb and look ahead half of the window length you specified to cover the behavior.

None
rank int

This parameter specifies the amount of "eigensignals" which will be used to measure the dissimilarity of the signal in the future behavior. As a rule of thumb, we take the five most dominant "eigensignals" if you do not specify otherwise.

5
scale bool

Due to numeric stability, we REALLY RECOMMEND scaling the signal into a restricted value range. Per default, we use a min max scaling to ensure a restricted value range. In the presence of extreme outliers, this could cause problems, as the signal will be squished.

True
method str

Currently, "svd" [1], "ika" [2] and "rsvd" are available. ika corresponds to IKA-SST in [2] which will speed up the computation significantly.

'ika'
lanczos_rank int

In order to use the implicit approximation of "eigensignals" by using "ika" [2] method, one needs to decide the rank of the implicit approximation of each "eigensignal". As a rule of thumb, we determine this value as being twice the amount of specified "eigensignals" to span the subspace (parameter rank). This is also recommended by the author of IKA-SST in [2].

None
random_rank int

In order to use the randomized singular value decomposition, one needs to provide a randomized rank (size of second matrix dimension for the randomized matrix) as specified in [3]. The lower this value, the faster the computation but the higher the error (as the approximation gets worse).

None
feedback_noise_level float

This specifies the amplitude of additive white gaussian noise added to the dominant "eigensignal" of the future behavior when shifting forward. This idea is noted in [2] and initializes the seed of the power method for dominant eigenvector estimation with the precious dominant eigenvector plus the noise level specified here. The noise level should just be a small fraction of the value range of the signal.

0.001
scoring_step int

the distance between scoring steps in samples (e.g., 2 would half the computation).

1
use_fast_hankel bool

Use the O(N*logN) version for the decomposition. This is likely to have large speed improvements for window sizes > 150

False
mitigate_offset bool

Use a sliding mean window to mitigate the constant offset of time series.

False
Source code in changepoynt/algorithms/sst.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def __init__(self, window_length: int, n_windows: int = None, lag: int = None, rank: int = 5, scale: bool = True,
             method: str = 'ika', lanczos_rank: int = None, random_rank: int = None,
             feedback_noise_level: float = 1e-3, scoring_step: int = 1, use_fast_hankel: bool = False,
             mitigate_offset: bool = False) -> None:
    """
    Initializing the Singular Spectrum Transformation (SST) requires setting a lot of parameters. See the parameters
    explanation for some intuition into the right choices. Currently, there are two SST methods from [1] and [2]
    available for use.

    :param window_length: This specifies the length of the time series (in samples), which will be used to extract
    the representative "eigensignals" to be compared before and after the lag. The window length should be big
    enough to cover any wanted patterns (e.g., bigger than the periodicity of periodic signals).

    :param n_windows: This specifies the number of consecutive time windows used to extract the "eigensignals" from
    the given time series. It should be big enough to cover different parts of the target behavior. If one does not
    specify this value, we use the rule of thumb and take as many time windows as you specified the length of the
    window (rule of thumb).

    :param lag: This parameter specifies the distance of the comparison for behaviors. In easy terms it tells the
    algorithms how far it should look into the future to find change in behavior to the current signal. If you do
    not specify this parameter, we use a rule of thumb and look ahead half of the window length you specified to
    cover the behavior.

    :param rank: This parameter specifies the amount of "eigensignals" which will be used to measure the
    dissimilarity of the signal in the future behavior. As a rule of thumb, we take the five most dominant
    "eigensignals" if you do not specify otherwise.

    :param scale: Due to numeric stability, we REALLY RECOMMEND scaling the signal into a restricted value range.
    Per default, we use a min max scaling to ensure a restricted value range. In the presence of extreme outliers,
    this could cause problems, as the signal will be squished.

    :param method: Currently, "svd" [1], "ika" [2] and "rsvd" are available. ika corresponds to IKA-SST in [2] which
    will speed up the computation significantly.

    :param lanczos_rank: In order to use the implicit approximation of "eigensignals" by using "ika" [2] method,
    one needs to decide the rank of the implicit approximation of each "eigensignal". As a rule of thumb, we
    determine this value as being twice the amount of specified "eigensignals" to span the subspace
    (parameter rank). This is also recommended by the author of IKA-SST in [2].

    :param random_rank: In order to use the randomized singular value decomposition, one needs to provide a
    randomized rank (size of second matrix dimension for the randomized matrix) as specified in [3]. The lower
    this value, the faster the computation but the higher the error (as the approximation gets worse).

    :param feedback_noise_level: This specifies the amplitude of additive white gaussian noise added to the dominant
    "eigensignal" of the future behavior when shifting forward. This idea is noted in [2] and initializes
    the seed of the power method for dominant eigenvector estimation with the precious dominant eigenvector
    plus the noise level specified here. The noise level should just be a small fraction of the value range
    of the signal.

    :param scoring_step: the distance between scoring steps in samples (e.g., 2 would half the computation).

    :param use_fast_hankel: Use the O(N*logN) version for the decomposition. This is likely to have large
    speed improvements for window sizes > 150

    :param mitigate_offset: Use a sliding mean window to mitigate the constant offset of time series.
    """

    # save the specified parameters into instance variables
    self.window_length = window_length
    self.n_windows = n_windows
    self.lag = lag
    self.rank = rank
    self.scale = scale
    self.method = method
    self.lanczos_rank = lanczos_rank
    self.random_rank = random_rank
    self.noise = feedback_noise_level
    self.scoring_step = scoring_step
    self.use_fast_hankel = use_fast_hankel
    self.mitigate_offset = mitigate_offset

    # set some default values when they have not been specified
    if self.n_windows is None:
        self.n_windows = self.window_length
    if self.lag is None:
        # rule of thumb
        self.lag = max(self.n_windows // 3, 1)
    if self.lanczos_rank is None:
        # make rank even and multiply by two just as specified in [2]
        self.lanczos_rank = self.rank * 2 - (self.rank & 1)
    if self.random_rank is None:
        # compute the rank as specified in [3] and
        # https://scikit-learn.org/stable/modules/generated/sklearn.utils.extmath.randomized_svd.html
        self.random_rank = min(self.rank + 10, self.window_length, self.n_windows)

    # specify the methods and their corresponding functions as lambda functions expecting only the
    # 1) future hankel matrix,
    # 2) the current hankel matrix and
    # 3) the feedback vector (e.g. for dominant eigenvector feedback)
    # all other parameter should be specified as values in the partial lambda function
    #
    # We recommend using:
    # 1) rsvd
    # 2) ika if speed is a concern
    # 3) weighted if noise is low
    self.methods = {'ika': partial(_implicit_krylov_approximation,
                                   rank=self.rank,
                                   lanczos_rank=self.lanczos_rank),
                    'svd': partial(_rayleigh_singular_value_decomposition,
                                   rank=self.rank),
                    'rsvd': partial(_random_singular_value_decomposition,
                                    rank=self.rank,
                                    randomized_rank=self.random_rank),
                    'fbrsvd': partial(_facebook_random_singular_value_decomposition,
                                      rank=self.rank,
                                      randomized_rank=self.random_rank),
                    'naive': partial(_naive_singular_value_decomposition,
                                      rank=self.rank),
                    'naive updated': partial(_naive_singular_value_decomposition_updated_score,
                                     rank=self.rank),
                    'weighted': partial(_weighted_random_singular_value_decomposition,
                                        rank=self.rank,
                                        randomized_rank=self.random_rank),
                    'symmetric': partial(_symmetric_random_singular_value_decomposition,
                                        rank=self.rank,
                                        randomized_rank=self.random_rank),
                    }
    if self.method not in self.methods:
        raise ValueError(f'Method {self.method} not defined. Possible methods: {list(self.methods.keys())}.')

    # set up the methods we use for the construction of the hankel matrix (either it is the fft representation
    # of the other one)
    if use_fast_hankel and self.method not in ["rsvd", "ika", "weighted", "symmetric"]:
        raise ValueError(f'{self.method} method is not defined with use_fast_hankel=True')
    self.hankel_construction = {
        False: lg.compile_hankel,
        True: lg.HankelFFTRepresentation
    }

    # We can only use one of the following options: mitigate_offset OR use_fast_hankel (read: exclusive or, XOR)
    # or none of both
    #
    # the reasons is that fast hankel does the matrix multiplication via an FFT convolution over the whole Hankel
    # matrix at once
    #
    # mitigate_offset works by subtracting column wise mean values (one per subsequence that construct the Hankel
    # matrix.
    #
    # When using the FFT convolution, we cannot subtract column wise mean values
    if self.use_fast_hankel and self.mitigate_offset:
        raise ValueError(f'use_fast_hankel={self.use_fast_hankel} is not allowed when mitigate_offset={self.mitigate_offset}. You can only use one or none of them.')

    # check whether the method is correct
    assert self.method in self.methods, f'Specified method {self.method} is not available in {self.methods.keys()}.'

covered_regions()

This function returns the size of the region of interest (ROI). The ROI is the total amount of samples covered for the score detection.

The covered region depends on the window size, the number of windows, and the lag between the two Hankel matrices (past and future Hankel matrices).

The function return two different sizes:

1) total_region: The total region covered by both matrices with lag. This is also the minimum number of samples that a time series must have before the subspace method can run.

2) matrix_region: The region covered by one Hankel matrix. Essentially the region that is sampled for subsequences that are used to extract the characteristics

Returns:

Type Description
tuple[int, int]

total_region: int, matrix_region: int

Source code in changepoynt/algorithms/base_algorithm.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def covered_regions(self) -> tuple[int, int]:
    """
    This function returns the size of the region of interest (ROI). The ROI is the total amount of samples covered
    for the score detection.

    The covered region depends on the window size, the number of windows, and the lag between the two Hankel
    matrices (past and future Hankel matrices).

    The function return two different sizes:

    1) total_region: The total region covered by both matrices with lag. This is also the minimum number of samples
    that a time series must have before the subspace method can run.

    2) matrix_region: The region covered by one Hankel matrix. Essentially the region that is sampled for
    subsequences that are used to extract the characteristics

    :return: total_region: int, matrix_region: int
    """

    # the number of samples one Hankel matrix covers
    matrix_region = self.window_length + self.n_windows - 1

    # we have two Hankel matrices that might overlap
    total_region = matrix_region + self.lag

    return total_region, matrix_region

estimate_runtime(signal, steps=30, verbose=False)

This function estimates the runtime for a given signal by running the initial steps.

Singular subspace algorithms grow linearly with the signal length of the time series, the overall runtime is number of steps times signal length.

Parameters:

Name Type Description Default
signal ndarray

The signal that you will run the algorithm on.

required
steps int

The number of estimation steps. Increasing this number increases the runtime but also increases the accuracy of the estimation.

30
verbose bool

If true, prints out the runtime of the algorithm.

False

Returns:

Type Description
[float, float]

Runtime estimation in seconds, standard deviation in seconds

Source code in changepoynt/algorithms/base_algorithm.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def estimate_runtime(self, signal: np.ndarray, steps: int = 30, verbose: bool = False) -> [float, float]:
    """
    This function estimates the runtime for a given signal by running the initial steps.

    Singular subspace algorithms grow linearly with the signal length of the time series, the overall runtime
    is number of steps times signal length.

    :param signal: The signal that you will run the algorithm on.

    :param steps: The number of estimation steps. Increasing this number increases the runtime
                  but also increases the accuracy of the estimation.

    :param verbose: If true, prints out the runtime of the algorithm.

    :return: Runtime estimation in seconds, standard deviation in seconds
    """

    # get the required signal length
    total_covered_region = self.covered_regions()[0]

    # get the number of steps necessary for the whole time series
    processing_steps = (signal.shape[0] - total_covered_region)//self.scoring_step

    # check whether the signal is long enough
    if total_covered_region > signal.shape[0]:
        raise ValueError(f'Test signal for runtime estimation is not long enough: {signal.shape=} < {total_covered_region}')

    # shorten the signal so we only run one step
    if signal.ndim == 2:
        shortened_signal = signal[:total_covered_region+1, :].copy()
    elif signal.ndim == 1:
        shortened_signal = signal[:total_covered_region + 1].copy()
    else:
        raise ValueError(f'Test signal for runtime estimation has weird shape {signal.shape=}.')

    # trigger the potential JIT compilers
    self.transform(shortened_signal)

    # compute the scoring while measuring the time
    times = np.zeros(steps)
    for idx in range(steps):

        # make the transformation and measure the time
        start = time.perf_counter()
        self.transform(shortened_signal)
        timer = time.perf_counter() - start

        # fill the time
        times[idx] = timer


    # compute the time per step
    timer = np.mean(times)
    std = np.std(times)

    # multiply with the amounts of samples that we will be processing
    timer = timer * processing_steps
    std = std * processing_steps

    # check whether we want to print the result
    if verbose:
        print(f"For {signal.shape=} and the current parameters, the runtime will be around {timer:.3f} seconds (+/- {std:.3f} seconds).")
    return timer, std

transform(time_series)

This function calculates the anomaly score for each sample within the time series, starting from an initial sample being the first sample of fitting the past hankel matrix (window_length + n_windows samples) and the new future hankel matrix (+lag). Values before that will be zero

It also does some assertions regarding the specified parameters and whether they fit the time series.

This function builds the interface to the jit compiled static function used to iterate over the array.

Parameters:

Name Type Description Default
time_series ndarray

1D array containing the time series to be scored

required

Returns:

Type Description
ndarray

anomaly score

Source code in changepoynt/algorithms/sst.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def transform(self, time_series: np.ndarray) -> np.ndarray:
    """
    This function calculates the anomaly score for each sample within the time series, starting from an initial
    sample being the first sample of fitting the past hankel matrix (window_length + n_windows samples) and the new
    future hankel matrix (+lag). Values before that will be zero

    It also does some assertions regarding the specified parameters and whether they fit the time series.

    This function builds the interface to the jit compiled static function used to iterate over the array.

    :param time_series: 1D array containing the time series to be scored
    :return: anomaly score
    """

    # check the dimensions of the input array
    assert time_series.ndim == 1, "Time series needs to be an 1D array."

    # compute the starting point of the scoring (past and future hankel need to fit)
    starting_point = self.covered_regions()[0]
    assert starting_point < time_series.shape[0], "The time series is too short to score any points."

    # scale the time series (or just copy it if already scaled)
    if self.scale:
        time_series = normalization.min_max_scaling(time_series, min_val=1.0, max_val=2.0, inplace=False)
    else:
        time_series = time_series.copy()

    # get the changepoint scorer from the different methods
    scoring_function = self.methods[self.method]
    hankel_function = self.hankel_construction[self.use_fast_hankel]

    # start the scaling itself by calling the jit compiled staticmethod and return the result
    score = _transform(time_series=time_series, start_idx=starting_point, offset=self.compute_offset(),
                       window_length=self.window_length, n_windows=self.n_windows, lag=self.lag,
                       scoring_step=self.scoring_step, scoring_function=scoring_function,
                       hankel_construction_function=hankel_function,
                       mitigate_offset=self.mitigate_offset)
    return score

main()

This function is not intended for users, but for quick testing during development.

Source code in changepoynt/algorithms/sst.py
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
def main():
    """This function is not intended for users, but for quick testing during development."""
    from time import perf_counter

    # make synthetic step function
    np.random.seed(123)
    length = 300
    x = np.hstack([1 * np.ones(length) + np.random.rand(length) * 1,
                   3 * np.ones(length) + np.random.rand(length) * 2,
                   5 * np.ones(length) + np.random.rand(length) * 1.5])
    x += np.random.rand(x.size)

    # create the sst method
    ika_sst = SST(31, method='ika', use_fast_hankel=True, scoring_step=3)
    svd_sst = SST(31, method='svd')
    rsvd_sst = SST(31, method='rsvd', use_fast_hankel=True)
    fbrsvd_sst = SST(31, method='fbrsvd')

    # make some runtime checks
    ika_sst.estimate_runtime(x, verbose=True)

    # make the scoring
    start = perf_counter()
    ika_sst.transform(x)
    print((perf_counter() - start))
    # print(svd_sst.transform(x))
    rsvd_sst.transform(x)
    fbrsvd_sst.transform(x)

esst

ESST

Bases: SingularSubspaceAlgorithm

This class implements an own idea for change point detection.

It has been published together with Sarah Boelter (UMN) Boelter, Sarah, Lucas Weber, et al. "Fault Prediction in Planetary Drilling Using Subspace Analysis Techniques." International Conference on Intelligent Autonomous Systems (IAS-19). 2025. (* is equal contribution) https://ntrs.nasa.gov/citations/20250002705

It is a research algorithm, please use with caution.

Source code in changepoynt/algorithms/esst.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
class ESST(SingularSubspaceAlgorithm):
    """
    This class implements an own idea for change point detection.

    It has been published together with Sarah Boelter (UMN)
    Boelter, Sarah*, Lucas Weber*, et al. "Fault Prediction in Planetary Drilling Using Subspace Analysis Techniques."
    International Conference on Intelligent Autonomous Systems (IAS-19). 2025.
    (* is equal contribution)
    https://ntrs.nasa.gov/citations/20250002705

    It is a research algorithm, please use with caution.
    """

    def __init__(self, window_length: int, n_windows: int = None, lag: int = None, rank: int = 5,
                 scale: bool = True, method: str = 'fbrsvd', random_rank: int = None, scoring_step: int = 1,
                 use_fast_hankel: bool = False, mitigate_offset: bool = False) -> None:
        """
        Experimental change point detection method evaluation the prevalence of change points within a signal
        by comparing the difference in eigenvectors between to points in time.

        :param window_length: This specifies the length of the time series (in samples), which will be used to extract
        the representative "eigensignals" to be compared before and after the lag. The window length should be big
        enough to cover any wanted patterns (e.g., bigger than the periodicity of periodic signals).

        :param n_windows: This specifies the number of consecutive time windows used to extract the "eigensignals" from
        the given time series. It should be big enough to cover different parts of the target behavior. If one does not
        specify this value, we use the rule of thumb and take as many time windows as you specified the length of the
        window (rule of thumb).

        :param lag: This parameter specifies the distance of the comparison for behaviors. In easy terms it tells the
        algorithms how far it should look into the future to find change in behavior to the current signal. If you do
        not specify this parameter, we use a rule of thumb and look ahead half of the window length you specified to
        cover the behavior.

        :param rank: This parameter specifies the amount of "eigensignals" which will be used to measure the
        dissimilarity of the signal in the future behavior. As a rule of thumb, we take the five most dominant
        "eigensignals" if you do not specify otherwise.

        :param scale: Due to numeric stability, we REALLY RECOMMEND scaling the signal into a restricted value range. Per
        default, we use a min max scaling to ensure a restricted value range. In the presence of extreme outliers, this
        could cause problems, as the signal will be squished.

        :param random_rank: To use the randomized singular value decomposition, one needs to provide a
        randomized rank (size of second matrix dimension for the randomized matrix) as specified in [3]. The lower
        this value, the faster the computation but the higher the error (as the approximation gets worse).

        :param scoring_step: The distance between scoring steps in samples (e.g., 2 would half the computation).

        :param use_fast_hankel: Whether to deploy the fast hankel matrix product.

        :param mitigate_offset: Use a sliding mean window to mitigate the constant offset of time series.
        """

        # save the specified parameters into instance variables
        self.window_length = window_length
        self.n_windows = n_windows
        self.rank = rank
        self.scale = scale
        self.random_rank = random_rank
        self.lag = lag
        self.scoring_step = scoring_step
        self.use_fast_hankel = use_fast_hankel
        self.method = method
        self.mitigate_offset = mitigate_offset

        # set some default values when they have not been specified
        if self.n_windows is None:
            self.n_windows = self.window_length//2
        if self.lag is None:
            # rule of thumb
            self.lag = self.n_windows
        if self.random_rank is None:
            # compute the rank as specified in [3] and
            # https://scikit-learn.org/stable/modules/generated/sklearn.utils.extmath.randomized_svd.html
            self.random_rank = min(self.rank + 10, self.window_length, self.n_windows)

        # specify the methods and their corresponding functions as lambda functions expecting only the hankel matrix
        self.methods = {'rsvd': partial(left_entropy, rank=self.rank, random_rank=self.random_rank,
                                        method='rsvd'),
                        'fbrsvd': partial(left_entropy, rank=self.rank, random_rank=self.random_rank,
                                          method='fbrsvd')}
        if self.method not in self.methods:
            raise ValueError(f'Method {self.method} not defined. Possible methods: {list(self.methods.keys())}.')

        # set up the methods we use for the construction of the hankel matrix (either it is the fft representation
        # of the other one)
        if use_fast_hankel and self.method == 'fbrsvd':
            raise ValueError(f'fbrsvd method is not defined with use_fast_hankel=True')
        self.hankel_construction = {
            False: lg.compile_hankel,
            True: lg.HankelFFTRepresentation
        }

        # We can only use one of the following options: mitigate_offset OR use_fast_hankel (read: exclusive or, XOR)
        # or none of both
        #
        # the reasons is that fast hankel does the matrix multiplication via an FFT convolution over the whole Hankel
        # matrix at once
        #
        # mitigate_offset works by subtracting column wise mean values (one per subsequence that construct the Hankel
        # matrix.
        #
        # When using the FFT convolution, we cannot subtract column wise mean values
        if self.use_fast_hankel and self.mitigate_offset:
            raise ValueError(f'use_fast_hankel={self.use_fast_hankel} is not allowed when mitigate_offset={self.mitigate_offset}. You can only use one or none of them.')

    def compute_offset(self) -> int:
        return self.n_windows + self.lag

    def transform(self, time_series: np.ndarray) -> np.ndarray:
        """
        This function calculates the anomaly score for each sample within the time series.

        It also does some assertion regarding time series length.

        :param time_series: 1D array containing the time series to be scored
        :return: anomaly score
        """

        # check the dimensions of the input array
        assert time_series.ndim == 1, "Time series needs to be an 1D array."

        # check that we have at least two windows
        assert time_series.shape[0] > self.window_length, 'Time series needs to be longer than window length.'

        # compute the starting point of the scoring (past and future hankel need to fit)
        starting_point = self.covered_regions()[0]
        assert starting_point < time_series.shape[0], "The time series is too short to score any points."

        # scale the time series (or just copy it if already scaled)
        if self.scale:
            time_series = normalization.min_max_scaling(time_series, min_val=1.0, max_val=2.0, inplace=False)
        else:
            time_series = time_series.copy()

        # get the different methods
        scoring_function = self.methods[self.method]
        hankel_function = self.hankel_construction[self.use_fast_hankel]
        return _transform(time_series=time_series, start_idx=starting_point, offset=self.compute_offset(),
                          window_length=self.window_length, n_windows=self.n_windows, lag=self.lag,
                          scoring_step=self.scoring_step, scoring_function=scoring_function,
                          hankel_construction_function=hankel_function, mitigate_offset=self.mitigate_offset)

__init__(window_length, n_windows=None, lag=None, rank=5, scale=True, method='fbrsvd', random_rank=None, scoring_step=1, use_fast_hankel=False, mitigate_offset=False)

Experimental change point detection method evaluation the prevalence of change points within a signal by comparing the difference in eigenvectors between to points in time.

Parameters:

Name Type Description Default
window_length int

This specifies the length of the time series (in samples), which will be used to extract the representative "eigensignals" to be compared before and after the lag. The window length should be big enough to cover any wanted patterns (e.g., bigger than the periodicity of periodic signals).

required
n_windows int

This specifies the number of consecutive time windows used to extract the "eigensignals" from the given time series. It should be big enough to cover different parts of the target behavior. If one does not specify this value, we use the rule of thumb and take as many time windows as you specified the length of the window (rule of thumb).

None
lag int

This parameter specifies the distance of the comparison for behaviors. In easy terms it tells the algorithms how far it should look into the future to find change in behavior to the current signal. If you do not specify this parameter, we use a rule of thumb and look ahead half of the window length you specified to cover the behavior.

None
rank int

This parameter specifies the amount of "eigensignals" which will be used to measure the dissimilarity of the signal in the future behavior. As a rule of thumb, we take the five most dominant "eigensignals" if you do not specify otherwise.

5
scale bool

Due to numeric stability, we REALLY RECOMMEND scaling the signal into a restricted value range. Per default, we use a min max scaling to ensure a restricted value range. In the presence of extreme outliers, this could cause problems, as the signal will be squished.

True
random_rank int

To use the randomized singular value decomposition, one needs to provide a randomized rank (size of second matrix dimension for the randomized matrix) as specified in [3]. The lower this value, the faster the computation but the higher the error (as the approximation gets worse).

None
scoring_step int

The distance between scoring steps in samples (e.g., 2 would half the computation).

1
use_fast_hankel bool

Whether to deploy the fast hankel matrix product.

False
mitigate_offset bool

Use a sliding mean window to mitigate the constant offset of time series.

False
Source code in changepoynt/algorithms/esst.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def __init__(self, window_length: int, n_windows: int = None, lag: int = None, rank: int = 5,
             scale: bool = True, method: str = 'fbrsvd', random_rank: int = None, scoring_step: int = 1,
             use_fast_hankel: bool = False, mitigate_offset: bool = False) -> None:
    """
    Experimental change point detection method evaluation the prevalence of change points within a signal
    by comparing the difference in eigenvectors between to points in time.

    :param window_length: This specifies the length of the time series (in samples), which will be used to extract
    the representative "eigensignals" to be compared before and after the lag. The window length should be big
    enough to cover any wanted patterns (e.g., bigger than the periodicity of periodic signals).

    :param n_windows: This specifies the number of consecutive time windows used to extract the "eigensignals" from
    the given time series. It should be big enough to cover different parts of the target behavior. If one does not
    specify this value, we use the rule of thumb and take as many time windows as you specified the length of the
    window (rule of thumb).

    :param lag: This parameter specifies the distance of the comparison for behaviors. In easy terms it tells the
    algorithms how far it should look into the future to find change in behavior to the current signal. If you do
    not specify this parameter, we use a rule of thumb and look ahead half of the window length you specified to
    cover the behavior.

    :param rank: This parameter specifies the amount of "eigensignals" which will be used to measure the
    dissimilarity of the signal in the future behavior. As a rule of thumb, we take the five most dominant
    "eigensignals" if you do not specify otherwise.

    :param scale: Due to numeric stability, we REALLY RECOMMEND scaling the signal into a restricted value range. Per
    default, we use a min max scaling to ensure a restricted value range. In the presence of extreme outliers, this
    could cause problems, as the signal will be squished.

    :param random_rank: To use the randomized singular value decomposition, one needs to provide a
    randomized rank (size of second matrix dimension for the randomized matrix) as specified in [3]. The lower
    this value, the faster the computation but the higher the error (as the approximation gets worse).

    :param scoring_step: The distance between scoring steps in samples (e.g., 2 would half the computation).

    :param use_fast_hankel: Whether to deploy the fast hankel matrix product.

    :param mitigate_offset: Use a sliding mean window to mitigate the constant offset of time series.
    """

    # save the specified parameters into instance variables
    self.window_length = window_length
    self.n_windows = n_windows
    self.rank = rank
    self.scale = scale
    self.random_rank = random_rank
    self.lag = lag
    self.scoring_step = scoring_step
    self.use_fast_hankel = use_fast_hankel
    self.method = method
    self.mitigate_offset = mitigate_offset

    # set some default values when they have not been specified
    if self.n_windows is None:
        self.n_windows = self.window_length//2
    if self.lag is None:
        # rule of thumb
        self.lag = self.n_windows
    if self.random_rank is None:
        # compute the rank as specified in [3] and
        # https://scikit-learn.org/stable/modules/generated/sklearn.utils.extmath.randomized_svd.html
        self.random_rank = min(self.rank + 10, self.window_length, self.n_windows)

    # specify the methods and their corresponding functions as lambda functions expecting only the hankel matrix
    self.methods = {'rsvd': partial(left_entropy, rank=self.rank, random_rank=self.random_rank,
                                    method='rsvd'),
                    'fbrsvd': partial(left_entropy, rank=self.rank, random_rank=self.random_rank,
                                      method='fbrsvd')}
    if self.method not in self.methods:
        raise ValueError(f'Method {self.method} not defined. Possible methods: {list(self.methods.keys())}.')

    # set up the methods we use for the construction of the hankel matrix (either it is the fft representation
    # of the other one)
    if use_fast_hankel and self.method == 'fbrsvd':
        raise ValueError(f'fbrsvd method is not defined with use_fast_hankel=True')
    self.hankel_construction = {
        False: lg.compile_hankel,
        True: lg.HankelFFTRepresentation
    }

    # We can only use one of the following options: mitigate_offset OR use_fast_hankel (read: exclusive or, XOR)
    # or none of both
    #
    # the reasons is that fast hankel does the matrix multiplication via an FFT convolution over the whole Hankel
    # matrix at once
    #
    # mitigate_offset works by subtracting column wise mean values (one per subsequence that construct the Hankel
    # matrix.
    #
    # When using the FFT convolution, we cannot subtract column wise mean values
    if self.use_fast_hankel and self.mitigate_offset:
        raise ValueError(f'use_fast_hankel={self.use_fast_hankel} is not allowed when mitigate_offset={self.mitigate_offset}. You can only use one or none of them.')

covered_regions()

This function returns the size of the region of interest (ROI). The ROI is the total amount of samples covered for the score detection.

The covered region depends on the window size, the number of windows, and the lag between the two Hankel matrices (past and future Hankel matrices).

The function return two different sizes:

1) total_region: The total region covered by both matrices with lag. This is also the minimum number of samples that a time series must have before the subspace method can run.

2) matrix_region: The region covered by one Hankel matrix. Essentially the region that is sampled for subsequences that are used to extract the characteristics

Returns:

Type Description
tuple[int, int]

total_region: int, matrix_region: int

Source code in changepoynt/algorithms/base_algorithm.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def covered_regions(self) -> tuple[int, int]:
    """
    This function returns the size of the region of interest (ROI). The ROI is the total amount of samples covered
    for the score detection.

    The covered region depends on the window size, the number of windows, and the lag between the two Hankel
    matrices (past and future Hankel matrices).

    The function return two different sizes:

    1) total_region: The total region covered by both matrices with lag. This is also the minimum number of samples
    that a time series must have before the subspace method can run.

    2) matrix_region: The region covered by one Hankel matrix. Essentially the region that is sampled for
    subsequences that are used to extract the characteristics

    :return: total_region: int, matrix_region: int
    """

    # the number of samples one Hankel matrix covers
    matrix_region = self.window_length + self.n_windows - 1

    # we have two Hankel matrices that might overlap
    total_region = matrix_region + self.lag

    return total_region, matrix_region

estimate_runtime(signal, steps=30, verbose=False)

This function estimates the runtime for a given signal by running the initial steps.

Singular subspace algorithms grow linearly with the signal length of the time series, the overall runtime is number of steps times signal length.

Parameters:

Name Type Description Default
signal ndarray

The signal that you will run the algorithm on.

required
steps int

The number of estimation steps. Increasing this number increases the runtime but also increases the accuracy of the estimation.

30
verbose bool

If true, prints out the runtime of the algorithm.

False

Returns:

Type Description
[float, float]

Runtime estimation in seconds, standard deviation in seconds

Source code in changepoynt/algorithms/base_algorithm.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def estimate_runtime(self, signal: np.ndarray, steps: int = 30, verbose: bool = False) -> [float, float]:
    """
    This function estimates the runtime for a given signal by running the initial steps.

    Singular subspace algorithms grow linearly with the signal length of the time series, the overall runtime
    is number of steps times signal length.

    :param signal: The signal that you will run the algorithm on.

    :param steps: The number of estimation steps. Increasing this number increases the runtime
                  but also increases the accuracy of the estimation.

    :param verbose: If true, prints out the runtime of the algorithm.

    :return: Runtime estimation in seconds, standard deviation in seconds
    """

    # get the required signal length
    total_covered_region = self.covered_regions()[0]

    # get the number of steps necessary for the whole time series
    processing_steps = (signal.shape[0] - total_covered_region)//self.scoring_step

    # check whether the signal is long enough
    if total_covered_region > signal.shape[0]:
        raise ValueError(f'Test signal for runtime estimation is not long enough: {signal.shape=} < {total_covered_region}')

    # shorten the signal so we only run one step
    if signal.ndim == 2:
        shortened_signal = signal[:total_covered_region+1, :].copy()
    elif signal.ndim == 1:
        shortened_signal = signal[:total_covered_region + 1].copy()
    else:
        raise ValueError(f'Test signal for runtime estimation has weird shape {signal.shape=}.')

    # trigger the potential JIT compilers
    self.transform(shortened_signal)

    # compute the scoring while measuring the time
    times = np.zeros(steps)
    for idx in range(steps):

        # make the transformation and measure the time
        start = time.perf_counter()
        self.transform(shortened_signal)
        timer = time.perf_counter() - start

        # fill the time
        times[idx] = timer


    # compute the time per step
    timer = np.mean(times)
    std = np.std(times)

    # multiply with the amounts of samples that we will be processing
    timer = timer * processing_steps
    std = std * processing_steps

    # check whether we want to print the result
    if verbose:
        print(f"For {signal.shape=} and the current parameters, the runtime will be around {timer:.3f} seconds (+/- {std:.3f} seconds).")
    return timer, std

transform(time_series)

This function calculates the anomaly score for each sample within the time series.

It also does some assertion regarding time series length.

Parameters:

Name Type Description Default
time_series ndarray

1D array containing the time series to be scored

required

Returns:

Type Description
ndarray

anomaly score

Source code in changepoynt/algorithms/esst.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def transform(self, time_series: np.ndarray) -> np.ndarray:
    """
    This function calculates the anomaly score for each sample within the time series.

    It also does some assertion regarding time series length.

    :param time_series: 1D array containing the time series to be scored
    :return: anomaly score
    """

    # check the dimensions of the input array
    assert time_series.ndim == 1, "Time series needs to be an 1D array."

    # check that we have at least two windows
    assert time_series.shape[0] > self.window_length, 'Time series needs to be longer than window length.'

    # compute the starting point of the scoring (past and future hankel need to fit)
    starting_point = self.covered_regions()[0]
    assert starting_point < time_series.shape[0], "The time series is too short to score any points."

    # scale the time series (or just copy it if already scaled)
    if self.scale:
        time_series = normalization.min_max_scaling(time_series, min_val=1.0, max_val=2.0, inplace=False)
    else:
        time_series = time_series.copy()

    # get the different methods
    scoring_function = self.methods[self.method]
    hankel_function = self.hankel_construction[self.use_fast_hankel]
    return _transform(time_series=time_series, start_idx=starting_point, offset=self.compute_offset(),
                      window_length=self.window_length, n_windows=self.n_windows, lag=self.lag,
                      scoring_step=self.scoring_step, scoring_function=scoring_function,
                      hankel_construction_function=hankel_function, mitigate_offset=self.mitigate_offset)

left_entropy(hankel, rank, random_rank, method)

Entropy Singular Spectrum Transformation.

Parameters:

Name Type Description Default
hankel ndarray

the hankel matrix of the signal

required
rank int

the number of (approximated) eigenvectors as subspace of H1

required
random_rank int

the sampling parameter for the randomized svd

required
method str

which rsvd method to use

required

Returns:

Type Description
float

the change point score, the input vector x0

Source code in changepoynt/algorithms/esst.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def left_entropy(hankel: np.ndarray, rank: int, random_rank: int, method: str) -> float:
    """
    Entropy Singular Spectrum Transformation.

    :param hankel: the hankel matrix of the signal
    :param rank: the number of (approximated) eigenvectors as subspace of H1
    :param random_rank: the sampling parameter for the randomized svd
    :param method: which rsvd method to use
    :return: the change point score, the input vector x0
    """
    # compute the left and right eigenvectors using the randomized svd for fast computation
    if method == 'fbrsvd':
        right_eigenvectors, eigenvalues, left_eigenvectors = fbpca.pca(hankel, rank, True)
    elif method == 'rsvd':
        right_eigenvectors, eigenvalues, left_eigenvectors = lg.randomized_hankel_svd(hankel, rank,
                                                                                      oversampling_p=random_rank - rank)
    else:
        raise NotImplementedError(f'Method {method} is not available.')

    # shift the left eigenvectors up
    left_eigenvectors = left_eigenvectors - np.min(left_eigenvectors, axis=1)[:, None] + 1

    # make the eigenvectors into "probability distributions" so their sum of elements is one
    left_eigenvectors = left_eigenvectors/np.sum(left_eigenvectors, axis=1)[:, None]

    # compute the normalized entropy of the eigenvectors
    """entropy = (-1 *
               np.sum(np.log2(left_eigenvectors, out=np.zeros_like(left_eigenvectors), where=(left_eigenvectors != 0))
                      * left_eigenvectors, axis=1))/np.log2(hankel.shape[1])"""
    # skew = np.abs(skewness(left_eigenvectors, np.tile(np.linspace(0, hankel.shape[1]-1, hankel.shape[1]), (rank, 1))))
    skew = np.abs(left_right_diff(left_eigenvectors))

    # compute the weighted mean of the entropy
    # weighted_entropy = (eigenvalues @ ((1-entropy)*skew))/np.sum(eigenvalues)
    weighted_entropy = (eigenvalues @ skew) / np.sum(eigenvalues)

    return weighted_entropy

skewness(pdf_matrix, x)

Compute the skewness of a matrix of probability density functions given as a numpy array.

Args: pdf_matrix (numpy array): Matrix of probability density functions, where each row is a probability density function x (numpy array): Array of values for which the pdfs are defined

Returns: Skewness of each probability density function as a numpy array

Source code in changepoynt/algorithms/esst.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def skewness(pdf_matrix: np.ndarray, x: np.ndarray) -> np.ndarray:
    """
    Compute the skewness of a matrix of probability density functions given as a numpy array.

    Args:
        pdf_matrix (numpy array): Matrix of probability density functions, where each row is a probability density function
        x (numpy array): Array of values for which the pdfs are defined

    Returns:
        Skewness of each probability density function as a numpy array
    """
    mean_vector = np.sum(x * pdf_matrix, axis=1)
    variances_vector = np.sum((x - mean_vector[:, np.newaxis]) ** 2 * pdf_matrix, axis=1)
    std_vector = np.sqrt(variances_vector)
    skewness_vector = np.sum(((x - mean_vector[:, np.newaxis]) / std_vector[:, np.newaxis]) ** 3 * pdf_matrix, axis=1)
    return skewness_vector

msst

MSST

Bases: SingularSubspaceAlgorithm

This class implements all the utility and functionality necessary to compute the Multivariate Singular Spectrum Transformation (MSST).

This algorithm is an extension of the SST change point detection algorithm as described in:

[1] Idé, Tsuyoshi, and Keisuke Inoue. "Knowledge discovery from heterogeneous dynamic systems using change-point correlations." Proceedings of the 2005 SIAM international conference on data mining. Society for Industrial and Applied Mathematics, 2005.

[2] Idé, Tsuyoshi, and Koji Tsuda. "Change-point detection using krylov subspace learning." Proceedings of the 2007 SIAM International Conference on Data Mining. Society for Industrial and Applied Mathematics, 2007.

It is basically a merger of techniques from [1, 2] and: [3] Alanqary, Arwa, Abdullah Alomar, and Devavrat Shah. "Change point detection via multivariate singular spectrum analysis." Advances in Neural Information Processing Systems 34 (2021): 23218-23230.

It also uses a technique called randomized singular value decomposition which is surveyed and described in

[4] Halko, Nathan, Per-Gunnar Martinsson, and Joel A. Tropp. "Finding structure with randomness: Probabilistic algorithms for constructing approximate matrix decompositions." SIAM review 53.2 (2011): 217-288.

For the option fast_hankel=True it uses and algorithm based on

[5] L. Weber and R. Lenz. "Accelerating Singular Spectrum Transformation for Scalable Change Point Detection," in IEEE Access, Volume 11, 2025. doi: 10.1109/ACCESS.2025.3640386.

There will be a parameter specifying, whether to use the implicit krylov approximation from [2]. This significantly speeds up the computation but can reduce accuracy as the "eigensignals" are only approximated indirectly using a krylov subspace with the possible change point signal as seed. This method also deploys a feedback of the dominant "eigensignal" as seed into the power method (dominant eigenvector estimation) with additive gaussian noise.

!NOTE!: Most computational heavy functions are implemented as standalone functions, even if they require instance variables as this enables us to use jit compiled code provided by the numba compiler for faster calculations.

Also, we decided to do type hinting but not type checking as it requires too much boilerplate code. We recommend to input the specified types as the program will break in unforeseen ways otherwise.

Source code in changepoynt/algorithms/msst.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
class MSST(SingularSubspaceAlgorithm):
    """
    This class implements all the utility and functionality necessary to compute the Multivariate Singular Spectrum
    Transformation (MSST).

    This algorithm is an extension of the SST change point detection algorithm as described in:

    [1]
    Idé, Tsuyoshi, and Keisuke Inoue.
    "Knowledge discovery from heterogeneous dynamic systems using change-point correlations."
    Proceedings of the 2005 SIAM international conference on data mining.
    Society for Industrial and Applied Mathematics, 2005.

    [2]
    Idé, Tsuyoshi, and Koji Tsuda.
    "Change-point detection using krylov subspace learning."
    Proceedings of the 2007 SIAM International Conference on Data Mining.
    Society for Industrial and Applied Mathematics, 2007.

    It is basically a merger of techniques from [1, 2] and:
    [3]
    Alanqary, Arwa, Abdullah Alomar, and Devavrat Shah.
    "Change point detection via multivariate singular spectrum analysis."
    Advances in Neural Information Processing Systems 34 (2021): 23218-23230.

    It also uses a technique called randomized singular value decomposition which is surveyed and described in

    [4]
    Halko, Nathan, Per-Gunnar Martinsson, and Joel A. Tropp.
    "Finding structure with randomness: Probabilistic algorithms for constructing approximate matrix decompositions."
    SIAM review 53.2 (2011): 217-288.

    For the option fast_hankel=True it uses and algorithm based on

    [5]
    L. Weber and R. Lenz.
    "Accelerating Singular Spectrum Transformation for Scalable Change Point Detection,"
    in IEEE Access, Volume 11, 2025.
    doi: 10.1109/ACCESS.2025.3640386.

    There will be a parameter specifying, whether to use the implicit krylov approximation from [2]. This
    significantly speeds up the computation but can reduce accuracy as the "eigensignals" are only approximated
    indirectly using a krylov subspace with the possible change point signal as seed. This method also deploys
    a feedback of the dominant "eigensignal" as seed into the power method (dominant eigenvector estimation)
    with additive gaussian noise.

    !NOTE!:
    Most computational heavy functions are implemented as standalone functions, even if they require instance variables
    as this enables us to use jit compiled code provided by the numba compiler for faster calculations.

    Also, we decided to do type hinting but not type checking as it requires too much boilerplate code. We recommend
    to input the specified types as the program will break in unforeseen ways otherwise.
    """

    def __init__(self, window_length: int, n_windows: int = None, lag: int = None, rank: int = 5, scale: bool = True,
                 method: str = 'ika', lanczos_rank: int = None, random_rank: int = None,
                 feedback_noise_level: float = 1e-3, scoring_step: int = 1, use_fast_hankel: bool = False) -> None:
        """
        Initializing the Singular Spectrum Transformation (SST) requires setting a lot of parameters. See the parameters
        explanation for some intuition into the right choices. Currently, there are two SST methods from [1] and [2]
        available for use.

        :param window_length: This specifies the length of the time series (in samples), which will be used to extract
        the representative "eigensignals" to be compared before and after the lag. The window length should be big
        enough to cover any wanted patterns (e.g., bigger than the periodicity of periodic signals).

        :param n_windows: This specifies the number of consecutive time windows used to extract the "eigensignals" from
        the given time series. It should be big enough to cover different parts of the target behavior. If one does not
        specify this value, we use the rule of thumb and take as many time windows as you specified the length of the
        window (rule of thumb).

        :param lag: This parameter specifies the distance of the comparison for behaviors. In easy terms it tells the
        algorithms how far it should look into the future to find change in behavior to the current signal. If you do
        not specify this parameter, we use a rule of thumb and look ahead half of the window length you specified to
        cover the behavior.

        :param rank: This parameter specifies the amount of "eigensignals" which will be used to measure the
        dissimilarity of the signal in the future behavior. As a rule of thumb, we take the five most dominant
        "eigensignals" if you do not specify otherwise.

        :param scale: Due to numeric stability, we REALLY RECOMMEND scaling the signal into a restricted value range.
        Per default, we use a min max scaling to ensure a restricted value range. In the presence of extreme outliers,
        this could cause problems, as the signal will be squished.

        :param method: Currently, "svd" [1], "ika" [2] and "rsvd" are available. ika corresponds to IKA-SST in [2] which
        will speed up the computation significantly.

        :param lanczos_rank: In order to use the implicit approximation of "eigensignals" by using "ika" [2] method,
        one needs to decide the rank of the implicit approximation of each "eigensignal". As a rule of thumb, we
        determine this value as being twice the amount of specified "eigensignals" to span the subspace
        (parameter rank). This is also recommended by the author of IKA-SST in [2].

        :param random_rank: In order to use the randomized singular value decomposition, one needs to provide a
        randomized rank (size of second matrix dimension for the randomized matrix) as specified in [3]. The lower
        this value, the faster the computation but the higher the error (as the approximation gets worse).

        :param feedback_noise_level: This specifies the amplitude of additive white gaussian noise added to the dominant
        "eigensignal" of the future behavior when shifting forward. This idea is noted in [2] and initializes
        the seed of the power method for dominant eigenvector estimation with the precious dominant eigenvector
        plus the noise level specified here. The noise level should just be a small fraction of the value range
        of the signal.

        :param scoring_step: the distance between scoring steps in samples (e.g., 2 would half the computation).

        :param use_fast_hankel: Use the O(N*logN) version for the decomposition. This is likely to have large
        speed improvements for window sizes > 150
        """

        # save the specified parameters into instance variables
        self.window_length = window_length
        self.n_windows = n_windows
        self.lag = lag
        self.rank = rank
        self.scale = scale
        self.method = method
        self.lanczos_rank = lanczos_rank
        self.random_rank = random_rank
        self.noise = feedback_noise_level
        self.scoring_step = scoring_step
        self.use_fast_hankel = use_fast_hankel

        # set some default values when they have not been specified
        if self.n_windows is None:
            self.n_windows = self.window_length
        if self.lag is None:
            # rule of thumb
            self.lag = max(self.n_windows // 3, 1)
        if self.lanczos_rank is None:
            # make rank even and multiply by two just as specified in [2]
            self.lanczos_rank = self.rank * 2 - (self.rank & 1)
        if self.random_rank is None:
            # compute the rank as specified in [3] and
            # https://scikit-learn.org/stable/modules/generated/sklearn.utils.extmath.randomized_svd.html
            self.random_rank = min(self.rank + 10, self.window_length, self.n_windows)

        # specify the methods and their corresponding functions as lambda functions expecting only the
        # 1) future hankel matrix,
        # 2) the current hankel matrix and
        # 3) the feedback vector (e.g. for dominant eigenvector feedback)
        # all other parameter should be specified as values in the partial lambda function
        self.methods = {'ika': partial(cpsst._implicit_krylov_approximation,
                                       rank=self.rank,
                                       lanczos_rank=self.lanczos_rank),
                        'rsvd': partial(cpsst._random_singular_value_decomposition,
                                        rank=self.rank,
                                        randomized_rank=self.random_rank),
                        'weighted': partial(cpsst._weighted_random_singular_value_decomposition,
                                            rank=self.rank,
                                            randomized_rank=self.random_rank),
                        'symmetric': partial(cpsst._symmetric_random_singular_value_decomposition,
                                             rank=self.rank,
                                             randomized_rank=self.random_rank),
                        }
        if self.method not in self.methods:
            raise ValueError(f'Method {self.method} not defined. Possible methods: {list(self.methods.keys())}.')

        # set up the methods we use for the construction of the hankel matrix (either it is the fft representation
        # of the other one)
        if use_fast_hankel and self.method not in ["rsvd", "ika", "weighted", "symmetric"]:
            raise ValueError(f'{self.method} method is not defined with use_fast_hankel=True')

    def compute_offset(self) -> int:
        return self.n_windows // 2 + self.lag

    def transform(self, time_series: np.ndarray) -> np.ndarray:
        """
        This function calculates the anomaly score for each sample within the time series, starting from an initial
        sample being the first sample of fitting the past hankel matrix (window_length + n_windows samples) and the new
        future hankel matrix (+lag). Values before that will be zero

        It also does some assertions regarding the specified parameters and whether they fit the time series.

        This function builds the interface to the jit compiled static function used to iterate over the array.

        :param time_series: 1D array containing the time series to be scored
        :return: anomaly score
        """

        # check the dimensions of the input array
        assert time_series.ndim > 1, "Time series needs to be an N-D array. Currently it is 1-D."

        # compute the starting point of the scoring (past and future hankel need to fit)
        starting_point = self.covered_regions()[0]
        assert starting_point < time_series.shape[0], "The time series is too short to score any points."

        # scale the time series (or just copy it if already scaled)
        time_series = time_series.copy()
        if self.scale:
            for idx in range(time_series.shape[1]):
                time_series[:, idx] = normalization.min_max_scaling(time_series[:, idx], 1, 2, inplace=True)

        # get the changepoint scorer from the different methods
        scoring_function = self.methods[self.method]

        # start the scaling itself by calling the jit compiled staticmethod and return the result
        score = _transform(time_series=time_series, start_idx=starting_point, offset=self.compute_offset(),
                           window_length=self.window_length, n_windows=self.n_windows, lag=self.lag,
                           scoring_step=self.scoring_step, scoring_function=scoring_function, use_fast_hankel=True)
        return score

__init__(window_length, n_windows=None, lag=None, rank=5, scale=True, method='ika', lanczos_rank=None, random_rank=None, feedback_noise_level=0.001, scoring_step=1, use_fast_hankel=False)

Initializing the Singular Spectrum Transformation (SST) requires setting a lot of parameters. See the parameters explanation for some intuition into the right choices. Currently, there are two SST methods from [1] and [2] available for use.

Parameters:

Name Type Description Default
window_length int

This specifies the length of the time series (in samples), which will be used to extract the representative "eigensignals" to be compared before and after the lag. The window length should be big enough to cover any wanted patterns (e.g., bigger than the periodicity of periodic signals).

required
n_windows int

This specifies the number of consecutive time windows used to extract the "eigensignals" from the given time series. It should be big enough to cover different parts of the target behavior. If one does not specify this value, we use the rule of thumb and take as many time windows as you specified the length of the window (rule of thumb).

None
lag int

This parameter specifies the distance of the comparison for behaviors. In easy terms it tells the algorithms how far it should look into the future to find change in behavior to the current signal. If you do not specify this parameter, we use a rule of thumb and look ahead half of the window length you specified to cover the behavior.

None
rank int

This parameter specifies the amount of "eigensignals" which will be used to measure the dissimilarity of the signal in the future behavior. As a rule of thumb, we take the five most dominant "eigensignals" if you do not specify otherwise.

5
scale bool

Due to numeric stability, we REALLY RECOMMEND scaling the signal into a restricted value range. Per default, we use a min max scaling to ensure a restricted value range. In the presence of extreme outliers, this could cause problems, as the signal will be squished.

True
method str

Currently, "svd" [1], "ika" [2] and "rsvd" are available. ika corresponds to IKA-SST in [2] which will speed up the computation significantly.

'ika'
lanczos_rank int

In order to use the implicit approximation of "eigensignals" by using "ika" [2] method, one needs to decide the rank of the implicit approximation of each "eigensignal". As a rule of thumb, we determine this value as being twice the amount of specified "eigensignals" to span the subspace (parameter rank). This is also recommended by the author of IKA-SST in [2].

None
random_rank int

In order to use the randomized singular value decomposition, one needs to provide a randomized rank (size of second matrix dimension for the randomized matrix) as specified in [3]. The lower this value, the faster the computation but the higher the error (as the approximation gets worse).

None
feedback_noise_level float

This specifies the amplitude of additive white gaussian noise added to the dominant "eigensignal" of the future behavior when shifting forward. This idea is noted in [2] and initializes the seed of the power method for dominant eigenvector estimation with the precious dominant eigenvector plus the noise level specified here. The noise level should just be a small fraction of the value range of the signal.

0.001
scoring_step int

the distance between scoring steps in samples (e.g., 2 would half the computation).

1
use_fast_hankel bool

Use the O(N*logN) version for the decomposition. This is likely to have large speed improvements for window sizes > 150

False
Source code in changepoynt/algorithms/msst.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def __init__(self, window_length: int, n_windows: int = None, lag: int = None, rank: int = 5, scale: bool = True,
             method: str = 'ika', lanczos_rank: int = None, random_rank: int = None,
             feedback_noise_level: float = 1e-3, scoring_step: int = 1, use_fast_hankel: bool = False) -> None:
    """
    Initializing the Singular Spectrum Transformation (SST) requires setting a lot of parameters. See the parameters
    explanation for some intuition into the right choices. Currently, there are two SST methods from [1] and [2]
    available for use.

    :param window_length: This specifies the length of the time series (in samples), which will be used to extract
    the representative "eigensignals" to be compared before and after the lag. The window length should be big
    enough to cover any wanted patterns (e.g., bigger than the periodicity of periodic signals).

    :param n_windows: This specifies the number of consecutive time windows used to extract the "eigensignals" from
    the given time series. It should be big enough to cover different parts of the target behavior. If one does not
    specify this value, we use the rule of thumb and take as many time windows as you specified the length of the
    window (rule of thumb).

    :param lag: This parameter specifies the distance of the comparison for behaviors. In easy terms it tells the
    algorithms how far it should look into the future to find change in behavior to the current signal. If you do
    not specify this parameter, we use a rule of thumb and look ahead half of the window length you specified to
    cover the behavior.

    :param rank: This parameter specifies the amount of "eigensignals" which will be used to measure the
    dissimilarity of the signal in the future behavior. As a rule of thumb, we take the five most dominant
    "eigensignals" if you do not specify otherwise.

    :param scale: Due to numeric stability, we REALLY RECOMMEND scaling the signal into a restricted value range.
    Per default, we use a min max scaling to ensure a restricted value range. In the presence of extreme outliers,
    this could cause problems, as the signal will be squished.

    :param method: Currently, "svd" [1], "ika" [2] and "rsvd" are available. ika corresponds to IKA-SST in [2] which
    will speed up the computation significantly.

    :param lanczos_rank: In order to use the implicit approximation of "eigensignals" by using "ika" [2] method,
    one needs to decide the rank of the implicit approximation of each "eigensignal". As a rule of thumb, we
    determine this value as being twice the amount of specified "eigensignals" to span the subspace
    (parameter rank). This is also recommended by the author of IKA-SST in [2].

    :param random_rank: In order to use the randomized singular value decomposition, one needs to provide a
    randomized rank (size of second matrix dimension for the randomized matrix) as specified in [3]. The lower
    this value, the faster the computation but the higher the error (as the approximation gets worse).

    :param feedback_noise_level: This specifies the amplitude of additive white gaussian noise added to the dominant
    "eigensignal" of the future behavior when shifting forward. This idea is noted in [2] and initializes
    the seed of the power method for dominant eigenvector estimation with the precious dominant eigenvector
    plus the noise level specified here. The noise level should just be a small fraction of the value range
    of the signal.

    :param scoring_step: the distance between scoring steps in samples (e.g., 2 would half the computation).

    :param use_fast_hankel: Use the O(N*logN) version for the decomposition. This is likely to have large
    speed improvements for window sizes > 150
    """

    # save the specified parameters into instance variables
    self.window_length = window_length
    self.n_windows = n_windows
    self.lag = lag
    self.rank = rank
    self.scale = scale
    self.method = method
    self.lanczos_rank = lanczos_rank
    self.random_rank = random_rank
    self.noise = feedback_noise_level
    self.scoring_step = scoring_step
    self.use_fast_hankel = use_fast_hankel

    # set some default values when they have not been specified
    if self.n_windows is None:
        self.n_windows = self.window_length
    if self.lag is None:
        # rule of thumb
        self.lag = max(self.n_windows // 3, 1)
    if self.lanczos_rank is None:
        # make rank even and multiply by two just as specified in [2]
        self.lanczos_rank = self.rank * 2 - (self.rank & 1)
    if self.random_rank is None:
        # compute the rank as specified in [3] and
        # https://scikit-learn.org/stable/modules/generated/sklearn.utils.extmath.randomized_svd.html
        self.random_rank = min(self.rank + 10, self.window_length, self.n_windows)

    # specify the methods and their corresponding functions as lambda functions expecting only the
    # 1) future hankel matrix,
    # 2) the current hankel matrix and
    # 3) the feedback vector (e.g. for dominant eigenvector feedback)
    # all other parameter should be specified as values in the partial lambda function
    self.methods = {'ika': partial(cpsst._implicit_krylov_approximation,
                                   rank=self.rank,
                                   lanczos_rank=self.lanczos_rank),
                    'rsvd': partial(cpsst._random_singular_value_decomposition,
                                    rank=self.rank,
                                    randomized_rank=self.random_rank),
                    'weighted': partial(cpsst._weighted_random_singular_value_decomposition,
                                        rank=self.rank,
                                        randomized_rank=self.random_rank),
                    'symmetric': partial(cpsst._symmetric_random_singular_value_decomposition,
                                         rank=self.rank,
                                         randomized_rank=self.random_rank),
                    }
    if self.method not in self.methods:
        raise ValueError(f'Method {self.method} not defined. Possible methods: {list(self.methods.keys())}.')

    # set up the methods we use for the construction of the hankel matrix (either it is the fft representation
    # of the other one)
    if use_fast_hankel and self.method not in ["rsvd", "ika", "weighted", "symmetric"]:
        raise ValueError(f'{self.method} method is not defined with use_fast_hankel=True')

covered_regions()

This function returns the size of the region of interest (ROI). The ROI is the total amount of samples covered for the score detection.

The covered region depends on the window size, the number of windows, and the lag between the two Hankel matrices (past and future Hankel matrices).

The function return two different sizes:

1) total_region: The total region covered by both matrices with lag. This is also the minimum number of samples that a time series must have before the subspace method can run.

2) matrix_region: The region covered by one Hankel matrix. Essentially the region that is sampled for subsequences that are used to extract the characteristics

Returns:

Type Description
tuple[int, int]

total_region: int, matrix_region: int

Source code in changepoynt/algorithms/base_algorithm.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def covered_regions(self) -> tuple[int, int]:
    """
    This function returns the size of the region of interest (ROI). The ROI is the total amount of samples covered
    for the score detection.

    The covered region depends on the window size, the number of windows, and the lag between the two Hankel
    matrices (past and future Hankel matrices).

    The function return two different sizes:

    1) total_region: The total region covered by both matrices with lag. This is also the minimum number of samples
    that a time series must have before the subspace method can run.

    2) matrix_region: The region covered by one Hankel matrix. Essentially the region that is sampled for
    subsequences that are used to extract the characteristics

    :return: total_region: int, matrix_region: int
    """

    # the number of samples one Hankel matrix covers
    matrix_region = self.window_length + self.n_windows - 1

    # we have two Hankel matrices that might overlap
    total_region = matrix_region + self.lag

    return total_region, matrix_region

estimate_runtime(signal, steps=30, verbose=False)

This function estimates the runtime for a given signal by running the initial steps.

Singular subspace algorithms grow linearly with the signal length of the time series, the overall runtime is number of steps times signal length.

Parameters:

Name Type Description Default
signal ndarray

The signal that you will run the algorithm on.

required
steps int

The number of estimation steps. Increasing this number increases the runtime but also increases the accuracy of the estimation.

30
verbose bool

If true, prints out the runtime of the algorithm.

False

Returns:

Type Description
[float, float]

Runtime estimation in seconds, standard deviation in seconds

Source code in changepoynt/algorithms/base_algorithm.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def estimate_runtime(self, signal: np.ndarray, steps: int = 30, verbose: bool = False) -> [float, float]:
    """
    This function estimates the runtime for a given signal by running the initial steps.

    Singular subspace algorithms grow linearly with the signal length of the time series, the overall runtime
    is number of steps times signal length.

    :param signal: The signal that you will run the algorithm on.

    :param steps: The number of estimation steps. Increasing this number increases the runtime
                  but also increases the accuracy of the estimation.

    :param verbose: If true, prints out the runtime of the algorithm.

    :return: Runtime estimation in seconds, standard deviation in seconds
    """

    # get the required signal length
    total_covered_region = self.covered_regions()[0]

    # get the number of steps necessary for the whole time series
    processing_steps = (signal.shape[0] - total_covered_region)//self.scoring_step

    # check whether the signal is long enough
    if total_covered_region > signal.shape[0]:
        raise ValueError(f'Test signal for runtime estimation is not long enough: {signal.shape=} < {total_covered_region}')

    # shorten the signal so we only run one step
    if signal.ndim == 2:
        shortened_signal = signal[:total_covered_region+1, :].copy()
    elif signal.ndim == 1:
        shortened_signal = signal[:total_covered_region + 1].copy()
    else:
        raise ValueError(f'Test signal for runtime estimation has weird shape {signal.shape=}.')

    # trigger the potential JIT compilers
    self.transform(shortened_signal)

    # compute the scoring while measuring the time
    times = np.zeros(steps)
    for idx in range(steps):

        # make the transformation and measure the time
        start = time.perf_counter()
        self.transform(shortened_signal)
        timer = time.perf_counter() - start

        # fill the time
        times[idx] = timer


    # compute the time per step
    timer = np.mean(times)
    std = np.std(times)

    # multiply with the amounts of samples that we will be processing
    timer = timer * processing_steps
    std = std * processing_steps

    # check whether we want to print the result
    if verbose:
        print(f"For {signal.shape=} and the current parameters, the runtime will be around {timer:.3f} seconds (+/- {std:.3f} seconds).")
    return timer, std

transform(time_series)

This function calculates the anomaly score for each sample within the time series, starting from an initial sample being the first sample of fitting the past hankel matrix (window_length + n_windows samples) and the new future hankel matrix (+lag). Values before that will be zero

It also does some assertions regarding the specified parameters and whether they fit the time series.

This function builds the interface to the jit compiled static function used to iterate over the array.

Parameters:

Name Type Description Default
time_series ndarray

1D array containing the time series to be scored

required

Returns:

Type Description
ndarray

anomaly score

Source code in changepoynt/algorithms/msst.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def transform(self, time_series: np.ndarray) -> np.ndarray:
    """
    This function calculates the anomaly score for each sample within the time series, starting from an initial
    sample being the first sample of fitting the past hankel matrix (window_length + n_windows samples) and the new
    future hankel matrix (+lag). Values before that will be zero

    It also does some assertions regarding the specified parameters and whether they fit the time series.

    This function builds the interface to the jit compiled static function used to iterate over the array.

    :param time_series: 1D array containing the time series to be scored
    :return: anomaly score
    """

    # check the dimensions of the input array
    assert time_series.ndim > 1, "Time series needs to be an N-D array. Currently it is 1-D."

    # compute the starting point of the scoring (past and future hankel need to fit)
    starting_point = self.covered_regions()[0]
    assert starting_point < time_series.shape[0], "The time series is too short to score any points."

    # scale the time series (or just copy it if already scaled)
    time_series = time_series.copy()
    if self.scale:
        for idx in range(time_series.shape[1]):
            time_series[:, idx] = normalization.min_max_scaling(time_series[:, idx], 1, 2, inplace=True)

    # get the changepoint scorer from the different methods
    scoring_function = self.methods[self.method]

    # start the scaling itself by calling the jit compiled staticmethod and return the result
    score = _transform(time_series=time_series, start_idx=starting_point, offset=self.compute_offset(),
                       window_length=self.window_length, n_windows=self.n_windows, lag=self.lag,
                       scoring_step=self.scoring_step, scoring_function=scoring_function, use_fast_hankel=True)
    return score

main()

This function is not intended for users, but for quick testing during development.

Source code in changepoynt/algorithms/msst.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
def main():
    """This function is not intended for users, but for quick testing during development."""
    from time import time

    # make synthetic step function
    np.random.seed(123)
    length = 300
    x = np.hstack([1 * np.ones(length) + np.random.rand(length) * 1,
                   3 * np.ones(length) + np.random.rand(length) * 2,
                   5 * np.ones(length) + np.random.rand(length) * 1.5])
    x += np.random.rand(x.size)
    x = x[..., None]

    # create the sst method
    ika_sst = MSST(31, method='ika', use_fast_hankel=True)
    rsvd_sst = MSST(31, method='rsvd', use_fast_hankel=True)
    weighted_sst = MSST(31, method='weighted', use_fast_hankel=True)

    # compare with original sst
    orig_ika = cpsst.SST(31, method='ika', use_fast_hankel=True).transform(x[:, 0])
    print('Comparison to original score', np.corrcoef(orig_ika, ika_sst.transform(x))[0, 1])

    # make the scoring
    start = time()
    ika_sst.transform(x)
    print(time() - start)
    rsvd_sst.transform(x)
    weighted_sst.transform(x)

messt

MESST

Bases: SingularSubspaceAlgorithm

This class implements the Multivariate Enhanced Singular Spectrum Transformation (MESST)

The univariate algorithm ESST has been published together with Sarah Boelter (UMN) Boelter, Sarah, Lucas Weber, et al. "Fault Prediction in Planetary Drilling Using Subspace Analysis Techniques." International Conference on Intelligent Autonomous Systems (IAS-19). 2025. (* is equal contribution) https://ntrs.nasa.gov/citations/20250002705

It is a research algorithm, please use with caution.

Source code in changepoynt/algorithms/messt.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class MESST(SingularSubspaceAlgorithm):
    """
    This class implements the Multivariate Enhanced Singular Spectrum Transformation (MESST)

    The univariate algorithm ESST has been published together with Sarah Boelter (UMN)
    Boelter, Sarah*, Lucas Weber*, et al. "Fault Prediction in Planetary Drilling Using Subspace Analysis Techniques."
    International Conference on Intelligent Autonomous Systems (IAS-19). 2025.
    (* is equal contribution)
    https://ntrs.nasa.gov/citations/20250002705

    It is a research algorithm, please use with caution.
    """

    def __init__(self, window_length: int, n_windows: int = None, lag: int = None, rank: int = 5,
                 scale: bool = True, method: str = 'rsvd', random_rank: int = None, scoring_step: int = 1,
                 use_fast_hankel: bool = False) -> None:
        """
        Experimental change point detection method evaluation the prevalence of change points within a signal
        by comparing the difference in eigenvectors between to points in time.

        :param window_length: This specifies the length of the time series (in samples), which will be used to extract
        the representative "eigensignals" to be compared before and after the lag. The window length should be big
        enough to cover any wanted patterns (e.g., bigger than the periodicity of periodic signals).

        :param n_windows: This specifies the number of consecutive time windows used to extract the "eigensignals" from
        the given time series. It should be big enough to cover different parts of the target behavior. If one does not
        specify this value, we use the rule of thumb and take as many time windows as you specified the length of the
        window (rule of thumb).

        :param lag: This parameter specifies the distance of the comparison for behaviors. In easy terms it tells the
        algorithms how far it should look into the future to find change in behavior to the current signal. If you do
        not specify this parameter, we use a rule of thumb and look ahead half of the window length you specified to
        cover the behavior.

        :param rank: This parameter specifies the amount of "eigensignals" which will be used to measure the
        dissimilarity of the signal in the future behavior. As a rule of thumb, we take the five most dominant
        "eigensignals" if you do not specify otherwise.

        :param scale: Due to numeric stability, we REALLY RECOMMEND scaling the signal into a restricted value range. Per
        default, we use a min max scaling to ensure a restricted value range. In the presence of extreme outliers, this
        could cause problems, as the signal will be squished.

        :param random_rank: To use the randomized singular value decomposition, one needs to provide a
        randomized rank (size of second matrix dimension for the randomized matrix) as specified in [3]. The lower
        this value, the faster the computation but the higher the error (as the approximation gets worse).

        :param scoring_step: The distance between scoring steps in samples (e.g., 2 would half the computation).

        :param use_fast_hankel: Whether to deploy the fast hankel matrix product.

        """

        # save the specified parameters into instance variables
        self.window_length = window_length
        self.n_windows = n_windows
        self.rank = rank
        self.scale = scale
        self.random_rank = random_rank
        self.lag = lag
        self.scoring_step = scoring_step
        self.use_fast_hankel = use_fast_hankel
        self.method = method

        # set some default values when they have not been specified
        if self.n_windows is None:
            self.n_windows = self.window_length//2
        if self.lag is None:
            # rule of thumb
            self.lag = self.n_windows
        if self.random_rank is None:
            # compute the rank as specified in [3] and
            # https://scikit-learn.org/stable/modules/generated/sklearn.utils.extmath.randomized_svd.html
            self.random_rank = min(self.rank + 10, self.window_length, self.n_windows)

        # specify the methods and their corresponding functions as lambda functions expecting only the hankel matrix
        self.methods = {'rsvd': partial(esst.left_entropy, rank=self.rank, random_rank=self.random_rank,
                                        method=self.method)}
        if self.method not in self.methods:
            raise ValueError(f'Method {self.method} not defined. Possible methods: {list(self.methods.keys())}.')

        # set up the methods we use for the construction of the hankel matrix (either it is the fft representation
        # of the other one)
        if use_fast_hankel and self.method != 'rsvd':
            raise ValueError(f'method {self.method} is not defined with use_fast_hankel=True')

    def compute_offset(self) -> int:
        return self.n_windows + self.lag

    def transform(self, time_series: np.ndarray) -> np.ndarray:
        """
        This function calculates the anomaly score for each sample within the time series.

        It also does some assertion regarding time series length.

        :param time_series: 1D array containing the time series to be scored
        :return: anomaly score
        """

        # check the dimensions of the input array
        assert time_series.ndim > 1, "Time series needs to be an N-D array. Currently it is 1-D."

        # compute the starting point of the scoring (past and future hankel need to fit)
        starting_point = self.covered_regions()[0]
        assert starting_point < time_series.shape[0], "The time series is too short to score any points."

        # scale the time series (or just copy it if already scaled)
        time_series = time_series.copy()
        if self.scale:
            for idx in range(time_series.shape[1]):
                time_series[:, idx] = normalization.min_max_scaling(time_series[:, idx], 1, 2, inplace=True)

        # get the different methods
        scoring_function = self.methods[self.method]
        return _transform(time_series=time_series, start_idx=starting_point, offset=self.compute_offset(),
                          window_length=self.window_length, n_windows=self.n_windows, lag=self.lag,
                          scoring_step=self.scoring_step, scoring_function=scoring_function,
                          use_fast_hankel=self.use_fast_hankel)

__init__(window_length, n_windows=None, lag=None, rank=5, scale=True, method='rsvd', random_rank=None, scoring_step=1, use_fast_hankel=False)

Experimental change point detection method evaluation the prevalence of change points within a signal by comparing the difference in eigenvectors between to points in time.

Parameters:

Name Type Description Default
window_length int

This specifies the length of the time series (in samples), which will be used to extract the representative "eigensignals" to be compared before and after the lag. The window length should be big enough to cover any wanted patterns (e.g., bigger than the periodicity of periodic signals).

required
n_windows int

This specifies the number of consecutive time windows used to extract the "eigensignals" from the given time series. It should be big enough to cover different parts of the target behavior. If one does not specify this value, we use the rule of thumb and take as many time windows as you specified the length of the window (rule of thumb).

None
lag int

This parameter specifies the distance of the comparison for behaviors. In easy terms it tells the algorithms how far it should look into the future to find change in behavior to the current signal. If you do not specify this parameter, we use a rule of thumb and look ahead half of the window length you specified to cover the behavior.

None
rank int

This parameter specifies the amount of "eigensignals" which will be used to measure the dissimilarity of the signal in the future behavior. As a rule of thumb, we take the five most dominant "eigensignals" if you do not specify otherwise.

5
scale bool

Due to numeric stability, we REALLY RECOMMEND scaling the signal into a restricted value range. Per default, we use a min max scaling to ensure a restricted value range. In the presence of extreme outliers, this could cause problems, as the signal will be squished.

True
random_rank int

To use the randomized singular value decomposition, one needs to provide a randomized rank (size of second matrix dimension for the randomized matrix) as specified in [3]. The lower this value, the faster the computation but the higher the error (as the approximation gets worse).

None
scoring_step int

The distance between scoring steps in samples (e.g., 2 would half the computation).

1
use_fast_hankel bool

Whether to deploy the fast hankel matrix product.

False
Source code in changepoynt/algorithms/messt.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def __init__(self, window_length: int, n_windows: int = None, lag: int = None, rank: int = 5,
             scale: bool = True, method: str = 'rsvd', random_rank: int = None, scoring_step: int = 1,
             use_fast_hankel: bool = False) -> None:
    """
    Experimental change point detection method evaluation the prevalence of change points within a signal
    by comparing the difference in eigenvectors between to points in time.

    :param window_length: This specifies the length of the time series (in samples), which will be used to extract
    the representative "eigensignals" to be compared before and after the lag. The window length should be big
    enough to cover any wanted patterns (e.g., bigger than the periodicity of periodic signals).

    :param n_windows: This specifies the number of consecutive time windows used to extract the "eigensignals" from
    the given time series. It should be big enough to cover different parts of the target behavior. If one does not
    specify this value, we use the rule of thumb and take as many time windows as you specified the length of the
    window (rule of thumb).

    :param lag: This parameter specifies the distance of the comparison for behaviors. In easy terms it tells the
    algorithms how far it should look into the future to find change in behavior to the current signal. If you do
    not specify this parameter, we use a rule of thumb and look ahead half of the window length you specified to
    cover the behavior.

    :param rank: This parameter specifies the amount of "eigensignals" which will be used to measure the
    dissimilarity of the signal in the future behavior. As a rule of thumb, we take the five most dominant
    "eigensignals" if you do not specify otherwise.

    :param scale: Due to numeric stability, we REALLY RECOMMEND scaling the signal into a restricted value range. Per
    default, we use a min max scaling to ensure a restricted value range. In the presence of extreme outliers, this
    could cause problems, as the signal will be squished.

    :param random_rank: To use the randomized singular value decomposition, one needs to provide a
    randomized rank (size of second matrix dimension for the randomized matrix) as specified in [3]. The lower
    this value, the faster the computation but the higher the error (as the approximation gets worse).

    :param scoring_step: The distance between scoring steps in samples (e.g., 2 would half the computation).

    :param use_fast_hankel: Whether to deploy the fast hankel matrix product.

    """

    # save the specified parameters into instance variables
    self.window_length = window_length
    self.n_windows = n_windows
    self.rank = rank
    self.scale = scale
    self.random_rank = random_rank
    self.lag = lag
    self.scoring_step = scoring_step
    self.use_fast_hankel = use_fast_hankel
    self.method = method

    # set some default values when they have not been specified
    if self.n_windows is None:
        self.n_windows = self.window_length//2
    if self.lag is None:
        # rule of thumb
        self.lag = self.n_windows
    if self.random_rank is None:
        # compute the rank as specified in [3] and
        # https://scikit-learn.org/stable/modules/generated/sklearn.utils.extmath.randomized_svd.html
        self.random_rank = min(self.rank + 10, self.window_length, self.n_windows)

    # specify the methods and their corresponding functions as lambda functions expecting only the hankel matrix
    self.methods = {'rsvd': partial(esst.left_entropy, rank=self.rank, random_rank=self.random_rank,
                                    method=self.method)}
    if self.method not in self.methods:
        raise ValueError(f'Method {self.method} not defined. Possible methods: {list(self.methods.keys())}.')

    # set up the methods we use for the construction of the hankel matrix (either it is the fft representation
    # of the other one)
    if use_fast_hankel and self.method != 'rsvd':
        raise ValueError(f'method {self.method} is not defined with use_fast_hankel=True')

covered_regions()

This function returns the size of the region of interest (ROI). The ROI is the total amount of samples covered for the score detection.

The covered region depends on the window size, the number of windows, and the lag between the two Hankel matrices (past and future Hankel matrices).

The function return two different sizes:

1) total_region: The total region covered by both matrices with lag. This is also the minimum number of samples that a time series must have before the subspace method can run.

2) matrix_region: The region covered by one Hankel matrix. Essentially the region that is sampled for subsequences that are used to extract the characteristics

Returns:

Type Description
tuple[int, int]

total_region: int, matrix_region: int

Source code in changepoynt/algorithms/base_algorithm.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def covered_regions(self) -> tuple[int, int]:
    """
    This function returns the size of the region of interest (ROI). The ROI is the total amount of samples covered
    for the score detection.

    The covered region depends on the window size, the number of windows, and the lag between the two Hankel
    matrices (past and future Hankel matrices).

    The function return two different sizes:

    1) total_region: The total region covered by both matrices with lag. This is also the minimum number of samples
    that a time series must have before the subspace method can run.

    2) matrix_region: The region covered by one Hankel matrix. Essentially the region that is sampled for
    subsequences that are used to extract the characteristics

    :return: total_region: int, matrix_region: int
    """

    # the number of samples one Hankel matrix covers
    matrix_region = self.window_length + self.n_windows - 1

    # we have two Hankel matrices that might overlap
    total_region = matrix_region + self.lag

    return total_region, matrix_region

estimate_runtime(signal, steps=30, verbose=False)

This function estimates the runtime for a given signal by running the initial steps.

Singular subspace algorithms grow linearly with the signal length of the time series, the overall runtime is number of steps times signal length.

Parameters:

Name Type Description Default
signal ndarray

The signal that you will run the algorithm on.

required
steps int

The number of estimation steps. Increasing this number increases the runtime but also increases the accuracy of the estimation.

30
verbose bool

If true, prints out the runtime of the algorithm.

False

Returns:

Type Description
[float, float]

Runtime estimation in seconds, standard deviation in seconds

Source code in changepoynt/algorithms/base_algorithm.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def estimate_runtime(self, signal: np.ndarray, steps: int = 30, verbose: bool = False) -> [float, float]:
    """
    This function estimates the runtime for a given signal by running the initial steps.

    Singular subspace algorithms grow linearly with the signal length of the time series, the overall runtime
    is number of steps times signal length.

    :param signal: The signal that you will run the algorithm on.

    :param steps: The number of estimation steps. Increasing this number increases the runtime
                  but also increases the accuracy of the estimation.

    :param verbose: If true, prints out the runtime of the algorithm.

    :return: Runtime estimation in seconds, standard deviation in seconds
    """

    # get the required signal length
    total_covered_region = self.covered_regions()[0]

    # get the number of steps necessary for the whole time series
    processing_steps = (signal.shape[0] - total_covered_region)//self.scoring_step

    # check whether the signal is long enough
    if total_covered_region > signal.shape[0]:
        raise ValueError(f'Test signal for runtime estimation is not long enough: {signal.shape=} < {total_covered_region}')

    # shorten the signal so we only run one step
    if signal.ndim == 2:
        shortened_signal = signal[:total_covered_region+1, :].copy()
    elif signal.ndim == 1:
        shortened_signal = signal[:total_covered_region + 1].copy()
    else:
        raise ValueError(f'Test signal for runtime estimation has weird shape {signal.shape=}.')

    # trigger the potential JIT compilers
    self.transform(shortened_signal)

    # compute the scoring while measuring the time
    times = np.zeros(steps)
    for idx in range(steps):

        # make the transformation and measure the time
        start = time.perf_counter()
        self.transform(shortened_signal)
        timer = time.perf_counter() - start

        # fill the time
        times[idx] = timer


    # compute the time per step
    timer = np.mean(times)
    std = np.std(times)

    # multiply with the amounts of samples that we will be processing
    timer = timer * processing_steps
    std = std * processing_steps

    # check whether we want to print the result
    if verbose:
        print(f"For {signal.shape=} and the current parameters, the runtime will be around {timer:.3f} seconds (+/- {std:.3f} seconds).")
    return timer, std

transform(time_series)

This function calculates the anomaly score for each sample within the time series.

It also does some assertion regarding time series length.

Parameters:

Name Type Description Default
time_series ndarray

1D array containing the time series to be scored

required

Returns:

Type Description
ndarray

anomaly score

Source code in changepoynt/algorithms/messt.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def transform(self, time_series: np.ndarray) -> np.ndarray:
    """
    This function calculates the anomaly score for each sample within the time series.

    It also does some assertion regarding time series length.

    :param time_series: 1D array containing the time series to be scored
    :return: anomaly score
    """

    # check the dimensions of the input array
    assert time_series.ndim > 1, "Time series needs to be an N-D array. Currently it is 1-D."

    # compute the starting point of the scoring (past and future hankel need to fit)
    starting_point = self.covered_regions()[0]
    assert starting_point < time_series.shape[0], "The time series is too short to score any points."

    # scale the time series (or just copy it if already scaled)
    time_series = time_series.copy()
    if self.scale:
        for idx in range(time_series.shape[1]):
            time_series[:, idx] = normalization.min_max_scaling(time_series[:, idx], 1, 2, inplace=True)

    # get the different methods
    scoring_function = self.methods[self.method]
    return _transform(time_series=time_series, start_idx=starting_point, offset=self.compute_offset(),
                      window_length=self.window_length, n_windows=self.n_windows, lag=self.lag,
                      scoring_step=self.scoring_step, scoring_function=scoring_function,
                      use_fast_hankel=self.use_fast_hankel)

ulsif

ULSIF

Bases: Algorithm

This class implements change point detection based on density ration estimation optimizing a least squares approach from:

[1] "A least-squares approach to direct importance estimation" T. Kanamori, S. Hido, and M. Sugiyama. Journal of Machine Learning Research, 10:1391–1445, 2009.

and can be implemented as a special version of with alpha = 0

[2] Liu, Song, et al. "Change-point detection in time-series data by relative density-ratio estimation." Neural Networks 43 (2013): 72-83.

which we are doing here.

Source code in changepoynt/algorithms/ulsif.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class ULSIF(Algorithm):
    """
    This class implements change point detection based on density ration estimation optimizing a least squares
    approach from:

    [1]
    "A least-squares approach to direct importance estimation"
    T. Kanamori, S. Hido, and M. Sugiyama.
    Journal of Machine Learning Research, 10:1391–1445, 2009.

    and can be implemented as a special version of with alpha = 0

    [2]
    Liu, Song, et al.
    "Change-point detection in time-series data by relative density-ratio estimation."
    Neural Networks 43 (2013): 72-83.

    which we are doing here.
    """

    def __init__(self, window_length: int = 10, n_windows: int = 50, lag: int = None, estimation_lag: int = None,
                 scoring_step: int = 1, n_kernels: int = 100, symmetric=True) -> None:
        """
        This defines all necessary parameters for the uLSIF to work. As uLSIF is similar to RuLSIF only with a
        zero alpha, we put all parameters through to RuLSIF.

        :param window_length: the length of the windows we want to compare the densities for (k in the paper)
        :param n_windows: the amount of windows (n in the paper)
        :param lag: the difference in time between the past and the future comparison window
        :param estimation_lag: how often we should re-estimate lambda and sigma for the gaussian kernels
        :param scoring_step: the number of samples between each change score value (e.g. 2 would half the computations)
        :param n_kernels: the number of kernels for the density ration estimation
        :param symmetric: specifies whether to use two processes to compute a forward and backward pass simultaneously
        """
        self.window_length = window_length
        self.n_windows = n_windows
        self.lag = lag
        self.estimation_lag = estimation_lag
        self.n_kernels = n_kernels
        self.scoring_step = scoring_step
        self.symmetric = symmetric

        # create the specialized version of RuLSIF
        self.transformer = RuLSIF(window_length=self.window_length, n_windows=self.n_windows, lag=self.lag,
                                  estimation_lag=self.estimation_lag, n_kernels=self.n_kernels, alpha=0.0,
                                  symmetric=self.symmetric)

    def transform(self, time_series: np.ndarray):
        return self.transformer.transform(time_series)

__init__(window_length=10, n_windows=50, lag=None, estimation_lag=None, scoring_step=1, n_kernels=100, symmetric=True)

This defines all necessary parameters for the uLSIF to work. As uLSIF is similar to RuLSIF only with a zero alpha, we put all parameters through to RuLSIF.

Parameters:

Name Type Description Default
window_length int

the length of the windows we want to compare the densities for (k in the paper)

10
n_windows int

the amount of windows (n in the paper)

50
lag int

the difference in time between the past and the future comparison window

None
estimation_lag int

how often we should re-estimate lambda and sigma for the gaussian kernels

None
scoring_step int

the number of samples between each change score value (e.g. 2 would half the computations)

1
n_kernels int

the number of kernels for the density ration estimation

100
symmetric

specifies whether to use two processes to compute a forward and backward pass simultaneously

True
Source code in changepoynt/algorithms/ulsif.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(self, window_length: int = 10, n_windows: int = 50, lag: int = None, estimation_lag: int = None,
             scoring_step: int = 1, n_kernels: int = 100, symmetric=True) -> None:
    """
    This defines all necessary parameters for the uLSIF to work. As uLSIF is similar to RuLSIF only with a
    zero alpha, we put all parameters through to RuLSIF.

    :param window_length: the length of the windows we want to compare the densities for (k in the paper)
    :param n_windows: the amount of windows (n in the paper)
    :param lag: the difference in time between the past and the future comparison window
    :param estimation_lag: how often we should re-estimate lambda and sigma for the gaussian kernels
    :param scoring_step: the number of samples between each change score value (e.g. 2 would half the computations)
    :param n_kernels: the number of kernels for the density ration estimation
    :param symmetric: specifies whether to use two processes to compute a forward and backward pass simultaneously
    """
    self.window_length = window_length
    self.n_windows = n_windows
    self.lag = lag
    self.estimation_lag = estimation_lag
    self.n_kernels = n_kernels
    self.scoring_step = scoring_step
    self.symmetric = symmetric

    # create the specialized version of RuLSIF
    self.transformer = RuLSIF(window_length=self.window_length, n_windows=self.n_windows, lag=self.lag,
                              estimation_lag=self.estimation_lag, n_kernels=self.n_kernels, alpha=0.0,
                              symmetric=self.symmetric)

rulsif

RuLSIF

Bases: Algorithm

This class implements change point detection based on relative density ration estimation optimizing the least squares approach (RuLSIF) from:

[1] Liu, Song, et al. "Change-point detection in time-series data by relative density-ratio estimation." Neural Networks 43 (2013): 72-83.

The code has been created looking at: - https://github.com/hoxo-m/densratio_py - https://pypi.org/project/densratio/ - http://www.makotoyamada-ml.com/RuLSIF.html (Matlab implementation original author) - https://github.com/anewgithubname/change_detection (Matlab implementation co-author)

Source code in changepoynt/algorithms/rulsif.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class RuLSIF(Algorithm):
    """
    This class implements change point detection based on relative density ration estimation optimizing the least
    squares approach (RuLSIF) from:

    [1]
    Liu, Song, et al.
    "Change-point detection in time-series data by relative density-ratio estimation."
    Neural Networks 43 (2013): 72-83.

    The code has been created looking at:
    - https://github.com/hoxo-m/densratio_py
    - https://pypi.org/project/densratio/
    - http://www.makotoyamada-ml.com/RuLSIF.html (Matlab implementation original author)
    - https://github.com/anewgithubname/change_detection (Matlab implementation co-author)
    """

    def __init__(self, window_length: int = 10, n_windows: int = 50, lag: int = None, estimation_lag: int = None,
                 scoring_step: int = 1, n_kernels: int = 100, alpha: float = 0.01, symmetric: bool = True,
                 parallel: bool = False) -> None:
        """
        This defines all necessary parameters for the RuLSIF to work.
        :param window_length: the length of the windows we want to compare the densities for (k in the paper)
        :param n_windows: the amount of windows (n in the paper)
        :param lag: the difference in time between the past and the future comparison window
        :param estimation_lag: how often we should re-estimate lambda and sigma for the gaussian kernels
        :param scoring_step: the number of samples between each change score value (e.g. 2 would half the computations)
        :param n_kernels: the number of kernels for the density ration estimation
        :param alpha: the smoothing parameter for the RELATIVE in RuLSIF
        :param symmetric: specifies whether to use two processes to compute a forward and backward pass simultaneously
        """
        self.window_length = window_length
        self.n_windows = n_windows
        self.lag = lag
        self.estimation_lag = estimation_lag
        self.n_kernels = n_kernels
        self.alpha = alpha
        self.scoring_step = scoring_step
        self.symmetric = symmetric
        self.parallel = parallel

        # check that alpha does not exceed the bounds
        assert 0 <= self.alpha < 1, 'The alpha parameter should be in the interval [0,1).'

        # check for the estimation lag
        assert self.estimation_lag is None or 1 <= self.estimation_lag, \
            'The estimation lag needs to be bigger than zero samples.'

        # set the lag if it is not given
        if not self.lag:
            self.lag = self.n_windows

    def transform(self, time_series: np.ndarray):

        # check the dimensions of the input array
        assert time_series.ndim == 1, "Time series needs to be an 1D array."

        # compute the starting point of the scoring (past and future hankel need to fit)
        starting_point = self.window_length + self.n_windows + self.lag
        assert starting_point < time_series.shape[0], "The time series is too short to score any points."

        # create the estimator
        estimator = dre.RuLSIF(self.alpha)

        # copy the time series to protect it from any modifications within the algorithm
        time_series = time_series.copy()

        # check whether we want to compute the symmetric score
        if self.symmetric:

            # compute the scores with two workers one going forward and the other one backward
            if self.parallel:
                with mp.Pool(2) as workers:
                    result = workers.starmap(_transform,
                                             ((time_series, starting_point, self.window_length, self.n_windows,
                                               self.lag, self.scoring_step, estimator),
                                              (time_series[::-1], starting_point, self.window_length, self.n_windows,
                                               self.lag, self.scoring_step, estimator)))
            else:
                result = [_transform(time_series, starting_point, self.window_length, self.n_windows, self.lag,
                                     self.scoring_step, estimator),
                          _transform(time_series[::-1], starting_point, self.window_length, self.n_windows,
                                     self.lag, self.scoring_step, estimator)]

            return result[0] + result[1][::-1]

        else:
            # call the function to compute the values
            return _transform(time_series, starting_point, self.window_length, self.n_windows, self.lag,
                              self.scoring_step, estimator)

__init__(window_length=10, n_windows=50, lag=None, estimation_lag=None, scoring_step=1, n_kernels=100, alpha=0.01, symmetric=True, parallel=False)

This defines all necessary parameters for the RuLSIF to work.

Parameters:

Name Type Description Default
window_length int

the length of the windows we want to compare the densities for (k in the paper)

10
n_windows int

the amount of windows (n in the paper)

50
lag int

the difference in time between the past and the future comparison window

None
estimation_lag int

how often we should re-estimate lambda and sigma for the gaussian kernels

None
scoring_step int

the number of samples between each change score value (e.g. 2 would half the computations)

1
n_kernels int

the number of kernels for the density ration estimation

100
alpha float

the smoothing parameter for the RELATIVE in RuLSIF

0.01
symmetric bool

specifies whether to use two processes to compute a forward and backward pass simultaneously

True
Source code in changepoynt/algorithms/rulsif.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(self, window_length: int = 10, n_windows: int = 50, lag: int = None, estimation_lag: int = None,
             scoring_step: int = 1, n_kernels: int = 100, alpha: float = 0.01, symmetric: bool = True,
             parallel: bool = False) -> None:
    """
    This defines all necessary parameters for the RuLSIF to work.
    :param window_length: the length of the windows we want to compare the densities for (k in the paper)
    :param n_windows: the amount of windows (n in the paper)
    :param lag: the difference in time between the past and the future comparison window
    :param estimation_lag: how often we should re-estimate lambda and sigma for the gaussian kernels
    :param scoring_step: the number of samples between each change score value (e.g. 2 would half the computations)
    :param n_kernels: the number of kernels for the density ration estimation
    :param alpha: the smoothing parameter for the RELATIVE in RuLSIF
    :param symmetric: specifies whether to use two processes to compute a forward and backward pass simultaneously
    """
    self.window_length = window_length
    self.n_windows = n_windows
    self.lag = lag
    self.estimation_lag = estimation_lag
    self.n_kernels = n_kernels
    self.alpha = alpha
    self.scoring_step = scoring_step
    self.symmetric = symmetric
    self.parallel = parallel

    # check that alpha does not exceed the bounds
    assert 0 <= self.alpha < 1, 'The alpha parameter should be in the interval [0,1).'

    # check for the estimation lag
    assert self.estimation_lag is None or 1 <= self.estimation_lag, \
        'The estimation lag needs to be bigger than zero samples.'

    # set the lag if it is not given
    if not self.lag:
        self.lag = self.n_windows

fluss

FLUSS

Bases: Algorithm

This class uses the FLUSS algorithm described in:

[1] Gharghabi, Shaghayegh, et al. "Matrix profile VIII: domain agnostic online semantic segmentation at superhuman performance levels." 2017 IEEE international conference on data mining (ICDM). IEEE, 2017.

This class essentially wraps the stumpy library which implements a fast approach to calculate the matrix profile. https://stumpy.readthedocs.io/en/latest/index.html

TODO: Implement GPU support and streaming? (FLOSS)

Source code in changepoynt/algorithms/fluss.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class FLUSS(Algorithm):
    """
    This class uses the FLUSS algorithm described in:

    [1]
    Gharghabi, Shaghayegh, et al.
    "Matrix profile VIII: domain agnostic online semantic segmentation at superhuman performance levels."
    2017 IEEE international conference on data mining (ICDM). IEEE, 2017.

    This class essentially wraps the stumpy library which implements a fast approach to calculate the matrix profile.
    https://stumpy.readthedocs.io/en/latest/index.html

    TODO:
    Implement GPU support and streaming? (FLOSS)
    """

    def __init__(self, window_length: int) -> None:
        """
        We initialize the matrix profile and the subsequent floss using only the window length used for comarisons.

        :param window_length: the length of the window used for the distance comparisons in the matrix profile.
        """

        # save the specified parameters into instance variables
        self.window_length = window_length

    def transform(self, time_series: np.ndarray) -> np.ndarray:
        """
        This function calculate the anomaly score for each sample within the time series.

        It also does some assertion regarding time series length.

        :param time_series: 1D array containing the time series to be scored
        :return: anomaly score
        """

        # check the dimensions of the input array
        assert time_series.ndim == 1, "Time series needs to be an 1D array."

        # check that we have at least two windows
        assert time_series.shape[0] > self.window_length, 'Time series needs to be longer than window length.'

        # compute the floss score which is the amount of index pointers over this sample
        # compared to an expected curve. It is capped by one.
        matrix_profile = stumpy.stump(time_series, m=self.window_length)
        corrected_arc_crossings, _ = stumpy.fluss(matrix_profile[:, 1], L=self.window_length, n_regimes=1)
        return 1-corrected_arc_crossings

__init__(window_length)

We initialize the matrix profile and the subsequent floss using only the window length used for comarisons.

Parameters:

Name Type Description Default
window_length int

the length of the window used for the distance comparisons in the matrix profile.

required
Source code in changepoynt/algorithms/fluss.py
22
23
24
25
26
27
28
29
30
def __init__(self, window_length: int) -> None:
    """
    We initialize the matrix profile and the subsequent floss using only the window length used for comarisons.

    :param window_length: the length of the window used for the distance comparisons in the matrix profile.
    """

    # save the specified parameters into instance variables
    self.window_length = window_length

transform(time_series)

This function calculate the anomaly score for each sample within the time series.

It also does some assertion regarding time series length.

Parameters:

Name Type Description Default
time_series ndarray

1D array containing the time series to be scored

required

Returns:

Type Description
ndarray

anomaly score

Source code in changepoynt/algorithms/fluss.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def transform(self, time_series: np.ndarray) -> np.ndarray:
    """
    This function calculate the anomaly score for each sample within the time series.

    It also does some assertion regarding time series length.

    :param time_series: 1D array containing the time series to be scored
    :return: anomaly score
    """

    # check the dimensions of the input array
    assert time_series.ndim == 1, "Time series needs to be an 1D array."

    # check that we have at least two windows
    assert time_series.shape[0] > self.window_length, 'Time series needs to be longer than window length.'

    # compute the floss score which is the amount of index pointers over this sample
    # compared to an expected curve. It is capped by one.
    matrix_profile = stumpy.stump(time_series, m=self.window_length)
    corrected_arc_crossings, _ = stumpy.fluss(matrix_profile[:, 1], L=self.window_length, n_regimes=1)
    return 1-corrected_arc_crossings

floss

FLOSS

Bases: Algorithm

This class uses the FLOSS algorithm described in:

[1] Gharghabi, Shaghayegh, et al. "Matrix profile VIII: domain agnostic online semantic segmentation at superhuman performance levels." 2017 IEEE international conference on data mining (ICDM). IEEE, 2017.

This class essentially wraps the stumpy library which implements a fast approach to calculate the matrix profile. https://stumpy.readthedocs.io/en/latest/index.html

TODO: Implement GPU support and streaming? (FLOSS)

Source code in changepoynt/algorithms/floss.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class FLOSS(Algorithm):
    """
    This class uses the FLOSS algorithm described in:

    [1]
    Gharghabi, Shaghayegh, et al.
    "Matrix profile VIII: domain agnostic online semantic segmentation at superhuman performance levels."
    2017 IEEE international conference on data mining (ICDM). IEEE, 2017.

    This class essentially wraps the stumpy library which implements a fast approach to calculate the matrix profile.
    https://stumpy.readthedocs.io/en/latest/index.html

    TODO:
    Implement GPU support and streaming? (FLOSS)
    """

    def __init__(self, window_length: int, initial_length: int = None) -> None:
        """
        We initialize the matrix profile and the subsequent floss using only the window length used for comarisons.

        :param window_length: the length of the window used for the distance comparisons in the matrix profile.
        :param initial_length: the length for the initial matrix profile length
        """
        raise NotImplementedError('FLOSS is not yet fully functional.')
        # save the specified parameters into instance variables
        self.window_length = window_length
        self.initial_length = initial_length

        # set default initial length if necessary
        if not initial_length:
            self.initial_length = 10*self.window_length

        # check for the size
        assert self.initial_length >= 2*self.window_length, 'The initial_length should be at least twice' \
                                                            'the window_length.'

    def transform(self, time_series: np.ndarray) -> np.ndarray:
        """
        This function calculate the anomaly score for each sample within the time series.

        It also does some assertion regarding time series length.

        :param time_series: 1D array containing the time series to be scored
        :return: anomaly score
        """

        # check the dimensions of the input array
        assert time_series.ndim == 1, "Time series needs to be an 1D array."

        # check that we have at least two windows
        assert time_series.shape[0] > self.initial_length, \
            'Time series needs to be longer than specified initial length.'

        # feed it through the online process
        score = _transform(time_series, self.initial_length, self.window_length)
        return score

__init__(window_length, initial_length=None)

We initialize the matrix profile and the subsequent floss using only the window length used for comarisons.

Parameters:

Name Type Description Default
window_length int

the length of the window used for the distance comparisons in the matrix profile.

required
initial_length int

the length for the initial matrix profile length

None
Source code in changepoynt/algorithms/floss.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, window_length: int, initial_length: int = None) -> None:
    """
    We initialize the matrix profile and the subsequent floss using only the window length used for comarisons.

    :param window_length: the length of the window used for the distance comparisons in the matrix profile.
    :param initial_length: the length for the initial matrix profile length
    """
    raise NotImplementedError('FLOSS is not yet fully functional.')
    # save the specified parameters into instance variables
    self.window_length = window_length
    self.initial_length = initial_length

    # set default initial length if necessary
    if not initial_length:
        self.initial_length = 10*self.window_length

    # check for the size
    assert self.initial_length >= 2*self.window_length, 'The initial_length should be at least twice' \
                                                        'the window_length.'

transform(time_series)

This function calculate the anomaly score for each sample within the time series.

It also does some assertion regarding time series length.

Parameters:

Name Type Description Default
time_series ndarray

1D array containing the time series to be scored

required

Returns:

Type Description
ndarray

anomaly score

Source code in changepoynt/algorithms/floss.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def transform(self, time_series: np.ndarray) -> np.ndarray:
    """
    This function calculate the anomaly score for each sample within the time series.

    It also does some assertion regarding time series length.

    :param time_series: 1D array containing the time series to be scored
    :return: anomaly score
    """

    # check the dimensions of the input array
    assert time_series.ndim == 1, "Time series needs to be an 1D array."

    # check that we have at least two windows
    assert time_series.shape[0] > self.initial_length, \
        'Time series needs to be longer than specified initial length.'

    # feed it through the online process
    score = _transform(time_series, self.initial_length, self.window_length)
    return score

bocpd

============================================================================ Author: Gregory Gundersen

Python implementation of Bayesian online changepoint detection for a normal model with unknown mean parameter. For algorithm details, see

Adams & MacKay 2007
"Bayesian Online Changepoint Detection"
https://arxiv.org/abs/0710.3742

For Bayesian inference details about the Gaussian, see:

Murphy 2007
"Conjugate Bayesian analysis of the Gaussian distribution"
https://www.cs.ubc.ca/~murphyk/Papers/bayesGauss.pdf

This code is associated with the following blog posts:

https://gregorygundersen.com/blog/2019/08/13/bocd/
https://gregorygundersen.com/blog/2020/10/20/implementing-bocd/

============================================================================

GaussianUnknownMean

Source code in changepoynt/algorithms/bocpd.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
class GaussianUnknownMean:

    def __init__(self, mean0, var0, varx):
        """Initialize model.

        meanx is unknown; varx is known
        p(meanx) = N(mean0, var0)
        p(x) = N(meanx, varx)
        """
        self.mean0 = mean0
        self.var0 = var0
        self.varx = varx
        self.mean_params = np.array([mean0])
        self.prec_params = np.array([1 / var0])

    def log_pred_prob(self, t, x):
        """Compute predictive probabilities \pi, i.e. the posterior predictive
        for each run length hypothesis.
        """
        # Posterior predictive: see eq. 40 in (Murphy 2007).
        post_means = self.mean_params[:t]
        post_stds = np.sqrt(self.var_params[:t])
        return scipy_norm(post_means, post_stds).logpdf(x)

    def update_params(self, t, x):
        """Upon observing a new datum x at time t, update all run length
        hypotheses.
        """
        # See eq. 19 in (Murphy 2007).
        new_prec_params = self.prec_params + (1 / self.varx)
        self.prec_params = np.append([1 / self.var0], new_prec_params)
        # See eq. 24 in (Murphy 2007).
        new_mean_params = (self.mean_params * self.prec_params[:-1] + (x / self.varx)) / new_prec_params
        self.mean_params = np.append([self.mean0], new_mean_params)

    @property
    def var_params(self):
        """Helper function for computing the posterior variance.
        """
        return 1. / self.prec_params + self.varx

var_params property

Helper function for computing the posterior variance.

__init__(mean0, var0, varx)

Initialize model.

meanx is unknown; varx is known p(meanx) = N(mean0, var0) p(x) = N(meanx, varx)

Source code in changepoynt/algorithms/bocpd.py
190
191
192
193
194
195
196
197
198
199
200
201
def __init__(self, mean0, var0, varx):
    """Initialize model.

    meanx is unknown; varx is known
    p(meanx) = N(mean0, var0)
    p(x) = N(meanx, varx)
    """
    self.mean0 = mean0
    self.var0 = var0
    self.varx = varx
    self.mean_params = np.array([mean0])
    self.prec_params = np.array([1 / var0])

log_pred_prob(t, x)

Compute predictive probabilities \pi, i.e. the posterior predictive for each run length hypothesis.

Source code in changepoynt/algorithms/bocpd.py
203
204
205
206
207
208
209
210
def log_pred_prob(self, t, x):
    """Compute predictive probabilities \pi, i.e. the posterior predictive
    for each run length hypothesis.
    """
    # Posterior predictive: see eq. 40 in (Murphy 2007).
    post_means = self.mean_params[:t]
    post_stds = np.sqrt(self.var_params[:t])
    return scipy_norm(post_means, post_stds).logpdf(x)

update_params(t, x)

Upon observing a new datum x at time t, update all run length hypotheses.

Source code in changepoynt/algorithms/bocpd.py
212
213
214
215
216
217
218
219
220
221
def update_params(self, t, x):
    """Upon observing a new datum x at time t, update all run length
    hypotheses.
    """
    # See eq. 19 in (Murphy 2007).
    new_prec_params = self.prec_params + (1 / self.varx)
    self.prec_params = np.append([1 / self.var0], new_prec_params)
    # See eq. 24 in (Murphy 2007).
    new_mean_params = (self.mean_params * self.prec_params[:-1] + (x / self.varx)) / new_prec_params
    self.mean_params = np.append([self.mean0], new_mean_params)

bocd(data, model, hazard)

Return run length posterior using Algorithm 1 in Adams & MacKay 2007.

Source code in changepoynt/algorithms/bocpd.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def bocd(data, model, hazard):
    """Return run length posterior using Algorithm 1 in Adams & MacKay 2007.
    """
    # 1. Initialize lower triangular matrix representing the posterior as
    #    function of time. Model parameters are initialized in the model class.
    #
    #    When we exponentiate R at the end, exp(-inf) --> 0, which is nice for
    #    visualization.
    #
    T = len(data)
    log_R = -np.inf * np.ones((T + 1, T + 1))
    log_R[0, 0] = 0  # log 0 == 1
    pmean = np.empty(T)  # Model's predictive mean.
    pvar = np.empty(T)  # Model's predictive variance.
    log_message = np.array([0])  # log 0 == 1
    log_H = np.log(hazard)
    log_1mH = np.log(1 - hazard)

    for t in range(1, T + 1):
        # 2. Observe new datum.
        x = data[t - 1]

        # Make model predictions.
        pmean[t - 1] = np.sum(np.exp(log_R[t - 1, :t]) * model.mean_params[:t])
        pvar[t - 1] = np.sum(np.exp(log_R[t - 1, :t]) * model.var_params[:t])

        # 3. Evaluate predictive probabilities.
        log_pis = model.log_pred_prob(t, x)

        # 4. Calculate growth probabilities.
        log_growth_probs = log_pis + log_message + log_1mH

        # 5. Calculate changepoint probabilities.
        log_cp_prob = scipy_logsumexp(log_pis + log_message + log_H)

        # 6. Calculate evidence
        new_log_joint = np.append(log_cp_prob, log_growth_probs)

        # 7. Determine run length distribution.
        log_R[t, :t + 1] = new_log_joint
        log_R[t, :t + 1] -= scipy_logsumexp(new_log_joint)

        # 8. Update sufficient statistics.
        model.update_params(t, x)

        # Pass message.
        log_message = new_log_joint

    R = np.exp(log_R)
    return R, pmean, pvar

generate_data(varx, mean0, var0, T, cp_prob)

Generate partitioned data of T observations according to constant changepoint probability cp_prob with hyperpriors mean0 and prec0.

Source code in changepoynt/algorithms/bocpd.py
232
233
234
235
236
237
238
239
240
241
242
243
244
def generate_data(varx, mean0, var0, T, cp_prob):
    """Generate partitioned data of T observations according to constant
    changepoint probability `cp_prob` with hyperpriors `mean0` and `prec0`.
    """
    data = []
    cps = []
    meanx = mean0
    for t in range(0, T):
        if np.random.random() < cp_prob:
            meanx = np.random.normal(mean0, var0)
            cps.append(t)
        data.append(np.random.normal(meanx, varx))
    return data, cps

clasp

CLASP

Bases: Algorithm

This class uses the ClaSP algorithm published in

[1] Schäfer, Patrick, Arik Ermshaus, and Ulf Leser. "Clasp-time series segmentation." Proceedings of the 30th ACM International Conference on Information & Knowledge Management. 2021.

This class essentially wraps the claspy library https://github.com/ermshaua/claspy https://sites.google.com/view/ts-clasp/

In the current configuration it is recommended to set no parameters, as CLASP is able to extract it from the data. TODO: Own implementation?

Source code in changepoynt/algorithms/clasp.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class CLASP(Algorithm):
    """
    This class uses the ClaSP algorithm published in

    [1]
    Schäfer, Patrick, Arik Ermshaus, and Ulf Leser.
    "Clasp-time series segmentation."
    Proceedings of the 30th ACM International Conference on Information & Knowledge Management. 2021.

    This class essentially wraps the claspy library
    https://github.com/ermshaua/claspy
    https://sites.google.com/view/ts-clasp/

    In the current configuration it is recommended to set no parameters, as CLASP is able to extract it from the data.
    TODO:
    Own implementation?
    """

    def __init__(self, n_segments="learn", n_estimators=10, window_size="suss", k_neighbours=3,
                 distance="znormed_euclidean_distance", score="roc_auc",
                 early_stopping=True, validation="significance_test", threshold=1e-15, excl_radius=5,
                 random_state=2357) -> None:

        # say that we currently not support CLASP due to old packages in the requirements
        raise NotImplementedError('CLASP is currently not available, as it requires outdated package versions.')

        # save the specified parameters into instance variables
        self.n_segments = n_segments
        self.n_estimators = n_estimators
        self.window_size = window_size
        self.k_neighbours = k_neighbours
        self.distance = distance
        self.score = score
        self.early_stopping = early_stopping
        self.validation = validation
        self.threshold = threshold
        self.excl_radius = excl_radius
        self.random_state = random_state

    def transform(self, time_series: np.ndarray) -> np.ndarray:
        # check the dimensions of the input array
        assert time_series.ndim == 1, "Time series needs to be an 1D array."

        # make the clasp segmentation object
        claspy_segmenter = BinaryClaSPSegmentation(n_segments=self.n_segments, n_estimators=self.n_estimators,
                                                   window_size=self.window_size, k_neighbours=self.k_neighbours,
                                                   score=self.score, early_stopping=self.early_stopping,
                                                   validation=self.validation, threshold=self.threshold,
                                                   excl_radius=self.excl_radius, random_state=self.random_state)
        claspy_segmenter.fit(time_series)
        return claspy_segmenter.profile