import os
import subprocess
import time
import matplotlib.cm as cm
import matplotlib.pyplot as pyplot
import numpy as np
import openmmtools
from cg_openmm.utilities.util import get_box_vectors, set_box_vectors
from matplotlib.backends.backend_pdf import PdfPages
from matplotlib.colors import Normalize
from mdtraj import Topology, Trajectory
from mdtraj.formats import PDBTrajectoryFile
from mpl_toolkits.axes_grid1 import make_axes_locatable
from openmm import unit
from openmm.app.dcdfile import DCDFile
from openmm.app.pdbfile import PDBFile
from openmmtools.multistate import (MultiStateReporter, MultiStateSampler,
ReplicaExchangeAnalyzer,
ReplicaExchangeSampler)
from pymbar import timeseries
from scipy.optimize import minimize_scalar
from scipy.special import erf
# quiet down some citation spam
MultiStateSampler._global_citation_silence = True
kB = (unit.MOLAR_GAS_CONSTANT_R).in_units_of(unit.kilojoule / (unit.kelvin * unit.mole))
[docs]def make_replica_dcd_files(
topology, timestep=5*unit.femtosecond, time_interval=200,
output_dir="output", output_data="output.nc", checkpoint_data="output_checkpoint.nc",
frame_begin=0, frame_stride=1, center=False):
"""
Make dcd files from replica exchange simulation trajectory data.
:param topology: OpenMM Topology
:type topology: `Topology() <https://simtk.org/api_docs/openmm/api4_1/python/classsimtk_1_1openmm_1_1app_1_1topology_1_1Topology.html>`_
:param timestep: Time step used in the simulation (default=5*unit.femtosecond)
:type timestep: `Quantity() <http://docs.openmm.org/development/api-python/generated/simtk.unit.quantity.Quantity.html>` float * simtk.unit
:param time_interval: frequency, in number of time steps, at which positions were recorded (default=200)
:type time_interval: int
:param output_dir: path to which we will write the output (default='output')
:type output_dir: str
:param output_data: name of output .nc data file (default='output.nc')
:type output_data: str
:param checkpoint_data: name of checkpoint .nc data file (default='output_checkpoint.nc')
:type checkpoint_data: str
:param frame_begin: Frame at which to start writing the dcd trajectory (default=0)
:type frame_begin: int
:param frame_stride: advance by this many time intervals when writing dcd trajectories (default=1)
:type frame_stride: int
:param center: align all frames in the replica trajectories (default=False)
:type center: Boolean
"""
file_list = []
output_data_path = os.path.join(output_dir, output_data)
# Get number of replicas:
reporter = MultiStateReporter(output_data_path, open_mode='r', checkpoint_storage=checkpoint_data)
states = reporter.read_thermodynamic_states()[0]
n_replicas=len(states)
sampler_states = reporter.read_sampler_states(iteration=0)
xunit = sampler_states[0].positions[0].unit
for replica_index in range(n_replicas):
replica_positions = extract_trajectory(topology, reporter, replica_index=replica_index,
frame_begin=frame_begin, frame_stride=frame_stride)
n_frames_tot = replica_positions.shape[0]
# Determine simulation time (in ps) for each frame:
time_delta_ps = (timestep*time_interval).value_in_unit(unit.picosecond)
traj_times = np.linspace(
frame_begin*time_delta_ps,
(frame_begin+frame_stride*(n_frames_tot-1))*time_delta_ps,
num=n_frames_tot,
)
file_name = f"{output_dir}/replica_{replica_index+1}.dcd"
# Trajectories are written in nanometers:
replica_traj = Trajectory(
replica_positions,
Topology.from_openmm(topology),
time=traj_times,
)
if center:
ref_traj = replica_traj[0]
replica_traj.superpose(ref_traj)
# This rewrites to replica_traj
Trajectory.save_dcd(replica_traj,file_name)
reporter.close()
return file_list
[docs]def make_replica_pdb_files(
topology, output_dir="output", output_data="output.nc", checkpoint_data="output_checkpoint.nc",
frame_begin=0, frame_stride=1, center=False):
"""
Make pdb files from replica exchange simulation trajectory data.
:param topology: OpenMM Topology
:type topology: `Topology() <https://simtk.org/api_docs/openmm/api4_1/python/classsimtk_1_1openmm_1_1app_1_1topology_1_1Topology.html>`_
:param output_dir: path to which we will write the output (default='output')
:type output_dir: str
:param output_data: name of output .nc data file (default='output.nc')
:type output_data: str
:param checkpoint_data: name of checkpoint .nc data file (default='output_checkpoint.nc')
:type checkpoint_data: str
:param frame_begin: Frame at which to start writing the pdb trajectory (default=0)
:type frame_begin: int
:param frame_stride: advance by this many frames when writing pdb trajectories (default=1)
:type frame_stride: int
:param center: align all frames in the replica trajectories (default=False)
:type center: Boolean
:returns:
- file_list ( List( str ) ) - A list of names for the files that were written
"""
file_list = []
output_data_path = os.path.join(output_dir, output_data)
# Get number of replicas:
reporter = MultiStateReporter(output_data_path, open_mode='r', checkpoint_storage=checkpoint_data)
states = reporter.read_thermodynamic_states()[0]
n_replicas = len(states)
sampler_states = reporter.read_sampler_states(iteration=0)
xunit = sampler_states[0].positions[0].unit
for replica_index in range(n_replicas):
replica_positions = extract_trajectory(topology, reporter, replica_index=replica_index,
frame_begin=frame_begin, frame_stride=frame_stride)
file_name = f"{output_dir}/replica_{replica_index+1}.pdb"
# Trajectories are written in nanometers:
replica_traj = Trajectory(
replica_positions,
Topology.from_openmm(topology),
)
if center:
ref_traj = replica_traj[0]
replica_traj.superpose(ref_traj)
# This rewrites to replica_traj
Trajectory.save_pdb(replica_traj,file_name)
reporter.close()
return file_list
[docs]def make_state_dcd_files(
topology, timestep=5*unit.femtosecond, time_interval=200,
output_dir="output", output_data="output.nc", checkpoint_data="output_checkpoint.nc",
frame_begin=0, frame_stride=1, center=True):
"""
Make dcd files by state from replica exchange simulation trajectory data.
Note: these are discontinuous trajectories with constant temperature state.
:param topology: OpenMM Topology
:type topology: `Topology() <https://simtk.org/api_docs/openmm/api4_1/python/classsimtk_1_1openmm_1_1app_1_1topology_1_1Topology.html>`_
:param timestep: Time step used in the simulation (default=5*unit.femtosecond)
:type timestep: `Quantity() <http://docs.openmm.org/development/api-python/generated/simtk.unit.quantity.Quantity.html>` float * simtk.unit
:param time_interval: frequency, in number of time steps, at which positions were recorded (default=200)
:type time_interval: int
:param output_dir: path to which we will write the output (default='output')
:type output_dir: str
:param output_data: name of output .nc data file (default='output.nc')
:type output_data: str
:param checkpoint_data: name of checkpoint .nc data file (default='output_checkpoint.nc')
:type checkpoint_data: str
:param frame_begin: Frame at which to start writing the dcd trajectory (default=0)
:type frame_begin: int
:param frame_stride: advance by this many time intervals when writing dcd trajectories (default=1)
:type frame_stride: int
:param center: align the center of mass of each structure in the discontinuous state trajectory (default=True)
:type center: Boolean
"""
file_list = []
output_data_path = os.path.join(output_dir, output_data)
# Get number of states:
reporter = MultiStateReporter(output_data_path, open_mode='r', checkpoint_storage=checkpoint_data)
states = reporter.read_thermodynamic_states()[0]
sampler_states = reporter.read_sampler_states(iteration=0)
xunit = sampler_states[0].positions[0].unit
for state_index in range(len(states)):
state_positions = extract_trajectory(topology, reporter, state_index=state_index,
frame_begin=frame_begin, frame_stride=frame_stride)
n_frames_tot = state_positions.shape[0]
# Determine simulation time (in ps) for each frame:
time_delta_ps = (timestep*time_interval).value_in_unit(unit.picosecond)
traj_times = np.linspace(
frame_begin*time_delta_ps,
(frame_begin+frame_stride*(n_frames_tot-1))*time_delta_ps,
num=n_frames_tot,
)
file_name = f"{output_dir}/state_{state_index+1}.dcd"
# Trajectories are written in nanometers:
state_traj = Trajectory(
state_positions,
Topology.from_openmm(topology),
time=traj_times,
)
if center:
ref_traj = state_traj[0]
state_traj.superpose(ref_traj)
# This rewrites to state_traj
Trajectory.save_dcd(state_traj,file_name)
reporter.close()
return file_list
[docs]def make_state_pdb_files(
topology, output_dir="output", output_data="output.nc", checkpoint_data="output_checkpoint.nc",
frame_begin=0, frame_stride=1, center=True):
"""
Make pdb files by state from replica exchange simulation trajectory data.
Note: these are discontinuous trajectories with constant temperature state.
:param topology: OpenMM Topology
:type topology: `Topology() <https://simtk.org/api_docs/openmm/api4_1/python/classsimtk_1_1openmm_1_1app_1_1topology_1_1Topology.html>`_
:param output_dir: path to which we will write the output (default='output')
:type output_dir: str
:param output_data: name of output .nc data file (default='output.nc')
:type output_data: str
:param checkpoint_data: name of checkpoint .nc data file (default='output_checkpoint.nc')
:type checkpoint_data: str
:param frame_begin: Frame at which to start writing the pdb trajectory (default=0)
:type frame_begin: int
:param frame_stride: advance by this many frames when writing pdb trajectories (default=1)
:type frame_stride: int
:param center: align the center of mass of each structure in the discontinuous state trajectory (default=True)
:type center: Boolean
:returns:
- file_list ( List( str ) ) - A list of names for the files that were written
"""
file_list = []
output_data_path = os.path.join(output_dir, output_data)
# Get number of states:
reporter = MultiStateReporter(output_data_path, open_mode='r', checkpoint_storage=checkpoint_data)
states = reporter.read_thermodynamic_states()[0]
sampler_states = reporter.read_sampler_states(iteration=0)
xunit = sampler_states[0].positions[0].unit
for state_index in range(len(states)):
state_positions = extract_trajectory(topology, reporter, state_index=state_index,
frame_begin=frame_begin, frame_stride=frame_stride)
file_name = f"{output_dir}/state_{state_index+1}.pdb"
# Trajectories are written in nanometers:
state_traj = Trajectory(
state_positions,
Topology.from_openmm(topology),
)
if center:
ref_traj = state_traj[0]
state_traj.superpose(ref_traj)
# This rewrites to state_traj
Trajectory.save_pdb(state_traj,file_name)
reporter.close()
return file_list
[docs]def process_replica_exchange_data(
output_data="output/output.nc", output_directory="output", series_per_page=4,
write_data_file=True, plot_production_only=False, print_timing=False,
equil_nskip=1, frame_begin=0, frame_end=-1,
):
"""
Read replica exchange simulation data, detect equilibrium and decorrelation time, and plot replica exchange results.
:param output_data: path to output .nc file from replica exchange simulation, (default='output/output.nc')
:type output_data: str
:param output_directory: path to which output files will be written (default='output')
:type output_directory: stry
:param series_per_page: number of replica data series to plot per pdf page (default=4)
:type series_per_page: int
:param write_data_file: Option to write a text data file containing the state_energies array (default=True)
:type write_data_file: Boolean
:param plot_production_only: Option to plot only the production region, as determined from pymbar detectEquilibration (default=False)
:type plot_production_only: Boolean
:param equil_nskip: skip this number of frames to sparsify the energy timeseries for pymbar detectEquilibration (default=1) - this is used only when frame_begin=0 and the trajectory has less than 40000 frames.
:type equil_nskip: Boolean
:param frame_begin: analyze starting from this frame, discarding all prior as equilibration period (default=0)
:type frame_begin: int
:param frame_end: analyze up to this frame only, discarding the rest (default=-1).
:type frame_end: int
:returns:
- replica_energies ( `Quantity() <http://docs.openmm.org/development/api-python/generated/simtk.unit.quantity.Quantity.html>`_ ( np.float( [number_replicas,number_simulation_steps] ), simtk.unit ) ) - The potential energies for all replicas at all (printed) time steps
- replica_state_indices ( np.int64( [number_replicas,number_simulation_steps] ), simtk.unit ) - The thermodynamic state assignments for all replicas at all (printed) time steps
- production_start ( int - The frame at which the production region begins for all replicas, as determined from pymbar detectEquilibration
- sample_spacing ( int - The number of frames between uncorrelated state energies, estimated using heuristic algorithm )
- n_transit ( np.float( [number_replicas] ) ) - Number of half-transitions between state 0 and n for each replica
- mixing_stats ( tuple ( np.float( [number_replicas x number_replicas] ) , np.float( [ number_replicas ] ) , float( statistical inefficiency ) ) ) - transition matrix, corresponding eigenvalues, and statistical inefficiency
"""
t1 = time.perf_counter()
# Read the simulation coordinates for individual temperature replicas
reporter = MultiStateReporter(output_data, open_mode="r")
t2 = time.perf_counter()
if print_timing:
print(f"open data time: {t2-t1}")
# figure out what the time between output is.
# We assume all use the same time step (which i think is required)
mcmove = reporter.read_mcmc_moves()[0]
time_interval = mcmove.n_steps*mcmove.timestep
t3 = time.perf_counter()
if print_timing:
print(f"read_mcmc_moves time: {t3-t2}")
# figure out what the temperature list is
states = reporter.read_thermodynamic_states()[0]
t4 = time.perf_counter()
if print_timing:
print(f"read_thermodynamics_states time: {t4-t3}")
temperature_list = []
for s in states:
temperature_list.append(s.temperature)
analyzer = ReplicaExchangeAnalyzer(reporter)
t5 = time.perf_counter()
(
replica_energies,
unsampled_state_energies,
neighborhoods,
replica_state_indices,
) = analyzer.read_energies()
# Truncate output of read_energies() to last frame of interest
if frame_end > 0:
# Use frames from frame_begin to frame_end
replica_energies = replica_energies[:,:,:frame_end]
unsampled_state_energies = unsampled_state_energies[:,:,:frame_end]
neighborhoods = neighborhoods[:,:,:frame_end]
replica_state_indices = replica_state_indices[:,:frame_end]
t6 = time.perf_counter()
if print_timing:
print(f"read_energies time: {t6-t5}")
n_particles = np.shape(reporter.read_sampler_states(iteration=0)[0].positions)[0]
temps = np.array([temp._value for temp in temperature_list])
beta_k = 1 / (kB * temps)
n_replicas = len(temperature_list)
for k in range(n_replicas):
replica_energies[:, k, :] *= beta_k[k] ** (-1)
t7 = time.perf_counter()
if print_timing:
print(f"reduce replica energies time: {t7-t6}")
total_steps = len(replica_energies[0][0])
state_energies = np.zeros([n_replicas, total_steps])
t8 = time.perf_counter()
# there must be some better way to do this as list comprehension.
for step in range(total_steps):
for state in range(n_replicas):
state_energies[state, step] = replica_energies[
np.where(replica_state_indices[:, step] == state)[0], 0, step
]
t9 = time.perf_counter()
if print_timing:
print(f"assign state energies time: {t9-t8}")
# can run physical-valication on these state_energies
# Use pymbar timeseries module to detect production period
t10 = time.perf_counter()
# Start of equilibrated data:
t0 = np.zeros((n_replicas))
# Statistical inefficiency:
g = np.zeros((n_replicas))
subsample_indices = {}
# If sufficiently large, discard the first 20000 frames as equilibration period and use
# subsampleCorrelatedData to get the energy decorrelation time.
if total_steps >= 40000 or frame_begin > 0:
if frame_begin > 0:
# If specified, use frame_begin as the start of the production region
production_start=frame_begin
else:
# Otherwise, use frame 20000
production_start=20000
for state in range(n_replicas):
subsample_indices[state] = timeseries.subsampleCorrelatedData(
state_energies[state][production_start:],
conservative=True,
)
g[state] = subsample_indices[state][1]-subsample_indices[state][0]
else:
# For small trajectories, use detectEquilibration
for state in range(n_replicas):
t0[state], g[state], Neff_max = timeseries.detectEquilibration(state_energies[state], nskip=equil_nskip)
# Choose the latest equil timestep to apply to all states
production_start = int(np.max(t0))
# Assume a normal distribution (very rough approximation), and use mean plus
# the number of standard deviations which leads to (n_replica-1)/n_replica coverage
# For 12 replicas this should be the mean + 1.7317 standard deviations
# x standard deviations is the solution to (n_replica-1)/n_replica = erf(x/sqrt(2))
# This is equivalent to a target of 23/24 CDF value
print(f"Correlation times (frames): {g.astype(int)}")
def erf_fun(x):
return np.power((erf(x/np.sqrt(2))-(n_replicas-1)/n_replicas),2)
# x must be larger than zero
opt_g_results = minimize_scalar(
erf_fun,
bounds=(0,10),
method='bounded',
)
if not opt_g_results.success:
print("Error solving for correlation time, exiting...")
print(f"erf opt results: {opt_g_results}")
exit()
sample_spacing = int(np.ceil(np.mean(g)+opt_g_results.x*np.std(g)))
t11 = time.perf_counter()
if print_timing:
print(f"detect equil and subsampling time: {t11-t10}")
print("state mean energies variance")
for state in range(n_replicas):
state_mean = np.mean(state_energies[state,production_start::sample_spacing])
state_std = np.std(state_energies[state,production_start::sample_spacing])
print(
f" {state:4d} {state_mean:10.6f} {state_std:10.6f}"
)
t12 = time.perf_counter()
if write_data_file == True:
f = open(os.path.join(output_directory, "replica_energies.dat"), "w")
for step in range(total_steps):
f.write(f"{step:10d}")
for replica_index in range(n_replicas):
f.write(f"{replica_energies[replica_index,replica_index,step]:12.6f}")
f.write("\n")
f.close()
t13 = time.perf_counter()
if print_timing:
print(f"Optionally write .dat file: {t13-t12}")
t14 = time.perf_counter()
if plot_production_only==True:
plot_replica_exchange_energies(
state_energies[:,production_start:],
temperature_list,
series_per_page,
time_interval=time_interval,
time_shift=production_start*time_interval,
file_name=f"{output_directory}/rep_ex_ener.pdf",
)
plot_replica_exchange_energy_histograms(
state_energies[:,production_start:],
temperature_list,
file_name=f"{output_directory}/rep_ex_ener_hist.pdf",
)
plot_replica_exchange_summary(
replica_state_indices[:,production_start:],
temperature_list,
series_per_page,
time_interval=time_interval,
time_shift=production_start*time_interval,
file_name=f"{output_directory}/rep_ex_states.pdf",
)
plot_replica_state_matrix(
replica_state_indices[:,production_start:],
file_name=f"{output_directory}/state_probability_matrix.pdf",
)
else:
plot_replica_exchange_energies(
state_energies,
temperature_list,
series_per_page,
time_interval=time_interval,
file_name=f"{output_directory}/rep_ex_ener.pdf",
)
plot_replica_exchange_energy_histograms(
state_energies,
temperature_list,
file_name=f"{output_directory}/rep_ex_ener_hist.pdf",
)
plot_replica_exchange_summary(
replica_state_indices,
temperature_list,
series_per_page,
time_interval=time_interval,
file_name=f"{output_directory}/rep_ex_states.pdf",
)
plot_replica_state_matrix(
replica_state_indices,
file_name=f"{output_directory}/state_probability_matrix.pdf",
)
t15 = time.perf_counter()
if print_timing:
print(f"plotting time: {t15-t14}")
# Analyze replica exchange state transitions
# For each replica, how many times does the thermodynamic state go between state 0 and state n
# For consistency with the other mixing statistics, use only the production region here
replica_state_indices_prod = replica_state_indices[:,production_start:]
# Number of one-way transitions from states 0 to n or states n to 0
n_transit = np.zeros((n_replicas,1))
# Replica_state_indices is [n_replicas x n_iterations]
for rep in range(n_replicas):
last_bound = None
for i in range(replica_state_indices_prod.shape[1]):
if replica_state_indices_prod[rep,i] == 0 or replica_state_indices_prod[rep,i] == (n_replicas-1):
if last_bound is None:
# This is the first time state 0 or n is visited
pass
else:
if last_bound != replica_state_indices_prod[rep,i]:
# This is a completed transition from 0 to n or n to 0
n_transit[rep] += 1
last_bound = replica_state_indices_prod[rep,i]
t16 = time.perf_counter()
if print_timing:
print(f"replica transition analysis: {t16-t15}")
# Compute transition matrix from the analyzer
mixing_stats = analyzer.generate_mixing_statistics(number_equilibrated=production_start)
t17 = time.perf_counter()
if print_timing:
print(f"compute transition matrix: {t17-t16}")
print(f"total time elapsed: {t17-t1}")
# Close reporter/.nc file:
reporter.close()
return (replica_energies, replica_state_indices, production_start, sample_spacing, n_transit, mixing_stats)
[docs]def run_replica_exchange(
topology,
system,
positions,
total_simulation_time=1.0 * unit.picosecond,
simulation_time_step=None,
temperature_list=None,
friction=1.0 / unit.picosecond,
minimize=True,
exchange_frequency=1000,
output_data="output/output.nc",
):
"""
Run a OpenMMTools replica exchange simulation using an OpenMM coarse grained model.
:param topology: OpenMM Topology
:type topology: `Topology() <https://simtk.org/api_docs/openmm/api4_1/python/classsimtk_1_1openmm_1_1app_1_1topology_1_1Topology.html>`_
:param system: OpenMM System()
:type system: `System() <https://simtk.org/api_docs/openmm/api4_1/python/classsimtk_1_1openmm_1_1openmm_1_1System.html>`_
:param positions: Positions array for the model we would like to test
:type positions: `Quantity() <http://docs.openmm.org/development/api-python/generated/simtk.unit.quantity.Quantity.html>`_ ( np.array( [cgmodel.num_beads,3] ), simtk.unit )
:param total_simulation_time: Total run time for individual simulations
:type total_simulation_time: `SIMTK <https://simtk.org/>`_ `Unit() <http://docs.openmm.org/7.1.0/api-python/generated/simtk.unit.unit.Unit.html>`_
:param simulation_time_step: Simulation integration time step
:type simulation_time_step: `SIMTK <https://simtk.org/>`_ `Unit() <http://docs.openmm.org/7.1.0/api-python/generated/simtk.unit.unit.Unit.html>`_
:param temperature_list: List of temperatures for which to perform replica exchange simulations, default = None
:type temperature: List( float * simtk.unit.temperature )
:param friction: Langevin thermostat friction coefficient, default = 1 / ps
:type friction: `SIMTK <https://simtk.org/>`_ `Unit() <http://docs.openmm.org/7.1.0/api-python/generated/simtk.unit.unit.Unit.html>`_
:param minimize: Whether minimization is done before running the simulation
:type minimize: bool
:param output_data: Name of NETCDF file where we will write simulation data
:type output_data: string
:param exchange_frequency: Number of time steps between replica exchange attempts, Default = None
:type exchange_frequency: int
:param output_data: file to put the output .nc
:type output_data: netCDF4 file as generated by OpenMM
:returns:
- replica_energies ( `Quantity() <http://docs.openmm.org/development/api-python/generated/simtk.unit.quantity.Quantity.html>`_ ( np.float( [number_replicas,number_simulation_steps] ), simtk.unit ) ) - The potential energies for all replicas at all (printed) time steps
- replica_positions ( `Quantity() <http://docs.openmm.org/development/api-python/generated/simtk.unit.quantity.Quantity.html>`_ ( np.float( [number_replicas,number_simulation_steps,cgmodel.num_beads,3] ), simtk.unit ) ) - The positions for all replicas at all (printed) time steps
- replica_state_indices ( np.int64( [number_replicas,number_simulation_steps] ), simtk.unit ) - The thermodynamic state assignments for all replicas at all (printed) time steps
:Example:
>>> from foldamers.cg_model.cgmodel import CGModel
>>> from cg_openmm.simulation.rep_exch import *
>>> cgmodel = CGModel()
>>> replica_energies,replica_positions,replica_state_indices = run_replica_exchange(cgmodel.topology,cgmodel.system,cgmodel.positions)
"""
simulation_steps = int(np.floor(total_simulation_time / simulation_time_step))
exchange_attempts = int(np.floor(simulation_steps / exchange_frequency))
if temperature_list is None:
temperature_list = [((300.0 + i) * unit.kelvin) for i in range(-50, 50, 10)]
num_replicas = len(temperature_list)
sampler_states = list()
thermodynamic_states = list()
# Define thermodynamic states.
# box_vectors = system.getDefaultPeriodicBoxVectors()
for temperature in temperature_list:
thermodynamic_state = openmmtools.states.ThermodynamicState(
system=system, temperature=temperature
)
thermodynamic_states.append(thermodynamic_state)
sampler_states.append(
openmmtools.states.SamplerState(positions)
) # no box vectors, non-periodic system.
# Create and configure simulation object.
move = openmmtools.mcmc.LangevinDynamicsMove(
timestep=simulation_time_step,
collision_rate=friction,
n_steps=exchange_frequency,
reassign_velocities=False,
)
simulation = ReplicaExchangeSampler(
mcmc_moves=move,
number_of_iterations=exchange_attempts,
replica_mixing_scheme='swap-neighbors',
)
if os.path.exists(output_data):
os.remove(output_data)
reporter = MultiStateReporter(output_data, checkpoint_interval=1)
simulation.create(thermodynamic_states, sampler_states, reporter)
if minimize:
simulation.minimize()
print("Running OpenMM replica exchange simulation...")
print(f"Time step: {simulation_time_step}")
print(f"Iterations: {exchange_attempts}",flush=True)
try:
simulation.run()
except BaseException:
print("Replica exchange simulation failed, try verifying your model/simulation settings.")
exit()
return
[docs]def restart_replica_exchange(
total_simulation_time,
simulation_time_step=5*unit.picosecond,
exchange_frequency=200,
output_data="output/output.nc",
):
"""
Restart an OpenMMTools replica exchange simulation using an OpenMM coarse grained model and
output .nc files from the previous segment of the simulation.
:param total_simulation_time: Total run time for original + new simulation segments
:type total_simulation_time: `SIMTK <https://simtk.org/>`_ `Unit() <http://docs.openmm.org/7.1.0/api-python/generated/simtk.unit.unit.Unit.html>`_
:param simulation_time_step: Simulation integration time step (default=5*unit.picosecond)
:type simulation_time_step: `SIMTK <https://simtk.org/>`_ `Unit() <http://docs.openmm.org/7.1.0/api-python/generated/simtk.unit.unit.Unit.html>`_
:param exchange_frequency: Number of time steps between replica exchange attempts (default=200)
:type exchange_frequency: int
:param output_data: Path to the NETCDF file for previous segment of simulation - this will be appended to (default="output/output.nc")
:type output_data: str
"""
# Compute number of total time steps and iterations:
simulation_steps = int(np.floor(total_simulation_time / simulation_time_step))
exchange_attempts = int(np.floor(simulation_steps / exchange_frequency))
# Load in the reporter from the original simulation:
reporter = MultiStateReporter(output_data, open_mode="r+")
simulation = ReplicaExchangeSampler.from_storage(reporter)
# Compute how many more iterations are needed:
n_iter_remain = exchange_attempts-simulation.iteration
print(f"Continuing OpenMM replica exchange simulation from iteration {simulation.iteration}")
print(f"Time step: {simulation_time_step}")
print(f"New iterations: {n_iter_remain}",flush=True)
simulation.extend(n_iterations=n_iter_remain)
return
[docs]def get_minimum_energy_ensemble(
topology, replica_energies, replica_positions, ensemble_size=5, file_name=None
):
"""
Get an ensemble of low (potential) energy poses, and write the lowest energy structure to a PDB file if a file_name is provided.
:param topology: OpenMM Topology()
:type topology: `Topology() <https://simtk.org/api_docs/openmm/api4_1/python/classsimtk_1_1openmm_1_1app_1_1topology_1_1Topology.html>`_
:param replica_energies: List of dimension num_replicas X simulation_steps, which gives the energies for all replicas at all simulation steps
:type replica_energies: List( List( float * simtk.unit.energy for simulation_steps ) for num_replicas )
:param replica_positions: List of positions for all output frames for all replicas
:type replica_positions: np.array( ( float * simtk.unit.positions for num_beads ) for simulation_steps )
:param file_name: Output destination for PDB coordinates of minimum energy pose, Default = None
:returns:
- ensemble ( List() ) - A list of poses that are in the minimum energy ensemble.
:Example:
>>> from foldamers.cg_model.cgmodel import CGModel
>>> from cg_openmm.simulation.rep_exch import *
>>> cgmodel = CGModel()
>>> replica_energies,replica_positions,replica_state_indices = run_replica_exchange(cgmodel.topology,cgmodel.system,cgmodel.positions)
>>> ensemble_size = 5
>>> file_name = "minimum.pdb"
>>> minimum_energy_ensemble = get_minimum_energy_ensemble(cgmodel.topology,replica_energies,replica_positions,ensemble_size=ensemble_size,file_name=file_name)
"""
# Get the minimum energy structure sampled during the simulation
ensemble = []
ensemble_energies = []
for replica in range(len(replica_energies)):
energies = np.array([energy for energy in replica_energies[replica][replica]])
for energy in range(len(energies)):
if len(ensemble) < ensemble_size:
ensemble.append(replica_positions[replica][energy])
ensemble_energies.append(energies[energy])
else:
for comparison in range(len(ensemble_energies)):
if energies[energy] < ensemble_energies[comparison]:
ensemble_energies[comparison] = energies[energy]
ensemble[comparison] = replica_positions[replica][energy]
if file_name is None:
index = 1
for pose in ensemble:
file = open(str("re_min_" + str(index) + ".pdb"), "w")
PDBFile.writeFile(topology, pose, file=file)
else:
file = open(file_name, "w")
for pose in ensemble:
PDBFile.writeFile(topology, pose, file=file)
return ensemble
[docs]def plot_replica_exchange_energies(
state_energies,
temperature_list,
series_per_page,
time_interval=1.0 * unit.picosecond,
time_shift=0.0 * unit.picosecond,
file_name="rep_ex_ener.pdf",
legend=True,
):
"""
Plot the potential energies for a batch of replica exchange trajectories
:param state_energies: List of dimension num_replicas X simulation_steps, which gives the energies for all replicas at all simulation steps
:type state_energies: List( List( float * simtk.unit.energy for simulation_steps ) for num_replicas )
:param temperature_list: List of temperatures for which to perform replica exchange simulations, default = [(300.0 * unit.kelvin).__add__(i * unit.kelvin) for i in range(-20,100,10)]
:type temperature: List( float * simtk.unit.temperature )
:param time_interval: interval between energy exchanges.
:type time_interval: `SIMTK <https://simtk.org/>`_ `Unit() <http://docs.openmm.org/7.1.0/api-python/generated/simtk.unit.unit.Unit.html>`_
:param time_shift: amount of time before production period to shift the time axis(default = 0)
:type time_shift: `SIMTK <https://simtk.org/>`_ `Unit() <http://docs.openmm.org/7.1.0/api-python/generated/simtk.unit.unit.Unit.html>`_
:param file_name: The pathname of the output file for plotting results, default = "replica_exchange_energies.png"
:type file_name: str
:param legend: Controls whether a legend is added to the plot
:type legend: Logical
"""
simulation_times = np.array(
[
step * time_interval.value_in_unit(unit.picosecond)
for step in range(len(state_energies[0]))
]
)
simulation_times += time_shift.value_in_unit(unit.picosecond)
# To improve pdf render speed, sparsify data to display less than 2000 data points
n_xdata = len(simulation_times)
if n_xdata <= 1000:
plot_stride = 1
else:
plot_stride = int(np.floor(n_xdata/1000))
# If more than series_per_page replicas, split into separate pages for better visibility
nmax = series_per_page
npage = int(np.ceil(len(temperature_list)/nmax))
with PdfPages(file_name) as pdf:
page_num=1
plotted_per_page=0
pyplot.figure()
for state in range(len(temperature_list)):
if plotted_per_page <= (nmax):
pyplot.plot(
simulation_times[::plot_stride],
state_energies[state,::plot_stride],
alpha=0.5,
linewidth=1,
)
plotted_per_page += 1
if (plotted_per_page >= nmax) or (state==(len(temperature_list)-1)):
# Save and close previous page
pyplot.xlabel("Simulation Time ( Picoseconds )")
pyplot.ylabel("Potential Energy ( kJ / mol )")
pyplot.title("Replica Exchange Simulation")
if legend:
pyplot.legend(
[round(temperature.value_in_unit(unit.kelvin), 1) for temperature in temperature_list[(0+(page_num-1)*nmax):(page_num*nmax)]],
loc="center left",
bbox_to_anchor=(1, 0.5),
title="T (K)",
)
pdf.savefig(bbox_inches="tight") # Save current fig to pdf page
pyplot.close()
plotted_per_page = 0
page_num += 1
return
[docs]def plot_replica_exchange_energy_histograms(
state_energies,
temperature_list,
file_name="rep_ex_ener_hist.pdf",
legend=True,
):
"""
Plot the potential energies for a batch of replica exchange trajectories
:param state_energies: List of dimension num_replicas X simulation_steps, which gives the energies for all replicas at all simulation steps
:type state_energies: List( List( float * simtk.unit.energy for simulation_steps ) for num_replicas )
:param temperature_list: List of temperatures for which to perform replica exchange simulations, default = [(300.0 * unit.kelvin).__add__(i * unit.kelvin) for i in range(-20,100,10)]
:type temperature: List( float * simtk.unit.temperature )
:param file_name: The pathname of the output file for plotting results, default = "replica_exchange_energies.png"
:type file_name: str
:param legend: Controls whether a legend is added to the plot
:type legend: Logical
"""
figure = pyplot.figure(figsize=(8.5,11))
for state in range(len(temperature_list)):
n_out, bin_edges_out = np.histogram(
state_energies[state,:],bins=20,density=True,
)
bin_centers = np.zeros((len(bin_edges_out)-1,1))
for i in range(len(bin_edges_out)-1):
bin_centers[i] = (bin_edges_out[i]+bin_edges_out[i+1])/2
pyplot.plot(bin_centers,n_out,'o-',alpha=0.5,linewidth=1,markersize=6)
pyplot.xlabel("Potential Energy ( kJ / mol )")
pyplot.ylabel("Probability")
pyplot.title("Replica Exchange Energy Histogram")
if legend:
pyplot.legend(
[round(temperature._value, 1) for temperature in temperature_list],
loc="center left",
bbox_to_anchor=(1, 0.5),
title="T (K)",
)
pyplot.savefig(file_name, bbox_inches="tight")
pyplot.close()
return
def plot_replica_state_matrix(
replica_state_indices,
file_name='state_probability_matrix.pdf'
):
# Plot a matrix of replica vs. state, coloring each box in the grid by normalized frequency
# For each replica, histogram the state indices data
# Then normalize the data and create [n_replica x n_state] patch graph
n_replicas = replica_state_indices.shape[0]
hist_all = np.zeros((n_replicas, n_replicas))
state_bin_edges = np.linspace(-0.5,n_replicas-0.5,n_replicas+1)
state_bin_centers = 0.5+state_bin_edges[0:n_replicas]
for rep in range(n_replicas):
hist_all[rep,:], bin_edges = np.histogram(
replica_state_indices[rep,:],bins=state_bin_edges,density=True,
)
# No need for global normalization, since each replica's state probabilities must sum to 1
hist_norm = np.zeros_like(hist_all)
for rep in range(n_replicas):
for state in range(n_replicas):
hist_norm[rep,state] = hist_all[rep,state]/np.max(hist_all[rep,:])
mean_score = np.mean(hist_norm)
min_score = np.amin(hist_norm)
ax = pyplot.subplot(111)
cmap=pyplot.get_cmap('nipy_spectral')
norm=Normalize(vmin=0,vmax=1)
ax.imshow(hist_norm,cmap=cmap,norm=norm)
ax.set_aspect('equal', 'box')
# Append colorbar axis to right side
divider = make_axes_locatable(ax)
cax = divider.append_axes("right",size="5%",pad=0.20)
pyplot.colorbar(
cm.ScalarMappable(cmap=cmap,norm=norm),
cax=cax,
label='normalized frequency',
)
ax.set_xlabel("State")
ax.set_ylabel("Replica")
pyplot.suptitle(f"Replica exchange state probabilities\n(Mean: {mean_score:.4f} Min: {min_score:.4f})")
pyplot.savefig(file_name)
pyplot.close()
return hist_all
[docs]def plot_replica_exchange_summary(
replica_states,
temperature_list,
series_per_page,
time_interval=1.0 * unit.picosecond,
time_shift=0.0 * unit.picosecond,
file_name="rep_ex_states.pdf",
legend=True,
):
"""
Plot the thermodynamic state assignments for individual temperature replicas as a function of the simulation time, in order to obtain a visual summary of the replica exchanges from a OpenMM simulation.
:param replica_states: List of dimension num_replicas X simulation_steps, which gives the thermodynamic state indices for all replicas at all simulation steps
:type replica_states: List( List( float * simtk.unit.energy for simulation_steps ) for num_replicas )
:param temperature_list: List of temperatures for which to perform replica exchange simulations, default = [(300.0 * unit.kelvin).__add__(i * unit.kelvin) for i in range(-20,100,10)]
:type temperature: List( float * simtk.unit.temperature )
:param time_interval: interval between energy exchanges.
:type time_interval: `SIMTK <https://simtk.org/>`_ `Unit() <http://docs.openmm.org/7.1.0/api-python/generated/simtk.unit.unit.Unit.html>`_
:param time_shift: amount of time before production period to shift the time axis(default = 0)
:type time_shift: `SIMTK <https://simtk.org/>`_ `Unit() <http://docs.openmm.org/7.1.0/api-python/generated/simtk.unit.unit.Unit.html>`_
:param file_name: The pathname of the output file for plotting results, default = "replica_exchange_state_transitions.png"
:type file_name: str
:param legend: Controls whether a legend is added to the plot
:type legend: Logical
"""
simulation_times = np.array(
[
step * time_interval.value_in_unit(unit.picosecond)
for step in range(len(replica_states[0]))
]
)
simulation_times += time_shift.value_in_unit(unit.picosecond)
# To improve pdf render speed, sparsify data to display less than 2000 data points
n_xdata = len(simulation_times)
if n_xdata <= 1000:
plot_stride = 1
else:
plot_stride = int(np.floor(n_xdata/1000))
# If more than series_per_page replicas, split into separate pages for better visibility
nmax = series_per_page
npage = int(np.ceil(len(temperature_list)/nmax))
with PdfPages(file_name) as pdf:
page_num=1
plotted_per_page=0
pyplot.figure()
for replica in range(len(replica_states)):
state_indices = np.array([int(round(state)) for state in replica_states[replica]])
if plotted_per_page <= (nmax):
pyplot.plot(
simulation_times[::plot_stride],
state_indices[::plot_stride],
alpha=0.5,
linewidth=1
)
plotted_per_page += 1
if (plotted_per_page >= nmax) or (replica==(len(replica_states)-1)):
# Save and close previous page
pyplot.xlabel("Simulation Time ( Picoseconds )")
pyplot.ylabel("Thermodynamic State Index")
pyplot.title("State Exchange Summary")
if legend:
pyplot.legend(
[i for i in range((page_num-1)*nmax,page_num*nmax)],
loc="center left",
bbox_to_anchor=(1, 0.5),
title="Replica Index",
)
pdf.savefig(bbox_inches="tight") # Save current fig to pdf page
pyplot.close()
plotted_per_page = 0
page_num += 1
return