Hyperpipe

The Hyperpipe is the basic construct in PHOTON with which everything starts. It is like the designer for your machine learning pipeline. You choose your strategies, such as cross-validation-split methods, performance metrics, the hyperparameter optimization algorithm and so forth and then you add your pipeline elements.

class Hyperpipe

Wrapper class for machine learning pipeline, holding all pipeline elements and managing the optimization of the hyperparameters

Parameters

  • name [str]: Name of hyperpipe instance

  • inner_cv [BaseCrossValidator]: Cross validation strategy to test hyperparameter configurations, generates the validation set

  • outer_cv [BaseCrossValidator]: Cross validation strategy to use for the hyperparameter search itself, generates the test set

  • optimizer [str or object, default="grid_search"]: Hyperparameter optimization algorithm

    • In case a string literal is given:

      • "grid_search": optimizer that iteratively tests all possible hyperparameter combinations
      • "random_grid_search": a variation of the grid search optimization that randomly picks hyperparameter combinations from all possible hyperparameter combinations
      • "timeboxed_random_grid_search": randomly chooses hyperparameter combinations from the set of all possible hyperparameter combinations and tests until the given time limit is reached
      • limit_in_minutes: int
    • In case an object is given: expects the object to have the following methods:

    • next_config_generator: returns a hyperparameter configuration in form of an dictionary containing key->value pairs in the sklearn parameter encoding model_name__parameter_name: parameter_value
    • prepare: takes a list of pipeline elements and their particular hyperparameters to test
    • evaluate_recent_performance: gets a tested config and the respective performance in order to calculate a smart next configuration to process
  • metrics [list of metric names as str]: Metrics that should be calculated for both training, validation and test set Use the preimported metrics from sklearn and photonai, or register your own

    • Metrics for classification:
      • accuracy: sklearn.metrics.accuracy_score
      • matthews_corrcoef: sklearn.metrics.matthews_corrcoef
      • confusion_matrix: sklearn.metrics.confusion_matrix,
      • f1_score: sklearn.metrics.f1_score
      • hamming_loss: sklearn.metrics.hamming_loss
      • log_loss: sklearn.metrics.log_loss
      • precision: sklearn.metrics.precision_score
      • recall: sklearn.metrics.recall_score
    • Metrics for regression:
      • mean_squared_error: sklearn.metrics.mean_squared_error
      • mean_absolute_error: sklearn.metrics.mean_absolute_error
      • explained_variance: sklearn.metrics.explained_variance_score
      • r2: sklearn.metrics.r2_score
    • Other metrics
      • pearson_correlation: photon_core.framework.Metrics.pearson_correlation
      • variance_explained: photon_core.framework.Metrics.variance_explained_score
      • categorical_accuracy: photon_core.framework.Metrics.categorical_accuracy_score
  • best_config_metric [str]: The metric that should be maximized or minimized in order to choose the best hyperparameter configuration

  • eval_final_performance [bool, default=True]: If the metrics should be calculated for the test set, otherwise the test set is seperated but not used

  • test_size [float, default=0.2]: the amount of the data that should be left out if no outer_cv is given and eval_final_perfomance is set to True

  • set_random_seed [bool, default=False]: If True sets the random seed to 42

  • verbosity [int, default=0]: The level of verbosity, 0 is least talkative and gives only warn and error, 1 gives adds info and 2 adds debug

  • groups [array-like, default=None]: Info for advanced cross validation strategies, such as LeaveOneSiteOut-CV about the affiliation of the rows in the data

  • filter_element [SourceFilter, default=None]: Instance of SourceFilter Class that transforms the input data, e.g. extracts certain columns

    • imbalanced_data_strategy_filter [str, default=None]: Uses the imblearn package to handle imbalanced class distributions in the data A strategy is used to transform the data into more balanced distributions before the hyperparameter search is started. Strategies to choose from are:

      • imbalance_type = OVERSAMPLING:
        • RandomOverSampler
        • SMOTE
        • ADASYN
      • imbalance_type = UNDERSAMPLING:
        • ClusterCentroids
        • RandomUnderSampler
        • NearMiss
        • InstanceHardnessThreshold
        • CondensedNearestNeighbour
        • EditedNearestNeighbours
        • RepeatedEditedNearestNeighbours
        • AllKNN
        • NeighbourhoodCleaningRule
        • OneSidedSelection
      • imbalance_type = COMBINE:
        • SMOTEENN,
        • SMOTETomek

    Attributes

    • optimum_pipe [Pipeline]: An sklearn pipeline object that is fitted to the training data according to the best hyperparameter configuration found. Currently, we don't create an ensemble of all best hyperparameter configs over all folds. We find the best config by comparing the test error across outer folds. The hyperparameter config of the best fold is used as the optimal model and is then trained on the complete set.

    • best_config [dict]: Dictionary containing the hyperparameters of the best configuration. Contains the parameters in the sklearn interface of model_name__parameter_name: parameter value

    • result_tree [MDBHyperpipe]: Object containing all information about the for the performed hyperparameter search. Holds the training and test metrics for all outer folds, inner folds and configurations, as well as additional information.

    • pipeline_elements [list]: Contains all PipelineElement or Hyperpipe objects that are added to the pipeline.

    Example

    manager = Hyperpipe('test_manager',
                        optimizer='timeboxed_random_grid_search', optimizer_params={'limit_in_minutes': 1},
                        outer_cv=ShuffleSplit(test_size=0.2, n_splits=1),
                        inner_cv=KFold(n_splits=10, shuffle=True),
                        metrics=['accuracy', 'precision', 'recall', "f1_score"],
                        best_config_metric='accuracy', eval_final_performance=True,
                        verbose=2)
    
