cpdbench.task.TaskFactory

 1from typing import Callable
 2import inspect
 3
 4from cpdbench.exception.UserParameterDoesNotExistException import UserParameterDoesNotExistException
 5from cpdbench.task.AlgorithmExecutionTask import AlgorithmExecutionTask
 6from cpdbench.task.DatasetFetchTask import DatasetFetchTask
 7from cpdbench.task.MetricExecutionTask import MetricExecutionTask
 8from cpdbench.task.Task import TaskType, Task
 9import cpdbench.utils.BenchConfig as BenchConfig
10from cpdbench.utils import Logger
11from cpdbench.utils.Utils import get_name_of_function
12from functools import partial
13
14
15class TaskFactory:
16    """Abstract factory for creating task objects"""
17
18    def __init__(self):
19        self._user_config = BenchConfig.get_user_config()
20        self._logger = Logger.get_application_logger()
21        self._task_counter = 0
22
23    def create_task_from_function(self, function: Callable, task_type: TaskType) -> Task:
24        """Creates one task for an unparametrized function.
25        :param function: the function to be executed as task
26        :param task_type: the type of the task to be created
27        :return: the constructed task object
28        """
29        return self._generate_task_object(function, {}, task_type)
30
31    def create_tasks_with_parameters(self, function: Callable, task_type: TaskType) -> list[Task]:
32        """Creates correct task objects based on the given task type and the needed parameters defined in the user
33        config. Because of this, this method can potentially output multiple tasks with the same function
34        but different parameters.
35        :param function: the function to be executed as task
36        :param task_type: the type of the task to be created
37        :return: the constructed task objects
38        """
39        all_params = [param.name for param in inspect.signature(function).parameters.values() if param.kind ==
40                      param.KEYWORD_ONLY]
41        try:
42            global_params = [param for param in all_params if self._user_config.check_if_global_param(param)]
43        except Exception as e:
44            if "Parameter not found" in str(e):
45                raise UserParameterDoesNotExistException(str(e).split()[-1], get_name_of_function(function))
46
47        if all_params is None or len(all_params) == 0:
48            # Easy case: no parameter
49            task = self._generate_task_object(function,{}, task_type)
50            self._logger.info(f"Created task {task.get_task_name()}")
51            self._task_counter += 1
52            return [task]
53        global_params.sort()
54        all_params.sort()
55        if global_params == all_params:
56            # Easy case: only global params
57            param_values = [{}]
58        else:
59            param_values = [{} for _ in range(self._user_config.get_number_of_executions(task_type))]
60        if len(param_values) == 0:
61            param_values = [{}]
62
63        for param in all_params:
64            try:
65                vals = self._user_config.get_user_param(param, task_type)
66            except Exception as e:
67                if str(e) == "Parameter not found":
68                    raise UserParameterDoesNotExistException(param, get_name_of_function(function))
69                else:
70                    raise e
71            else:
72                for i in range(len(param_values)):
73                    if param in global_params:
74                        param_values[i].update({param: vals[0]})  # global param
75                    else:
76                        param_values[i].update({param: vals[i]})  # execution param
77
78        tasks = []
79        for param_dict in param_values:
80            function_with_params = partial(function, **param_dict)
81            task = self._generate_task_object(function_with_params, param_dict, task_type)
82            self._task_counter += 1
83            self._logger.info(f"Created task {task.get_task_name()} with following parameters: {str(param_dict)}")
84            tasks.append(task)
85        return tasks
86
87    def _generate_task_object(self, function: Callable, param_dict: dict, task_type: TaskType):
88        if task_type == TaskType.DATASET_FETCH:
89            return DatasetFetchTask(function, self._task_counter, param_dict)
90        elif task_type == TaskType.ALGORITHM_EXECUTION:
91            return AlgorithmExecutionTask(function, self._task_counter, param_dict)
92        elif task_type == TaskType.METRIC_EXECUTION:
93            return MetricExecutionTask(function, self._task_counter, param_dict)
class TaskFactory:
16class TaskFactory:
17    """Abstract factory for creating task objects"""
18
19    def __init__(self):
20        self._user_config = BenchConfig.get_user_config()
21        self._logger = Logger.get_application_logger()
22        self._task_counter = 0
23
24    def create_task_from_function(self, function: Callable, task_type: TaskType) -> Task:
25        """Creates one task for an unparametrized function.
26        :param function: the function to be executed as task
27        :param task_type: the type of the task to be created
28        :return: the constructed task object
29        """
30        return self._generate_task_object(function, {}, task_type)
31
32    def create_tasks_with_parameters(self, function: Callable, task_type: TaskType) -> list[Task]:
33        """Creates correct task objects based on the given task type and the needed parameters defined in the user
34        config. Because of this, this method can potentially output multiple tasks with the same function
35        but different parameters.
36        :param function: the function to be executed as task
37        :param task_type: the type of the task to be created
38        :return: the constructed task objects
39        """
40        all_params = [param.name for param in inspect.signature(function).parameters.values() if param.kind ==
41                      param.KEYWORD_ONLY]
42        try:
43            global_params = [param for param in all_params if self._user_config.check_if_global_param(param)]
44        except Exception as e:
45            if "Parameter not found" in str(e):
46                raise UserParameterDoesNotExistException(str(e).split()[-1], get_name_of_function(function))
47
48        if all_params is None or len(all_params) == 0:
49            # Easy case: no parameter
50            task = self._generate_task_object(function,{}, task_type)
51            self._logger.info(f"Created task {task.get_task_name()}")
52            self._task_counter += 1
53            return [task]
54        global_params.sort()
55        all_params.sort()
56        if global_params == all_params:
57            # Easy case: only global params
58            param_values = [{}]
59        else:
60            param_values = [{} for _ in range(self._user_config.get_number_of_executions(task_type))]
61        if len(param_values) == 0:
62            param_values = [{}]
63
64        for param in all_params:
65            try:
66                vals = self._user_config.get_user_param(param, task_type)
67            except Exception as e:
68                if str(e) == "Parameter not found":
69                    raise UserParameterDoesNotExistException(param, get_name_of_function(function))
70                else:
71                    raise e
72            else:
73                for i in range(len(param_values)):
74                    if param in global_params:
75                        param_values[i].update({param: vals[0]})  # global param
76                    else:
77                        param_values[i].update({param: vals[i]})  # execution param
78
79        tasks = []
80        for param_dict in param_values:
81            function_with_params = partial(function, **param_dict)
82            task = self._generate_task_object(function_with_params, param_dict, task_type)
83            self._task_counter += 1
84            self._logger.info(f"Created task {task.get_task_name()} with following parameters: {str(param_dict)}")
85            tasks.append(task)
86        return tasks
87
88    def _generate_task_object(self, function: Callable, param_dict: dict, task_type: TaskType):
89        if task_type == TaskType.DATASET_FETCH:
90            return DatasetFetchTask(function, self._task_counter, param_dict)
91        elif task_type == TaskType.ALGORITHM_EXECUTION:
92            return AlgorithmExecutionTask(function, self._task_counter, param_dict)
93        elif task_type == TaskType.METRIC_EXECUTION:
94            return MetricExecutionTask(function, self._task_counter, param_dict)

