Stacking

Stacking is a way of combining multiple models, that introduces the concept of a meta learner. It is like a stack of vertical pipeline paths. Of course, we have an easy implementation in PHOTON.

What you need are the classes PipelineBranch, to create subpipelines to stack and a container class, that vertically stacks them and makes the data flow in parallel through all items, PipelineStacking. Of course, you can stack singular PipelineElements, too. You can even stack Hyperpipes, but be careful with that, we are in the beta phase.

See an example

Pipeline Branch

You can use the class Pipeline Branch to define subpipelines for your stacking container. In contrast to stacking simple pipeline elements you can have several items per branch.

class PipelineBranch

A substream of pipeline elements that is encapsulated e.g. for parallelization

Parameters

  • name [str]: Name of the encapsulated item and/or summary of the encapsulated element`s functions
class PipelineBranch(PipelineElement):
    """
     A substream of pipeline elements that is encapsulated e.g. for parallelization

     Parameters
     ----------
        * `name` [str]:
            Name of the encapsulated item and/or summary of the encapsulated element`s functions

        """

    def __init__(self, name):

        super().__init__(name, {}, test_disabled=False, disabled=False, base_element=True)

        self.pipeline_elements = []

    def __iadd__(self, pipe_element):
        """
        Add an element to the sub pipeline
        Returns self

        Parameters
        ----------
        * `pipe_element` [PipelineElement or Hyperpipe]:
            The object to add, being either a transformer or an estimator.

        """
        self.pipeline_elements.append(pipe_element)
        self._prepare_pipeline()
        return self

    def add(self, pipe_element):
        """
           Add an element to the sub pipeline
           Returns self
    
           Parameters
           ----------
           * `pipe_element` [PipelineElement or Hyperpipe]:
               The object to add, being either a transformer or an estimator.
    
           """
        self.__iadd__(pipe_element)

    def _prepare_pipeline(self):
        """ Generates sklearn pipeline with all underlying steps """
        pipeline_steps = []

        for item in self.pipeline_elements:
            # pipeline_steps.append((item.name, item.base_element))
            pipeline_steps.append((item.name, item))
            self._hyperparameters[item.name] = item.hyperparameters

        self.generate_sklearn_hyperparameters()
        self.base_element = Pipeline(pipeline_steps)

    @property
    def hyperparameters(self):
        return self._hyperparameters

    @hyperparameters.setter
    def hyperparameters(self, value):
        """
        Setting hyperparameters does not make sense, only the items that added can be optimized, not the container (self)
        """
        return None

    def generate_config_grid(self):
        return create_global_config_grid(self.pipeline_elements, self.name)

    def generate_sklearn_hyperparameters(self):
        """
        Generates a dictionary according to the sklearn convention of element_name__parameter_name: parameter_value
        """
        self._hyperparameters = {}
        for element in self.pipeline_elements:
            for attribute, value_list in element.hyperparameters.items():
                self._hyperparameters[self.name + '__' + attribute] = value_list

Ancestors (in MRO)

Static methods

def __init__(self, name)

Takes a string literal and transforms it into an object of the associated class (see PhotonCore.JSON)

Returns

instantiated class object

def __init__(self, name):
    super().__init__(name, {}, test_disabled=False, disabled=False, base_element=True)
    self.pipeline_elements = []

def add(self, pipe_element)

Add an element to the sub pipeline Returns self

Parameters

  • pipe_element [PipelineElement or Hyperpipe]: The object to add, being either a transformer or an estimator.
def add(self, pipe_element):
    """
       Add an element to the sub pipeline
       Returns self

       Parameters
       ----------
       * `pipe_element` [PipelineElement or Hyperpipe]:
           The object to add, being either a transformer or an estimator.

       """
    self.__iadd__(pipe_element)

def fit(self, data, targets=None)

Calls the fit function of the base element

Returns

self

def fit(self, data, targets=None):
    """
    Calls the fit function of the base element
    Returns
    ------
    self
    """
    if not self.disabled:
        obj = self.base_element
        obj.fit(data, targets)
        # self.base_element.fit(data, targets)
    return self

def get_params(self, deep=True)

Forwards the get_params request to the wrapped base element

def get_params(self, deep: bool=True):
    """
    Forwards the get_params request to the wrapped base element
    """
    return self.base_element.get_params(deep)

def inverse_transform(self, data)

Calls inverse_transform on the base element

def inverse_transform(self, data):
    """
    Calls inverse_transform on the base element
    """
    if hasattr(self.base_element, 'inverse_transform'):
        return self.base_element.inverse_transform(data)
    else:
        # raise Warning('Element ' + self.name + ' has no method inverse_transform')
        return data

def predict(self, data)

Calls predict function on the base element.

IF PREDICT IS NOT AVAILABLE CALLS TRANSFORM. This is for the case that the encapsulated hyperpipe only part of another hyperpipe, and works as a transformer. Sklearn usually expects the last element to predict. Also this is needed in case we are using an autoencoder which is firstly trained by using predict, and after training only used for transforming.

def predict(self, data):
    """
    Calls predict function on the base element.
    IF PREDICT IS NOT AVAILABLE CALLS TRANSFORM.
    This is for the case that the encapsulated hyperpipe only part of another hyperpipe, and works as a transformer.
    Sklearn usually expects the last element to predict.
    Also this is needed in case we are using an autoencoder which is firstly trained by using predict, and after
    training only used for transforming.
    """
    if not self.disabled:
        if hasattr(self.base_element, 'predict'):
            return self.base_element.predict(data)
        elif hasattr(self.base_element, 'transform'):
            return self.base_element.transform(data)
        else:
            Logger().error('BaseException. base Element should have function ' +
                           'predict, or at least transform.')
            raise BaseException('base Element should have function predict, or at least transform.')
    else:
        return data

def predict_proba(self, data)

Predict probabilities base element needs predict_proba() function, otherwise throw base exception.

def predict_proba(self, data):
    """
    Predict probabilities
    base element needs predict_proba() function, otherwise throw
    base exception.
    """
    if not self.disabled:
        if hasattr(self.base_element, 'predict_proba'):
            return self.base_element.predict_proba(data)
        else:
            Logger().error('BaseException. base Element should have "predict_proba" function.')
        raise BaseException('base Element should have predict_proba function.')
    return data

def score(self, X_test, y_test)

Calls the score function on the base element: Returns a goodness of fit measure or a likelihood of unseen data:

def score(self, X_test, y_test):
    """
    Calls the score function on the base element:
    Returns a goodness of fit measure or a likelihood of unseen data:
    """
    return self.base_element.score(X_test, y_test)

def set_params(self, **kwargs)

Forwards the set_params request to the wrapped base element Takes care of the disabled parameter which is additionally attached by the PHOTON wrapper

def set_params(self, **kwargs):
    """
    Forwards the set_params request to the wrapped base element
    Takes care of the disabled parameter which is additionally attached by the PHOTON wrapper
    """
    # element disable is a construct used for this container only
    if self._sklearn_disabled in kwargs:
        self.disabled = kwargs[self._sklearn_disabled]
        del kwargs[self._sklearn_disabled]
    elif 'disabled' in kwargs:
        self.disabled = kwargs['disabled']
        del kwargs['disabled']
    self.base_element.set_params(**kwargs)
    return self

def transform(self, data)

Calls transform on the base element.

IN CASE THERE IS NO TRANSFORM METHOD, CALLS PREDICT. This is used if we are using an estimator as a preprocessing step.

def transform(self, data):
    """
    Calls transform on the base element.
    IN CASE THERE IS NO TRANSFORM METHOD, CALLS PREDICT.
    This is used if we are using an estimator as a preprocessing step.
    """
    if not self.disabled:
        if hasattr(self.base_element, 'transform'):
            return self.base_element.transform(data)
        elif hasattr(self.base_element, 'predict'):
            return self.base_element.predict(data)
        else:
            Logger().error('BaseException: transform-predict-mess')
            raise BaseException('transform-predict-mess')
    else:
        return data

Instance variables

var hyperparameters

var pipeline_elements

Pipeline Stacking

The Pipeline Stacking class is a container pipeline element that vertically stacks subpipeline streams. You can add single pipeline items or create a pipeline branch and add it to the stacking element. Then your data flows parallel through all the added items. In the end it will be horizontally concatenated or, if voting=True, you get a single prediction.

class PipelineStacking

Creates a vertical stacking/parallelization of pipeline items.

The object acts as single pipeline element and encapsulates several vertically stacked other pipeline elements, each child receiving the same input data. The data is iteratively distributed to all children, the results are collected and horizontally concatenated.

class PipelineStacking(PipelineElement):
    """
    Creates a vertical stacking/parallelization of pipeline items.

    The object acts as single pipeline element and encapsulates several vertically stacked other pipeline elements, each
    child receiving the same input data. The data is iteratively distributed to all children, the results are collected
    and horizontally concatenated.

    """
    def __init__(self, name: str, stacking_elements=None, voting: bool=True):
        """
        Creates a new PipelineStacking element.
        Collects all possible hyperparameter combinations of the children

        Parameters
        ----------
        * `name` [str]:
            Give the pipeline element a name
        * `stacking_elements` [list, optional]:
            List of pipeline elements that should run in parallel
        * `voting` [bool]:
            If true, the predictions of the encapsulated pipeline elements are joined to a single prediction
        """
        super(PipelineStacking, self).__init__(name, hyperparameters={}, test_disabled=False, disabled=False,
                                               base_element=True)

        self._hyperparameters = {}
        self.pipe_elements = OrderedDict()
        self.voting = voting
        if stacking_elements is not None:
            for item_to_stack in stacking_elements:
                self.__iadd__(item_to_stack)

    def __iadd__(self, item):
        """
        Adds a new element to the stack.
        Generates sklearn hyperparameter names in order to set the item's hyperparameters in the optimization process.

        * `item` [PipelineElement or PipelineBranch or Hyperpipe]:
            The Element that should be stacked and will run in a vertical parallelization in the original pipe.
        """
        self.pipe_elements[item.name] = item
        self._hyperparameters[item.name] = item.hyperparameters

        # for each configuration
        tmp_dict = dict(item.hyperparameters)
        for key, element in tmp_dict.items():
            if isinstance(item, PipelineElement):
                self._hyperparameters[self.name + '__' + key] = tmp_dict[key]
            else:
                self._hyperparameters[self.name + '__' + item.name + '__' + key] = tmp_dict[key]
        return self

    def add(self, item):
        self.__iadd__(item)

    @property
    def hyperparameters(self):
        return self._hyperparameters

    @hyperparameters.setter
    def hyperparameters(self, value):
        """
        Setting hyperparameters does not make sense, only the items that added can be optimized, not the container (self)
        """
        pass

    def generate_config_grid(self):
        return create_global_config_grid(self.pipe_elements.values(), self.name)

    def get_params(self, deep=True):
        all_params = {}
        for name, element in self.pipe_elements.items():
            all_params[name] = element.get_params(deep)
        return all_params

    def set_params(self, **kwargs):
        """
        Find the particular child and distribute the params to it
        """
        spread_params_dict = {}
        for k, val in kwargs.items():
            splitted_k = k.split('__')
            item_name = splitted_k[0]
            if item_name not in spread_params_dict:
                spread_params_dict[item_name] = {}
            dict_entry = {'__'.join(splitted_k[1::]): val}
            spread_params_dict[item_name].update(dict_entry)

        for name, params in spread_params_dict.items():
            if name in self.pipe_elements:
                self.pipe_elements[name].set_params(**params)
            else:
                Logger().error('NameError: Could not find element ' + name)
                raise NameError('Could not find element ', name)
        return self

    def fit(self, data, targets=None):
        """
        Calls fit iteratively on every child
        """
        for name, element in self.pipe_elements.items():
            # Todo: parallellize fitting
            element.fit(data, targets)
        return self

    def predict(self, data):
        """
        Iteratively calls predict on every child.
        """
        # Todo: strategy for concatenating data from different pipes
        # todo: parallelize prediction
        predicted_data = np.empty((0, 0))
        for name, element in self.pipe_elements.items():
            element_transform = element.predict(data)
            predicted_data = PipelineStacking.stack_data(predicted_data, element_transform)
        if self.voting:
            if hasattr(predicted_data, 'shape'):
                if len(predicted_data.shape) > 1:
                    predicted_data = np.mean(predicted_data, axis=1).astype(int)
        return predicted_data

    def predict_proba(self, data):
        """
        Predict probabilities for every pipe element and
        stack them together. Alternatively, do voting instead.
        """
        predicted_data = np.empty((0, 0))
        for name, element in self.pipe_elements.items():
            element_transform = element.predict_proba(data)
            predicted_data = PipelineStacking.stack_data(predicted_data, element_transform)
        if self.voting:
            if hasattr(predicted_data, 'shape'):
                if len(predicted_data.shape) > 1:
                    predicted_data = np.mean(predicted_data, axis=1).astype(int)
        return predicted_data

    def transform(self, data):
        """
        Calls transform on every child.

        If the encapsulated child is a hyperpipe, also calls predict on the last element in the pipeline.
        """
        transformed_data = np.empty((0, 0))
        for name, element in self.pipe_elements.items():
            # if it is a hyperpipe with a final estimator, we want to use predict:
            if hasattr(element, 'pipe'):
                if element.overwrite_x is not None:
                    element_data = element.overwrite_x
                else:
                    element_data = data
                if element.pipe._final_estimator:
                    element_transform = element.predict(element_data)
                else:
                    # if it is just a preprocessing pipe we want to use transform
                    element_transform = element.transform(element_data)
            else:
                raise "I dont know what todo!"

            transformed_data = PipelineStacking.stack_data(transformed_data, element_transform)

        return transformed_data

    # def fit_predict(self, data, targets):
    #     predicted_data = None
    #     for name, element in self.pipe_elements.items():
    #         element_transform = element.fit_predict(data)
    #         predicted_data = PipelineStacking.stack_data(predicted_data, element_transform)
    #     return predicted_data
    #
    # def fit_transform(self, data, targets=None):
    #     transformed_data = np.empty((0, 0))
    #     for name, element in self.pipe_elements.items():
    #         # if it is a hyperpipe with a final estimator, we want to use predict:
    #         if hasattr(element, 'pipe'):
    #             if element.pipe._final_estimator:
    #                 element.fit(data, targets)
    #                 element_transform = element.predict(data)
    #             else:
    #                 # if it is just a preprocessing pipe we want to use transform
    #                 element.fit(data)
    #                 element_transform = element.transform(data)
    #             transformed_data = PipelineStacking.stack_data(transformed_data, element_transform)
    #     return transformed_data

    @classmethod
    def stack_data(cls, a, b):
        """
        Helper method to horizontally join the outcome of each child

        Parameters
        ----------
        * `a` [ndarray]:
            The existing matrix
        * `b` [ndarray]:
            The matrix that is to be attached horizontally

        Returns
        -------
        New matrix, that is a and b horizontally joined

        """
        if not a.any():
            a = b
        else:
            # Todo: check for right dimensions!
            if a.ndim == 1 and b.ndim == 1:
                a = np.column_stack((a, b))
            else:
                b = np.reshape(b, (b.shape[0], 1))
                a = np.concatenate((a, b), 1)
        return a

    def score(self, X_test, y_test):
        """
        Calculate accuracy for predictions made with this object.
        This function should probably never be called.

        """
        # Todo: invent strategy for this ?
        # raise BaseException('PipelineStacking.score should probably never be reached.')
        # return 16
        predicted = self.predict(X_test)

        return accuracy_score(y_test, predicted)

Ancestors (in MRO)

Class variables

var ELEMENT_DICTIONARY

Static methods

def __init__(self, name, stacking_elements=None, voting=True)

Creates a new PipelineStacking element. Collects all possible hyperparameter combinations of the children

Parameters

  • name [str]: Give the pipeline element a name
  • stacking_elements [list, optional]: List of pipeline elements that should run in parallel
  • voting [bool]: If true, the predictions of the encapsulated pipeline elements are joined to a single prediction
def __init__(self, name: str, stacking_elements=None, voting: bool=True):
    """
    Creates a new PipelineStacking element.
    Collects all possible hyperparameter combinations of the children
    Parameters
    ----------
    * `name` [str]:
        Give the pipeline element a name
    * `stacking_elements` [list, optional]:
        List of pipeline elements that should run in parallel
    * `voting` [bool]:
        If true, the predictions of the encapsulated pipeline elements are joined to a single prediction
    """
    super(PipelineStacking, self).__init__(name, hyperparameters={}, test_disabled=False, disabled=False,
                                           base_element=True)
    self._hyperparameters = {}
    self.pipe_elements = OrderedDict()
    self.voting = voting
    if stacking_elements is not None:
        for item_to_stack in stacking_elements:
            self.__iadd__(item_to_stack)

def add(self, item)

def add(self, item):
    self.__iadd__(item)

def copy_me(self)

def copy_me(self):
    return deepcopy(self)

def fit(self, data, targets=None)

Calls fit iteratively on every child

def fit(self, data, targets=None):
    """
    Calls fit iteratively on every child
    """
    for name, element in self.pipe_elements.items():
        # Todo: parallellize fitting
        element.fit(data, targets)
    return self

def generate_config_grid(self)

def generate_config_grid(self):
    return create_global_config_grid(self.pipe_elements.values(), self.name)

def generate_sklearn_hyperparameters(self, value)

Generates a dictionary according to the sklearn convention of element_name__parameter_name: parameter_value

def generate_sklearn_hyperparameters(self, value: dict):
    """
    Generates a dictionary according to the sklearn convention of element_name__parameter_name: parameter_value
    """
    self._hyperparameters = {}
    for attribute, value_list in value.items():
        self._hyperparameters[self.name + '__' + attribute] = value_list
    if self.test_disabled:
        self._hyperparameters[self._sklearn_disabled] = [False, True]

def get_params(self, deep=True)

Forwards the get_params request to the wrapped base element

def get_params(self, deep=True):
    all_params = {}
    for name, element in self.pipe_elements.items():
        all_params[name] = element.get_params(deep)
    return all_params

def inverse_transform(self, data)

Calls inverse_transform on the base element

def inverse_transform(self, data):
    """
    Calls inverse_transform on the base element
    """
    if hasattr(self.base_element, 'inverse_transform'):
        return self.base_element.inverse_transform(data)
    else:
        # raise Warning('Element ' + self.name + ' has no method inverse_transform')
        return data

def predict(self, data)

Iteratively calls predict on every child.

def predict(self, data):
    """
    Iteratively calls predict on every child.
    """
    # Todo: strategy for concatenating data from different pipes
    # todo: parallelize prediction
    predicted_data = np.empty((0, 0))
    for name, element in self.pipe_elements.items():
        element_transform = element.predict(data)
        predicted_data = PipelineStacking.stack_data(predicted_data, element_transform)
    if self.voting:
        if hasattr(predicted_data, 'shape'):
            if len(predicted_data.shape) > 1:
                predicted_data = np.mean(predicted_data, axis=1).astype(int)
    return predicted_data

def predict_proba(self, data)

Predict probabilities for every pipe element and stack them together. Alternatively, do voting instead.

def predict_proba(self, data):
    """
    Predict probabilities for every pipe element and
    stack them together. Alternatively, do voting instead.
    """
    predicted_data = np.empty((0, 0))
    for name, element in self.pipe_elements.items():
        element_transform = element.predict_proba(data)
        predicted_data = PipelineStacking.stack_data(predicted_data, element_transform)
    if self.voting:
        if hasattr(predicted_data, 'shape'):
            if len(predicted_data.shape) > 1:
                predicted_data = np.mean(predicted_data, axis=1).astype(int)
    return predicted_data

def prettify_config_output(self, config_name, config_value, return_dict=False)

Make hyperparameter combinations human readable

def prettify_config_output(self, config_name: str, config_value, return_dict:bool=False):
    """Make hyperparameter combinations human readable """
    if config_name == "disabled" and config_value is False:
        if return_dict:
            return {'enabled':True}
        else:
            return "enabled = True"
    else:
        if return_dict:
            return {config_name:config_value}
        else:
            return config_name + '=' + str(config_value)

def score(self, X_test, y_test)

Calculate accuracy for predictions made with this object. This function should probably never be called.

def score(self, X_test, y_test):
    """
    Calculate accuracy for predictions made with this object.
    This function should probably never be called.
    """
    # Todo: invent strategy for this ?
    # raise BaseException('PipelineStacking.score should probably never be reached.')
    # return 16
    predicted = self.predict(X_test)
    return accuracy_score(y_test, predicted)

def set_params(self, **kwargs)

Find the particular child and distribute the params to it

def set_params(self, **kwargs):
    """
    Find the particular child and distribute the params to it
    """
    spread_params_dict = {}
    for k, val in kwargs.items():
        splitted_k = k.split('__')
        item_name = splitted_k[0]
        if item_name not in spread_params_dict:
            spread_params_dict[item_name] = {}
        dict_entry = {'__'.join(splitted_k[1::]): val}
        spread_params_dict[item_name].update(dict_entry)
    for name, params in spread_params_dict.items():
        if name in self.pipe_elements:
            self.pipe_elements[name].set_params(**params)
        else:
            Logger().error('NameError: Could not find element ' + name)
            raise NameError('Could not find element ', name)
    return self

def transform(self, data)

Calls transform on every child.

If the encapsulated child is a hyperpipe, also calls predict on the last element in the pipeline.

def transform(self, data):
    """
    Calls transform on every child.
    If the encapsulated child is a hyperpipe, also calls predict on the last element in the pipeline.
    """
    transformed_data = np.empty((0, 0))
    for name, element in self.pipe_elements.items():
        # if it is a hyperpipe with a final estimator, we want to use predict:
        if hasattr(element, 'pipe'):
            if element.overwrite_x is not None:
                element_data = element.overwrite_x
            else:
                element_data = data
            if element.pipe._final_estimator:
                element_transform = element.predict(element_data)
            else:
                # if it is just a preprocessing pipe we want to use transform
                element_transform = element.transform(element_data)
        else:
            raise "I dont know what todo!"
        transformed_data = PipelineStacking.stack_data(transformed_data, element_transform)
    return transformed_data

Instance variables

var hyperparameters

var pipe_elements

var voting

Methods

def create(cls, name, base_element, hyperparameters, test_disabled=False, disabled=False, **kwargs)

Takes an instantiated object and encapsulates it into the PHOTON structure, add the disabled function and attaches information about the hyperparameters that should be tested

@classmethod
def create(cls, name, base_element, hyperparameters: dict, test_disabled=False, disabled=False, **kwargs):
    """
    Takes an instantiated object and encapsulates it into the PHOTON structure,
    add the disabled function and attaches information about the hyperparameters that should be tested
    """
    return PipelineElement(name, hyperparameters, test_disabled, disabled, base_element=base_element, **kwargs)

def stack_data(cls, a, b)

Helper method to horizontally join the outcome of each child

Parameters

  • a [ndarray]: The existing matrix
  • b [ndarray]: The matrix that is to be attached horizontally

Returns

New matrix, that is a and b horizontally joined

@classmethod
def stack_data(cls, a, b):
    """
    Helper method to horizontally join the outcome of each child
    Parameters
    ----------
    * `a` [ndarray]:
        The existing matrix
    * `b` [ndarray]:
        The matrix that is to be attached horizontally
    Returns
    -------
    New matrix, that is a and b horizontally joined
    """
    if not a.any():
        a = b
    else:
        # Todo: check for right dimensions!
        if a.ndim == 1 and b.ndim == 1:
            a = np.column_stack((a, b))
        else:
            b = np.reshape(b, (b.shape[0], 1))
            a = np.concatenate((a, b), 1)
    return a