class Hyperpipe(BaseEstimator):
    """
    Wrapper class for machine learning pipeline, holding all pipeline elements
    and managing the optimization of the hyperparameters

    Parameters
    ----------
    * `name` [str]:
        Name of hyperpipe instance

    * `inner_cv` [BaseCrossValidator]:
        Cross validation strategy to test hyperparameter configurations, generates the validation set

    * `outer_cv` [BaseCrossValidator]:
        Cross validation strategy to use for the hyperparameter search itself, generates the test set

    * `optimizer` [str or object, default="grid_search"]:
        Hyperparameter optimization algorithm

        - In case a string literal is given:
            - "grid_search": optimizer that iteratively tests all possible hyperparameter combinations
            - "random_grid_search": a variation of the grid search optimization that randomly picks hyperparameter
               combinations from all possible hyperparameter combinations
            - "timeboxed_random_grid_search": randomly chooses hyperparameter combinations from the set of all
               possible hyperparameter combinations and tests until the given time limit is reached
               - `limit_in_minutes`: int

        - In case an object is given:
          expects the object to have the following methods:
           - `next_config_generator`: returns a hyperparameter configuration in form of an dictionary containing
              key->value pairs in the sklearn parameter encoding `model_name__parameter_name: parameter_value`
           - `prepare`: takes a list of pipeline elements and their particular hyperparameters to test
           - `evaluate_recent_performance`: gets a tested config and the respective performance in order to
              calculate a smart next configuration to process

    * `metrics` [list of metric names as str]:
        Metrics that should be calculated for both training, validation and test set
        Use the preimported metrics from sklearn and photonai, or register your own

        - Metrics for `classification`:
            - `accuracy`: sklearn.metrics.accuracy_score
            - `matthews_corrcoef`: sklearn.metrics.matthews_corrcoef
            - `confusion_matrix`: sklearn.metrics.confusion_matrix,
            - `f1_score`: sklearn.metrics.f1_score
            - `hamming_loss`: sklearn.metrics.hamming_loss
            - `log_loss`: sklearn.metrics.log_loss
            - `precision`: sklearn.metrics.precision_score
            - `recall`: sklearn.metrics.recall_score
        - Metrics for `regression`:
            - `mean_squared_error`: sklearn.metrics.mean_squared_error
            - `mean_absolute_error`: sklearn.metrics.mean_absolute_error
            - `explained_variance`: sklearn.metrics.explained_variance_score
            - `r2`: sklearn.metrics.r2_score
        - Other metrics
            - `pearson_correlation`: photon_core.framework.Metrics.pearson_correlation
            - `variance_explained`:  photon_core.framework.Metrics.variance_explained_score
            - `categorical_accuracy`: photon_core.framework.Metrics.categorical_accuracy_score

    * `best_config_metric` [str]:
        The metric that should be maximized or minimized in order to choose the best hyperparameter configuration

    * `eval_final_performance` [bool, default=True]:
        If the metrics should be calculated for the test set, otherwise the test set is seperated but not used

    * `test_size` [float, default=0.2]:
        the amount of the data that should be left out if no outer_cv is given and
        eval_final_perfomance is set to True

    * `set_random_seed` [bool, default=False]:
        If True sets the random seed to 42

    * `verbosity` [int, default=0]:
        The level of verbosity, 0 is least talkative and gives only warn and error, 1 gives adds info and 2 adds debug

    * `groups` [array-like, default=None]:
        Info for advanced cross validation strategies, such as LeaveOneSiteOut-CV about the affiliation
        of the rows in the data

    * `filter_element` [SourceFilter, default=None]:
        Instance of SourceFilter Class that transforms the input data, e.g. extracts certain columns

    * `imbalanced_data_strategy_filter` [str, default=None]:
        Uses the imblearn package to handle imbalanced class distributions in the data
        A strategy is used to transform the data into more balanced distributions before the hyperparameter search
        is started.
        Strategies to choose from are:
        - imbalance_type = OVERSAMPLING:
            - RandomOverSampler
            - SMOTE
            - ADASYN

        -imbalance_type = UNDERSAMPLING:
            - ClusterCentroids,
            - RandomUnderSampler,
            - NearMiss,
            - InstanceHardnessThreshold,
            - CondensedNearestNeighbour,
            - EditedNearestNeighbours,
            - RepeatedEditedNearestNeighbours,
            - AllKNN,
            - NeighbourhoodCleaningRule,
            - OneSidedSelection

        - imbalance_type = COMBINE:
            - SMOTEENN,
            - SMOTETomek

    Attributes
    ----------
    * `optimum_pipe` [Pipeline]:
        An sklearn pipeline object that is fitted to the training data according to the best hyperparameter
        configuration found. Currently, we don't create an ensemble of all best hyperparameter configs over all folds.
        We find the best config by comparing the test error across outer folds. The hyperparameter config of the best
        fold is used as the optimal model and is then trained on the complete set.

    * `best_config` [dict]:
        Dictionary containing the hyperparameters of the best configuration.
        Contains the parameters in the sklearn interface of model_name__parameter_name: parameter value

    * `result_tree` [MDBHyperpipe]:
        Object containing all information about the for the performed hyperparameter search.
        Holds the training and test metrics for all outer folds, inner folds and configurations, as well as
        additional information.

    * `pipeline_elements` [list]:
        Contains all PipelineElement or Hyperpipe objects that are added to the pipeline.

    Example
    -------
        manager = Hyperpipe('test_manager',
                            optimizer='timeboxed_random_grid_search', optimizer_params={'limit_in_minutes': 1},
                            outer_cv=ShuffleSplit(test_size=0.2, n_splits=1),
                            inner_cv=KFold(n_splits=10, shuffle=True),
                            metrics=['accuracy', 'precision', 'recall', "f1_score"],
                            best_config_metric='accuracy', eval_final_performance=True,
                            verbose=2)

   """

    OPTIMIZER_DICTIONARY = {'grid_search': GridSearchOptimizer,
                            'random_grid_search': RandomGridSearchOptimizer,
                            'timeboxed_random_grid_search': TimeBoxedRandomGridSearchOptimizer}

    def __init__(self, name, inner_cv: BaseCrossValidator, outer_cv=None,
                 optimizer='grid_search', optimizer_params: dict = {}, metrics=None,
                 best_config_metric=None, eval_final_performance=True, test_size: float = 0.2,
                 calculate_metrics_per_fold: bool = True, calculate_metrics_across_folds: bool = False,
                 groups=None, set_random_seed: bool=False,
                 filter_element=None, imbalanced_data_strategy_filter: str = '',
                 verbosity=0,
                 persist_options=None,
                 performance_constraints=None):

        # Re eval_final_performance:
        # set eval_final_performance to False because
        # 1. if no cv-object is given, no split is performed --> seems more logical
        #    than passing nothing, passing no cv-object but getting
        #    an 80/20 split by default
        # 2. if cv-object is given, split is performed but we don't peek
        #    into the test set --> thus we can evaluate more hp configs
        #    later without double dipping

        self.name = name
        self.inner_cv = inner_cv
        self.outer_cv = outer_cv
        self.eval_final_performance = eval_final_performance
        self.test_size = test_size
        self.cv_iter = None
        self.data_test_cases = None

        self.calculate_metrics_per_fold = calculate_metrics_per_fold
        self.calculate_metrics_across_folds = calculate_metrics_across_folds

        # Todo: if self.outer_cv is LeaveOneOut: Set calculate metrics across folds to True -> Print

        self.X = None
        self.y = None

        self.groups = groups
        self.filter_element = filter_element
        if imbalanced_data_strategy_filter:
            self.imbalanced_data_strategy_filter = ImbalancedDataTransform(imbalanced_data_strategy_filter)
        else:
            self.imbalanced_data_strategy_filter = None

        self.fit_duration = 0

        if set_random_seed:
            import random
            random.seed(42)
            print('set random seed to 42')

        # set verbosity level
        Logger().set_verbosity(verbosity)

        # MongoDBWriter setup
        if persist_options:
            self.persist_options = persist_options
            if self.persist_options.log_file:
                Logger().set_custom_log_file(self.persist_options.log_file)
        else:
            self.persist_options = PersistOptions()
        self.mongodb_writer = MongoDBWriter(self.persist_options)

        self.pipeline_elements = []
        self._pipe = None
        self.optimum_pipe = None

        self.metrics = metrics
        #  Todo: raise error or warning if metrics and best config_metric is None
        self.best_config_metric = best_config_metric
        self.config_optimizer = None

        self.result_tree = None
        self.best_config = None
        self.best_children_config = None
        self.best_performance = None
        self.is_final_fit = False

        self.__mother_outer_fold_counter = 0
        self.__mother_inner_fold_counter = 0
        self.__mother_config_counter = 0

        # containers for optimization history and logging
        self._performance_history_list = []

        if isinstance(optimizer, str):
            # instantiate optimizer from string
            #  Todo: check if optimizer strategy is already implemented
            optimizer_class = self.OPTIMIZER_DICTIONARY[optimizer]
            optimizer_instance = optimizer_class(**optimizer_params)
            self.optimizer = optimizer_instance
        else:
            # Todo: check if correct object
            self.optimizer = optimizer

        self._validation_X = None
        self._validation_y = None
        self._test_X = None
        self._test_y = None
        self._last_fit_data_hash = None
        self._current_fold = -1
        self._num_of_folds = 0
        self._is_mother_pipe = True
        self._fold_data_hashes = []

        self.inner_cv_callback_function = performance_constraints

    def _set_verbosity(self, verbosity):
        """
        Set verbosity level manually
        Returns None

        Parameters
        ----------
        * `verbosity` [Integer]:
            Verbosity level can be 0, 1, or 2.

        """
        Logger().set_verbosity(verbosity)

    def _set_persist_options(self, persist_options):
        """
        Set persist options manually
        Returns None

        Parameters
        ----------
        * `persist_options` [PersistOptions]:

        """
        self.persist_options = persist_options
        if self.persist_options.log_file:
            Logger().set_custom_log_file(self.persist_options.log_file)
        self.mongodb_writer = MongoDBWriter(self.persist_options)

    def __iadd__(self, pipe_element):
        """
        Add an element to the machine learning pipeline
        Returns self

        Parameters
        ----------
        * 'pipe_element' [PipelineElement or Hyperpipe]:
            The object to add to the machine learning pipeline, being either a transformer or an estimator.

        """
        # if isinstance(pipe_element, PipelineElement):
        self.pipeline_elements.append(pipe_element)
        # Todo: is repeated each time element is added....
        self._prepare_pipeline()
        return self
        # else:
        #     Todo: raise error
        # raise TypeError("Element must be of type Pipeline Element")

    def add(self, pipe_element):
        """
           Add an element to the machine learning pipeline
           Returns self

           Parameters
           ----------
           * `pipe_element` [PipelineElement or Hyperpipe]:
               The object to add to the machine learning pipeline, being either a transformer or an estimator.

           """
        self.__iadd__(pipe_element)

    def __yield_all_data(self):
        """
        Helper function that iteratively returns the data stored in self.X
        Returns an iterable version of self.X
        """
        if hasattr(self.X, 'shape'):
            yield list(range(self.X.shape[0])), []
        else:
            yield list(range(len(self.X))), []

    def _generate_outer_cv_indices(self):
        """
        Generates the training and  test set indices for the hyperparameter search
        Returns a tuple of training and test indices

        - If there is a strategy given for the outer cross validation the strategy is called to split the data
        - If no strategy is given and eval_final_performance is True, all data is used for training
        - If no strategy is given and eval_final_performance is False: a test set is seperated from the
          training and validation set by the parameter test_size with ShuffleSplit
        """
        # if there is a CV Object for cross validating the hyperparameter search
        if self.outer_cv:
            self.data_test_cases = self.outer_cv.split(self.X, self.y)
        # in case we do not want to divide between validation and test set
        elif not self.eval_final_performance:
            self.data_test_cases = self.__yield_all_data()
        # the default is dividing one time into a validation and test set
        else:
            train_test_cv_object = ShuffleSplit(n_splits=1, test_size=self.test_size)
            self.data_test_cases = train_test_cv_object.split(self.X, self.y)

    def __distribute_cv_info_to_hyperpipe_children(self, reset: bool =False, reset_final_fit: bool=False,
                                                   outer_fold_counter: int=None, inner_fold_counter: int =None,
                                                   num_of_folds: int = None, config_counter: int =None):
        """
        Informs all elements of the pipeline that are of type hyperpipe (hyperpipe children)
        about the mother's configuration or current state

        Parameters
        ----------
        * 'num_of_folds' [int]:
            how many inner folds the mother hyperpipe has

        * 'outer_fold_counter' [int]:
            in which outer fold the mother hyerpipe currently is

        * 'inner_fold_counter' [int]:
            in which inner fold the mother hyperpipe currently is

        * 'config_counter' [int]:
            in which config_nr the mother hyperpipe actually is

        * 'reset' [bool, default = False]:
            if the hyperparameter search starts anew

        * 'reset_final_fit' [bool, default = False]:
            reset the is_final_fit parameter so that children hyperpipe train anew for outer fold of mother pipe

        """

        def _distribute_info_to_object(pipe_object, number_of_folds, reset_folds, reset_final_fit,
                                      outer_fold_counter, inner_fold_counter, config_counter):
            if pipe_object.local_search:
                if number_of_folds is not None:
                    pipe_object.num_of_folds = number_of_folds
                    pipe_object.is_mother_pipe = False
                if reset_folds:
                    pipe_object.current_fold = -1
                if outer_fold_counter is not None:
                    pipe_object.mother_outer_fold_counter = outer_fold_counter
                if inner_fold_counter is not None:
                    pipe_object.mother_inner_fold_counter = inner_fold_counter
                if config_counter:
                    pipe_object.mother_config_counter = config_counter
                if reset_final_fit:
                    pipe_object.is_final_fit = False

        # walk through all children of pipeline, if its a hyperpipe distribute the information
        for element_tuple in self._pipe.steps:
            element_object = element_tuple[1]
            if isinstance(element_object, Hyperpipe):
                _distribute_info_to_object(element_object, num_of_folds, reset, reset_final_fit,
                                          outer_fold_counter, inner_fold_counter, config_counter)
            elif isinstance(element_object, PipelineStacking):
                for child_pipe_name, child_pipe_object in element_object.pipe_elements.items():
                    if isinstance(child_pipe_object, Hyperpipe):
                        _distribute_info_to_object(child_pipe_object, num_of_folds, reset, reset_final_fit,
                                                  outer_fold_counter, inner_fold_counter, config_counter)

    def update_mother_inner_fold_nr(self, new_inner_fold_nr: int):
        """
        Function handle so that the TestPipeline class from Photon's validation module can pass the information to hyperpipe children

        Parameters
        ----------
        * 'new_inner_fold_nr' [int]:
            in which inner_fold the mother hyperpipe currently is
        """
        self.__distribute_cv_info_to_hyperpipe_children(inner_fold_counter=new_inner_fold_nr)

    def fit(self, data, targets, **fit_params):
        """
        Starts the hyperparameter search and/or fits the pipeline to the data and targets

        Manages the nested cross validated hyperparameter search:

        1. Filters the data according to filter strategy (1) and according to the imbalanced_data_strategy (2)
        2. requests new configurations from the hyperparameter search strategy, the optimizer,
        3. initializes the testing of a specific configuration,
        4. communicates the result to the optimizer,
        5. repeats 2-4 until optimizer delivers no more configurations to test
        6. finally searches for the best config in all tested configs,
        7. trains the pipeline with the best config and evaluates the performance on the test set

        Parameters
        ----------
         * `data` [array-like, shape=[N, D]]:
            the training and test data, where N is the number of samples and D is the number of features.

         * `targets` [array-like, shape=[N]]:
            the truth values, where N is the number of samples.


        Returns
        -------
         * 'self'
            Returns self

        """

        # in case we want to inject some data from outside the pipeline

        self.X = data
        self.y = targets


        # !!!!!!!!!!!!!!!! FIT ONLY IF DATA CHANGED !!!!!!!!!!!!!!!!!!!
        # -------------------------------------------------------------

        # in case we need to reduce the dimension of the data due to parallelity of the outer pipe, lets do it.
        if self.filter_element:
            self.X = self.filter_element.transform(self.X)

        # if the groups are imbalanced, and a strategy is chosen, apply it here
        if self.imbalanced_data_strategy_filter:
            self.imbalanced_data_strategy_filter.fit(self.X, self.y)
            self.X, self.y = self.imbalanced_data_strategy_filter.transform()

        self._current_fold += 1

        # be compatible to list of (image-) files
        if isinstance(self.X, list):
            self.X = np.asarray(self.X)
        if isinstance(self.y, list):
            self.y = np.asarray(self.y)

        # handle neuro Imge paths as data
        # ToDo: Need to check the DATA, not the img paths for neuro
        new_data_hash = sha1(np.asarray(self.X, order='C')).hexdigest()

        # fit
        # 1. if it is first time ever or
        # 2. the data did change for that fold or
        # 3. if it is the mother pipe (then number_of_folds = 0)
        if (len(self._fold_data_hashes) < self._num_of_folds) \
                or (self._num_of_folds > 0 and self._fold_data_hashes[self._current_fold] != new_data_hash) \
                or self._num_of_folds == 0:

            # save data hash for that fold
            if self._num_of_folds > 0:
                if len(self._fold_data_hashes) < self._num_of_folds:
                    self._fold_data_hashes.append(new_data_hash)
                else:
                    self._fold_data_hashes[self._current_fold] = new_data_hash

            # optimize: iterate through configs and save results
            if not self.is_final_fit:

                # first check if correct optimizer metric has been chosen
                # pass pipeline_elements so that OptimizerMetric can look for last
                # element and use the corresponding score method
                self.config_optimizer = OptimizerMetric(self.best_config_metric, self.pipeline_elements, self.metrics)
                self.metrics = self.config_optimizer.check_metrics()

                if 'score' in self.metrics:
                    Logger().warn('Attention: Scoring with default score function of estimator can slow down calculations!')

                # generate OUTER ! cross validation splits to iterate over
                self._generate_outer_cv_indices()

                outer_fold_counter = 0

                if not self._is_mother_pipe:
                    self.result_tree_name = self.name + '_outer_fold_' + str(self.__mother_outer_fold_counter)  \
                                            + '_inner_fold_' + str(self.__mother_inner_fold_counter)
                else:
                    self.result_tree_name = self.name

                # initialize result logging with hyperpipe class
                self.result_tree = MDBHyperpipe(name=self.result_tree_name)
                self.result_tree.outer_folds = []
                self.result_tree.eval_final_performance = self.eval_final_performance
                self.result_tree.best_config_metric = self.best_config_metric

                # loop over outer cross validation
                for train_indices, test_indices in self.data_test_cases:

                    # give the optimizer the chance to inform about elements
                    self.optimizer.prepare(self.pipeline_elements)

                    outer_fold_counter += 1
                    outer_fold_fit_start_time = time.time()

                    Logger().info('HYPERPARAMETER SEARCH OF {0}, Outer Cross validation Fold {1}'
                                  .format(self.name, outer_fold_counter))

                    t1 = time.time()

                    # Prepare Train and validation set data
                    self._validation_X = self.X[train_indices]
                    self._validation_y = self.y[train_indices]
                    self._test_X = self.X[test_indices]
                    self._test_y = self.y[test_indices]

                    # Prepare inner cross validation
                    cv_iter = list(self.inner_cv.split(self._validation_X, self._validation_y))
                    num_folds = len(cv_iter)
                    num_samples_train = len(self._validation_y)
                    num_samples_test = len(self._test_y)

                    # distribute number of folds to encapsulated child hyperpipes
                    self.__distribute_cv_info_to_hyperpipe_children(num_of_folds=num_folds,
                                                                    outer_fold_counter=outer_fold_counter)

                    tested_config_counter = 0

                    # add outer fold info object to result tree
                    outer_fold = MDBOuterFold(fold_nr=outer_fold_counter)
                    outer_fold.tested_config_list = []
                    self.result_tree.outer_folds.append(outer_fold)

                    # do the optimizing
                    for current_config in self.optimizer.next_config:
                        self.__distribute_cv_info_to_hyperpipe_children(reset=True, config_counter=tested_config_counter)
                        hp = TestPipeline(self._pipe, current_config, self.metrics, self.update_mother_inner_fold_nr,
                                          mongo_db_settings=self.persist_options,
                                          callback_function=self.inner_cv_callback_function)
                        Logger().debug('optimizing of:' + self.name)
                        Logger().debug(self._optimize_printing(current_config))
                        Logger().debug('calculating...')

                        # Test the configuration cross validated by inner_cv object
                        current_config_mdb = hp.calculate_cv_score(self._validation_X, self._validation_y, cv_iter,
                                                            calculate_metrics_per_fold=self.calculate_metrics_per_fold,
                                                            calculate_metrics_across_folds=self.calculate_metrics_across_folds)

                        current_config_mdb.config_nr = tested_config_counter
                        current_config_mdb.config_dict = current_config
                        current_config_mdb.pipe_name = self.name
                        tested_config_counter += 1
                        current_config_mdb.human_readable_config = self.config_to_dict(current_config)

                        # save the configuration of all children pipelines
                        children_config = {}
                        children_config_ref_list = []
                        for pipe_step in self._pipe.steps:
                            item = pipe_step[1]
                            if isinstance(item, Hyperpipe):
                                if item.local_search and item.best_config is not None:
                                    children_config[item.name] = item.best_config
                            elif isinstance(item, PipelineStacking):
                                for subhyperpipe_name, hyperpipe in item.pipe_elements.items():
                                    if isinstance(hyperpipe, Hyperpipe):
                                        if hyperpipe.local_search and hyperpipe.best_config is not None:
                                            # special case: we need to access pipe over pipeline_stacking element
                                            children_config[item.name + '__' + subhyperpipe_name] = hyperpipe.best_config.config_dict
                                        # children_config_ref_list.append(hyperpipe.best_config_outer_fold._id)
                        specific_parameters = self._pipe.get_params()
                        #current_config_mdb.full_model_spec = specific_parameters

                        current_config_mdb.children_config_dict = children_config
                        current_config_mdb.children_config_ref = children_config_ref_list

                        Logger().verbose(self._optimize_printing(current_config))

                        if not current_config_mdb.config_failed:
                            # get optimizer_metric and forward to optimizer
                            # todo: also pass greater_is_better=True/False to optimizer
                            metric_train = MDBHelper.get_metric(current_config_mdb, FoldOperations.MEAN, self.config_optimizer.metric)
                            metric_test = MDBHelper.get_metric(current_config_mdb, FoldOperations.MEAN, self.config_optimizer.metric, train=False)
                            #
                            # if not metric_train or metric_test:
                            #     raise Exception("Config did not fail, but did not get any metrics either....!!?")
                            config_performance = (metric_train, metric_test)

                            # Print Result for config
                            Logger().debug('...done:')
                            Logger().verbose(self.config_optimizer.metric + str(config_performance))
                        else:
                             config_performance = (-1, -1)
                             # Print Result for config
                             Logger().debug('...failed:')
                             Logger().error(current_config_mdb.config_error)

                        # add config to result tree and do intermediate saving
                        self.result_tree.outer_folds[-1].tested_config_list.append(current_config_mdb)
                        # Todo: add try catch in case config cannot be written
                        self.mongodb_writer.save(self.result_tree)

                        # 3. inform optimizer about performance
                        self.optimizer.evaluate_recent_performance(current_config, config_performance)

                    if tested_config_counter > 0:
                        best_config_outer_fold = self.config_optimizer.get_optimum_config(outer_fold.tested_config_list)

                        if not best_config_outer_fold:
                            raise Exception("No best config was found!")
                        best_config_outer_fold_mdb = MDBConfig()
                        best_config_outer_fold_mdb.children_config_dict = best_config_outer_fold.children_config_dict
                        best_config_outer_fold_mdb.pipe_name = self.name
                        best_config_outer_fold_mdb.children_config_ref = best_config_outer_fold.children_config_ref
                        # best_config_outer_fold_mdb.best_config_ref_to_train_item = best_config_outer_fold._id
                        best_config_outer_fold_mdb.config_dict = best_config_outer_fold.config_dict
                        best_config_outer_fold_mdb.human_readable_config = best_config_outer_fold.human_readable_config


                        # inform user
                        Logger().info('finished optimization of ' + self.name)
                        Logger().verbose('Result')
                        Logger().verbose('Number of tested configurations:' + str(tested_config_counter))
                        Logger().verbose('Optimizer metric: ' + self.config_optimizer.metric + '\n' +
                                         '   --> Greater is better: ' + str(self.config_optimizer.greater_is_better))
                        Logger().info('Best config: ' + self._optimize_printing(best_config_outer_fold_mdb.config_dict) +
                                      '\n' + '... with children config: '
                                      + self._optimize_printing(best_config_outer_fold_mdb.children_config_dict))


                        # ... and create optimal pipeline
                        self.optimum_pipe = self._pipe
                        # set self to best config
                        self.optimum_pipe.set_params(**best_config_outer_fold_mdb.config_dict)

                        # set all children to best config and inform to NOT optimize again, ONLY fit
                        for child_name, child_config in best_config_outer_fold_mdb.children_config_dict.items():
                            if child_config:
                                # in case we have a pipeline stacking we need to identify the particular subhyperpipe
                                splitted_name = child_name.split('__')
                                if len(splitted_name) > 1:
                                    stacking_element = self.optimum_pipe.named_steps[splitted_name[0]]
                                    pipe_element = stacking_element.pipe_elements[splitted_name[1]]
                                else:
                                    pipe_element = self.optimum_pipe.named_steps[child_name]
                                pipe_element.set_params(**child_config)
                                pipe_element.is_final_fit = True

                        self.__distribute_cv_info_to_hyperpipe_children(reset=True)

                        Logger().verbose('...now fitting ' + self.name + ' with optimum configuration')
                        fit_time_start = time.time()
                        self.optimum_pipe.fit(self._validation_X, self._validation_y)
                        final_fit_duration = time.time() - fit_time_start

                        #self.best_config_outer_fold.full_model_spec = self.optimum_pipe.get_params()
                        best_config_outer_fold_mdb.fit_duration_minutes = final_fit_duration
                        self.result_tree.outer_folds[-1].best_config = best_config_outer_fold_mdb
                        self.result_tree.outer_folds[-1].best_config.inner_folds = []

                        if self.eval_final_performance:
                            # Todo: generate mean and std over outer folds as well. move this items to the top
                            Logger().verbose('...now predicting ' + self.name + ' unseen data')

                            test_score_mdb = TestPipeline.score(self.optimum_pipe, self._test_X, self._test_y,
                                                                self.metrics,
                                                                save_predictions=self.persist_options.save_predictions,
                                                                save_feature_importances=self.persist_options.save_feature_importances)

                            Logger().info('.. calculating metrics for test set (' + self.name + ')')
                            Logger().verbose('...now predicting ' + self.name + ' final model with training data')

                            train_score_mdb = TestPipeline.score(self.optimum_pipe, self._validation_X, self._validation_y,
                                                                 self.metrics,
                                                                 save_predictions=self.persist_options.save_predictions,
                                                                 save_feature_importances=self.persist_options.save_feature_importances)

                            # save test fold
                            outer_fold_mdb = MDBInnerFold()
                            outer_fold_mdb.fold_nr = 1
                            outer_fold_mdb.number_samples_training = num_samples_train
                            outer_fold_mdb.number_samples_validation = num_samples_test
                            outer_fold_mdb.training = train_score_mdb
                            outer_fold_mdb.validation = test_score_mdb
                            self.result_tree.outer_folds[-1].best_config.inner_folds = [outer_fold_mdb]

                            Logger().info('PERFORMANCE TRAIN:')
                            for m_key, m_value in train_score_mdb.metrics.items():
                                Logger().info(str(m_key) + ": " + str(m_value))

                            Logger().info('PERFORMANCE TEST:')
                            for m_key, m_value in test_score_mdb.metrics.items():
                                    Logger().info(str(m_key) + ": " + str(m_value))
                        else:

                            # save test fold
                            outer_fold_mdb = MDBInnerFold()
                            outer_fold_mdb.fold_nr = 1
                            outer_fold_mdb.number_samples_training = num_samples_train
                            outer_fold_mdb.number_samples_validation = num_samples_test

                            def _copy_inner_fold_means(metric_dict):
                                # We copy all mean values from validation to the best config
                                # training
                                train_item_metrics = {}
                                for m in metric_dict:
                                    if m.operation == str(FoldOperations.MEAN):
                                        train_item_metrics[m.metric_name] = m.value
                                train_item = MDBScoreInformation()
                                train_item.metrics_copied_from_inner = True
                                train_item.metrics = train_item_metrics
                                return train_item

                            # training
                            outer_fold_mdb.training = _copy_inner_fold_means(best_config_outer_fold.metrics_train)
                            # validation
                            outer_fold_mdb.validation = _copy_inner_fold_means(best_config_outer_fold.metrics_test)

                            self.result_tree.outer_folds[-1].best_config.inner_folds = [outer_fold_mdb]

                    Logger().info('This took {} minutes.'.format((time.time() - t1) / 60))
                    self.result_tree.time_of_results = datetime.datetime.now()
                    self.mongodb_writer.save(self.result_tree)
                    self.__distribute_cv_info_to_hyperpipe_children(reset_final_fit=True, outer_fold_counter=outer_fold_counter)

                # Compute all final metrics
                self.result_tree.metrics_train, self.result_tree.metrics_test = MDBHelper.aggregate_metrics(self.result_tree.outer_folds,
                                                                                                            self.metrics)
                # save result tree to db or file or both
                self.mongodb_writer.save(self.result_tree)
                Logger().info("Saved result tree to database")

                # Find best config across outer folds
                self.best_config = self.config_optimizer.get_optimum_config_outer_folds(self.result_tree.outer_folds)
                self.result_tree.best_config = self.best_config
                Logger().info('OVERALL BEST CONFIGURATION')
                Logger().info('--------------------------')
                Logger().info(self._optimize_printing(self.best_config.config_dict) +
                              '\n' + '... with children config: '
                              + self._optimize_printing(self.best_config.children_config_dict))
                # set self to best config
                self.optimum_pipe = self._pipe
                self.optimum_pipe.set_params(**self.best_config.config_dict)
                self.optimum_pipe.fit(self._validation_X, self._validation_y)

                # save results again
                self.mongodb_writer.save(self.result_tree)
                Logger().info("Saved overall best config to database")
            ###############################################################################################
            else:
                self._pipe.fit(self.X, self.y, **fit_params)

        else:
            Logger().verbose("Avoided fitting of " + self.name + " on fold "
                             + str(self._current_fold) + " because data did not change")
            Logger().verbose('Best config of ' + self.name + ' : ' + str(self.best_config))

        return self

    def predict(self, data):
        """
        Use the optimum pipe to predict the data

        Returns
        -------
            predicted targets

        """
        # Todo: if local_search = true then use optimized pipe here?
        if self._pipe:
            if self.filter_element:
                data = self.filter_element.transform(data)
            return self.optimum_pipe.predict(data)

    def predict_proba(self, data):
        """
        Predict probabilities

        Returns
        -------
        predicted probabilities

        """
        if self._pipe:
            if self.filter_element:
                data = self.filter_element.transform(data)
            return self.optimum_pipe.predict_proba(data)

    def transform(self, data):
        """
        Use the optimum pipe to transform the data
        """
        if self._pipe:
            if self.filter_element:
                data = self.filter_element.transform(data)
            return self.optimum_pipe.transform(data)

    def get_params(self, deep=True):
        """
        Retrieve parameters from sklearn pipeline
        """
        if self._pipe is not None:
            return self._pipe.get_params(deep)
        else:
            return None

    def set_params(self, **params):
        """
        Give parameter values to the pipeline elements
        """
        if self._pipe is not None:
            self._pipe.set_params(**params)
        return self

    def _prepare_pipeline(self):
        """
        build sklearn pipeline from PipelineElements and
        calculate parameter grid for all combinations of pipeline element hyperparameters
        """
        # prepare pipeline
        pipeline_steps = []
        for item in self.pipeline_elements:
            # pipeline_steps.append((item.name, item.base_element))
            pipeline_steps.append((item.name, item))

        # build pipeline...
        self._pipe = Pipeline(pipeline_steps)

    def copy_me(self):
        """
        Helper function to copy all pipeline elements
        """
        item_list =[]
        for item in self.pipeline_elements:
            item_list.append(item.copy_me())
        return item_list

    def _copy_pipeline(self):
        """
        Copy Pipeline by building a new sklearn Pipeline with Pipeline Elements

        Returns
        -------
        new sklearn Pipeline object
        """
        pipeline_steps = []
        for item in self.pipeline_elements:
            cpy = item.copy_me()
            if isinstance(cpy, list):
                for new_step in cpy:
                    pipeline_steps.append((new_step.name, new_step))
            else:
                pipeline_steps.append((cpy.name, cpy))
        return Pipeline(pipeline_steps)

    def save_optimum_pipe(self, file):
        """
        Save optimal pipeline only. Complete hyperpipe will no not be saved.

        Parameters
        ----------
        * 'file' [str]:
            File path as string specifying file to save pipeline to
        """
        element_number = 0
        element_identifier = list()
        folder = os.path.splitext(file)[0]
        file = os.path.splitext(file)[0] + '.photon'

        if os.path.exists(folder):
            Logger().warn('The file you specified already exists as a folder.')
        else:
            os.mkdir(folder)
            folder = folder + '/'
        wrapper_files = list()

        for element_name, element in self.optimum_pipe.named_steps.items():
            filename = '_optimum_pipe_' + str(element_number) + '_' + element_name
            element_identifier.append({'element_name': element_name,
                                       'filename': filename})
            base_element = element.base_element.base_element
            if hasattr(base_element, 'save'):
                base_element.save(folder + filename)
                element_identifier[-1]['mode'] = 'custom'
                element_identifier[-1]['wrapper_script'] = os.path.basename(inspect.getfile(base_element.__class__))
                wrapper_files.append(inspect.getfile(base_element.__class__))
                element_identifier[-1]['test_disabled'] = element.test_disabled
                element_identifier[-1]['disabled'] = element.disabled
                element_identifier[-1]['hyperparameters'] = element.hyperparameters

            else:
                try:
                    joblib.dump(element, folder + filename + '.pkl', compress=1)
                    element_identifier[-1]['mode'] = 'pickle'
                except:
                    raise NotImplementedError("Custom pipeline element must implement .save() method or "
                                              "allow pickle.")
            element_number += 1
        # save pipeline blueprint to make loading of pipeline easier
        with open(folder + '_optimum_pipe_blueprint.pkl', 'wb') as f:
            pickle.dump(element_identifier, f)

        # get all files
        files = glob.glob(folder + '_optimum_pipe_*')
        with zipfile.ZipFile(file, 'w') as myzip:
            for f in files:
                myzip.write(f, os.path.basename(f))
                os.remove(f)
            for f in wrapper_files:
                myzip.write(f, os.path.splitext(os.path.basename(f))[0] + '.py')
        os.removedirs(folder)

    @staticmethod
    def load_optimum_pipe(file):
        """
        Load optimal pipeline.


        Parameters
        ----------
        * `file` [str]:
            File path specifying .photon file to load optimal pipeline from

        Returns
        -------
        sklearn Pipeline with all trained photon_pipelines
        """
        if file.endswith('.photon'):
            archive_name = os.path.splitext(file)[0]
            folder = archive_name + '/'
            zf = zipfile.ZipFile(file)
            zf.extractall(folder)
        else:
            raise FileNotFoundError('Specify .photon file that holds PHOTON optimum pipe.')

        setup_info = pickle.load(open(folder + '_optimum_pipe_blueprint.pkl', 'rb'))
        element_list = list()
        for element_info in setup_info:
            if element_info['mode'] == 'custom':
                spec = importlib.util.spec_from_file_location(element_info['element_name'],
                                                              folder + element_info['wrapper_script'])
                imported_module = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(imported_module)
                base_element = getattr(imported_module, element_info['element_name'])
                custom_element = PipelineElement(name=element_info['element_name'], base_element=base_element(),
                                                 hyperparameters=element_info['hyperparameters'],
                                                 test_disabled=element_info['test_disabled'],
                                                 disabled=element_info['disabled'])
                custom_element.base_element.load(folder + element_info['filename'])
                element_list.append((element_info['element_name'], custom_element))
            else:
                element_list.append((element_info['element_name'], joblib.load(folder + element_info['filename'] + '.pkl')))

        return Pipeline(element_list)


    def inverse_transform_pipeline(self, hyperparameters: dict, data, targets, data_to_inverse):
        """
        Inverse transform data for a pipeline with specific hyperparameter configuration

        1. Copy Sklearn Pipeline,
        2. Set Parameters
        3. Fit Pipeline to data and targets
        4. Inverse transform data with that pipeline

        Parameters
        ----------
        * `hyperparameters` [dict]:
            The concrete configuration settings for the pipeline elements
        * `data` [array-like]:
            The training data to which the pipeline is fitted
        * `targets` [array-like]:
            The truth values for training
        * `data_to_inverse` [array-like]:
            The data that should be inversed after training

        Returns
        -------
        Inversed data as array
        """
        copied_pipe = self._copy_pipeline()
        copied_pipe.set_params(**hyperparameters)
        copied_pipe.fit(data, targets)
        return copied_pipe.inverse_transform(data_to_inverse)

    def _optimize_printing(self, config: dict):
        """
        make the sklearn config syntax prettily readable for humans
        """
        prettified_config = [self.name + '\n']
        for el_key, el_value in config.items():
            items = el_key.split('__')
            name = items[0]
            rest = '__'.join(items[1::])
            if name in self._pipe.named_steps:
                new_pretty_key = '    ' + name + '->'
                prettified_config.append(new_pretty_key +
                                         self._pipe.named_steps[name].prettify_config_output(rest, el_value) + '\n')
            else:
                Logger().error('ValueError: Item is not contained in pipeline:' + name)
                raise ValueError('Item is not contained in pipeline:' + name)
        return ''.join(prettified_config)

    @staticmethod
    def prettify_config_output(config_name: str, config_value):
        """
        Print the disabled = False as Enabled = True for better human reading
        """
        if config_name == "disabled" and config_value is False:
            return "enabled = True"
        else:
            return config_name + '=' + str(config_value)


    def config_to_dict(self, specific_config):
        """
        """
        config = {}
        for key, value in specific_config.items():
            items = key.split('__')
            name = items[0]
            rest = '__'.join(items[1::])
            if name in self._pipe.named_steps:
                config.update(self._pipe.named_steps[name].prettify_config_output(rest, value, return_dict=True))
                #config[name] = value
        return config

