cpdbench.task.TaskFactory
1from typing import Callable 2import inspect 3 4from cpdbench.exception.UserParameterDoesNotExistException import UserParameterDoesNotExistException 5from cpdbench.task.AlgorithmExecutionTask import AlgorithmExecutionTask 6from cpdbench.task.DatasetFetchTask import DatasetFetchTask 7from cpdbench.task.MetricExecutionTask import MetricExecutionTask 8from cpdbench.task.Task import TaskType, Task 9import cpdbench.utils.BenchConfig as BenchConfig 10from cpdbench.utils import Logger 11from cpdbench.utils.Utils import get_name_of_function 12from functools import partial 13 14 15class TaskFactory: 16 """Abstract factory for creating task objects""" 17 18 def __init__(self): 19 self._user_config = BenchConfig.get_user_config() 20 self._logger = Logger.get_application_logger() 21 self._task_counter = 0 22 23 def create_task_from_function(self, function: Callable, task_type: TaskType) -> Task: 24 """Creates one task for an unparametrized function. 25 :param function: the function to be executed as task 26 :param task_type: the type of the task to be created 27 :return: the constructed task object 28 """ 29 return self._generate_task_object(function, {}, task_type) 30 31 def create_tasks_with_parameters(self, function: Callable, task_type: TaskType) -> list[Task]: 32 """Creates correct task objects based on the given task type and the needed parameters defined in the user 33 config. Because of this, this method can potentially output multiple tasks with the same function 34 but different parameters. 35 :param function: the function to be executed as task 36 :param task_type: the type of the task to be created 37 :return: the constructed task objects 38 """ 39 all_params = [param.name for param in inspect.signature(function).parameters.values() if param.kind == 40 param.KEYWORD_ONLY] 41 try: 42 global_params = [param for param in all_params if self._user_config.check_if_global_param(param)] 43 except Exception as e: 44 if "Parameter not found" in str(e): 45 raise UserParameterDoesNotExistException(str(e).split()[-1], get_name_of_function(function)) 46 47 if all_params is None or len(all_params) == 0: 48 # Easy case: no parameter 49 task = self._generate_task_object(function,{}, task_type) 50 self._logger.info(f"Created task {task.get_task_name()}") 51 self._task_counter += 1 52 return [task] 53 global_params.sort() 54 all_params.sort() 55 if global_params == all_params: 56 # Easy case: only global params 57 param_values = [{}] 58 else: 59 param_values = [{} for _ in range(self._user_config.get_number_of_executions(task_type))] 60 if len(param_values) == 0: 61 param_values = [{}] 62 63 for param in all_params: 64 try: 65 vals = self._user_config.get_user_param(param, task_type) 66 except Exception as e: 67 if str(e) == "Parameter not found": 68 raise UserParameterDoesNotExistException(param, get_name_of_function(function)) 69 else: 70 raise e 71 else: 72 for i in range(len(param_values)): 73 if param in global_params: 74 param_values[i].update({param: vals[0]}) # global param 75 else: 76 param_values[i].update({param: vals[i]}) # execution param 77 78 tasks = [] 79 for param_dict in param_values: 80 function_with_params = partial(function, **param_dict) 81 task = self._generate_task_object(function_with_params, param_dict, task_type) 82 self._task_counter += 1 83 self._logger.info(f"Created task {task.get_task_name()} with following parameters: {str(param_dict)}") 84 tasks.append(task) 85 return tasks 86 87 def _generate_task_object(self, function: Callable, param_dict: dict, task_type: TaskType): 88 if task_type == TaskType.DATASET_FETCH: 89 return DatasetFetchTask(function, self._task_counter, param_dict) 90 elif task_type == TaskType.ALGORITHM_EXECUTION: 91 return AlgorithmExecutionTask(function, self._task_counter, param_dict) 92 elif task_type == TaskType.METRIC_EXECUTION: 93 return MetricExecutionTask(function, self._task_counter, param_dict)
class
TaskFactory:
16class TaskFactory: 17 """Abstract factory for creating task objects""" 18 19 def __init__(self): 20 self._user_config = BenchConfig.get_user_config() 21 self._logger = Logger.get_application_logger() 22 self._task_counter = 0 23 24 def create_task_from_function(self, function: Callable, task_type: TaskType) -> Task: 25 """Creates one task for an unparametrized function. 26 :param function: the function to be executed as task 27 :param task_type: the type of the task to be created 28 :return: the constructed task object 29 """ 30 return self._generate_task_object(function, {}, task_type) 31 32 def create_tasks_with_parameters(self, function: Callable, task_type: TaskType) -> list[Task]: 33 """Creates correct task objects based on the given task type and the needed parameters defined in the user 34 config. Because of this, this method can potentially output multiple tasks with the same function 35 but different parameters. 36 :param function: the function to be executed as task 37 :param task_type: the type of the task to be created 38 :return: the constructed task objects 39 """ 40 all_params = [param.name for param in inspect.signature(function).parameters.values() if param.kind == 41 param.KEYWORD_ONLY] 42 try: 43 global_params = [param for param in all_params if self._user_config.check_if_global_param(param)] 44 except Exception as e: 45 if "Parameter not found" in str(e): 46 raise UserParameterDoesNotExistException(str(e).split()[-1], get_name_of_function(function)) 47 48 if all_params is None or len(all_params) == 0: 49 # Easy case: no parameter 50 task = self._generate_task_object(function,{}, task_type) 51 self._logger.info(f"Created task {task.get_task_name()}") 52 self._task_counter += 1 53 return [task] 54 global_params.sort() 55 all_params.sort() 56 if global_params == all_params: 57 # Easy case: only global params 58 param_values = [{}] 59 else: 60 param_values = [{} for _ in range(self._user_config.get_number_of_executions(task_type))] 61 if len(param_values) == 0: 62 param_values = [{}] 63 64 for param in all_params: 65 try: 66 vals = self._user_config.get_user_param(param, task_type) 67 except Exception as e: 68 if str(e) == "Parameter not found": 69 raise UserParameterDoesNotExistException(param, get_name_of_function(function)) 70 else: 71 raise e 72 else: 73 for i in range(len(param_values)): 74 if param in global_params: 75 param_values[i].update({param: vals[0]}) # global param 76 else: 77 param_values[i].update({param: vals[i]}) # execution param 78 79 tasks = [] 80 for param_dict in param_values: 81 function_with_params = partial(function, **param_dict) 82 task = self._generate_task_object(function_with_params, param_dict, task_type) 83 self._task_counter += 1 84 self._logger.info(f"Created task {task.get_task_name()} with following parameters: {str(param_dict)}") 85 tasks.append(task) 86 return tasks 87 88 def _generate_task_object(self, function: Callable, param_dict: dict, task_type: TaskType): 89 if task_type == TaskType.DATASET_FETCH: 90 return DatasetFetchTask(function, self._task_counter, param_dict) 91 elif task_type == TaskType.ALGORITHM_EXECUTION: 92 return AlgorithmExecutionTask(function, self._task_counter, param_dict) 93 elif task_type == TaskType.METRIC_EXECUTION: 94 return MetricExecutionTask(function, self._task_counter, param_dict)
Abstract factory for creating task objects
def
create_task_from_function( self, function: Callable, task_type: cpdbench.task.Task.TaskType) -> cpdbench.task.Task.Task:
24 def create_task_from_function(self, function: Callable, task_type: TaskType) -> Task: 25 """Creates one task for an unparametrized function. 26 :param function: the function to be executed as task 27 :param task_type: the type of the task to be created 28 :return: the constructed task object 29 """ 30 return self._generate_task_object(function, {}, task_type)
Creates one task for an unparametrized function.
Parameters
- function: the function to be executed as task
- task_type: the type of the task to be created
Returns
the constructed task object
def
create_tasks_with_parameters( self, function: Callable, task_type: cpdbench.task.Task.TaskType) -> list[cpdbench.task.Task.Task]:
32 def create_tasks_with_parameters(self, function: Callable, task_type: TaskType) -> list[Task]: 33 """Creates correct task objects based on the given task type and the needed parameters defined in the user 34 config. Because of this, this method can potentially output multiple tasks with the same function 35 but different parameters. 36 :param function: the function to be executed as task 37 :param task_type: the type of the task to be created 38 :return: the constructed task objects 39 """ 40 all_params = [param.name for param in inspect.signature(function).parameters.values() if param.kind == 41 param.KEYWORD_ONLY] 42 try: 43 global_params = [param for param in all_params if self._user_config.check_if_global_param(param)] 44 except Exception as e: 45 if "Parameter not found" in str(e): 46 raise UserParameterDoesNotExistException(str(e).split()[-1], get_name_of_function(function)) 47 48 if all_params is None or len(all_params) == 0: 49 # Easy case: no parameter 50 task = self._generate_task_object(function,{}, task_type) 51 self._logger.info(f"Created task {task.get_task_name()}") 52 self._task_counter += 1 53 return [task] 54 global_params.sort() 55 all_params.sort() 56 if global_params == all_params: 57 # Easy case: only global params 58 param_values = [{}] 59 else: 60 param_values = [{} for _ in range(self._user_config.get_number_of_executions(task_type))] 61 if len(param_values) == 0: 62 param_values = [{}] 63 64 for param in all_params: 65 try: 66 vals = self._user_config.get_user_param(param, task_type) 67 except Exception as e: 68 if str(e) == "Parameter not found": 69 raise UserParameterDoesNotExistException(param, get_name_of_function(function)) 70 else: 71 raise e 72 else: 73 for i in range(len(param_values)): 74 if param in global_params: 75 param_values[i].update({param: vals[0]}) # global param 76 else: 77 param_values[i].update({param: vals[i]}) # execution param 78 79 tasks = [] 80 for param_dict in param_values: 81 function_with_params = partial(function, **param_dict) 82 task = self._generate_task_object(function_with_params, param_dict, task_type) 83 self._task_counter += 1 84 self._logger.info(f"Created task {task.get_task_name()} with following parameters: {str(param_dict)}") 85 tasks.append(task) 86 return tasks
Creates correct task objects based on the given task type and the needed parameters defined in the user config. Because of this, this method can potentially output multiple tasks with the same function but different parameters.
Parameters
- function: the function to be executed as task
- task_type: the type of the task to be created
Returns
the constructed task objects