Abstract factory for creating task objects

def create_task_from_function( self, function: Callable, task_type: cpdbench.task.Task.TaskType) -> cpdbench.task.Task.Task:
24    def create_task_from_function(self, function: Callable, task_type: TaskType) -> Task:
25        """Creates one task for an unparametrized function.
26        :param function: the function to be executed as task
27        :param task_type: the type of the task to be created
28        :return: the constructed task object
29        """
30        return self._generate_task_object(function, {}, task_type)

Creates one task for an unparametrized function.

Parameters
  • function: the function to be executed as task
  • task_type: the type of the task to be created
Returns

the constructed task object

def create_tasks_with_parameters( self, function: Callable, task_type: cpdbench.task.Task.TaskType) -> list[cpdbench.task.Task.Task]:
32    def create_tasks_with_parameters(self, function: Callable, task_type: TaskType) -> list[Task]:
33        """Creates correct task objects based on the given task type and the needed parameters defined in the user
34        config. Because of this, this method can potentially output multiple tasks with the same function
35        but different parameters.
36        :param function: the function to be executed as task
37        :param task_type: the type of the task to be created
38        :return: the constructed task objects
39        """
40        all_params = [param.name for param in inspect.signature(function).parameters.values() if param.kind ==
41                      param.KEYWORD_ONLY]
42        try:
43            global_params = [param for param in all_params if self._user_config.check_if_global_param(param)]
44        except Exception as e:
45            if "Parameter not found" in str(e):
46                raise UserParameterDoesNotExistException(str(e).split()[-1], get_name_of_function(function))
47
48        if all_params is None or len(all_params) == 0:
49            # Easy case: no parameter
50            task = self._generate_task_object(function,{}, task_type)
51            self._logger.info(f"Created task {task.get_task_name()}")
52            self._task_counter += 1
53            return [task]
54        global_params.sort()
55        all_params.sort()
56        if global_params == all_params:
57            # Easy case: only global params
58            param_values = [{}]
59        else:
60            param_values = [{} for _ in range(self._user_config.get_number_of_executions(task_type))]
61        if len(param_values) == 0:
62            param_values = [{}]
63
64        for param in all_params:
65            try:
66                vals = self._user_config.get_user_param(param, task_type)
67            except Exception as e:
68                if str(e) == "Parameter not found":
69                    raise UserParameterDoesNotExistException(param, get_name_of_function(function))
70                else:
71                    raise e
72            else:
73                for i in range(len(param_values)):
74                    if param in global_params:
75                        param_values[i].update({param: vals[0]})  # global param
76                    else:
77                        param_values[i].update({param: vals[i]})  # execution param
78
79        tasks = []
80        for param_dict in param_values:
81            function_with_params = partial(function, **param_dict)
82            task = self._generate_task_object(function_with_params, param_dict, task_type)
83            self._task_counter += 1
84            self._logger.info(f"Created task {task.get_task_name()} with following parameters: {str(param_dict)}")
85            tasks.append(task)
86        return tasks

Creates correct task objects based on the given task type and the needed parameters defined in the user config. Because of this, this method can potentially output multiple tasks with the same function but different parameters.

Parameters
  • function: the function to be executed as task
  • task_type: the type of the task to be created
Returns

the constructed task objects