Ancestors (in MRO)

  • Hyperpipe
  • sklearn.base.BaseEstimator
  • builtins.object

Class variables

var OPTIMIZER_DICTIONARY

Static methods

def __init__(self, name, inner_cv, outer_cv=None, optimizer='grid_search', optimizer_params={}, metrics=None, best_config_metric=None, eval_final_performance=True, test_size=0.2, calculate_metrics_per_fold=True, calculate_metrics_across_folds=False, groups=None, set_random_seed=False, filter_element=None, imbalanced_data_strategy_filter='', verbosity=0, persist_options=None, performance_constraints=None)

Initialize self. See help(type(self)) for accurate signature.

def __init__(self, name, inner_cv: BaseCrossValidator, outer_cv=None,
             optimizer='grid_search', optimizer_params: dict = {}, metrics=None,
             best_config_metric=None, eval_final_performance=True, test_size: float = 0.2,
             calculate_metrics_per_fold: bool = True, calculate_metrics_across_folds: bool = False,
             groups=None, set_random_seed: bool=False,
             filter_element=None, imbalanced_data_strategy_filter: str = '',
             verbosity=0,
             persist_options=None,
             performance_constraints=None):
    # Re eval_final_performance:
    # set eval_final_performance to False because
    # 1. if no cv-object is given, no split is performed --> seems more logical
    #    than passing nothing, passing no cv-object but getting
    #    an 80/20 split by default
    # 2. if cv-object is given, split is performed but we don't peek
    #    into the test set --> thus we can evaluate more hp configs
    #    later without double dipping
    self.name = name
    self.inner_cv = inner_cv
    self.outer_cv = outer_cv
    self.eval_final_performance = eval_final_performance
    self.test_size = test_size
    self.cv_iter = None
    self.data_test_cases = None
    self.calculate_metrics_per_fold = calculate_metrics_per_fold
    self.calculate_metrics_across_folds = calculate_metrics_across_folds
    # Todo: if self.outer_cv is LeaveOneOut: Set calculate metrics across folds to True -> Print
    self.X = None
    self.y = None
    self.groups = groups
    self.filter_element = filter_element
    if imbalanced_data_strategy_filter:
        self.imbalanced_data_strategy_filter = ImbalancedDataTransform(imbalanced_data_strategy_filter)
    else:
        self.imbalanced_data_strategy_filter = None
    self.fit_duration = 0
    if set_random_seed:
        import random
        random.seed(42)
        print('set random seed to 42')
    # set verbosity level
    Logger().set_verbosity(verbosity)
    # MongoDBWriter setup
    if persist_options:
        self.persist_options = persist_options
        if self.persist_options.log_file:
            Logger().set_custom_log_file(self.persist_options.log_file)
    else:
        self.persist_options = PersistOptions()
    self.mongodb_writer = MongoDBWriter(self.persist_options)
    self.pipeline_elements = []
    self._pipe = None
    self.optimum_pipe = None
    self.metrics = metrics
    #  Todo: raise error or warning if metrics and best config_metric is None
    self.best_config_metric = best_config_metric
    self.config_optimizer = None
    self.result_tree = None
    self.best_config = None
    self.best_children_config = None
    self.best_performance = None
    self.is_final_fit = False
    self.__mother_outer_fold_counter = 0
    self.__mother_inner_fold_counter = 0
    self.__mother_config_counter = 0
    # containers for optimization history and logging
    self._performance_history_list = []
    if isinstance(optimizer, str):
        # instantiate optimizer from string
        #  Todo: check if optimizer strategy is already implemented
        optimizer_class = self.OPTIMIZER_DICTIONARY[optimizer]
        optimizer_instance = optimizer_class(**optimizer_params)
        self.optimizer = optimizer_instance
    else:
        # Todo: check if correct object
        self.optimizer = optimizer
    self._validation_X = None
    self._validation_y = None
    self._test_X = None
    self._test_y = None
    self._last_fit_data_hash = None
    self._current_fold = -1
    self._num_of_folds = 0
    self._is_mother_pipe = True
    self._fold_data_hashes = []
    self.inner_cv_callback_function = performance_constraints

