cpdbench.control.TestrunController
1import logging 2import logging.handlers 3import multiprocessing 4import threading 5from concurrent.futures import ProcessPoolExecutor 6 7from cpdbench.control.CPDFullResult import CPDFullResult 8from cpdbench.control.CPDResult import CPDResult 9from cpdbench.control.DatasetExecutor import DatasetExecutor 10from cpdbench.control.ExecutionController import ExecutionController 11from cpdbench.utils import Logger, BenchConfig 12from tqdm import tqdm 13 14 15def _logger_thread(queue, logger): 16 while True: 17 record = queue.get() 18 if record is None: 19 break 20 logger.handle(record) 21 22 23def _create_ds_executor_and_run(dataset, algorithms, metrics, queue): 24 logger_name = 'cpdbench.' + dataset.get_task_name() 25 logger = logging.getLogger(logger_name) 26 logger.setLevel(logging.DEBUG) 27 logger.addHandler(logging.handlers.QueueHandler(queue)) 28 29 ds_executor = DatasetExecutor(dataset, algorithms, metrics, logger) 30 try: 31 return ds_executor.execute() 32 except Exception as e: 33 logger.exception(e) 34 raise e 35 36 37class TestrunController(ExecutionController): 38 """An ExecutionController implementation for the standard run configuration. 39 As described in the paper, all given datasets, algorithms and metrics are 40 completely executed and all results are stored in a CPDFullResult. 41 """ 42 43 def __init__(self): 44 self._logger = Logger.get_application_logger() 45 super().__init__(self._logger) 46 47 def execute_run(self, methods: dict) -> CPDResult: 48 self._logger.info('Creating tasks...') 49 tasks = self._create_tasks(methods) 50 self._logger.info(f"{len(tasks['datasets']) + len(tasks['algorithms']) + len(tasks['metrics'])} tasks created") 51 52 dataset_results = [] 53 run_result = CPDFullResult(list(map(lambda x: x.get_task_name(), tasks['datasets'])), 54 list(map(lambda x: x.get_task_name(), tasks['algorithms'])), 55 list(map(lambda x: x.get_task_name(), tasks['metrics']))) 56 q = multiprocessing.Manager().Queue() 57 error_list = [] 58 logging_thread = threading.Thread(target=_logger_thread, args=(q, self._logger)) 59 logging_thread.start() 60 61 max_workers = None if BenchConfig.multiprocessing_enabled else 1 62 with ProcessPoolExecutor(max_workers=max_workers) as executor: 63 for dataset in tasks["datasets"]: 64 dataset_results.append(executor.submit(_create_ds_executor_and_run, 65 dataset, 66 tasks["algorithms"], 67 tasks["metrics"], q)) 68 for i in tqdm(range(len(dataset_results)), desc="Processing datasets"): 69 j = 0 70 while True: 71 ds_res = dataset_results[j] 72 try: 73 res = ds_res.result(2) 74 except Exception as e: 75 if e is TimeoutError: 76 error_list.append(e) 77 dataset_results.pop(j) 78 i += 1 79 break 80 else: 81 run_result.add_dataset_result(res) 82 dataset_results.pop(j) 83 i += 1 84 break 85 j += 1 86 if j == len(dataset_results): 87 j = 0 88 89 q.put_nowait(None) 90 logging_thread.join() 91 for error in error_list: 92 self._logger.exception(error) 93 self._logger.info("Collected all datasets") 94 self._logger.info("Finished testrun. Printing results") 95 return run_result
38class TestrunController(ExecutionController): 39 """An ExecutionController implementation for the standard run configuration. 40 As described in the paper, all given datasets, algorithms and metrics are 41 completely executed and all results are stored in a CPDFullResult. 42 """ 43 44 def __init__(self): 45 self._logger = Logger.get_application_logger() 46 super().__init__(self._logger) 47 48 def execute_run(self, methods: dict) -> CPDResult: 49 self._logger.info('Creating tasks...') 50 tasks = self._create_tasks(methods) 51 self._logger.info(f"{len(tasks['datasets']) + len(tasks['algorithms']) + len(tasks['metrics'])} tasks created") 52 53 dataset_results = [] 54 run_result = CPDFullResult(list(map(lambda x: x.get_task_name(), tasks['datasets'])), 55 list(map(lambda x: x.get_task_name(), tasks['algorithms'])), 56 list(map(lambda x: x.get_task_name(), tasks['metrics']))) 57 q = multiprocessing.Manager().Queue() 58 error_list = [] 59 logging_thread = threading.Thread(target=_logger_thread, args=(q, self._logger)) 60 logging_thread.start() 61 62 max_workers = None if BenchConfig.multiprocessing_enabled else 1 63 with ProcessPoolExecutor(max_workers=max_workers) as executor: 64 for dataset in tasks["datasets"]: 65 dataset_results.append(executor.submit(_create_ds_executor_and_run, 66 dataset, 67 tasks["algorithms"], 68 tasks["metrics"], q)) 69 for i in tqdm(range(len(dataset_results)), desc="Processing datasets"): 70 j = 0 71 while True: 72 ds_res = dataset_results[j] 73 try: 74 res = ds_res.result(2) 75 except Exception as e: 76 if e is TimeoutError: 77 error_list.append(e) 78 dataset_results.pop(j) 79 i += 1 80 break 81 else: 82 run_result.add_dataset_result(res) 83 dataset_results.pop(j) 84 i += 1 85 break 86 j += 1 87 if j == len(dataset_results): 88 j = 0 89 90 q.put_nowait(None) 91 logging_thread.join() 92 for error in error_list: 93 self._logger.exception(error) 94 self._logger.info("Collected all datasets") 95 self._logger.info("Finished testrun. Printing results") 96 return run_result
An ExecutionController implementation for the standard run configuration. As described in the paper, all given datasets, algorithms and metrics are completely executed and all results are stored in a CPDFullResult.
48 def execute_run(self, methods: dict) -> CPDResult: 49 self._logger.info('Creating tasks...') 50 tasks = self._create_tasks(methods) 51 self._logger.info(f"{len(tasks['datasets']) + len(tasks['algorithms']) + len(tasks['metrics'])} tasks created") 52 53 dataset_results = [] 54 run_result = CPDFullResult(list(map(lambda x: x.get_task_name(), tasks['datasets'])), 55 list(map(lambda x: x.get_task_name(), tasks['algorithms'])), 56 list(map(lambda x: x.get_task_name(), tasks['metrics']))) 57 q = multiprocessing.Manager().Queue() 58 error_list = [] 59 logging_thread = threading.Thread(target=_logger_thread, args=(q, self._logger)) 60 logging_thread.start() 61 62 max_workers = None if BenchConfig.multiprocessing_enabled else 1 63 with ProcessPoolExecutor(max_workers=max_workers) as executor: 64 for dataset in tasks["datasets"]: 65 dataset_results.append(executor.submit(_create_ds_executor_and_run, 66 dataset, 67 tasks["algorithms"], 68 tasks["metrics"], q)) 69 for i in tqdm(range(len(dataset_results)), desc="Processing datasets"): 70 j = 0 71 while True: 72 ds_res = dataset_results[j] 73 try: 74 res = ds_res.result(2) 75 except Exception as e: 76 if e is TimeoutError: 77 error_list.append(e) 78 dataset_results.pop(j) 79 i += 1 80 break 81 else: 82 run_result.add_dataset_result(res) 83 dataset_results.pop(j) 84 i += 1 85 break 86 j += 1 87 if j == len(dataset_results): 88 j = 0 89 90 q.put_nowait(None) 91 logging_thread.join() 92 for error in error_list: 93 self._logger.exception(error) 94 self._logger.info("Collected all datasets") 95 self._logger.info("Finished testrun. Printing results") 96 return run_result
Executes the run implemented by this class.
Parameters
- methods: dictionary with all given input functions, grouped by function type.
Returns
A result object which can be handed to the user