cpdbench.control.DatasetExecutor

  1import time
  2from concurrent.futures import ThreadPoolExecutor
  3
  4from cpdbench.control.CPDDatasetResult import CPDDatasetResult, ErrorType
  5from cpdbench.exception.AlgorithmExecutionException import AlgorithmExecutionException
  6from cpdbench.exception.DatasetFetchException import CPDDatasetCreationException, SignalLoadingException
  7from cpdbench.exception.MetricExecutionException import MetricExecutionException
  8
  9
 10class DatasetExecutor:
 11    """Helper class for the TestrunController for the execution of all algorithm and metric tasks
 12    of one dataset for better structure.
 13    This executor runs on subprocesses in multiprocessing mode."""
 14
 15    def __init__(self, dataset_task, algorithm_tasks, metric_tasks, logger):
 16        self._result: CPDDatasetResult = None  # Created later
 17        self._dataset_task = dataset_task
 18        self._algorithm_tasks = algorithm_tasks
 19        self._metric_tasks = metric_tasks
 20        self.logger = logger
 21
 22    def execute(self):
 23        """Executes the entered algorithm and metric tasks."""
 24        self._result = CPDDatasetResult(self._dataset_task, self._algorithm_tasks, self._metric_tasks)
 25        try:
 26            self._execute_dataset()
 27        except Exception as e:
 28            self.logger.exception(e)
 29            self._result.add_error(e, ErrorType.DATASET_ERROR)
 30        return self._result
 31
 32    def _execute_dataset(self):
 33        try:
 34            self.logger.info(f"Start running tasks for dataset {self._dataset_task.get_task_name()}")
 35            self.logger.debug(f"Executing dataset task {self._dataset_task.get_task_name()}")
 36            runtime = time.perf_counter()
 37            dataset = self._dataset_task.execute()
 38            runtime = time.perf_counter() - runtime
 39            self._result.add_dataset_runtime(runtime)
 40            self.logger.debug(f"Finished dataset task {self._dataset_task.get_task_name()}. Took {runtime} seconds.")
 41        except Exception as e:
 42            raise CPDDatasetCreationException(self._dataset_task.get_task_name()) from e
 43        algorithms = []
 44        with ThreadPoolExecutor(max_workers=None) as executor:
 45            self.logger.debug(f"Getting signal")
 46            try:
 47                data, ground_truth = dataset.get_signal()
 48                self.logger.debug(f"Got signal.")
 49            except Exception as e:
 50                raise SignalLoadingException(self._dataset_task.get_task_name()) from e
 51            else:
 52                self.logger.debug(f"Starting threads for executing algorithms")
 53                for algorithm in self._algorithm_tasks:
 54                    algorithms.append(executor.submit(self._execute_algorithm_and_metric, data,
 55                                                      algorithm, ground_truth))
 56                    self.logger.debug(f"Started thread for algorithm {algorithm.get_task_name()}")
 57        for i in range(0, len(algorithms)):
 58            try:
 59                algorithms[i].result()
 60            except Exception as e:
 61                self.logger.exception(e)
 62                self._result.add_error(e, ErrorType.ALGORITHM_ERROR, self._algorithm_tasks[i].get_task_name())
 63        self.logger.debug(f"All algorithm threads are finished")
 64        self.logger.debug(f"Finished!")
 65
 66    def _execute_algorithm_and_metric(self, dataset, algorithm, ground_truth):
 67        logger = self.logger.getChild(algorithm.get_task_name())
 68        try:
 69            logger.debug(f"Executing algorithm task {algorithm.get_task_name()}")
 70            runtime = time.perf_counter()
 71            indexes, scores = algorithm.execute(dataset)
 72            runtime = time.perf_counter() - runtime
 73            logger.debug(f"Finished algorithm task {algorithm.get_task_name()}. Took {runtime} seconds")
 74        except Exception as e:
 75            raise AlgorithmExecutionException(algorithm.get_task_name(),
 76                                              self._dataset_task.get_task_name()) from e
 77        self._result.add_algorithm_result(indexes, scores, algorithm.get_task_name(), runtime)
 78        metrics = []
 79        logger.debug(f"Starting threads for executing metrics ")
 80        with ThreadPoolExecutor(max_workers=None) as executor:
 81            for metric in self._metric_tasks:
 82                metrics.append(executor.submit(self._calculate_metric, indexes, scores,
 83                                               metric, ground_truth, algorithm))
 84                logger.debug(f"Started thread for metric {metric.get_task_name()}")
 85        for i in range(0, len(metrics)):
 86            try:
 87                metrics[i].result()
 88            except Exception as e:
 89                logger.exception(e)
 90                self._result.add_error(e, ErrorType.METRIC_ERROR, algorithm.get_task_name(),
 91                                       self._metric_tasks[i].get_task_name())
 92        logger.debug(f"All metric threads are finished")
 93        logger.debug(f"Finished")
 94
 95    def _calculate_metric(self, indexes, scores, metric_task, ground_truth, algorithm):
 96        logger = self.logger.getChild(algorithm.get_task_name()).getChild(metric_task.get_task_name())
 97        logger.debug(f"Executing metric task {metric_task.get_task_name()}")
 98        try:
 99            runtime = time.perf_counter()