def add(self, pipe_element)

Add an element to the machine learning pipeline Returns self

Parameters

  • pipe_element [PipelineElement or Hyperpipe]: The object to add to the machine learning pipeline, being either a transformer or an estimator.
def add(self, pipe_element):
    """
       Add an element to the machine learning pipeline
       Returns self
       Parameters
       ----------
       * `pipe_element` [PipelineElement or Hyperpipe]:
           The object to add to the machine learning pipeline, being either a transformer or an estimator.
       """
    self.__iadd__(pipe_element)

def fit(self, data, targets, **fit_params)

Starts the hyperparameter search and/or fits the pipeline to the data and targets

Manages the nested cross validated hyperparameter search:

  1. Filters the data according to filter strategy (1) and according to the imbalanced_data_strategy (2)
  2. requests new configurations from the hyperparameter search strategy, the optimizer,
  3. initializes the testing of a specific configuration,
  4. communicates the result to the optimizer,
  5. repeats 2-4 until optimizer delivers no more configurations to test
  6. finally searches for the best config in all tested configs,
  7. trains the pipeline with the best config and evaluates the performance on the test set

Parameters

  • data [array-like, shape=[N, D]]: the training and test data, where N is the number of samples and D is the number of features.

  • targets [array-like, shape=[N]]: the truth values, where N is the number of samples.

