cpdbench.control.DatasetExecutor
1import time 2from concurrent.futures import ThreadPoolExecutor 3 4from cpdbench.control.CPDDatasetResult import CPDDatasetResult, ErrorType 5from cpdbench.exception.AlgorithmExecutionException import AlgorithmExecutionException 6from cpdbench.exception.DatasetFetchException import CPDDatasetCreationException, SignalLoadingException 7from cpdbench.exception.MetricExecutionException import MetricExecutionException 8 9 10class DatasetExecutor: 11 """Helper class for the TestrunController for the execution of all algorithm and metric tasks 12 of one dataset for better structure. 13 This executor runs on subprocesses in multiprocessing mode.""" 14 15 def __init__(self, dataset_task, algorithm_tasks, metric_tasks, logger): 16 self._result: CPDDatasetResult = None # Created later 17 self._dataset_task = dataset_task 18 self._algorithm_tasks = algorithm_tasks 19 self._metric_tasks = metric_tasks 20 self.logger = logger 21 22 def execute(self): 23 """Executes the entered algorithm and metric tasks.""" 24 self._result = CPDDatasetResult(self._dataset_task, self._algorithm_tasks, self._metric_tasks) 25 try: 26 self._execute_dataset() 27 except Exception as e: 28 self.logger.exception(e) 29 self._result.add_error(e, ErrorType.DATASET_ERROR) 30 return self._result 31 32 def _execute_dataset(self): 33 try: 34 self.logger.info(f"Start running tasks for dataset {self._dataset_task.get_task_name()}") 35 self.logger.debug(f"Executing dataset task {self._dataset_task.get_task_name()}") 36 runtime = time.perf_counter() 37 dataset = self._dataset_task.execute() 38 runtime = time.perf_counter() - runtime 39 self._result.add_dataset_runtime(runtime) 40 self.logger.debug(f"Finished dataset task {self._dataset_task.get_task_name()}. Took {runtime} seconds.") 41 except Exception as e: 42 raise CPDDatasetCreationException(self._dataset_task.get_task_name()) from e 43 algorithms = [] 44 with ThreadPoolExecutor(max_workers=None) as executor: 45 self.logger.debug(f"Getting signal") 46 try: 47 data, ground_truth = dataset.get_signal() 48 self.logger.debug(f"Got signal.") 49 except Exception as e: 50 raise SignalLoadingException(self._dataset_task.get_task_name()) from e 51 else: 52 self.logger.debug(f"Starting threads for executing algorithms") 53 for algorithm in self._algorithm_tasks: 54 algorithms.append(executor.submit(self._execute_algorithm_and_metric, data, 55 algorithm, ground_truth)) 56 self.logger.debug(f"Started thread for algorithm {algorithm.get_task_name()}") 57 for i in range(0, len(algorithms)): 58 try: 59 algorithms[i].result() 60 except Exception as e: 61 self.logger.exception(e) 62 self._result.add_error(e, ErrorType.ALGORITHM_ERROR, self._algorithm_tasks[i].get_task_name()) 63 self.logger.debug(f"All algorithm threads are finished") 64 self.logger.debug(f"Finished!") 65 66 def _execute_algorithm_and_metric(self, dataset, algorithm, ground_truth): 67 logger = self.logger.getChild(algorithm.get_task_name()) 68 try: 69 logger.debug(f"Executing algorithm task {algorithm.get_task_name()}") 70 runtime = time.perf_counter() 71 indexes, scores = algorithm.execute(dataset) 72 runtime = time.perf_counter() - runtime 73 logger.debug(f"Finished algorithm task {algorithm.get_task_name()}. Took {runtime} seconds") 74 except Exception as e: 75 raise AlgorithmExecutionException(algorithm.get_task_name(), 76 self._dataset_task.get_task_name()) from e 77 self._result.add_algorithm_result(indexes, scores, algorithm.get_task_name(), runtime) 78 metrics = [] 79 logger.debug(f"Starting threads for executing metrics ") 80 with ThreadPoolExecutor(max_workers=None) as executor: 81 for metric in self._metric_tasks: 82 metrics.append(executor.submit(self._calculate_metric, indexes, scores, 83 metric, ground_truth, algorithm)) 84 logger.debug(f"Started thread for metric {metric.get_task_name()}") 85 for i in range(0, len(metrics)): 86 try: 87 metrics[i].result() 88 except Exception as e: 89 logger.exception(e) 90 self._result.add_error(e, ErrorType.METRIC_ERROR, algorithm.get_task_name(), 91 self._metric_tasks[i].get_task_name()) 92 logger.debug(f"All metric threads are finished") 93 logger.debug(f"Finished") 94 95 def _calculate_metric(self, indexes, scores, metric_task, ground_truth, algorithm): 96 logger = self.logger.getChild(algorithm.get_task_name()).getChild(metric_task.get_task_name()) 97 logger.debug(f"Executing metric task {metric_task.get_task_name()}") 98 try: 99 runtime = time.perf_counter() 100 metric_result = metric_task.execute(indexes, scores, ground_truth) 101 runtime = time.perf_counter() - runtime 102 logger.debug(f"Finished metric task {metric_task.get_task_name()}. Took {runtime} seconds") 103 except Exception as e: 104 raise MetricExecutionException(metric_task.get_task_name(), algorithm.get_task_name(), 105 self._dataset_task.get_task_name()) from e 106 self._result.add_metric_score(metric_result, algorithm.get_task_name(), metric_task.get_task_name(), runtime) 107 logger.debug("Finished")
class
DatasetExecutor:
11class DatasetExecutor: 12 """Helper class for the TestrunController for the execution of all algorithm and metric tasks 13 of one dataset for better structure. 14 This executor runs on subprocesses in multiprocessing mode.""" 15 16 def __init__(self, dataset_task, algorithm_tasks, metric_tasks, logger): 17 self._result: CPDDatasetResult = None # Created later 18 self._dataset_task = dataset_task 19 self._algorithm_tasks = algorithm_tasks 20 self._metric_tasks = metric_tasks 21 self.logger = logger 22 23 def execute(self): 24 """Executes the entered algorithm and metric tasks.""" 25 self._result = CPDDatasetResult(self._dataset_task, self._algorithm_tasks, self._metric_tasks) 26 try: 27 self._execute_dataset() 28 except Exception as e: 29 self.logger.exception(e) 30 self._result.add_error(e, ErrorType.DATASET_ERROR) 31 return self._result 32 33 def _execute_dataset(self): 34 try: 35 self.logger.info(f"Start running tasks for dataset {self._dataset_task.get_task_name()}") 36 self.logger.debug(f"Executing dataset task {self._dataset_task.get_task_name()}") 37 runtime = time.perf_counter() 38 dataset = self._dataset_task.execute() 39 runtime = time.perf_counter() - runtime 40 self._result.add_dataset_runtime(runtime) 41 self.logger.debug(f"Finished dataset task {self._dataset_task.get_task_name()}. Took {runtime} seconds.") 42 except Exception as e: 43 raise CPDDatasetCreationException(self._dataset_task.get_task_name()) from e 44 algorithms = [] 45 with ThreadPoolExecutor(max_workers=None) as executor: 46 self.logger.debug(f"Getting signal") 47 try: 48 data, ground_truth = dataset.get_signal() 49 self.logger.debug(f"Got signal.") 50 except Exception as e: 51 raise SignalLoadingException(self._dataset_task.get_task_name()) from e 52 else: 53 self.logger.debug(f"Starting threads for executing algorithms") 54 for algorithm in self._algorithm_tasks: 55 algorithms.append(executor.submit(self._execute_algorithm_and_metric, data, 56 algorithm, ground_truth)) 57 self.logger.debug(f"Started thread for algorithm {algorithm.get_task_name()}") 58 for i in range(0, len(algorithms)): 59 try: 60 algorithms[i].result() 61 except Exception as e: 62 self.logger.exception(e) 63 self._result.add_error(e, ErrorType.ALGORITHM_ERROR, self._algorithm_tasks[i].get_task_name()) 64 self.logger.debug(f"All algorithm threads are finished") 65 self.logger.debug(f"Finished!") 66 67 def _execute_algorithm_and_metric(self, dataset, algorithm, ground_truth): 68 logger = self.logger.getChild(algorithm.get_task_name()) 69 try: 70 logger.debug(f"Executing algorithm task {algorithm.get_task_name()}") 71 runtime = time.perf_counter() 72 indexes, scores = algorithm.execute(dataset) 73 runtime = time.perf_counter() - runtime 74 logger.debug(f"Finished algorithm task {algorithm.get_task_name()}. Took {runtime} seconds") 75 except Exception as e: 76 raise AlgorithmExecutionException(algorithm.get_task_name(), 77 self._dataset_task.get_task_name()) from e 78 self._result.add_algorithm_result(indexes, scores, algorithm.get_task_name(), runtime) 79 metrics = [] 80 logger.debug(f"Starting threads for executing metrics ") 81 with ThreadPoolExecutor(max_workers=None) as executor: 82 for metric in self._metric_tasks: 83 metrics.append(executor.submit(self._calculate_metric, indexes, scores, 84 metric, ground_truth, algorithm)) 85 logger.debug(f"Started thread for metric {metric.get_task_name()}") 86 for i in range(0, len(metrics)): 87 try: 88 metrics[i].result() 89 except Exception as e: 90 logger.exception(e) 91 self._result.add_error(e, ErrorType.METRIC_ERROR, algorithm.get_task_name(), 92 self._metric_tasks[i].get_task_name()) 93 logger.debug(f"All metric threads are finished") 94 logger.debug(f"Finished") 95 96 def _calculate_metric(self, indexes, scores, metric_task, ground_truth, algorithm): 97 logger = self.logger.getChild(algorithm.get_task_name()).getChild(metric_task.get_task_name()) 98 logger.debug(f"Executing metric task {metric_task.get_task_name()}") 99 try: 100 runtime = time.perf_counter() 101 metric_result = metric_task.execute(indexes, scores, ground_truth) 102 runtime = time.perf_counter() - runtime 103 logger.debug(f"Finished metric task {metric_task.get_task_name()}. Took {runtime} seconds") 104 except Exception as e: 105 raise MetricExecutionException(metric_task.get_task_name(), algorithm.get_task_name(), 106 self._dataset_task.get_task_name()) from e 107 self._result.add_metric_score(metric_result, algorithm.get_task_name(), metric_task.get_task_name(), runtime) 108 logger.debug("Finished")
Helper class for the TestrunController for the execution of all algorithm and metric tasks of one dataset for better structure. This executor runs on subprocesses in multiprocessing mode.
def
execute(self):
23 def execute(self): 24 """Executes the entered algorithm and metric tasks.""" 25 self._result = CPDDatasetResult(self._dataset_task, self._algorithm_tasks, self._metric_tasks) 26 try: 27 self._execute_dataset() 28 except Exception as e: 29 self.logger.exception(e) 30 self._result.add_error(e, ErrorType.DATASET_ERROR) 31 return self._result
Executes the entered algorithm and metric tasks.