cpdbench.control.TestrunController

 1import logging
 2import logging.handlers
 3import multiprocessing
 4import threading
 5from concurrent.futures import ProcessPoolExecutor
 6
 7from cpdbench.control.CPDFullResult import CPDFullResult
 8from cpdbench.control.CPDResult import CPDResult
 9from cpdbench.control.DatasetExecutor import DatasetExecutor
10from cpdbench.control.ExecutionController import ExecutionController
11from cpdbench.utils import Logger, BenchConfig
12from tqdm import tqdm
13
14
15def _logger_thread(queue, logger):
16    while True:
17        record = queue.get()
18        if record is None:
19            break
20        logger.handle(record)
21
22
23def _create_ds_executor_and_run(dataset, algorithms, metrics, queue):
24    logger_name = 'cpdbench.' + dataset.get_task_name()
25    logger = logging.getLogger(logger_name)
26    logger.setLevel(logging.DEBUG)
27    logger.addHandler(logging.handlers.QueueHandler(queue))
28
29    ds_executor = DatasetExecutor(dataset, algorithms, metrics, logger)
30    try:
31        return ds_executor.execute()
32    except Exception as e:
33        logger.exception(e)
34        raise e
35
36
37class TestrunController(ExecutionController):
38    """An ExecutionController implementation for the standard run configuration.
39    As described in the paper, all given datasets, algorithms and metrics are
40    completely executed and all results are stored in a CPDFullResult.
41    """
42
43    def __init__(self):
44        self._logger = Logger.get_application_logger()
45        super().__init__(self._logger)
46
47    def execute_run(self, methods: dict) -> CPDResult:
48        self._logger.info('Creating tasks...')
49        tasks = self._create_tasks(methods)
50        self._logger.info(f"{len(tasks['datasets']) + len(tasks['algorithms']) + len(tasks['metrics'])} tasks created")
51
52        dataset_results = []
53        run_result = CPDFullResult(list(map(lambda x: x.get_task_name(), tasks['datasets'])),
54                                   list(map(lambda x: x.get_task_name(), tasks['algorithms'])),
55                                   list(map(lambda x: x.get_task_name(), tasks['metrics'])))
56        q = multiprocessing.Manager().Queue()
57        error_list = []
58        logging_thread = threading.Thread(target=_logger_thread, args=(q, self._logger))
59        logging_thread.start()
60
61        max_workers = None if BenchConfig.multiprocessing_enabled else 1
62        with ProcessPoolExecutor(max_workers=max_workers) as executor:
63            for dataset in tasks["datasets"]:
64                dataset_results.append(executor.submit(_create_ds_executor_and_run,
65                                                       dataset,
66                                                       tasks["algorithms"],
67                                                       tasks["metrics"], q))
68            for i in tqdm(range(len(dataset_results)), desc="Processing datasets"):
69                j = 0
70                while True:
71                    ds_res = dataset_results[j]
72                    try:
73                        res = ds_res.result(2)
74                    except Exception as e:
75                        if e is TimeoutError:
76                            error_list.append(e)
77                            dataset_results.pop(j)
78                            i += 1
79                            break
80                    else:
81                        run_result.add_dataset_result(res)
82                        dataset_results.pop(j)
83                        i += 1
84                        break
85                    j += 1
86                    if j == len(dataset_results):
87                        j = 0
88
89        q.put_nowait(None)
90        logging_thread.join()
91        for error in error_list:
92            self._logger.exception(error)
93        self._logger.info("Collected all datasets")
94        self._logger.info("Finished testrun. Printing results")
95        return run_result
class TestrunController(cpdbench.control.ExecutionController.ExecutionController):
38class TestrunController(ExecutionController):
39    """An ExecutionController implementation for the standard run configuration.
40    As described in the paper, all given datasets, algorithms and metrics are
41    completely executed and all results are stored in a CPDFullResult.
42    """
43
44    def __init__(self):
45        self._logger = Logger.get_application_logger()
46        super().__init__(self._logger)
47
48    def execute_run(self, methods: dict) -> CPDResult:
49        self._logger.info('Creating tasks...')
50        tasks = self._create_tasks(methods)
51        self._logger.info(f"{len(tasks['datasets']) + len(tasks['algorithms']) + len(tasks['metrics'])} tasks created")
52
53        dataset_results = []
54        run_result = CPDFullResult(list(map(lambda x: x.get_task_name(), tasks['datasets'])),
55                                   list(map(lambda x: x.get_task_name(), tasks['algorithms'])),
56                                   list(map(lambda x: x.get_task_name(), tasks['metrics'])))
57        q = multiprocessing.Manager().Queue()
58        error_list = []
59        logging_thread = threading.Thread(target=_logger_thread, args=(q, self._logger))
60        logging_thread.start()
61
62        max_workers = None if BenchConfig.multiprocessing_enabled else 1
63        with ProcessPoolExecutor(max_workers=max_workers) as executor:
64            for dataset in tasks["datasets"]:
65                dataset_results.append(executor.submit(_create_ds_executor_and_run,
66                                                       dataset,
67                                                       tasks["algorithms"],
68                                                       tasks["metrics"], q))
69            for i in tqdm(range(len(dataset_results)), desc="Processing datasets"):
70                j = 0
71                while True:
72                    ds_res = dataset_results[j]
73                    try:
74                        res = ds_res.result(2)
75                    except Exception as e:
76                        if e is TimeoutError:
77                            error_list.append(e)
78                            dataset_results.pop(j)
79                            i += 1
80                            break
81                    else:
82                        run_result.add_dataset_result(res)
83                        dataset_results.pop(j)
84                        i += 1
85                        break
86                    j += 1
87                    if j == len(dataset_results):
88                        j = 0
89
90        q.put_nowait(None)
91        logging_thread.join()
92        for error in error_list:
93            self._logger.exception(error)
94        self._logger.info("Collected all datasets")
95        self._logger.info("Finished testrun. Printing results")
96        return run_result