Returns

  • 'self' Returns self
def fit(self, data, targets, **fit_params):
    """
    Starts the hyperparameter search and/or fits the pipeline to the data and targets
    Manages the nested cross validated hyperparameter search:
    1. Filters the data according to filter strategy (1) and according to the imbalanced_data_strategy (2)
    2. requests new configurations from the hyperparameter search strategy, the optimizer,
    3. initializes the testing of a specific configuration,
    4. communicates the result to the optimizer,
    5. repeats 2-4 until optimizer delivers no more configurations to test
    6. finally searches for the best config in all tested configs,
    7. trains the pipeline with the best config and evaluates the performance on the test set
    Parameters
    ----------
     * `data` [array-like, shape=[N, D]]:
        the training and test data, where N is the number of samples and D is the number of features.
     * `targets` [array-like, shape=[N]]:
        the truth values, where N is the number of samples.
    Returns
    -------
     * 'self'
        Returns self
    """
    # in case we want to inject some data from outside the pipeline
    self.X = data
    self.y = targets
    # !!!!!!!!!!!!!!!! FIT ONLY IF DATA CHANGED !!!!!!!!!!!!!!!!!!!
    # -------------------------------------------------------------
    # in case we need to reduce the dimension of the data due to parallelity of the outer pipe, lets do it.
    if self.filter_element:
        self.X = self.filter_element.transform(self.X)
    # if the groups are imbalanced, and a strategy is chosen, apply it here
    if self.imbalanced_data_strategy_filter:
        self.imbalanced_data_strategy_filter.fit(self.X, self.y)
        self.X, self.y = self.imbalanced_data_strategy_filter.transform()
    self._current_fold += 1
    # be compatible to list of (image-) files
    if isinstance(self.X, list):
        self.X = np.asarray(self.X)
    if isinstance(self.y, list):
        self.y = np.asarray(self.y)
    # handle neuro Imge paths as data
    # ToDo: Need to check the DATA, not the img paths for neuro
    new_data_hash = sha1(np.asarray(self.X, order='C')).hexdigest()
    # fit
    # 1. if it is first time ever or
    # 2. the data did change for that fold or
    # 3. if it is the mother pipe (then number_of_folds = 0)
    if (len(self._fold_data_hashes) < self._num_of_folds) \
            or (self._num_of_folds > 0 and self._fold_data_hashes[self._current_fold] != new_data_hash) \
            or self._num_of_folds == 0:
        # save data hash for that fold
        if self._num_of_folds > 0:
            if len(self._fold_data_hashes) < self._num_of_folds:
                self._fold_data_hashes.append(new_data_hash)
            else:
                self._fold_data_hashes[self._current_fold] = new_data_hash
        # optimize: iterate through configs and save results
        if not self.is_final_fit:
            # first check if correct optimizer metric has been chosen
            # pass pipeline_elements so that OptimizerMetric can look for last
            # element and use the corresponding score method
            self.config_optimizer = OptimizerMetric(self.best_config_metric, self.pipeline_elements, self.metrics)
            self.metrics = self.config_optimizer.check_metrics()
            if 'score' in self.metrics:
                Logger().warn('Attention: Scoring with default score function of estimator can slow down calculations!')
            # generate OUTER ! cross validation splits to iterate over
            self._generate_outer_cv_indices()
            outer_fold_counter = 0
            if not self._is_mother_pipe:
                self.result_tree_name = self.name + '_outer_fold_' + str(self.__mother_outer_fold_counter)  \
                                        + '_inner_fold_' + str(self.__mother_inner_fold_counter)
            else:
                self.result_tree_name = self.name
            # initialize result logging with hyperpipe class
            self.result_tree = MDBHyperpipe(name=self.result_tree_name)
            self.result_tree.outer_folds = []
            self.result_tree.eval_final_performance = self.eval_final_performance
            self.result_tree.best_config_metric = self.best_config_metric
            # loop over outer cross validation
            for train_indices, test_indices in self.data_test_cases:
                # give the optimizer the chance to inform about elements
                self.optimizer.prepare(self.pipeline_elements)
                outer_fold_counter += 1
                outer_fold_fit_start_time = time.time()
                Logger().info('HYPERPARAMETER SEARCH OF {0}, Outer Cross validation Fold {1}'
                              .format(self.name, outer_fold_counter))
                t1 = time.time()
                # Prepare Train and validation set data
                self._validation_X = self.X[train_indices]
                self._validation_y = self.y[train_indices]
                self._test_X = self.X[test_indices]
                self._test_y = self.y[test_indices]
                # Prepare inner cross validation
                cv_iter = list(self.inner_cv.split(self._validation_X, self._validation_y))
                num_folds = len(cv_iter)
                num_samples_train = len(self._validation_y)
                num_samples_test = len(self._test_y)
                # distribute number of folds to encapsulated child hyperpipes
                self.__distribute_cv_info_to_hyperpipe_children(num_of_folds=num_folds,
                                                                outer_fold_counter=outer_fold_counter)
                tested_config_counter = 0
                # add outer fold info object to result tree
                outer_fold = MDBOuterFold(fold_nr=outer_fold_counter)
                outer_fold.tested_config_list = []
                self.result_tree.outer_folds.append(outer_fold)
                # do the optimizing
                for current_config in self.optimizer.next_config:
                    self.__distribute_cv_info_to_hyperpipe_children(reset=True, config_counter=tested_config_counter)
                    hp = TestPipeline(self._pipe, current_config, self.metrics, self.update_mother_inner_fold_nr,
                                      mongo_db_settings=self.persist_options,
                                      callback_function=self.inner_cv_callback_function)
                    Logger().debug('optimizing of:' + self.name)
                    Logger().debug(self._optimize_printing(current_config))
                    Logger().debug('calculating...')
                    # Test the configuration cross validated by inner_cv object
                    current_config_mdb = hp.calculate_cv_score(self._validation_X, self._validation_y, cv_iter,
                                                        calculate_metrics_per_fold=self.calculate_metrics_per_fold,
                                                        calculate_metrics_across_folds=self.calculate_metrics_across_folds)
                    current_config_mdb.config_nr = tested_config_counter
                    current_config_mdb.config_dict = current_config
                    current_config_mdb.pipe_name = self.name
                    tested_config_counter += 1
                    current_config_mdb.human_readable_config = self.config_to_dict(current_config)
                    # save the configuration of all children pipelines
                    children_config = {}
                    children_config_ref_list = []
                    for pipe_step in self._pipe.steps:
                        item = pipe_step[1]
                        if isinstance(item, Hyperpipe):
                            if item.local_search and item.best_config is not None:
                                children_config[item.name] = item.best_config
                        elif isinstance(item, PipelineStacking):
                            for subhyperpipe_name, hyperpipe in item.pipe_elements.items():
                                if isinstance(hyperpipe, Hyperpipe):
                                    if hyperpipe.local_search and hyperpipe.best_config is not None:
                                        # special case: we need to access pipe over pipeline_stacking element
                                        children_config[item.name + '__' + subhyperpipe_name] = hyperpipe.best_config.config_dict
                                    # children_config_ref_list.append(hyperpipe.best_config_outer_fold._id)
                    specific_parameters = self._pipe.get_params()
                    #current_config_mdb.full_model_spec = specific_parameters
                    current_config_mdb.children_config_dict = children_config
                    current_config_mdb.children_config_ref = children_config_ref_list
                    Logger().verbose(self._optimize_printing(current_config))
                    if not current_config_mdb.config_failed:
                        # get optimizer_metric and forward to optimizer
                        # todo: also pass greater_is_better=True/False to optimizer
                        metric_train = MDBHelper.get_metric(current_config_mdb, FoldOperations.MEAN, self.config_optimizer.metric)
                        metric_test = MDBHelper.get_metric(current_config_mdb, FoldOperations.MEAN, self.config_optimizer.metric, train=False)
                        #
                        # if not metric_train or metric_test:
                        #     raise Exception("Config did not fail, but did not get any metrics either....!!?")
                        config_performance = (metric_train, metric_test)
                        # Print Result for config
                        Logger().debug('...done:')
                        Logger().verbose(self.config_optimizer.metric + str(config_performance))
                    else:
                         config_performance = (-1, -1)
                         # Print Result for config
                         Logger().debug('...failed:')
                         Logger().error(current_config_mdb.config_error)
                    # add config to result tree and do intermediate saving
                    self.result_tree.outer_folds[-1].tested_config_list.append(current_config_mdb)
                    # Todo: add try catch in case config cannot be written
                    self.mongodb_writer.save(self.result_tree)
                    # 3. inform optimizer about performance
                    self.optimizer.evaluate_recent_performance(current_config, config_performance)
                if tested_config_counter > 0:
                    best_config_outer_fold = self.config_optimizer.get_optimum_config(outer_fold.tested_config_list)
                    if not best_config_outer_fold:
                        raise Exception("No best config was found!")
                    best_config_outer_fold_mdb = MDBConfig()
                    best_config_outer_fold_mdb.children_config_dict = best_config_outer_fold.children_config_dict
                    best_config_outer_fold_mdb.pipe_name = self.name
                    best_config_outer_fold_mdb.children_config_ref = best_config_outer_fold.children_config_ref
                    # best_config_outer_fold_mdb.best_config_ref_to_train_item = best_config_outer_fold._id
                    best_config_outer_fold_mdb.config_dict = best_config_outer_fold.config_dict
                    best_config_outer_fold_mdb.human_readable_config = best_config_outer_fold.human_readable_config
                    # inform user
                    Logger().info('finished optimization of ' + self.name)
                    Logger().verbose('Result')
                    Logger().verbose('Number of tested configurations:' + str(tested_config_counter))
                    Logger().verbose('Optimizer metric: ' + self.config_optimizer.metric + '\n' +
                                     '   --> Greater is better: ' + str(self.config_optimizer.greater_is_better))
                    Logger().info('Best config: ' + self._optimize_printing(best_config_outer_fold_mdb.config_dict) +
                                  '\n' + '... with children config: '
                                  + self._optimize_printing(best_config_outer_fold_mdb.children_config_dict))
                    # ... and create optimal pipeline
                    self.optimum_pipe = self._pipe
                    # set self to best config
                    self.optimum_pipe.set_params(**best_config_outer_fold_mdb.config_dict)
                    # set all children to best config and inform to NOT optimize again, ONLY fit
                    for child_name, child_config in best_config_outer_fold_mdb.children_config_dict.items():
                        if child_config:
                            # in case we have a pipeline stacking we need to identify the particular subhyperpipe
                            splitted_name = child_name.split('__')
                            if len(splitted_name) > 1:
                                stacking_element = self.optimum_pipe.named_steps[splitted_name[0]]
                                pipe_element = stacking_element.pipe_elements[splitted_name[1]]
                            else:
                                pipe_element = self.optimum_pipe.named_steps[child_name]
                            pipe_element.set_params(**child_config)
                            pipe_element.is_final_fit = True
                    self.__distribute_cv_info_to_hyperpipe_children(reset=True)
                    Logger().verbose('...now fitting ' + self.name + ' with optimum configuration')
                    fit_time_start = time.time()
                    self.optimum_pipe.fit(self._validation_X, self._validation_y)
                    final_fit_duration = time.time() - fit_time_start
                    #self.best_config_outer_fold.full_model_spec = self.optimum_pipe.get_params()
                    best_config_outer_fold_mdb.fit_duration_minutes = final_fit_duration
                    self.result_tree.outer_folds[-1].best_config = best_config_outer_fold_mdb
                    self.result_tree.outer_folds[-1].best_config.inner_folds = []
                    if self.eval_final_performance:
                        # Todo: generate mean and std over outer folds as well. move this items to the top
                        Logger().verbose('...now predicting ' + self.name + ' unseen data')
                        test_score_mdb = TestPipeline.score(self.optimum_pipe, self._test_X, self._test_y,
                                                            self.metrics,
                                                            save_predictions=self.persist_options.save_predictions,
                                                            save_feature_importances=self.persist_options.save_feature_importances)
                        Logger().info('.. calculating metrics for test set (' + self.name + ')')
                        Logger().verbose('...now predicting ' + self.name + ' final model with training data')
                        train_score_mdb = TestPipeline.score(self.optimum_pipe, self._validation_X, self._validation_y,
                                                             self.metrics,
                                                             save_predictions=self.persist_options.save_predictions,
                                                             save_feature_importances=self.persist_options.save_feature_importances)
                        # save test fold
                        outer_fold_mdb = MDBInnerFold()
                        outer_fold_mdb.fold_nr = 1
                        outer_fold_mdb.number_samples_training = num_samples_train
                        outer_fold_mdb.number_samples_validation = num_samples_test
                        outer_fold_mdb.training = train_score_mdb
                        outer_fold_mdb.validation = test_score_mdb
                        self.result_tree.outer_folds[-1].best_config.inner_folds = [outer_fold_mdb]
                        Logger().info('PERFORMANCE TRAIN:')
                        for m_key, m_value in train_score_mdb.metrics.items():
                            Logger().info(str(m_key) + ": " + str(m_value))
                        Logger().info('PERFORMANCE TEST:')
                        for m_key, m_value in test_score_mdb.metrics.items():
                                Logger().info(str(m_key) + ": " + str(m_value))
                    else:
                        # save test fold
                        outer_fold_mdb = MDBInnerFold()
                        outer_fold_mdb.fold_nr = 1
                        outer_fold_mdb.number_samples_training = num_samples_train
                        outer_fold_mdb.number_samples_validation = num_samples_test
                        def _copy_inner_fold_means(metric_dict):
                            # We copy all mean values from validation to the best config
                            # training
                            train_item_metrics = {}
                            for m in metric_dict:
                                if m.operation == str(FoldOperations.MEAN):
                                    train_item_metrics[m.metric_name] = m.value
                            train_item = MDBScoreInformation()
                            train_item.metrics_copied_from_inner = True
                            train_item.metrics = train_item_metrics
                            return train_item
                        # training
                        outer_fold_mdb.training = _copy_inner_fold_means(best_config_outer_fold.metrics_train)
                        # validation
                        outer_fold_mdb.validation = _copy_inner_fold_means(best_config_outer_fold.metrics_test)
                        self.result_tree.outer_folds[-1].best_config.inner_folds = [outer_fold_mdb]
                Logger().info('This took {} minutes.'.format((time.time() - t1) / 60))
                self.result_tree.time_of_results = datetime.datetime.now()
                self.mongodb_writer.save(self.result_tree)
                self.__distribute_cv_info_to_hyperpipe_children(reset_final_fit=True, outer_fold_counter=outer_fold_counter)
            # Compute all final metrics
            self.result_tree.metrics_train, self.result_tree.metrics_test = MDBHelper.aggregate_metrics(self.result_tree.outer_folds,
                                                                                                        self.metrics)
            # save result tree to db or file or both
            self.mongodb_writer.save(self.result_tree)
            Logger().info("Saved result tree to database")
            # Find best config across outer folds
            self.best_config = self.config_optimizer.get_optimum_config_outer_folds(self.result_tree.outer_folds)
            self.result_tree.best_config = self.best_config
            Logger().info('OVERALL BEST CONFIGURATION')
            Logger().info('--------------------------')
            Logger().info(self._optimize_printing(self.best_config.config_dict) +
                          '\n' + '... with children config: '
                          + self._optimize_printing(self.best_config.children_config_dict))
            # set self to best config
            self.optimum_pipe = self._pipe
            self.optimum_pipe.set_params(**self.best_config.config_dict)
            self.optimum_pipe.fit(self._validation_X, self._validation_y)
            # save results again
            self.mongodb_writer.save(self.result_tree)
            Logger().info("Saved overall best config to database")
        ###############################################################################################
        else:
            self._pipe.fit(self.X, self.y, **fit_params)
    else:
        Logger().verbose("Avoided fitting of " + self.name + " on fold "
                         + str(self._current_fold) + " because data did not change")
        Logger().verbose('Best config of ' + self.name + ' : ' + str(self.best_config))
    return self

