Source code for mcpele.parallel_tempering._base_mpi_ptmc

from __future__ import division
import abc
import numpy as np
import random
import os
from mpi4py import MPI
import copy

[docs]class _MPI_Parallel_Tempering(object): """Abstract class for MPI Parallel Tempering calculations :class:`_MPI_Parallel_Tempering` implements all the basic MPI routines. The initialisation function and the method to find the swap pattern need to implemented in a specific child method. The replica exchange method (REM) or parallel tempering (PT) is a Monte Carlo scheme that targets the slow equilibration of systems characterised by large barriers in their free energy landscape. In REM :math:`n` replicas of the system are simulated simultaneously in the canonical (NVT) ensemble. These systems differ in temperature and, while the high temperature replicas explore the high energy regions of phase space, easily crossing large free energy barriers, the low temperature replicas explore the low lying regions of the energy landscape. In the hierarchical picture of the energy landscape, in which intra-funnel equilibration is fast and inter-funnel is slow, the high temperature replicas explore the energy landscape hopping between funnels, while the low temperature replicas explore individual funnels accurately. The idea of REM is to introduce moves that swap configurations between the different replicas, thus making the high energy regions available to the low temperature simulations and *vice versa*. It is not hard to see why this makes the exploration of configurational space more efficient, thus accelerating equilibration. The acceptance rule for parallel tempering is .. math:: P( x_i \Rightarrow x_j) = min \{ 1, \exp [- (\\beta_j - \\beta_i) (E_i - E_j)] \} The choice of temperature scale is very important for efficient equilibration to occur. From the acceptance rule we see that the acceptance will be suppressed exponentially by large differences in energy between the two configurations, just as for the Metropolis algorithm, but also by large differences in temperature. Thus, we must keep this product of differences as small as possible, to allow for efficient swapping. As a rule of thumb this can be achieved by using a scale of geometrically increasing temperatures. It is important to note that REM is a truly equilibrium Monte Carlo method: the microscopic equilibrium of each ensemble is not disturbed by the swaps, hence the ensemble averages for each replica are just as valid as for ordinary Monte Carlo simulations (unlike simulated annealing, for which ensemble averages are not well defined). In addition, ensemble averages for the extended ensemble can be obtained by histogram reweighting technique. We also highlight the fact that REM moves are very cheap to perform since they do not require additional energy evaluations. .. note :: An optimal Parallel Tempering strategy should make sure that all MCMC walks take roughly the same amount of time. Besides this fundamental consideration, note that root (rank=0) is not an evil master but rather an enlightened dictator that leads by example: root is responsible to assign jobs and control parameters (e.g. temperature) to the slaves but it also performs MCMC walks along with them. For this reason it might be optimal to give root a set of control parameters for which the simulation is leaner so that it can start doing its own things while the slaves finish their work. Parameters ---------- mcrunner : :class:`_BaseMCrunner` object of :class:`_BaseMCrunner` that performs the MCMC walks Tmax : double maximum temperature to simulate (or equivalent control parameters) Tmin : double minimum temperature to simulate (or equivalent control parameters) max_ptiter : int maximum number of Parallel Tempering iterations pfreq : int frequency with which histogram and other information is dumped to a file skip : int number of parallel tempering iteration for which swaps should not be performed. Swaps should be avoided for instance while adjusting the step size print_status : bool choose whether to print MCrunner status at each iteration base_directory : string path to base directory where to save output verbose : bool print verbose output to terminal Attributes ---------- mcrunner : :class:`_BaseMCrunner` object of :class:`_BaseMCrunner` that performs the MCMC walks nproc : int number of parallel cores rank : int MPI rank (identifier of the particular core), master has rank=0 Tmax : double maximum temperature to simulate (or equivalent control parameters) Tmin : double minimum temperature to simulate (or equivalent control parameters) max_ptiter : int maximum number of Parallel Tempering iterations ptiter : int count of current parallel tempering iteration pfreq : int frequency with which histogram and other information is dumped to a file no_exchange_int : neg int this NEGATIVE number in :func:`exchange_pattern` means that no exchange should be attempted skip : int number of parallel tempering iteration for which swaps should not be performed. Swaps should be avoided for instance while adjusting the step size swap_accepted_count : int count of accepted swaps swap_rejected_count : int count of rejected swaps nodelist : list list of ranks (should be integers from 0 to ``nprocs``) base_directory : string path to base directory where to save output ex_outstream : stream stream to output exchanges informations initialised : bool records whether PT has been initialised print_status : bool choose whether to print MCrunner status at each iteration verbose : bool print verbose output to terminal """ __metaclass__ = abc.ABCMeta def __init__(self, mcrunner, Tmax, Tmin, max_ptiter, pfreq=1, skip=0, print_status=True, base_directory=None, verbose=False): self.mcrunner = mcrunner self.comm = MPI.COMM_WORLD self.nproc = self.comm.Get_size() #total number of processors (replicas) self.rank = self.comm.Get_rank() #this is the unique identifier for the process self.Tmax = Tmax self.Tmin = Tmin self.max_ptiter = max_ptiter self.ex_outstream = open("exchanges", "w") self.verbose = verbose self.ptiter = 0 self.print_status = print_status self.skip = skip #might want to skip the first few swaps to allow for equilibration self.pfreq = pfreq self.no_exchange_int = -12345 #this NEGATIVE number in exchange pattern means that no exchange should be attempted self.initialised = False #flag self.nodelist = [i for i in xrange(self.nproc)] self.swap_accepted_count = 0 self.swap_rejected_count = 0 if base_directory is None: self.base_directory = os.path.join(os.getcwd(),'ptmc_results') else: self.base_directory = base_directory print "processor {0} ready".format(self.rank) @abc.abstractmethod
[docs] def _find_exchange_buddy(self, Earray): """Abstract method to determines the exchange pattern, it needs to be overwritten. An exchange pattern array is constructed, filled with ``self.no_exchange_int`` which signifies that no exchange should be attempted. This value is replaced with the ``rank`` of the processor with which to perform the swap if the swap attempt is successful. The exchange partner is then scattered to the other processors. Parameters ---------- Earray : numpy.array array of energies (one from each core) """
@abc.abstractmethod
[docs] def _initialise(self): """Perform all the tasks required prior to starting the computation including initialising the output files """
@abc.abstractmethod
[docs] def _print_data(self): """Abstract method responsible for printing and/or dumping the data, let it be printing the histograms or else"""
@abc.abstractmethod
[docs] def _print_status(self): """Abstract method responsible for printing and/or dumping the status, let it be printing the histograms or else"""
@abc.abstractmethod
[docs] def _close_flush(self): """Abstract method responsible for printing and/or dumping the all streams at the end of the calculation"""
[docs] def one_iteration(self): """Perform one parallel tempering iteration Each PT iteration consists of the following steps: * set the coordinates * run the MCrunner for a predefined number of steps * collect the results (energy and new coordinates) * attempt an exchange """ #set configuration and temperature at which want to perform run self.mcrunner.set_config(np.array(self.config,dtype='d'), self.energy) #now run the MCMC walk self.mcrunner.run() #collect the results result = self.mcrunner.get_results() self.energy = result.energy self.config = np.array(result.coords,dtype='d') if self.ptiter >= self.skip: self._attempt_exchange() #print and increase parallel tempering count if (self.ptiter % self.pfreq == 0): self._print_data() if self.print_status: self._print_status() self.ptiter += 1
[docs] def run(self): """Run multiple single iterations, plus initialisation if MPI_PT has not been initialised yet """ if self.initialised is False: self._initialise() ptiter = 0 while (ptiter < self.max_ptiter): if self.verbose: print "processor {0} iteration {1}".format(self.rank,ptiter) self.one_iteration() ptiter += 1 #assure that data are not thrown away since last print if ptiter == self.max_ptiter: old_max_ptiter = self.max_ptiter #here we should call a convergence test, instead of doing so through print_data(hack) self._print_data() #check that on printing of data max_ptiter hasn't changed due to convergence test if self.max_ptiter == old_max_ptiter: self._print_status() self._close_flush() print 'process {0} terminated'.format(self.rank)
[docs] def _scatter_data(self, in_send_array, adim, dtype='d'): """Method to scatter data in equal ordered chunks among replica (it relies on the rank of the replica) In simple terms it requires that ``adim % nproc = 0``. If root scatters an array of size ``nproc`` then each core will receive the rank-th element of the array. Parameters ---------- in_send_array : numpy.array incoming array (from the master) to be scattered adim : int size of the in_send_array dtype : dtype type of the elements of the array Returns ------- recv_array : numpy.array array of length ``adim/nproc`` """ if (self.rank == 0): # process 0 is the root, it has data to scatter assert(len(in_send_array) == adim) assert(adim % self.nproc == 0) send_array = np.array(in_send_array,dtype=dtype) else: # processes other than root do not send assert(adim % self.nproc == 0) send_array = None recv_array = np.empty(adim/self.nproc,dtype=dtype) self.comm.Scatter(send_array, recv_array, root=0) return recv_array
[docs] def _scatter_single_value(self, send_array, dtype='d'): """Returns a single value from a scattered array for each replica (e.g. Temperature or swap partner) .. note :: send array must be of the same length as the number of processors Parameters ---------- send_array : numpy.array incoming array (from the master) to be scattered dtype : dtype type of the elements of the array Returns ------- dtype temperature or swap partner or else """ if (self.rank == 0): assert(len(send_array) == self.nproc) T = self._scatter_data(send_array, self.nproc, dtype=dtype) assert(len(T) == 1) return T[0]
[docs] def _broadcast_data(self, in_data, adim, dtype='d'): """Identical arrays are broadcasted from root to all other processes Parameters ---------- in_data : numpy.array incoming array (from the master) to be broadcasted adim : int size of the in_data array dtype : dtype type of the elements of the array Returns ------- bcast_data : numpy.array array of length ``adim`` """ if(self.rank == 0): bcast_data = np.array(in_data, dtype=dtype) assert(len(bcast_data)==adim) else: bcast_data = np.empty(adim, dtype=dtype) self.comm.Bcast(bcast_data, root=0) return bcast_data
[docs] def _gather_data(self, in_send_array, dtype='d'): """Method to gather data in equal ordered chunks from replicas (it relies on the rank of the replica) .. note :: gather assumes that all the subprocess are sending the same amount of data to root, to send variable amounts of data must use the MPI_gatherv directive Parameters ---------- in_send_array : numpy.array incoming array (from each process) to be sent to master dtype : dtype type of the elements of the array Returns ------- recv_array : numpy.array array of length ``len(in_send_array)) * nproc`` """ in_send_array = np.array(in_send_array, dtype=dtype) if (self.rank == 0): recv_array = np.empty(len(in_send_array) * self.nproc, dtype=dtype) else: recv_array = None self.comm.Gather(in_send_array, recv_array, root=0) if (self.rank != 0): assert(recv_array is None) return recv_array
[docs] def _gather_energies(self, E): """gather energy of configurations from all processors Parameters ---------- E : double energy of each processor respectively Returns ------- recv_Earray : numpy.array array of length ``nproc`` """ send_Earray = np.array([E],dtype='d') recv_Earray = self._gather_data(send_Earray) return recv_Earray
[docs] def _point_to_point_exchange_replace(self, dest, source, data): """swap data between two processors .. note :: the message sent buffer is replaced with the received message Parameters ---------- dest : int rank of processor with which to swap source : int rank of processor with which to swap data : numpy.array array of data to exchage Returns ------- data : numpy.array the send data buffer is replaced with the receive data """ assert(dest == source) data = np.array(data,dtype='d') self.comm.Sendrecv_replace(data, dest=dest,source=source) return data
[docs] def _exchange_pairs(self, exchange_buddy, data): """Return data from the pair exchange, otherwise return the data unaltered. .. warning :: the replica sends to exchange_partner and receives from it, replacing source with self.rank would cause a deadlock Parameters ---------- exchange_buddy : int rank of processor with which to swap data : numpy.array array of data to exchage Returns ------- data : numpy.array the send data buffer is replaced with the receive data """ if (exchange_buddy != self.no_exchange_int): #print "processor {0} p-to-p exchange, old data {1}".format(self.rank, data) data = self._point_to_point_exchange_replace(exchange_buddy, exchange_buddy, data) #print "processor {0} p-to-p exchange, new data {1}".format(self.rank, data) self.swap_accepted_count+=1 else: self.swap_rejected_count+=1 return data
[docs] def _attempt_exchange(self): """This function brings together all the functions necessary to attempt a configuration swap This function is structures as follows: * root gathers the energies from the slaves * root decides who will swap with whom * root to each processor the rank of its chosen partner (buddy) * processors exchange configuration and energy """ #gather energies, only root will do so Earray = self._gather_energies(self.energy) if self.verbose: if Earray is not None: print "Earray", Earray #find exchange pattern (list of exchange buddies) exchange_pattern = self._find_exchange_buddy(Earray) #now scatter the exchange pattern so that everybody knows who their buddy is exchange_buddy = self._scatter_single_value(np.array(exchange_pattern,dtype='d')) exchange_buddy = int(exchange_buddy) #attempt configurations swap self.config = self._exchange_pairs(exchange_buddy, np.array(self.config,dtype='d')) #swap energies E = self._exchange_pairs(exchange_buddy, np.array([self.energy],dtype='d')) assert(len(E)==1) self.energy = E[0]