An ExecutionController implementation for the standard run configuration. As described in the paper, all given datasets, algorithms and metrics are completely executed and all results are stored in a CPDFullResult.

def execute_run(self, methods: dict) -> cpdbench.control.CPDResult.CPDResult:
48    def execute_run(self, methods: dict) -> CPDResult:
49        self._logger.info('Creating tasks...')
50        tasks = self._create_tasks(methods)
51        self._logger.info(f"{len(tasks['datasets']) + len(tasks['algorithms']) + len(tasks['metrics'])} tasks created")
52
53        dataset_results = []
54        run_result = CPDFullResult(list(map(lambda x: x.get_task_name(), tasks['datasets'])),
55                                   list(map(lambda x: x.get_task_name(), tasks['algorithms'])),
56                                   list(map(lambda x: x.get_task_name(), tasks['metrics'])))
57        q = multiprocessing.Manager().Queue()
58        error_list = []
59        logging_thread = threading.Thread(target=_logger_thread, args=(q, self._logger))
60        logging_thread.start()
61
62        max_workers = None if BenchConfig.multiprocessing_enabled else 1
63        with ProcessPoolExecutor(max_workers=max_workers) as executor:
64            for dataset in tasks["datasets"]:
65                dataset_results.append(executor.submit(_create_ds_executor_and_run,
66                                                       dataset,
67                                                       tasks["algorithms"],
68                                                       tasks["metrics"], q))
69            for i in tqdm(range(len(dataset_results)), desc="Processing datasets"):
70                j = 0
71                while True:
72                    ds_res = dataset_results[j]
73                    try:
74                        res = ds_res.result(2)
75                    except Exception as e:
76                        if e is TimeoutError:
77                            error_list.append(e)
78                            dataset_results.pop(j)
79                            i += 1
80                            break
81                    else:
82                        run_result.add_dataset_result(res)
83                        dataset_results.pop(j)
84                        i += 1
85                        break
86                    j += 1
87                    if j == len(dataset_results):
88                        j = 0
89
90        q.put_nowait(None)
91        logging_thread.join()
92        for error in error_list:
93            self._logger.exception(error)
94        self._logger.info("Collected all datasets")
95        self._logger.info("Finished testrun. Printing results")
96        return run_result

Executes the run implemented by this class.

Parameters
  • methods: dictionary with all given input functions, grouped by function type.
Returns

A result object which can be handed to the user