def get_params(self, deep=True)

Retrieve parameters from sklearn pipeline

def get_params(self, deep=True):
    """
    Retrieve parameters from sklearn pipeline
    """
    if self._pipe is not None:
        return self._pipe.get_params(deep)
    else:
        return None

def inverse_transform_pipeline(self, hyperparameters, data, targets, data_to_inverse)

Inverse transform data for a pipeline with specific hyperparameter configuration

  1. Copy Sklearn Pipeline,
  2. Set Parameters
  3. Fit Pipeline to data and targets
  4. Inverse transform data with that pipeline

Parameters

  • hyperparameters [dict]: The concrete configuration settings for the pipeline elements
  • data [array-like]: The training data to which the pipeline is fitted
  • targets [array-like]: The truth values for training
  • data_to_inverse [array-like]: The data that should be inversed after training

Returns

Inversed data as array

def inverse_transform_pipeline(self, hyperparameters: dict, data, targets, data_to_inverse):
    """
    Inverse transform data for a pipeline with specific hyperparameter configuration
    1. Copy Sklearn Pipeline,
    2. Set Parameters
    3. Fit Pipeline to data and targets
    4. Inverse transform data with that pipeline
    Parameters
    ----------
    * `hyperparameters` [dict]:
        The concrete configuration settings for the pipeline elements
    * `data` [array-like]:
        The training data to which the pipeline is fitted
    * `targets` [array-like]:
        The truth values for training
    * `data_to_inverse` [array-like]:
        The data that should be inversed after training
    Returns
    -------
    Inversed data as array
    """
    copied_pipe = self._copy_pipeline()
    copied_pipe.set_params(**hyperparameters)
    copied_pipe.fit(data, targets)
    return copied_pipe.inverse_transform(data_to_inverse)

