Source code for lalegpl.lib.lale.nsga2

# Copyright 2021 IBM Corporation
#
# Licensed under the GNU General Public License 3.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.gnu.org/licenses/gpl-3.0.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import logging
import time

import jsonschema
import numpy as np
import pandas as pd
try:
    from platypus import (
        HUX,
        NSGAII,
        PM,
        SBX,
        Binary,
        BitFlip,
        CompoundOperator,
        Integer,
        Problem,
        Real,
        nondominated,
    )
except ImportError:
  raise ImportError("""NSGA2 needs a Python package called `platypus`. 
  You can install it using `pip install platypus` or install lalegpl[full] which will install it for you.""")

from sklearn.metrics import get_scorer
from sklearn.model_selection import check_cv, train_test_split

import lale.docstrings
import lale.operators
from lale.lib.lale._common_schemas import (
    check_scoring_best_score_constraint,
    schema_best_score,
    schema_cv,
    schema_estimator,
    schema_max_opt_time,
    schema_scoring_list,
)
from lale.lib.sklearn import LogisticRegression

logger = logging.getLogger(__name__)


# Exception for handling max optimization time
[docs]class MaxBudgetExceededException(Exception): pass
class _ModelHelper: def __init__(self, model): ( self.param_choices, self.param_type, self.param_categories, ) = self.__get_param_choices_types(model) type_map = { "number": Real, "integer": Integer, "boolean": Binary, "enum": Integer, } logger.info(self.param_choices) logger.info(self.param_type) logger.info(self.param_categories) types = [] for key in self.param_choices: if self.param_type[key] == "boolean": types.append(type_map[self.param_type[key]](1)) else: types.append( type_map[self.param_type[key]]( self.param_choices[key][0], self.param_choices[key][1] ) ) self.types = types logger.info(self.types) self.model = model def __get_param_choices_types(self, model): range_dict, cat_idx = model.get_param_ranges() param_choices = {} param_type = {} param_categories = {} for key in range_dict: if key not in cat_idx.keys(): minval, maxval, defval = range_dict[key] if minval == maxval: continue hp_schema = model.hyperparam_schema(key) if "type" in hp_schema.keys(): ptype = hp_schema["type"] else: # ptype = hp_schema['anyOf'][0]['type'] if isinstance(defval, int): ptype = "integer" elif isinstance(defval, float): ptype = "number" else: ptype = hp_schema["anyOf"][0]["type"] # if ptype == 'boolean': # continue param_choices[key] = [minval, maxval] param_type[key] = ptype else: minval, maxval, defval = cat_idx[key] if minval == maxval: continue param_choices[key] = [minval, maxval] param_type[key] = "enum" # for categorical inputs param_categories[key] = range_dict[key] return param_choices, param_type, param_categories def create_instance(self, parameter): logger.debug("Creating model instance with params: \n" f"{parameter}") clf = self.model.with_params(**parameter) return clf class _NSGA2Impl: def __init__( self, estimator=None, scoring=None, best_score=0.0, cv=5, max_evals=50, max_opt_time=None, population_size=10, random_seed=42, ): if estimator is None: self.model = LogisticRegression() else: self.model = estimator assert isinstance(self.model, lale.operators.IndividualOp), ( "Multi-objective optimization is supported for only " "Individual Operators currently and not supported over Pipelines." ) logger.info(f"Optimizing model {self.model} with type {type(self.model)}") logger.info("Lale param ranges - \n" f"{self.model.get_param_ranges()}") self.model_helper = _ModelHelper(self.model) self.moo_solutions = [] self.scoring = scoring assert self.scoring is not None, "scoring parameter not specified." assert len(self.scoring) >= 2, "Less than two scorers specified in scoring" if isinstance(best_score, list): if len(best_score) < len(scoring): best_score.extend([0.0] * (len(scoring) - len(best_score))) self.best_score = best_score else: self.best_score = [best_score] * len(scoring) self.cv = cv self.max_evals = max_evals self.max_opt_time = max_opt_time self.population_size = population_size self.random_seed = random_seed @classmethod def validate_hyperparams(cls, scoring=None, best_score=0, **hyperparams): check_scoring_best_score_constraint(scoring, best_score) # Internal class class Soln(object): def __init__(self, variables, objectives): self.variables = variables self.objectives = objectives # convert parameter list to dictionary def param_to_dict(self, parameter, param_choices, param_categories, param_type): temp = {} i = 0 for key in param_choices: if key not in param_categories.keys(): # if non-categorical parameter if param_type[key] == "boolean": temp[key] = parameter[i][0] else: temp[key] = parameter[i] else: temp[key] = param_categories[key][parameter[i]] i += 1 return temp def fit(self, X, y): opt_start_time = time.time() kfold = None if isinstance(self.cv, int) and self.cv == 1: X_train, X_val, y_train, y_val = train_test_split( X, y, test_size=0.2, random_state=self.random_seed, stratify=y ) logger.info( "Not using Cross-Validation. " "Performing single train/test split" ) else: is_clf = self.model.is_classifier() kfold = check_cv(self.cv, y=y, classifier=is_clf) # kfold = StratifiedKFold( # n_splits=self.cv, random_state=self.random_seed, shuffle=True # ) logger.info(f"Using Cross-Validation - {kfold}") self.ind = 0 def train_test_model(parameter): # First check if we exceeded allocated time budget current_time = time.time() elapsed_time = current_time - opt_start_time if (self.max_opt_time is not None) and (elapsed_time > self.max_opt_time): msg = ( f"Max optimization time exceeded. " f"Max Opt time = {self.max_opt_time}, Elapsed Time = {elapsed_time}, " f"NFE Completed - {self.ind}" ) raise MaxBudgetExceededException(msg) self.ind = self.ind + 1 logger.info(f"Training population {self.ind}") parameter = self.param_to_dict( parameter, self.model_helper.param_choices, self.model_helper.param_categories, self.model_helper.param_type, ) scorers = [get_scorer(scorer) for scorer in self.scoring] nscorers = len(scorers) try: if kfold is None: clf = self.model_helper.create_instance(parameter) clf_trained = clf.fit(X_train, y_train) obj_val = [scorer(clf_trained, X_val, y_val) for scorer in scorers] else: obj_scores = [[] for _ in range(nscorers)] # Perform k-fold cross-validation for train_index, test_index in kfold.split(X, y): if isinstance(X, pd.DataFrame): X_train_split, X_val_split = ( X.iloc[train_index], X.iloc[test_index], ) y_train_split, y_val_split = ( y.iloc[train_index], y.iloc[test_index], ) else: X_train_split, X_val_split = X[train_index], X[test_index] y_train_split, y_val_split = y[train_index], y[test_index] clf = self.model_helper.create_instance(parameter) clf_trained = clf.fit(X_train_split, y_train_split) obj_score = [ scorer(clf_trained, X_val_split, y_val_split) for scorer in scorers ] for i in range(nscorers): obj_scores[i].append(obj_score[i]) # Aggregate CV score obj_val = [np.mean(obj_scores[i]) for i in range(nscorers)] logger.debug(f"Obj k-fold scores - {obj_scores}") # By default we are solving a minimization MOO problem fitnessValue = [ self.best_score[i] - obj_val[i] for i in range(nscorers) ] logger.info(f"Train fitnessValue - {fitnessValue}") except jsonschema.ValidationError as e: logger.error(f"Caught JSON schema validation error.\n{e}") logger.error("Setting fitness (loss) values to infinity") fitnessValue = [np.inf for i in range(nscorers)] logger.info(f"Train fitnessValue - {fitnessValue}") return fitnessValue def time_check_callback(alg): current_time = time.time() elapsed_time = current_time - opt_start_time logger.info(f"NFE Complete - {alg.nfe}, Elapsed Time - {elapsed_time}") parameter_num = len(self.model_helper.param_choices) target_num = len(self.scoring) # Adjust max_evals if not a multiple of population size. This is # required as Platypus performs evaluations in multiples of # population_size. adjusted_max_evals = ( self.max_evals // self.population_size ) * self.population_size if adjusted_max_evals != self.max_evals: logger.info( f"Adjusting max_evals to {adjusted_max_evals} from specified {self.max_evals}" ) problem = Problem(parameter_num, target_num) problem.types[:] = self.model_helper.types problem.function = train_test_model # Set the variator based on types of decision variables varg = {} first_type = problem.types[0].__class__ all_type_same = all([isinstance(t, first_type) for t in problem.types]) # use compound operator for mixed types if not all_type_same: varg["variator"] = CompoundOperator(SBX(), HUX(), PM(), BitFlip()) algorithm = NSGAII( problem, population_size=self.population_size, **varg, ) try: algorithm.run(adjusted_max_evals, callback=time_check_callback) except MaxBudgetExceededException as e: logger.warning( f"Max optimization time budget exceeded. Optimization exited prematurely.\n{e}" ) solutions = nondominated(algorithm.result) # solutions = [s for s in algorithm.result if s.feasible]` # solutions = algorithm.result moo_solutions = [] for solution in solutions: vars = [] for pnum in range(parameter_num): vars.append(problem.types[pnum].decode(solution.variables[pnum])) vars_dict = self.param_to_dict( vars, self.model_helper.param_choices, self.model_helper.param_categories, self.model_helper.param_type, ) moo_solutions.append(self.Soln(vars_dict, solution.objectives)) logger.info(f"{vars}, {solution.objectives}") self.moo_solutions = moo_solutions pareto_models = [] for solution in self.moo_solutions: est = self.model_helper.create_instance(solution.variables) est_trained = est.fit(X, y) pareto_models.append((solution.variables, est_trained)) self.pareto_models = pareto_models return self def get_pareto_solutions(self): return self.moo_solutions def get_pareto_models(self): return self.pareto_models # Predict using first pareto-optimal estimator def predict(self, X, **kwargs): if "pipeline_name" in kwargs: pname = kwargs["pipeline_name"] pipeline = self.get_pipeline(pipeline_name=pname) del kwargs["pipeline_name"] else: pipeline = self.get_pipeline() return pipeline.predict(X, **kwargs) # Return pareto-optimal estimator def get_pipeline(self, pipeline_name=None, astype="lale"): """Retrieve one of the pareto-optimal pipelines. Parameters ---------- pipeline_name : union type, default None - string Key (name) from the table returned by summary(), return a trained pipeline. - None When not specified, return the first (trained) pipeline in the table returned by summary() astype : 'lale' or 'sklearn', default 'lale' Type of resulting pipeline. Returns ------- result : Trained operator.""" id = 0 if pipeline_name is not None: id = int(pipeline_name[1:]) assert 0 < len(self.pareto_models), "No pipelines found" assert id < len(self.pareto_models), "Invalid pipeline name" vars, pareto_model = self.pareto_models[id] result = pareto_model if astype == "lale": return result assert astype == "sklearn", "Invalid astype " + astype if hasattr(result, "export_to_sklearn_pipeline"): result = result.export_to_sklearn_pipeline() else: logger.warning("Cannot return sklearn pipeline.") return result def summary(self): """Table displaying the pareto-optimal solutions (pipelines) obtained after multi-objective optimization (name, ID, loss for each specified scorer). Returns ------- result : DataFrame""" nsolutions = len(self.moo_solutions) nscoring = len(self.scoring) records = [] for isol in range(nsolutions): record_dict = {} record_dict["name"] = f"p{isol}" record_dict["id"] = isol for iobj in range(nscoring): solution = self.moo_solutions[isol] record_dict[f"loss{iobj+1}"] = solution.objectives[iobj] records.append(record_dict) result = pd.DataFrame.from_records(records, index="name") return result _combined_schemas = { "$schema": "http://json-schema.org/draft-04/schema#", "description": """Multi Objective Optimizer based on NSGA-II algorithm. Example -------- >>> import lale.datasets.openml >>> (X_train, y_train), (X_test, y_test) = ... lale.datasets.openml.fetch('credit-g', 'classification', preprocess=True, astype='pandas') >>> >>> # Create sklearn scorer for computing FPR >>> def compute_fpr(y_true, y_pred): ... from sklearn.metrics import confusion_matrix ... tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel() ... fpr = round(fp / (fp + tn), 4) ... return fpr >>> >>> from sklearn.metrics import make_scorer >>> fpr_scorer = make_scorer(compute_fpr, greater_is_better=False) >>> >>> from lale.lib.xgboost import XGBClassifier >>> clf = XGBClassifier() >>> nsga2_args = {'estimator': clf, 'scoring': ['accuracy', fpr_scorer], ... 'best_score': [1, 0], 'cv': 3, ... 'max_evals': 20, 'population_size': 10} >>> opt = NSGA2(**nsga2_args) >>> trained = opt.fit(X_train, y_train) >>> # Predict using first pareto-optimal solution (pipeline) >>> predictions = trained.predict(X_test, pipeline_name='p0') >>> from sklearn.metrics import accuracy_score >>> acc = accuracy_score(y_test, predictions) >>> fpr = compute_fpr(y_test, predictions) >>> print('Accuracy, FPR - %.3f, %.3f' % (acc, fpr)) """, "documentation_url": "https://lale-gpl.readthedocs.io/en/latest/modules/lalegpl.lib.lale.nsga2.html", "import_from": "lalegpl.lib.lale", "type": "object", "tags": {"pre": [], "op": ["estimator"], "post": []}, "properties": { "hyperparams": { "allOf": [ { "type": "object", "properties": { "estimator": schema_estimator, "scoring": schema_scoring_list, "best_score": schema_best_score, "cv": schema_cv, "max_evals": { "description": "Number of trials of Hyperopt search.", "type": "integer", "minimum": 1, "default": 50, }, "max_opt_time": schema_max_opt_time, "population_size": {"default": 10}, "random_seed": {"default": 42}, }, "additionalProperties": False, "required": ["estimator", "scoring"], "relevantToOptimizer": [], } ] }, "input_fit": { "type": "object", "properties": { "X": {"laleType": "Any"}, "y": {"laleType": "Any"}, }, "additionalProperties": False, "required": ["X", "y"], }, "input_predict": { "type": "object", "properties": { "X": {"laleType": "Any"}, "pipeline_name": { "description": "Name of the pipeline to use for prediction", "anyOf": [ { "type": "string", "description": "Which pipeline to pick. Must be in the list returned by summary.", }, { "enum": [None], "description": "Run predict on the first pipeline.", }, ], }, }, "additionalProperties": True, "required": ["X"], }, "output_predict": {"laleType": "Any"}, }, } NSGA2 = lale.operators.make_operator(_NSGA2Impl, _combined_schemas, name="NSGA2") lale.docstrings.set_docstrings(NSGA2)