Source code for mppi.Calculators.QeCalculator

"""
This module manages parallel calculations with QuantumESPRESSO.
Both a scheduler like slurm or the python multiprocessing package can be used.
"""

from .Runner import Runner
import os

[docs]class QeCalculator(Runner): """ Manage (multiple) QuantumESPRESSO calculations performed in parallel. Computations are managed by a scheduler that, in the actual implementation of the class, can be `direct` or `slurm`. Parameters: omp (:py:class:`int`) : value of the OMP_NUM_THREADS variable mpi (:py:class:`int`) : number of mpi processes mpi_run (:py:class:`string`) : command for the execution of mpirun, e.g. 'mpirun -np' or 'mpiexec -np' executable (:py:class:`string`) : set the executable (pw.x, ph.x, ..) of the QuantumESPRESSO package scheduler (:py:class:`string`) : choose the scheduler used to submit the job, actually the choices implemented are 'direct' that runs the computation using the python multiprocessing package and 'slurm' that creates a slurm script multiTask (:py:class:`bool`) : if true a single run_script is built and all the computations are performed in parallel, otherwise an independent script is built for each elements of inputs and the computations are performed sequentially skip (:py:class:`bool`) : if True evaluate if one (or many) computations can be skipped. This is done by checking if the file $name.xml is present in the prefix folder, for each name in names verbose (:py:class:`bool`) : set the amount of information provided on terminal IO_time (int) : time step (in second) used by the wait method to check that the job is completed kwargs : other parameters that are stored in the _global_options dictionary. For instance the variable sbatch_options = [option1,option2,....] allows the user to include further options in the slurm script Example: >>> code = calculator(omp=1,mpi=4,mpi_run='mpirun -np',skip=True,verbose=True,scheduler='direct') >>> code.run(inputs = [...], run_dir = ...,names = [...], source_dir = ..., **kwargs) where the arguments of the run method are: Args: run_dir (:py:class:`string`) : the folder in which the simulation is performed inputs (:py:class:`list`) : list with the instances of the :class:`PwInput` class that define the input objects names (:py:class:`list`) : list with the names associated to the input files, given in the same order of the inputs list. Usually you can set the name equal to the prefix of the input object so the name of the input file and the prefix folder built by QuantumESPRESSO are equal source_dir (:py:class:`string`) : location of the scf source folder for a nscf computation. If present the class copies this folder in the run_dir with the name $prefix.save kwargs : other parameters that are stored in the run_options dictionary The calculator looks for the following variables in the run_options dictionary. These options may be useful for _asincronous_ computation managed the slurm scheduler. `dry_run=True` with this option the calculator setup the calculations and write the scrpt for submitting the jobs, but the computations are not run. `wait_end_run=False` with this option the wait of the end of the run is suppressed. """ def __init__(self, omp = os.environ.get('OMP_NUM_THREADS', 1), mpi = 2, mpi_run = 'mpirun -np', executable = 'pw.x', scheduler = 'direct', multiTask = True, skip = True, verbose = True, IO_time = 5, **kwargs): # Use the initialization from the Runner class (all options inside _global_options) Runner.__init__(self, omp=omp, mpi=mpi, mpi_run=mpi_run, executable=executable, scheduler=scheduler, multiTask=multiTask, skip=skip, verbose=verbose, IO_time=IO_time, **kwargs) if multiTask: task_str = 'parallel' else: task_str = 'serial' print('Initialize a %s QuantumESPRESSO calculator with scheduler %s' % (task_str,self._global_options['scheduler']))
[docs] def pre_processing(self): """ Process local run dictionary to create the run directory and input files. If skip = False clean the run_dir. If the 'source_dir' key is passed to the run method copy the source folder in the run_dir with the name $prefix. This procedure is performed after the deletion run_dir/prefix.save since otherwise the copy of the source_dir is deleted. """ run_dir = self.run_options.get('run_dir', '.') inputs = self.run_options.get('inputs') names = self.run_options.get('names') skip = self.run_options.get('skip') # Create the run_dir and write the input file self._ensure_run_directory() if inputs is not None: for input,name in zip(inputs,names): input.write(os.path.join(run_dir,name)+'.in') else: print('input list not provided') # if skip = False clean the run_dir if not skip: self._clean_run_dir() # Copy the source folder in the run_dir source_dir = self.run_options.get('source_dir') if source_dir is not None: self._copy_source_dir(source_dir) return {}
[docs] def process_run(self): """ Method associated to the running of the executable. The method prepares the jobs script(s), then submit the jobs and wait the end of the computation before passing to the :meth:`post_processing` method. Computations are performed in parallel or serially accordingly to the value of the multiTask option. """ multiTask = self.run_options.get('multiTask') dry_run = self.run_options.get('dry_run',False) wait_end_run = self.run_options.get('wait_end_run',True) to_run = self.select_to_run() jobs = self.build_run_script(to_run) if not dry_run: if multiTask: self.submit_job(jobs) if wait_end_run: self.wait(jobs,to_run) else: for index,job in zip(to_run,jobs): self.submit_job([job]) if wait_end_run: self.wait([job],[index]) return {}
[docs] def post_processing(self): """ Return a list with the names, including the path, of the data-file-schema.xml files for each element of inputs. If a file is absent the method returns None in the associated element of the list, making easy to understand if a specific computation has been correctly performed. Return: :py:class:`dict` : dictionary {'output' : []} where [] is a list with the names of the xml files (if the file exists) otherwise the associated element is set to None. """ run_dir = self.run_options.get('run_dir', '.') inputs = self.run_options['inputs'] results = {'output' : []} for input in inputs: prefix = input['control']['prefix'].strip("'") prefix += '.save' result = os.path.join(run_dir,prefix,'data-file-schema.xml') if os.path.isfile(result): results['output'].append(result) else: results['output'].append(None) return results
[docs] def select_to_run(self): """ If the skip attribute of run_options is True the method evaluates which computations can be skipped. This is done by checking if the file $prefix.xml is already present in the run_dir. Return: :py:class:`list` : list with numbers of the computations that have to be performed, in the same order provided in the run method """ skip = self.run_options.get('skip') run_dir = self.run_options.get('run_dir', '.') inputs = self.run_options.get('inputs') names = self.run_options.get('names') verbose = self.run_options.get('verbose') if not skip: to_run = [index for index in range(len(inputs))] return to_run else: to_run = [] for index,input in enumerate(inputs): prefix = input['control']['prefix'].strip("'") skipfile = os.path.join(run_dir,prefix)+'.xml' if os.path.isfile(skipfile): if verbose: print('Skip the run of',names[index]) else: to_run.append(index) return to_run
[docs] def build_run_script(self,to_run): """ Create the run script(s) that are executed by the :meth:`submit_job` method. The scripts depend on the scheduler adopted, and specific methods for `direct` and `slurm` scheduler are implemented. Args: to_run (:py:class:`string`) : list with the cardinal numbers of the runs to be performed Return: :py:class:`list` : list with jobs to run. The type of the object in the list depends on the chosen scheduler """ scheduler = self.run_options['scheduler'] jobs = None if scheduler == 'direct': jobs = self.direct_scheduler(to_run) elif scheduler == 'slurm': jobs = self.slurm_scheduler(to_run) else: print('scheduler unknown') return jobs
[docs] def submit_job(self,jobs): """ Submit the job. Args: jobs : The reference to the jobs to be executed. If the scheduler is `direct` jobs is a list with the instance of :py:class:multiprocessing. If the scheduler is `slurm` jobs is a list with the names of the slurm scripts """ scheduler = self.run_options['scheduler'] if scheduler == 'direct': # Set the OMP_NUM_THREADS variable in the environment os.environ['OMP_NUM_THREADS'] = str(self.run_options['omp']) for run in jobs: run.start() if scheduler == 'slurm': run_dir = self.run_options.get('run_dir', '.') for job in jobs: slurm_submit = 'cd %s ; sbatch %s.sh' %(run_dir,job) print('slurm submit: ',slurm_submit ) os.system(slurm_submit)
[docs] def wait(self,jobs,to_run): """ Wait the end of the jobs. Args: jobs : The reference to the jobs to be executed. If the scheduler is `direct` jobs is a list with the instance of :py:class:multiprocessing. If the scheduler is `slurm` jobs is a list with the names of the slurm scripts to_run (:py:class:`string`) : list with the cardinal numbers of the runs to be performed """ verbose = self.run_options.get('verbose') IO_time = self.run_options.get('IO_time') import time while not all(self._jobs_terminated(jobs)): if verbose: s = '' for index,status in zip(to_run,self._jobs_terminated(jobs)): s+='run'+str(index)+'_is_running: '+str(not status) + ' ' print(s) time.sleep(IO_time) if verbose : print('Job completed')
[docs] def direct_scheduler(self,to_run): """ Define the list of Process (methods of multiprocessing) associated to the runs specified in the list to_run. Args: to_run (:py:class:`string`) : list with the cardinal numbers of the runs to be performed Return: :py:class:`list` : list of the :py:class:`multiprocessing` objects associated to the runs of the job """ import multiprocessing def os_system_run(comm_str): os.system(comm_str) jobs = [] for index in to_run: comm_str = self.run_command(index) p = multiprocessing.Process(target=os_system_run, args=(comm_str,)) jobs.append(p) return jobs
[docs] def slurm_scheduler(self,to_run): """ Create the slurm script(s) associated to the runs specified in the list to_run. Args: to_run (:py:class:`string`) : list with the cardinal numbers of the runs to be performed Return: :py:class:`list`: list with the names of the slurm scripts associated to the computations that are not skipped """ omp = self.run_options.get('omp') mpi = self.run_options.get('mpi') names = self.run_options.get('names') run_dir = self.run_options.get('run_dir', '.') sbatch_options = self.run_options.get('sbatch_options', None) lines_options = [] lines_options.append('#!/bin/bash') lines_options.append('#SBATCH --ntasks=%s ### Number of tasks (MPI processes)'%mpi) lines_options.append('#SBATCH --cpus-per-task=%s ### Number of threads per task (OMP threads)'%omp) if sbatch_options is not None: # add other options if present in the run_options of the calculator for option in sbatch_options: lines_options.append('#SBATCH %s'%option) jobs = [] for index in to_run: job_name = 'job_'+names[index] jobs.append(job_name) comm_str = self.run_command(index) lines_run = [] lines_run.append('#SBATCH --output=%s.out'%job_name) lines_run.append('') lines_run.append('export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK') lines_run.append('') lines_run.append('echo "Job id $SLURM_JOB_ID"') lines_run.append('echo "Number of mpi $SLURM_NTASKS"') lines_run.append('echo "Number of threads per task $SLURM_CPUS_PER_TASK"') lines_run.append('') lines_run.append('echo "execute : %s"'%comm_str) lines_run.append(comm_str) lines_run.append('') lines_run.append('echo "JOB_DONE"') f = open(os.path.join(run_dir,job_name+'.sh'),'w') f.write('\n'.join(lines_options+lines_run)) f.close() return jobs
[docs] def run_command(self,index): """ Define the run command used to run the computation associated to the input file $names[index]. The value of the command depends on the chosen scheduler. Args: index (:py:class:`int`) : index of the computation to be performed Return: :py:class:`string` : command that runs the computation associated to the $names[index] input file """ scheduler = self.run_options.get('scheduler') executable = self.run_options.get('executable') mpi = self.run_options.get('mpi') mpi_run = self.run_options.get('mpi_run') run_dir = self.run_options.get('run_dir', '.') names = self.run_options.get('names') verbose = self.run_options.get('verbose') if scheduler == 'direct': set_run_dir = 'cd %s; '%run_dir command = set_run_dir + mpi_run + ' ' + str(mpi) + ' ' + executable if scheduler == 'slurm': command = mpi_run + ' ' + str(mpi) + ' ' + executable input_name = names[index] + '.in' output_name = names[index] + '.log' comm_str = command + ' -inp %s > %s'%(input_name,output_name) if verbose: print('run %s command: %s' %(index,comm_str)) return comm_str
def _jobs_terminated(self,jobs): """ Check the status of the running jobs. Args: jobs (:py:class:`list`) : list with the reference to the running jobs Return: :py:class:`list`: list with the status of the jobs. The elements are True if the associated computation is terminated and False if it is running """ scheduler = self.run_options.get('scheduler') run_dir = self.run_options.get('run_dir', '.') if scheduler == 'direct': jobs_terminated = [not job.is_alive() for job in jobs] if scheduler == 'slurm': jobs_terminated = [] for job in jobs: job_out = os.path.join(run_dir,job+'.out') if not os.path.isfile(job_out): jobs_terminated.append(False) else: with open(job_out, 'r') as f: last_line = f.read().splitlines()[-1] if last_line == 'JOB_DONE': jobs_terminated.append(True) else: jobs_terminated.append(False) return jobs_terminated def _ensure_run_directory(self): from mppi.Utilities import FutileUtils as f run_dir = self.run_options.get('run_dir', '.') # Restrict run_dir to a sub-directory if ("/" in run_dir or run_dir == ".."): raise ValueError( "run_dir '%s' must be a sub-directory"% run_dir) # Create the run_dir if not exist if f.ensure_dir(run_dir) and self.run_options['verbose']: print("Create the sub-directory '%s'" % run_dir) def _copy_source_dir(self,source_dir): """ Copy the source_dir in the run_dir and atttibute to the copied folder the name $prefix, for all the inputs. Args: source_dir: the name of the source_dir including its relative path. A source_dir outer respect to the actual run_dir of the instance of QeCalculator can be used. """ from shutil import copytree verbose = self.run_options.get('verbose') run_dir = self.run_options.get('run_dir', '.') inputs = self.run_options.get('inputs') for input in inputs: prefix = input['control']['prefix'].strip("'") dest_dir = os.path.join(run_dir,prefix)+'.save' if not os.path.isdir(dest_dir): if verbose: print('Copy source_dir %s in the %s'%(source_dir,dest_dir)) copytree(source_dir,dest_dir) else: if verbose: print('The folder %s already exsists. Source folder % s not copied' %(dest_dir,source_dir)) def _clean_run_dir(self): """ Clean the run_dir before performing the computation. Delete the $name.log file, the $prefix.xml file and the folder run_dir/prefix.save associated to all the inputs and names. """ run_dir = self.run_options.get('run_dir', '.') names = self.run_options.get('names','default') inputs = self.run_options.get('inputs') verbose = self.run_options.get('verbose') for input,name in zip(inputs,names): logfile = os.path.join(run_dir,name)+'.log' prefix = input['control']['prefix'].strip("'") xmlfile = os.path.join(run_dir,prefix)+'.xml' outdir = os.path.join(run_dir,prefix)+'.save' if os.path.isfile(logfile): if verbose: print('delete log file:',logfile) os.system('rm %s'%logfile) if os.path.isfile(xmlfile): if verbose: print('delete xml file:',xmlfile) os.system('rm %s'%xmlfile) if os.path.isdir(outdir): if verbose: print('delete folder:',outdir) os.system('rm -r %s'%outdir)