def load_optimum_pipe(file)

Load optimal pipeline.

Parameters

  • file [str]: File path specifying .photon file to load optimal pipeline from

Returns

sklearn Pipeline with all trained photon_pipelines

@staticmethod
def load_optimum_pipe(file):
    """
    Load optimal pipeline.
    Parameters
    ----------
    * `file` [str]:
        File path specifying .photon file to load optimal pipeline from
    Returns
    -------
    sklearn Pipeline with all trained photon_pipelines
    """
    if file.endswith('.photon'):
        archive_name = os.path.splitext(file)[0]
        folder = archive_name + '/'
        zf = zipfile.ZipFile(file)
        zf.extractall(folder)
    else:
        raise FileNotFoundError('Specify .photon file that holds PHOTON optimum pipe.')
    setup_info = pickle.load(open(folder + '_optimum_pipe_blueprint.pkl', 'rb'))
    element_list = list()
    for element_info in setup_info:
        if element_info['mode'] == 'custom':
            spec = importlib.util.spec_from_file_location(element_info['element_name'],
                                                          folder + element_info['wrapper_script'])
            imported_module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(imported_module)
            base_element = getattr(imported_module, element_info['element_name'])
            custom_element = PipelineElement(name=element_info['element_name'], base_element=base_element(),
                                             hyperparameters=element_info['hyperparameters'],
                                             test_disabled=element_info['test_disabled'],
                                             disabled=element_info['disabled'])
            custom_element.base_element.load(folder + element_info['filename'])
            element_list.append((element_info['element_name'], custom_element))
        else:
            element_list.append((element_info['element_name'], joblib.load(folder + element_info['filename'] + '.pkl')))
    return Pipeline(element_list)

