Source code for pyPhenology.models.ensemble_models

import numpy as np
from . import utils
from ..utils import load_model, load_model_parameters
from . import validation
from copy import deepcopy
import warnings
from joblib import Parallel, delayed

class EnsembleBase():
    def __init__(self):
        pass
    
    def score(self, metric='rmse', doy_observed=None,
              to_predict=None, predictors=None):
        """Get the scoring metric for fitted data
        Get the score on the dataset used for fitting (if fitting was done),
        otherwise set ``to_predict``, and ``predictors`` as used in
        ``model.predict()``. In the latter case score is calculated using
        observed values ``doy_observed``.
        Metrics available are root mean square error (``rmse``).
        Parameters:
            metric : str
                Currently only rmse is available for BootstrapModel
        """
        self._check_parameter_completeness()
        doy_estimated = self.predict(to_predict=to_predict,
                                     predictors=predictors)

        if doy_observed is None:
            doy_observed = self.observations.doy.values
        elif isinstance(doy_observed, np.ndarray):
            pass
        else:
            raise TypeError('Unknown doy_observed parameter type. expected ndarray, got ' + str(type(doy_observed)))

        error_function = utils.optimize.get_loss_function(method=metric)

        return error_function(doy_observed, doy_estimated)
    
    def save_params(self, filename, overwrite=False):
        """Save model parameters
        

        Parameters:
            filename : str
                Filename to save model to. Note this can be loaded again by
                passing the filename in the ``parameters`` argument, but only
                with the BootstrapModel.
            overwrite : bool
                Overwrite the file if it exists
        """
        self._check_parameter_completeness()

        utils.misc.write_saved_model(model_info=self._get_model_info(),
                                     model_file=filename,
                                     overwrite=overwrite)

    def _fit_job(self, model, **kwargs):
        """A wrapper to fit models within joblib.Parallel"""
        model.fit(self.observations, self.predictors, **kwargs)
        return model
    
    def _predict_job(self, model, to_predict, predictors, **kwargs):
        """A wrapper to predict new data within joblib.Parallel"""
        return model.predict(to_predict=to_predict, predictors=predictors, **kwargs)
    
    def ensemble_shape(self, shape=()):
        """Returns a tuple signifying the layers of submodels
        ie. for a single 50-bootstrap model the shape is (50,)
        for an ensemble of four 50-bootstrapped  models the shape is (4,50)
        """
        num_sub_models = len(self.model_list)
        if isinstance(self.model_list[0], EnsembleBase):
            return self.model_list[0].ensemble_shape(shape = shape + (num_sub_models,))
        else:
            return shape + (num_sub_models,)
    
