#!/usr/bin/env python3
import os, sys, random, warnings
import numpy as np
import pymetamodels.obj_data as obj_data
[docs]
class objsamplingsensitivity(object):
"""Python class representing the sampling and sensitivity analysis
:platform: Windows
:synopsis: object optimization calculation
:Dependences: SALib
:ivar version: optimization model version
:ivar conf_level: the confidence interval level (default 0.95)
.. note::
* See tutorials :ref:`Tutorials <pymetamodels_tutoriales>`
|
"""
## Common functions
###################
def __init__(self, model):
self.version = 0
self.model = model
self.conf_level = 0.95
def ini_analisis_type(self):
## Initialise and return dictionary with the sampling and sensitivity anaylis info
out = obj_data.objdata()
ID = "Sobol"
out[ID] = obj_data.objdata()
out[ID]["Analysis name"] = "Sobol sensitivity analysis"
out[ID]["Sampling name"] = "Saltelli sampling"
out[ID]["References"] = ["SOBOL2001271","Saltelli2010","Campolongo2011a","Saltelli2002"]
ID = "Morris"
out[ID] = obj_data.objdata()
out[ID]["Analysis name"] = "Morris Analysis"
out[ID]["Sampling name"] = "Method of Morris"
out[ID]["References"] = ["Ruano2012","Campolongo2007","Morris1991"]
ID = "RBD-Fast"
out[ID] = obj_data.objdata()
out[ID]["Analysis name"] = "Latin hypercube sampling (LHS)"
out[ID]["Sampling name"] = "RBD-FAST - Random balance designs fourier amplitude sensitivity test"
out[ID]["References"] = ["McKay2000a","Iman1981","Tarantola2006","Plischke2010","Tissot2012"]
ID = "Fast"
out[ID] = obj_data.objdata()
out[ID]["Analysis name"] = "FAST - Fourier amplitude sensitivity Test"
out[ID]["Sampling name"] = "Fourier amplitude sensitivity test model (eFAST)"
out[ID]["References"] = ["Cukier1973","Saltelli1999"]
ID = "Delta-MIM"
out[ID] = obj_data.objdata()
out[ID]["Analysis name"] = "Delta moment-independent measure"
out[ID]["Sampling name"] = "Latin hypercube sampling (LHS)"
out[ID]["References"] = ["McKay2000a","Iman1981","Borgonovo2007","Plischke2013"]
ID = "DGSM"
out[ID] = obj_data.objdata()
out[ID]["Analysis name"] = "Derivative-based Global Sensitivity Measure (DGSM)"
out[ID]["Sampling name"] = "Sampling for derivative-based global sensitivity measure (DGSM)"
out[ID]["References"] = ["Sobol2009","Sobol2010"]
ID = "Factorial"
out[ID] = obj_data.objdata()
out[ID]["Analysis name"] = "Fractional factorial"
out[ID]["Sampling name"] = "Fractional factorial sampling"
out[ID]["References"] = ["Saltelli2008"]
ID = "PAWN"
out[ID] = obj_data.objdata()
out[ID]["Analysis name"] = "PAWN sensitivity analysis"
out[ID]["Sampling name"] = "Latin hypercube sampling (LHS)"
out[ID]["References"] = ["Pianosi2015","Pianosi2018","Baroni2020"]
"""
# Not working
ID = "HDMR"
out[ID] = obj_data.objdata()
out[ID]["Analysis name"] = "High-dimensional model representation (HDMR)"
out[ID]["Sampling name"] = "Latin hypercube sampling (LHS)"
out[ID]["References"] = ["Rabitz2010"]
"""
return out
def _run_analisis(self, case, sensitivity = True, sampling = True):
## Run the sesitivity or sampling routines
# Grab the sensitivity method
sensitivity_type = self.sensitivity_type(case)
# Define the problem
self.problem_definition(case)
# Run sensitivity anlysis
if sensitivity_type == "Sobol":
if sampling: self.sampling_sobol(case)
if sensitivity: self.sensi_sobol(case)
elif sensitivity_type == "Morris":
if sampling: self.sampling_morris(case)
if sensitivity: self.sensi_morris(case)
elif sensitivity_type == "RBD-Fast":
if sampling: self.sampling_RBD_Fast(case)
if sensitivity: self.sensi_RBD_Fast(case)
elif sensitivity_type == "Fast":
if sampling: self.sampling_Fast(case)
if sensitivity: self.sensi_Fast(case)
elif sensitivity_type == "Delta-MIM":
if sampling: self.sampling_delta(case)
if sensitivity: self.sensi_delta(case)
elif sensitivity_type == "DGSM":
if sampling: self.sampling_dgsm(case)
if sensitivity: self.sensi_dgsm(case)
elif sensitivity_type == "Factorial":
if sampling: self.sampling_factorial(case)
if sensitivity: self.sensi_factorial(case)
elif sensitivity_type == "PAWN":
if sampling: self.sampling_PAWN(case)
if sensitivity: self.sensi_PAWN(case)
elif sensitivity_type == "HDMR":
if sampling: self.sampling_hdmr(case)
if sensitivity: self.sensi_hdmr(case)
else:
self.error("Sensibility analyisis type unknown %s" % sensitivity_type)
def normalize_sensitivity(self, case, out_var, var_in):
## Method of sensitivity analysis normalisation
# doi::10.1007/s12273-015-0245-4
# Grab the sensitivity method
sensitivity_type = self.sensitivity_type(case)
[s1,cov] = self.model.SiY_key_vals(case)
# get arr
lst = self.model.vars_keys(case, not_cte = True)
s1_arr = np.asarray(self.model.case[case][self.model.si_out][out_var][s1].copy())
cov_arr = np.asarray(self.model.case[case][self.model.si_out][out_var][cov].copy())
np.seterr(divide='ignore', invalid='ignore')
if sensitivity_type == "PAWN":
cov_arr1 = cov_arr
else:
cov_arr1 = cov_arr / s1_arr
np.seterr(divide='warn', invalid='warn')
s1_arr1 = np.abs(s1_arr) / np.max(np.abs(s1_arr))
# list of order of importance
lsto = s1_arr1.tolist()
lst_ord= sorted(range(len(lsto)), key=lambda k: lsto[k])
for ii in range(0,len(lst_ord)):
lsto[lst_ord[ii]] = len(lst_ord)-ii
s1_arr1_ord = np.asarray(lsto, dtype=type(float(0)))
#print(s1_arr1,s1_arr1_ord, len(s1_arr1)==len(s1_arr1_ord))
#
indx = lst.index(var_in)
return (s1_arr1[indx], cov_arr1[indx], s1_arr[indx], s1_arr1_ord[indx])
def problem_definition(self, case):
## Builds the problem definition
lst_vars = self.model.vars_keys(case, not_cte = True)
num_vars = len(lst_vars)
bounds = self.model.vars_bounds(case, lst_vars)
dist = self.model.vars_dist(case, lst_vars)
problem = {}
problem['num_vars'] = num_vars
problem['names'] = lst_vars
problem['bounds'] = bounds
problem['dists'] = dist
self.model.case[case]["problem"] = problem
##
def sensitivity_type(self, case):
## Returns the sensitivity type analysis
sensitivity_type = self.model.case[case][self.model.sensitivity_method]
if sensitivity_type not in self.model.v_analisis_type.keys():
self.error("The sensitivity type is not recognize: %s" % sensitivity_type)
return sensitivity_type
def add_Si_inputs(self, case, Si, key):
##Add Si analysis to data, one per Y variables
if not self.model.si_out in self.model.case[case].keys():
self.model.case[case][self.model.si_out] = obj_data.objdata()
self.model.case[case][self.model.si_out][key] = Si
def add_doe_inputs(self, case, X):
## Add sampling arrays to data doe as numpy arrays
# doe_in
self.model.case[case][self.model.doe_in] = obj_data.objdata()
ii = 0
for key in self.model.case[case]["problem"]["names"]:
self.model.case[case][self.model.doe_in][key] = X[:,ii].copy()
ii = ii + 1
for key in self.model.vars_keys(case, not_cte = False):
if self.model.case[case][self.model.vars_key][key] is None:
vval = np.zeros(X.shape[0])
vval[:] = np.nan
else:
vval = np.zeros(X.shape[0]) + self.model.case[case][self.model.vars_key][key]
self.model.case[case][self.model.doe_in][key] = vval
# doe_out
self.model.case[case][self.model.doe_out] = obj_data.objdata()
for key in self.model.vars_out_keys(case):
if self.model.case[case][self.model.vars_out_key][key] is None:
vval = np.zeros(X.shape[0])
vval[:] = np.nan
elif not self.model.is_number(self.model.case[case][self.model.vars_out_key][key]):
vval = np.zeros(X.shape[0])
vval[:] = np.nan
else:
vval = np.zeros(X.shape[0]) + self.model.case[case][self.model.vars_out_key][key]
self.model.case[case][self.model.doe_out][key] = vval
def doe_inputs_X(self, case):
## Return sampling arrays in the X format
# obtain the shape of the X format array
lst = []
ii = 0
for key in self.model.case[case]["problem"]["names"]:
lst1 = []
lst1.append(key)
lst1.append(self.model.case[case][self.model.doe_in][key].shape[0])
lst1.append(ii)
ii = ii + 1
lst.append(lst1)
rows = None
for ll in lst:
if rows is None:
rows = ll[1]
else:
if rows != ll[1]:
self.error("Error on arrays shape")
cols = len(lst)
# generate the X array
X= np.empty((rows,cols))
ii = 0
for key in self.model.case[case]["problem"]["names"]:
X[:,ii] = self.model.case[case][self.model.doe_in][key][:]
ii = ii + 1
return X
def sampling_sobol(self, case):
import SALib.sample.saltelli as saltelli
## Runs DOE build up
problem = self.model.case[case]["problem"]
N = self.nextPowerOf2(int(self.model.case[case][self.model.samples]))
calc_second_order = True
skip_values = None
X = saltelli.sample(problem, N,
calc_second_order = calc_second_order, skip_values=skip_values)
self.msg("Case %s sampling value %i by Sobol method. DOEX shape %s" % (case,N,str(X.shape)))
self.add_doe_inputs(case, X)
return X
def sensi_sobol(self, case):
import SALib.analyze.sobol as sobol
## Runs semsitivity analysis SOBOL
problem = self.model.case[case]["problem"]
calc_second_order = True
skip_values = None
num_resamples = 100
conf_level = self.conf_level
print_to_console = False
for key in self.model.vars_out_keys(case):
Y = self.model.case[case][self.model.doe_out][key]
Si = sobol.analyze(problem, Y, calc_second_order,
num_resamples=num_resamples, conf_level=conf_level, print_to_console=print_to_console,
parallel=False, n_processors=None, keep_resamples=False, seed=None)
self.add_Si_inputs(case, Si, key)
pass
def sampling_morris(self, case):
import SALib.sample.morris as morris
## Runs DOE build up
problem = self.model.case[case]["problem"]
N = self.nextPowerOf2(int(self.model.case[case][self.model.samples]))
num_levels = 4
X = morris.sample(problem, N, num_levels = num_levels,
optimal_trajectories = None, local_optimization = True, seed = None)
self.msg("Case %s sampling value %i by Morris method. DOEX shape %s" % (case,N,str(X.shape)))
self.add_doe_inputs(case, X)
return X
def sensi_morris(self, case):
import SALib.analyze.morris as morris
## Runs semsitivity analysis Morris
problem = self.model.case[case]["problem"]
num_resamples = 100
conf_level = self.conf_level
print_to_console = False
num_levels = 4
for key in self.model.vars_out_keys(case):
X = self.doe_inputs_X(case)
Y = self.model.case[case][self.model.doe_out][key]
Si = morris.analyze(problem, X, Y, num_resamples = num_resamples,
conf_level = conf_level, print_to_console = print_to_console,
num_levels = num_levels, seed=None)
self.add_Si_inputs(case, Si, key)
pass
def sampling_RBD_Fast(self, case):
import SALib.sample.latin as latin
## Runs DOE build up
problem = self.model.case[case]["problem"]
N = self.nextPowerOf2(int(self.model.case[case][self.model.samples]))
X = latin.sample(problem, N, seed = None)
self.msg("Case %s sampling value %i by latin Hypercube for RBD-FAST. DOEX shape %s" % (case,N,str(X.shape)))
self.add_doe_inputs(case, X)
return X
def sensi_RBD_Fast(self, case):
import SALib.analyze.rbd_fast as rbd_fast
## Runs semsitivity analysis RBd_Fast
problem = self.model.case[case]["problem"]
M = 10
num_resamples = 100
conf_level = self.conf_level
print_to_console = False
for key in self.model.vars_out_keys(case):
X = self.doe_inputs_X(case)
Y = self.model.case[case][self.model.doe_out][key]
Si = rbd_fast.analyze(problem, X, Y, M = M, num_resamples = num_resamples,
conf_level = conf_level, print_to_console = print_to_console, seed=None)
self.add_Si_inputs(case, Si, key)
pass
def sampling_Fast(self, case):
import SALib.sample.fast_sampler as fast_sampler
## Runs DOE build up
problem = self.model.case[case]["problem"]
N = self.nextPowerOf2(int(self.model.case[case][self.model.samples]))
M = 4
X = fast_sampler.sample(problem, N, M = M, seed = None)
self.msg("Case %s sampling value %i by latin Hypercube for FAST. DOEX shape %s" % (case,N,str(X.shape)))
self.add_doe_inputs(case, X)
return X
def sensi_Fast(self, case):
import SALib.analyze.fast as fast
## Runs semsitivity analysis Fast
problem = self.model.case[case]["problem"]
M = 4
num_resamples = 100
conf_level = self.conf_level
print_to_console = False
for key in self.model.vars_out_keys(case):
X = self.doe_inputs_X(case)
Y = self.model.case[case][self.model.doe_out][key]
Si = fast.analyze(problem, Y, M=M, num_resamples=num_resamples, conf_level=conf_level,
print_to_console=print_to_console, seed=None)
self.add_Si_inputs(case, Si, key)
pass
def sampling_delta(self, case):
import SALib.sample.latin as latin
## Runs DOE build up
problem = self.model.case[case]["problem"]
N = self.nextPowerOf2(int(self.model.case[case][self.model.samples]))
X = latin.sample(problem, N, seed = None)
self.msg("Case %s sampling value %i by latin Hypercube for Delta-MIM. DOEX shape %s" % (case,N,str(X.shape)))
self.add_doe_inputs(case, X)
return X
def sensi_delta(self, case):
import SALib.analyze.delta as delta
## Runs semsitivity analysis delta
problem = self.model.case[case]["problem"]
M = 4
num_resamples = 100
conf_level = self.conf_level
print_to_console = False
for key in self.model.vars_out_keys(case):
X = self.doe_inputs_X(case)
Y = self.model.case[case][self.model.doe_out][key]
Si = delta.analyze(problem, X, Y, num_resamples=num_resamples, conf_level=conf_level,
print_to_console=print_to_console, seed=None)
self.add_Si_inputs(case, Si, key)
pass
def sampling_dgsm(self, case):
import SALib.sample.finite_diff as finite_diff
## Runs DOE build up
problem = self.model.case[case]["problem"]
N = self.nextPowerOf2(int(self.model.case[case][self.model.samples]))
delta = 0.01
skip_values = 1024
X = finite_diff.sample(problem, N, delta = delta, seed = None, skip_values = skip_values)
self.msg("Case %s sampling value %i by DGSM. DOEX shape %s" % (case,N,str(X.shape)))
self.add_doe_inputs(case, X)
return X
def sensi_dgsm(self, case):
import SALib.analyze.dgsm as dgsm
## Runs semsitivity analysis dgsm
problem = self.model.case[case]["problem"]
M = 4
num_resamples = 100
conf_level = self.conf_level
print_to_console = False
for key in self.model.vars_out_keys(case):
X = self.doe_inputs_X(case)
Y = self.model.case[case][self.model.doe_out][key]
Si = dgsm.analyze(problem, X, Y, num_resamples=num_resamples, conf_level=conf_level,
print_to_console=print_to_console, seed=None)
self.add_Si_inputs(case, Si, key)
pass
def sampling_factorial(self, case):
import SALib.sample.ff as ff
## Runs DOE build up
problem = self.model.case[case]["problem"]
N = self.nextPowerOf2(int(self.model.case[case][self.model.samples]))
X = ff.sample(problem, seed = None)
self.msg("Case %s sampling value %i by Factorial. DOEX shape %s" % (case,N,str(X.shape)))
self.add_doe_inputs(case, X)
return X
def sensi_factorial(self, case):
import SALib.analyze.ff as ffa
## Runs semsitivity analysis factorial
problem = self.model.case[case]["problem"]
second_order = False
print_to_console = False
for key in self.model.vars_out_keys(case):
X = self.doe_inputs_X(case)
Y = self.model.case[case][self.model.doe_out][key]
Si = ffa.analyze(problem, X, Y, second_order=second_order,
print_to_console=print_to_console, seed=None)
self.add_Si_inputs(case, Si, key)
pass
def sampling_PAWN(self, case):
import SALib.sample.latin as latin
## Runs DOE build up
problem = self.model.case[case]["problem"]
N = self.nextPowerOf2(int(self.model.case[case][self.model.samples]))
X = latin.sample(problem, N, seed = None)
self.msg("Case %s sampling value %i by latin Hypercube for PAWN. DOEX shape %s" % (case,N,str(X.shape)))
self.add_doe_inputs(case, X)
return X
def sensi_PAWN(self, case):
import SALib.analyze.pawn as pawn
## Runs semsitivity analysis pawn
problem = self.model.case[case]["problem"]
S = 10
print_to_console = False
for key in self.model.vars_out_keys(case):
X = self.doe_inputs_X(case)
Y = self.model.case[case][self.model.doe_out][key]
Si = pawn.analyze(problem, X, Y, S = S,
print_to_console=print_to_console, seed=None)
self.add_Si_inputs(case, Si, key)
pass
def sampling_hdmr(self, case):
import SALib.sample.latin as latin
## Runs DOE build up
problem = self.model.case[case]["problem"]
N = self.nextPowerOf2(int(self.model.case[case][self.model.samples]))
X = latin.sample(problem, N, seed = None)
self.msg("Case %s sampling value %i by latin Hypercube for HDMR. DOEX shape %s" % (case,N,str(X.shape)))
self.add_doe_inputs(case, X)
return X
def sensi_hdmr(self, case):
import SALib.analyze.hdmr as hdmr
## Runs semsitivity analysis pawn
problem = self.model.case[case]["problem"]
maxorder = 2
maxiter = 100
m = 2
K = 20
print_to_console = False
for key in self.model.vars_out_keys(case):
X = self.doe_inputs_X(case)
Y = self.model.case[case][self.model.doe_out][key]
Si = hdmr.analyze(problem, X, Y, maxorder = maxorder, maxiter = maxiter, m = m, K = K,
R = None, alpha = 0.95, lambdax = 0.01,
print_to_console = print_to_console, seed = None)
self.add_Si_inputs(case, Si, key)
pass
#### Other functions
#####################################
def error(self, msg):
print("Error: " + msg)
if self.model.objlog:
self.model.objlog.warning("ID04 - %s" % msg)
raise ValueError(msg)
def msg(self, msg):
print("Msg: " + msg)
if self.model.objlog:
self.model.objlog.info("ID04 - %s" % msg)
def nextPowerOf2(self, n):
# Finds next power of two
# for n. If n itself is a
# power of two then returns n
n -= 1
n |= n >> 1
n |= n >> 2
n |= n >> 4
n |= n >> 8
n |= n >> 16
n += 1
return n