def predict(self, data)

Use the optimum pipe to predict the data

Returns

predicted targets
def predict(self, data):
    """
    Use the optimum pipe to predict the data
    Returns
    -------
        predicted targets
    """
    # Todo: if local_search = true then use optimized pipe here?
    if self._pipe:
        if self.filter_element:
            data = self.filter_element.transform(data)
        return self.optimum_pipe.predict(data)

def predict_proba(self, data)

Predict probabilities

Returns

predicted probabilities

def predict_proba(self, data):
    """
    Predict probabilities
    Returns
    -------
    predicted probabilities
    """
    if self._pipe:
        if self.filter_element:
            data = self.filter_element.transform(data)
        return self.optimum_pipe.predict_proba(data)

def save_optimum_pipe(self, file)

Save optimal pipeline only. Complete hyperpipe will no not be saved.

Parameters

  • 'file' [str]: File path as string specifying file to save pipeline to
def save_optimum_pipe(self, file):
    """
    Save optimal pipeline only. Complete hyperpipe will no not be saved.
    Parameters
    ----------
    * 'file' [str]:
        File path as string specifying file to save pipeline to
    """
    element_number = 0
    element_identifier = list()
    folder = os.path.splitext(file)[0]
    file = os.path.splitext(file)[0] + '.photon'
    if os.path.exists(folder):
        Logger().warn('The file you specified already exists as a folder.')
    else:
        os.mkdir(folder)
        folder = folder + '/'
    wrapper_files = list()
    for element_name, element in self.optimum_pipe.named_steps.items():
        filename = '_optimum_pipe_' + str(element_number) + '_' + element_name
        element_identifier.append({'element_name': element_name,
                                   'filename': filename})
        base_element = element.base_element.base_element
        if hasattr(base_element, 'save'):
            base_element.save(folder + filename)
            element_identifier[-1]['mode'] = 'custom'
            element_identifier[-1]['wrapper_script'] = os.path.basename(inspect.getfile(base_element.__class__))
            wrapper_files.append(inspect.getfile(base_element.__class__))
            element_identifier[-1]['test_disabled'] = element.test_disabled
            element_identifier[-1]['disabled'] = element.disabled
            element_identifier[-1]['hyperparameters'] = element.hyperparameters
        else:
            try:
                joblib.dump(element, folder + filename + '.pkl', compress=1)
                element_identifier[-1]['mode'] = 'pickle'
            except:
                raise NotImplementedError("Custom pipeline element must implement .save() method or "
                                          "allow pickle.")
        element_number += 1
    # save pipeline blueprint to make loading of pipeline easier
    with open(folder + '_optimum_pipe_blueprint.pkl', 'wb') as f:
        pickle.dump(element_identifier, f)
    # get all files
    files = glob.glob(folder + '_optimum_pipe_*')
    with zipfile.ZipFile(file, 'w') as myzip:
        for f in files:
            myzip.write(f, os.path.basename(f))
            os.remove(f)
        for f in wrapper_files:
            myzip.write(f, os.path.splitext(os.path.basename(f))[0] + '.py')
    os.removedirs(folder)

def set_params(self, **params)

Give parameter values to the pipeline elements

def set_params(self, **params):
    """
    Give parameter values to the pipeline elements
    """
    if self._pipe is not None:
        self._pipe.set_params(**params)
    return self

def transform(self, data)

Use the optimum pipe to transform the data

def transform(self, data):
    """
    Use the optimum pipe to transform the data
    """
    if self._pipe:
        if self.filter_element:
            data = self.filter_element.transform(data)
        return self.optimum_pipe.transform(data)