Source code for cg_openmm.parameters.optimize

from statistics import mean

import matplotlib.pyplot as pyplot
import mdtraj as md
import numpy as np
from cg_openmm.parameters.evaluate_energy import *
from cg_openmm.simulation.tools import *
from cg_openmm.thermo.calc import bootstrap_heat_capacity
from openmm import unit
from scipy.optimize import minimize, minimize_scalar


[docs]def optimize_force_field_parameters_Cv_FWHM(cgmodel, file_list, temperature_list, param_bounds_dict, frame_begin=0, frame_end=-1, sample_spacing=1, sparsify_stride=1, output_data='output.nc', verbose=False, n_cpu=12, min_eff_samples=50, n_trial_boot=200, num_intermediate_states=0, plotfile='optimize_FWHM_iterations.pdf', min_method='TNC'): """ Generalized function for optimizating a set of cgmodel force field parameters to minimize the full-width half-maximum (FWHM) of heat capacity as a function of temperature. Energy re-evaluation and MBAR reweighting is used in place of a full simulation whenever possible. :param cgmodel: CGModel() class object used to generate the original simulation energies :type cgmodel: class :param file_list: List of replica trajectory files to evaluate the energies of :type file_list: list or str :param temperature_list: List of temperatures associated with file_list :type temperature_list: List( float * simtk.unit.temperature ) :param param_bounds_dict: dictionary containing force field parameter names and bounds. Parameters not specified will use the fixed values defined in the cgmodel. :type param_bounds_dict: dict{'param_name': (bound_lo * simtk.unit, bound_hi * simtk.unit)} :param frame_begin: analyze starting from this frame, discarding all prior as equilibration period (default=0) :type frame_begin: int :param frame_end: analyze up to this frame only, discarding the rest (default=-1). :type frame_end: int :param sample_spacing: spacing of uncorrelated data points, for example determined from pymbar timeseries subsampleCorrelatedData (default=1) :type sample_spacing: int :param sparsify_stride: advance by this many frames between each energy evaluation (default=1) :type sparsify_stride: int :param verbose: option to print out detailed per-particle parameter changes (default=False) :type verbose: Boolean :param n_cpu: number of cpus for running parallel energy evaluations (default=1) :type n_cpu: int :param min_eff_samples: minimum number of effective samples determined from the mbar object, below which a full simulation is needed (default=50) :type min_eff_samples: int :param min_method: SciPy minimize method to use (default='TNC') :type min_method: str :param num_intermediate_states: The number of states to insert between existing states in 'temperature_list' (default=0) :type num_intermediate_states: int :param plotfile: path to filename to output plot (default='optimize_FWHM_iterations.pdf') :type plotfile: str :returns: - opt_param_dict - a dictionary containing the optimized force field parameters - opt_results - scipy minimize results summary - opt_FWHM - value of heat capacity full-width half-maximum for the optimal parameters """ # Parse the force field parameter change dict: x0 = [] param_names = [] bounds = [] units = [] for key,value in param_bounds_dict.items(): # value should be [(bound_lo, bound_hi)] # key should be a valid force field parameter name param_names.append(key) # Every parameter except periodicity should have units # For now, changing periodicity is not supported. # TODO: add support for sums of periodic torsion terms units.append(value[0].unit) bounds.append((value[0].value_in_unit(units[-1]),value[1].value_in_unit(units[-1]))) # Use mean value as starting guess: x0.append((value[1].value_in_unit(units[-1])+value[0].value_in_unit(units[-1]))/2) if verbose: print(f'param_names: {param_names}') print(f'unit: {units}') print(f'bounds: {bounds}') print(f'x0: {x0}') def get_reeval_FWHM(param_values, cgmodel, file_list, temperature_list, output_data, param_names, units, frame_begin, sample_spacing, sparsify_stride, frame_end, n_cpu, n_trial_boot, num_intermediate_states): """ Objective function to be minimized """ # Construct dictionary of parameter update instructions: param_dict = {} # if len(param_names) == 1: # # 1D optimization: # param_dict[param_names[0]] = param_values * units[0] for i in range(len(param_names)): param_dict[param_names[i]] = param_values[i] * units[i] if verbose: print(f'Current parameters: {param_dict}') # Re-evaluate energy with current force field parameters: # For bootstrapping, evaluate all frames between [frame_begin:sparsify_stride:frame_end], and # apply the sample_spacing only to the heat capacity part U_eval, simulation = eval_energy( cgmodel, file_list, temperature_list, param_dict, frame_begin=frame_begin, frame_stride=sparsify_stride, frame_end=frame_end, n_cpu=n_cpu, verbose=verbose, ) # Evaluate heat capacity and full-width half-maximum from bootstrapping: (new_temperature_list, C_v_values, C_v_uncertainty, Tm_value, Tm_uncertainty, Cv_height_value, Cv_height_uncertainty, FWHM_value, FWHM_uncertainty, N_eff_values) = bootstrap_heat_capacity( U_kln=U_eval, output_data=output_data, frame_begin=frame_begin, frame_end=frame_end, sample_spacing=sample_spacing, sparsify_stride=sparsify_stride, num_intermediate_states=num_intermediate_states, n_trial_boot=n_trial_boot, plot_file=f'heat_capacity_boot_{param_names[0]}_{param_values}.pdf', ) if verbose: print(f'Current FWHM: {FWHM_value} +/- {FWHM_uncertainty[0]}') print(f'Current minimum N_eff: {np.min(N_eff_values)}') # Check for minimum N_eff criteria. # If too small, the minimization should stop if we're using a gradient method. # If we're not using a gradient method, return a large value. if np.min(N_eff_values) < min_eff_samples: print(f'Insufficient number of effective samples ({np.min(N_eff_values)})') # print(f'Creating a cgmodel with current parameters...,end='') # Create the cgmodel # print('done') exit() return FWHM_value.value_in_unit(unit.kelvin) # Run optimization: # if len(param_names) == 1: # # Do scalar optimization: # opt_results = minimize_scalar(get_reeval_FWHM, x0, # args=(cgmodel, file_list, temperature_list, output_data, param_names, units, # frame_begin, sample_spacing, sparsify_stride, frame_end, n_cpu, n_trial_boot, num_intermediate_states), # method='bounded', # bounds=[bounds[0][0],bounds[0][1]], # options={'maxiter': 25}, # ) # else: # Do multivariate optimization: opt_results = minimize(get_reeval_FWHM, x0, jac='2-point', args=(cgmodel, file_list, temperature_list, output_data, param_names, units, frame_begin, sample_spacing, sparsify_stride, frame_end, n_cpu, n_trial_boot, num_intermediate_states), method=min_method, bounds=bounds, options={'maxfun': 25, 'finite_diff_rel_step': 0.005, 'eta': 0.5}, # This should be user input ) # TODO: plot the heat capacity curves at each iteration, and make a plot of all FWHM_values # Construct dictionary of optimal parameters: opt_param_dict = {} k = 0 for key,value in param_bounds_dict.items(): opt_param_dict[key] = opt_results.x[k] * units[k] k += 1 return opt_param_dict, opt_results
[docs]def get_fwhm_symmetry(C_v, T_list): """ """ symmetry_list = [] fwhm_list = [] for i in range(1, len(C_v) - 1): if C_v[i] >= C_v[i - 1] and C_v[i] >= C_v[i + 1]: max_value = C_v[i] max_value_T = T_list[i] half_max = 0.5 * max_value for j in range(i, 0, -1): if C_v[j] < half_max: break # interpolate to get lower HM lower_C = C_v[j] lower_T = T_list[j] upper_C = C_v[j + 1] upper_T = T_list[j + 1] delta_T = upper_T - lower_T delta_C = upper_C - lower_C lower_delta_C = half_max - lower_C delta_hm = lower_delta_C / delta_C lower_hm_T = delta_hm * delta_T for j in range(i, len(C_v)): if C_v[j] < half_max: break # interpolate to get lower HM lower_C = C_v[j] lower_T = T_list[j - 1] upper_C = C_v[j - 1] upper_T = T_list[j] delta_T = upper_T - lower_T delta_C = upper_C - lower_C upper_delta_C = C_v[j - 1] - half_max delta_hm = upper_delta_C / delta_C upper_hm_T = delta_hm * delta_T fwhm = upper_hm_T - lower_hm_T fwhm_list.append(fwhm) lower_delta = max_value_T - lower_hm_T upper_delta = upper_hm_T - max_value_T symmetry = abs(lower_delta - upper_delta) symmetry_list.append(symmetry) print("Symmetry = " + str(symmetry_list)) print("FWHM = " + str(fwhm_list)) symmetry = mean(symmetry_list) fwhm = mean(fwhm_list) return (symmetry, fwhm)
[docs]def get_num_maxima(C_v): """ """ maxima = 0 for i in range(1, len(C_v)): if C_v[i] >= C_v[i - 1] and C_v[i] >= C_v[i + 1]: maxima = maxima + 1 return maxima
[docs]def calculate_C_v_fitness(C_v, T_list): """ """ num_maxima = get_num_maxima(C_v) symmetry = get_fwhm_symmetry(C_v, T_list) num_maxima_weight = 0.33 symmetry_wieght = 0.33 fwhm_weight = 0.33 maxima_fitness = math.exp(-(num_maxima - 1) * num_maxima_weight) symmetry_fitness = math.exp(-(abs(symmetry - 1)) * symmetry_weight) fwhm_fitness = math.exp(-(fwhm) * fwhm_weight) fitness = maxima_fitness * symmetry_fitness * fwhm_fitness return fitness
[docs]def optimize_parameter( cgmodel, optimization_parameter, optimization_range_min, optimization_range_max, steps=None ): """ """ if steps == None: steps = 100 step_size = (optimization_range_max - optimization_range_min) / 100 parameter_values = [step * step_size for step in range(1, steps)] potential_energies = [] for parameter in parameter_values: cgmodel.optimization_parameter = parameter positions, potential_energy, time_step = minimize_structure( cgmodel.topology, cgmodel.system, cgmodel.positions, temperature=300.0 * unit.kelvin, simulation_time_step=None, total_simulation_time=1.0 * unit.picosecond, output_pdb="minimum.pdb", output_data="minimization.dat", print_frequency=1, ) potential_energies.append(potential_energy) best_value = min(potential_energies) return (best_value, potential_energies, parameter_values)