100            metric_result = metric_task.execute(indexes, scores, ground_truth)
101            runtime = time.perf_counter() - runtime
102            logger.debug(f"Finished metric task {metric_task.get_task_name()}. Took {runtime} seconds")
103        except Exception as e:
104            raise MetricExecutionException(metric_task.get_task_name(), algorithm.get_task_name(),
105                                           self._dataset_task.get_task_name()) from e
106        self._result.add_metric_score(metric_result, algorithm.get_task_name(), metric_task.get_task_name(), runtime)
107        logger.debug("Finished")
class DatasetExecutor:
 11class DatasetExecutor:
 12    """Helper class for the TestrunController for the execution of all algorithm and metric tasks
 13    of one dataset for better structure.
 14    This executor runs on subprocesses in multiprocessing mode."""
 15
 16    def __init__(self, dataset_task, algorithm_tasks, metric_tasks, logger):
 17        self._result: CPDDatasetResult = None  # Created later
 18        self._dataset_task = dataset_task
 19        self._algorithm_tasks = algorithm_tasks
 20        self._metric_tasks = metric_tasks
 21        self.logger = logger
 22
 23    def execute(self):
 24        """Executes the entered algorithm and metric tasks."""
 25        self._result = CPDDatasetResult(self._dataset_task, self._algorithm_tasks, self._metric_tasks)
 26        try:
 27            self._execute_dataset()
 28        except Exception as e:
 29            self.logger.exception(e)
 30            self._result.add_error(e, ErrorType.DATASET_ERROR)
 31        return self._result
 32
 33    def _execute_dataset(self):
 34        try:
 35            self.logger.info(f"Start running tasks for dataset {self._dataset_task.get_task_name()}")
 36            self.logger.debug(f"Executing dataset task {self._dataset_task.get_task_name()}")
 37            runtime = time.perf_counter()
 38            dataset = self._dataset_task.execute()
 39            runtime = time.perf_counter() - runtime
 40            self._result.add_dataset_runtime(runtime)
 41            self.logger.debug(f"Finished dataset task {self._dataset_task.get_task_name()}. Took {runtime} seconds.")
 42        except Exception as e:
 43            raise CPDDatasetCreationException(self._dataset_task.get_task_name()) from e
 44        algorithms = []
 45        with ThreadPoolExecutor(max_workers=None) as executor:
 46            self.logger.debug(f"Getting signal")
 47            try:
 48                data, ground_truth = dataset.get_signal()
 49                self.logger.debug(f"Got signal.")
 50            except Exception as e:
 51                raise SignalLoadingException(self._dataset_task.get_task_name()) from e
 52            else:
 53                self.logger.debug(f"Starting threads for executing algorithms")
 54                for algorithm in self._algorithm_tasks:
 55                    algorithms.append(executor.submit(self._execute_algorithm_and_metric, data,
 56                                                      algorithm, ground_truth))
 57                    self.logger.debug(f"Started thread for algorithm {algorithm.get_task_name()}")
 58        for i in range(0, len(algorithms)):
 59            try:
 60                algorithms[i].result()
 61            except Exception as e:
 62                self.logger.exception(e)
 63                self._result.add_error(e, ErrorType.ALGORITHM_ERROR, self._algorithm_tasks[i].get_task_name())
 64        self.logger.debug(f"All algorithm threads are finished")
 65        self.logger.debug(f"Finished!")
 66
 67    def _execute_algorithm_and_metric(self, dataset, algorithm, ground_truth):
 68        logger = self.logger.getChild(algorithm.get_task_name())
 69        try:
 70            logger.debug(f"Executing algorithm task {algorithm.get_task_name()}")
 71            runtime = time.perf_counter()
 72            indexes, scores = algorithm.execute(dataset)
 73            runtime = time.perf_counter() - runtime
 74            logger.debug(f"Finished algorithm task {algorithm.get_task_name()}. Took {runtime} seconds")
 75        except Exception as e:
 76            raise AlgorithmExecutionException(algorithm.get_task_name(),
 77                                              self._dataset_task.get_task_name()) from e
 78        self._result.add_algorithm_result(indexes, scores, algorithm.get_task_name(), runtime)
 79        metrics = []
 80        logger.debug(f"Starting threads for executing metrics ")
 81        with ThreadPoolExecutor(max_workers=None) as executor:
 82            for metric in self._metric_tasks:
 83                metrics.append(executor.submit(self._calculate_metric, indexes, scores,
 84                                               metric, ground_truth, algorithm))
 85                logger.debug(f"Started thread for metric {metric.get_task_name()}")
 86        for i in range(0, len(metrics)):
 87            try:
 88                metrics[i].result()
 89            except Exception as e:
 90                logger.exception(e)
 91                self._result.add_error(e, ErrorType.METRIC_ERROR, algorithm.get_task_name(),
 92                                       self._metric_tasks[i].get_task_name())
 93        logger.debug(f"All metric threads are finished")
 94        logger.debug(f"Finished")
 95
 96    def _calculate_metric(self, indexes, scores, metric_task, ground_truth, algorithm):
 97        logger = self.logger.getChild(algorithm.get_task_name()).getChild(metric_task.get_task_name())
 98        logger.debug(f"Executing metric task {metric_task.get_task_name()}")
 99        try:
100            runtime = time.perf_counter()
101            metric_result = metric_task.execute(indexes, scores, ground_truth)
102            runtime = time.perf_counter() - runtime
103            logger.debug(f"Finished metric task {metric_task.get_task_name()}. Took {runtime} seconds")
104        except Exception as e:
105            raise MetricExecutionException(metric_task.get_task_name(), algorithm.get_task_name(),
106                                           self._dataset_task.get_task_name()) from e
107        self._result.add_metric_score(metric_result, algorithm.get_task_name(), metric_task.get_task_name(), runtime)
108        logger.debug("Finished")

Helper class for the TestrunController for the execution of all algorithm and metric tasks of one dataset for better structure. This executor runs on subprocesses in multiprocessing mode.

DatasetExecutor(dataset_task, algorithm_tasks, metric_tasks, logger)
16    def __init__(self, dataset_task, algorithm_tasks, metric_tasks, logger):
17        self._result: CPDDatasetResult = None  # Created later
18        self._dataset_task = dataset_task
19        self._algorithm_tasks = algorithm_tasks
20        self._metric_tasks = metric_tasks
21        self.logger = logger
logger
def execute(self):
23    def execute(self):
24        """Executes the entered algorithm and metric tasks."""
25        self._result = CPDDatasetResult(self._dataset_task, self._algorithm_tasks, self._metric_tasks)
26        try:
27            self._execute_dataset()
28        except Exception as e:
29            self.logger.exception(e)
30            self._result.add_error(e, ErrorType.DATASET_ERROR)
31        return self._result

Executes the entered algorithm and metric tasks.