[docs]class BootstrapModel(EnsembleBase): """Fit a model using bootstrapping of the data. Bootstrapping is a technique to estimate model uncertainty. Many models of the same form are fit, but each use a random selection, with replacement, of the data. Note that the core model must be passed uninitialized:: from pyPhenology import models thermaltime_bootstrapped = models.BootstrapModel(core_model = models.ThermalTime) Starting parameters for the core model can be adjusted as normal. For example to fix the start day of the ThermalTime model:: thermaltime_bootstrapped = models.BootstrapModel(core_model = models.ThermalTime, parameters = {'t1':1}) """
[docs] def __init__(self, core_model=None, num_bootstraps=None, parameters={}): """Bootstrap Model Parameters: core_model : pyPhenology model The model to fit n number of times. Must be uninitialized num_bootstraps : int Number of times to fit the model parameters : dictionary | filename, optional Parameter search ranges or fixed values for the core model. If a filename, then it must be a bootstrap model saved via ``save_params()``. Also if it is a saved model file then the core_model and num_bootstrap parameters are ignored. """ EnsembleBase.__init__(self) self.observations = None self.predictors = None self.model_list = [] if isinstance(parameters, str): # A filename pointing toward a file from save_params() # core_model and num_bootstraps is ignored model_info = utils.misc.read_saved_model(model_file=parameters) self._parse_fully_fitted_model(model_info) elif isinstance(parameters, dict): # A dictionary passed in the parameters argument can either be # a list of parameters to pass to the core model (such as in the # example above), OR a full fitted model specification (essentially # the read json from a saved model file). The latter is only used # if this is from another ensemble method if 'model_name' in parameters: # A saved model self._parse_fully_fitted_model(parameters) else: # Custom parameter values to pass to each bootstrap model if core_model is None or num_bootstraps is None: raise TypeError('core_model and num_bootstraps must be set') validation.validate_model(core_model()) for i in range(num_bootstraps): self.model_list.append(core_model(parameters=parameters)) elif isinstance(parameters, list): # If its the output of BootstrapModel._get_model_info() for bootstrap_iteration in parameters: bootstrap_iteration.pop('bootstrap_num') self.model_list.append(core_model(parameters=bootstrap_iteration)) else: raise TypeError('parameters must be str or dict, got: ' + str(type(parameters)))
def _parse_fully_fitted_model(self, model_info): # This loads a model from a saved json file if type(self).__name__ != model_info['model_name']: raise RuntimeError('Saved model file does not match model class. ' + 'Saved file is {a}, this model is {b}'.format(a=model_info['model_name'], b=type(self).__name__)) for bootstrap_iteration in model_info['parameters']: fitted_bootstrap_iteration = load_model_parameters(bootstrap_iteration) self.model_list.append(fitted_bootstrap_iteration) def _fit_job(self, model, **kwargs): """A wrapper to fit models within joblib.Parallel""" obs_shuffled = self.observations.sample(frac=1, replace=True).copy() model.fit(obs_shuffled, self.predictors, **kwargs) return model
[docs] def fit(self, observations, predictors, n_jobs=1, verbose=False, debug=False, **kwargs): """Fit the underlying core models Parameters: observations : dataframe pandas dataframe of phenology observations predictors : dataframe pandas dataframe of associated predictors n_jobs : int number of parallel processes to use kwargs : Other arguments passed to core model fitting (eg. optimzer methods) """ self.observations = observations self.predictors = predictors self.model_list = Parallel(n_jobs = n_jobs)(delayed(self._fit_job)(m, **kwargs) for m in self.model_list)
[docs] def predict(self, to_predict=None, predictors=None, aggregation='mean', n_jobs=1, **kwargs): """Make predictions from the bootstrapped models. Predictions will be made using each of the bootstrapped models. The final results will be the mean or median of all bootstraps, or all bootstrapped model results in 2d array. Parameters: aggregation : str Either 'mean','median', or 'none'. 'none' return *all* predictions in an array of size (num_bootstraps, num_samples) n_jobs : int number of parallel processes to use """ # Get the organized predictors. This is done from the 1st model in the # list, but since they're all the same model it should be valid for all. # Only works if there are new predictors. Otherwise the data used for # fitting will be used. # TODO: implement this # if predictors is not None: # predictors = self.model_list[0]._organize_predictors(observations=to_predict, # predictors=predictors, # for_prediction=True) if predictors is None: predictors = self.predictors if to_predict is None: to_predict = self.observations # If the models within this ensemble are themselves ensembles, # then let those models do the parallel stuff. if isinstance(self.model_list[0], EnsembleBase): predictions = [] for m in self.model_list: predictions.append(m.predict(to_predict = to_predict, predictors = predictors, aggregation = aggregation, n_jobs = n_jobs, **kwargs)) else: predictions = Parallel(n_jobs = n_jobs)(delayed(self._predict_job) (m, to_predict = to_predict, predictors = predictors, aggregation=aggregation, **kwargs) for m in self.model_list) predictions = np.array(predictions) if aggregation == 'mean': predictions = np.mean(predictions, 0) elif aggregation == 'median': predictions = np.median(predictions, 0) elif aggregation == 'none': pass else: raise ValueError('Unknown aggregation: ' + str(aggregation)) return predictions
def _check_parameter_completeness(self): """Make sure all parameters have been set from fitting or loading at initialization""" [m._check_parameter_completeness() for m in self.model_list]
[docs] def get_params(self): """This returns list of dictionaries with parameters of each bootstrap model """ self._check_parameter_completeness() all_params = [] for i, model in enumerate(self.model_list): all_params.append(deepcopy(model.get_params())) all_params[-1].update({'bootstrap_num': i}) return all_params
def _get_model_info(self): # essentially the same as get_params() but is in a formate capable of # being loaded again later by _parse_fully_fitted_model() core_model_info = [] for i, model in enumerate(self.model_list): core_model_info.append(deepcopy(model._get_model_info())) core_model_info[-1].update({'bootstrap_num': i}) return {'model_name': type(self).__name__, 'parameters': core_model_info}
[docs]class Ensemble(EnsembleBase): """Fit an ensemble of different models. This model can fit multiple models into an ensemble where the weights are equal among all ensemble members. Note that the core models must be passed initialized. They will be fit within the Ensemble model:: from pyPhenology import models, utils observations, predictors = utils.load_test_data(name='vaccinium') m1 = models.Thermaltime(parameters={'T':0}) m2 = models.Thermaltime(parameters={'T':5}) m3 = models.Uniforc(parameters={'t1':1}) m4 = models.Uniforc(parameters={'t1':30}) ensemble = models.Ensemble(core_models=[m1,m2,m3,m4]) ensemble.fit(observations, predictors) """
[docs] def __init__(self, core_models): """Ensemble model core_models : list of pyPhenology models, or a saved model file """ EnsembleBase.__init__(self) self.observations = None self.predictors = None if isinstance(core_models, list): # List of models to fit self.model_list = core_models elif isinstance(core_models, str): # A filename pointing toward a file from save_params() model_info = utils.misc.read_saved_model(model_file=core_models) self._parse_fully_fitted_model(model_info) elif isinstance(core_models, dict): # A saved model file read by another model or utility self._parse_fully_fitted_model(core_models) else: raise TypeError('core_models must be list of pyPhenology models', 'or a filename for a saved model')
def _parse_fully_fitted_model(self, model_info): # This loads a model from a saved json file if type(self).__name__ != model_info['model_name']: raise RuntimeError('Saved model file does not match model class. ', 'Saved file is {a}, this model is {b}'.format(a=model_info['model_name'], b=type(self).__name__)) self.model_list = [] for model in model_info['core_models']: fitted_ensemble_member = load_model_parameters(model) self.model_list.append(fitted_ensemble_member)
[docs] def fit(self, observations, predictors, n_jobs=1, verbose=False, debug=False, **kwargs): """Fit the underlying core models Parameters: observations : dataframe pandas dataframe of phenology observations predictors : dataframe pandas dataframe of associated predictors n_jobs : int number of parallel processes to use kwargs : Other arguments passed to core model fitting (eg. optimzer methods) """ self.observations = observations self.predictors = predictors self.model_list = Parallel(n_jobs = n_jobs)(delayed(self._fit_job)(m, **kwargs) for m in self.model_list)
[docs] def predict(self, to_predict=None, predictors=None, aggregation='mean', n_jobs=1, **kwargs): """Make predictions.. Predictions will be made using each core models, then a final prediction for each observation using the specified aggregation method. Parameters: see core model description aggregation : str Either 'mean', 'median', or 'none'. If using 'none' this returns an array of tuple of size (number of members, predictions). n_jobs : int number of parallel processes to use """ if predictors is None: predictors = self.predictors if to_predict is None: to_predict = self.observations # If the models within this ensemble are themselves ensembles, # then let those models do the parallel stuff. if isinstance(self.model_list[0], EnsembleBase): predictions = [] for m in self.model_list: predictions.append(m.predict(to_predict = to_predict, predictors = predictors, aggregation = aggregation, n_jobs = n_jobs, **kwargs)) else: predictions = Parallel(n_jobs = n_jobs)(delayed(self._predict_job) (m, to_predict = to_predict, predictors = predictors, aggregation=aggregation, **kwargs) for m in self.model_list) predictions = np.array(predictions) if aggregation == 'mean': predictions = np.mean(predictions, 0) elif aggregation == 'median': predictions = np.median(predictions, 0) elif aggregation == 'none': pass else: raise ValueError('Unknown aggregation: ' + str(aggregation)) return predictions
def _check_parameter_completeness(self): """Make sure all parameters have been set from fitting or loading at initialization""" [m._check_parameter_completeness() for m in self.model_list]
[docs] def get_params(self): """This returns list of dictionaries with parameters of each model in the ensemble """ self._check_parameter_completeness() all_params = [] for i, model in enumerate(self.model_list): all_params.append(deepcopy(model.get_params())) return all_params
def _get_model_info(self): # essentially the same as get_params() but is in a formate capable of # being loaded again later by _parse_fully_fitted_model() core_model_info = [] for i, model in enumerate(self.model_list): core_model_info.append(deepcopy(model._get_model_info())) return {'model_name': type(self).__name__, 'core_models': core_model_info}
[docs]class WeightedEnsemble(EnsembleBase): """Fit an ensemble of many models with associated weights This model can combine multiple models into an ensemble where predictions are the weighted average of the predictions from each model. The weights are derived via "stacking" as described in Dormann et al. 2018. The steps are as followed: 1. Subset the data into random training/testing sets. 2. Fit each core model on the training set. 3. Make predictions on the testing set. 4. Find the weights which minimize RMSE of the testing set. 5. Repeat 1-4 for H iterations. 6. Take the average weight for each model from all iterations as final weight used in the ensemble. These will sum to 1. 7. Fit the core models a final time on the full dataset given to the fit() method. Parameters derived from this final iterations will be used to make predictions. Note that the core models must be passed initialized. They will be fit within the Weighted Ensemble model:: from pyPhenology import models, utils observations, predictors = utils.load_test_data(name='vaccinium') m1 = models.Thermaltime(parameters={'T':0}) m2 = models.Thermaltime(parameters={'T':5}) m3 = models.Thermaltime(parameters={'T':-5}) m4 = models.Thermaltime(parameters={'T':10}) m5 = models.Uniforc(parameters={'t1':1}) m6 = models.Uniforc(parameters={'t1':30}) m7 = models.Uniforc(parameters={'t1':60}) ensemble = models.WeightedEnsemble(core_models=[m1,m2,m3,m4,m5,m6,m7]) ensemble.fit(observations, predictors) Notes: Dormann, Carsten F., et al. Model averaging in ecology: a review of Bayesian, informationā€theoretic and tactical approaches for predictive inference. Ecological Monographs. https://doi.org/10.1002/ecm.1309 """
[docs] def __init__(self, core_models): """Weighted Ensemble model core_models : list of pyPhenology models, or a saved model file """ EnsembleBase.__init__(self) self.observations = None self.predictors = None if isinstance(core_models, list): # List of models to fit self.model_list = core_models self.weights = np.array([None] * len(core_models)) elif isinstance(core_models, str): # A filename pointing toward a file from save_params() model_info = utils.misc.read_saved_model(model_file=core_models) self._parse_fully_fitted_model(model_info) elif isinstance(core_models, dict): # A saved model file read by another model or utility self._parse_fully_fitted_model(core_models) else: raise TypeError('core_models must be list of pyPhenology models', 'or a filename for a saved model')
def _parse_fully_fitted_model(self, model_info): # This loads a model from a saved json file if type(self).__name__ != model_info['model_name']: raise RuntimeError('Saved model file does not match model class. ', 'Saved file is {a}, this model is {b}'.format(a=model_info['model_name'], b=type(self).__name__)) self.model_list = [] self.weights = [] for model in model_info['core_models']: Model = load_model(model['model_name']) self.model_list.append(Model(parameters=model['parameters'])) self.weights.append(model['weight']) self.weights = np.array(self.weights) def _fit_job(self, model, observations, predictors, **kwargs): """A wrapper to fit models within joblib.Parallel""" model.fit(observations, predictors, **kwargs) return model
[docs] def fit(self, observations, predictors, iterations=10, held_out_percent=0.2, loss_function='rmse', method='DE', optimizer_params='practical', n_jobs=1, verbose=False, debug=False): """Fit the underlying core models Parameters: observations : dataframe pandas dataframe of phenology observations predictors : dataframe pandas dataframe of associated predictors iterations : int Number of stacking iterations to use. held_out_percent : float Percent of randomly held out data to use in each stacking iteration. Must be between 0 and 1. n_jobs : int number of parallel processes to use kwargs : Other arguments passed to core model fitting (eg. optimzer methods) """ self.observations = observations self.predictors = predictors self.fitted_weights = np.empty((iterations, len(self.model_list))) loss = utils.optimize.get_loss_function(loss_function) weight_bounds = [(1,10)] * len(self.model_list) translate_scipy_weights = lambda w: np.array(w) with Parallel(n_jobs = n_jobs) as parallel: for i in range(iterations): held_out_observations = self.observations.sample(frac=held_out_percent, replace=False) training_observations = self.observations[~self.observations.index.isin(held_out_observations.index)] # Fit every model with a new set of random training data self.model_list = parallel(delayed(self._fit_job) (m, training_observations, predictors, loss_function=loss_function, method=method, optimizer_params=optimizer_params, verbose=verbose, debug=debug) for m in self.model_list) # Predict with every model on a new set of random test data held_out_predictions = [model.predict(held_out_observations, predictors=predictors) for model in self.model_list] held_out_predictions = np.array(held_out_predictions).T # Special funtion for use inside scipy.optimize routines def weighted_loss(w): w = np.array([w]) w = w/w.sum() pred = (held_out_predictions * w).sum(1) return loss(held_out_observations.doy.values, pred) # Find the weights of each model with minimize RMSE iteration_weights = utils.optimize.fit_parameters(function_to_minimize = weighted_loss, bounds = weight_bounds, method='DE', optimizer_params=optimizer_params, results_translator=translate_scipy_weights) iteration_weights = iteration_weights / iteration_weights.sum() self.fitted_weights[i] = iteration_weights self.weights = self.fitted_weights.mean(0) # Sum of weights should equal or be extremely close to 1 summed_weights = self.weights.sum().round(5) if summed_weights != 1.0: raise RuntimeError('Weights do not sum to 1, got '+str(summed_weights)) # Refit the core models one last time to the full dataset. These # fitted models will be compbined with the weights for predictions self.model_list = parallel(delayed(self._fit_job) (m, observations, predictors, loss_function=loss_function, method=method, optimizer_params=optimizer_params, verbose=verbose, debug=debug) for m in self.model_list)
[docs] def predict(self, to_predict=None, predictors=None, aggregation = 'mean', n_jobs=1, **kwargs): """Make predictions.. Predictions will be made using each core models, then a final average model derrived using the fitted weights. Parameters: see core model description aggregation : str Either 'weighted_mean' to get a normal prediciton, or 'none' to get predictions for all models. If using 'none' this returns a tuple of (weights, predictions). n_jobs : int number of parallel processes to use """ self._check_parameter_completeness() if not isinstance(aggregation, str): raise TypeError('aggregation should be a string. got: ' + str(type(aggregation))) if predictors is None and to_predict is None: predictors = self.predictors to_predict = self.observations predictions = Parallel(n_jobs = n_jobs)(delayed(self._predict_job) (m, to_predict = to_predict, predictors = predictors, aggregation=aggregation, **kwargs) for m in self.model_list) predictions = np.array(predictions) if aggregation=='mean': # Transpose to align the model axis with the 1D weight array # then transpose back to sum the weighted predictions. return (predictions.T * self.weights).T.sum(0) elif aggregation=='none': return self.weights, predictions else: raise ValueError('Unknown aggregation: ' + str(aggregation))
def _check_parameter_completeness(self): """Make sure all parameters have been set from fitting or loading at initialization""" if not len(self.weights) == len(self.model_list): raise RuntimeError('Model not fit') [m._check_parameter_completeness() for m in self.model_list] def get_params(self): self._check_parameter_completeness() core_model_info = [] for i, model in enumerate(self.model_list): core_model_info.append(deepcopy(model._get_model_info())) core_model_info[-1].update({'weight': self.weights[i]}) return core_model_info def _get_model_info(self): # essentially the same as get_params() but is in a format capable of # being loaded again later by _parse_fully_fitted_model() return {'model_name': type(self).__name__, 'core_models': self.get_params()} def get_weights(self): self._check_parameter_completeness() return self.weights