pymtt
 All Classes Namespaces Files Functions Variables Groups
OpenMPI.py
Go to the documentation of this file.
1 # -*- coding: utf-8; tab-width: 4; indent-tabs-mode: f; python-indent: 4 -*-
2 #
3 # Copyright (c) 2015-2018 Intel, Inc. All rights reserved.
4 # Copyright (c) 2017 Los Alamos National Security, LLC. All rights
5 # reserved.
6 # $COPYRIGHT$
7 #
8 # Additional copyrights may follow
9 #
10 # $HEADER$
11 #
12 
13 from __future__ import print_function
14 import os
15 from LauncherMTTTool import *
16 import shlex
17 
18 ## @addtogroup Tools
19 # @{
20 # @addtogroup Launcher
21 # @section OpenMPI
22 # Plugin for using the Open MPI mpirun launch tool
23 # @param hostfile The hostfile for OpenMPI to use
24 # @param command Command for executing the application
25 # @param np Number of processes to run
26 # @param ppn Number of processes per node to run
27 # @param timeout Maximum execution time - terminate a test if it exceeds this time
28 # @param options Comma-delimited sets of command line options that shall be used on each test
29 # @param skipped Exit status of a test that declares it was skipped
30 # @param merge_stdout_stderr Merge stdout and stderr into one output stream
31 # @param stdout_save_lines Number of lines of stdout to save
32 # @param stderr_save_lines Number of lines of stderr to save
33 # @param test_dir Names of directories to be scanned for tests
34 # @param fail_tests Names of tests that are expected to fail
35 # @param fail_returncodes Expected return codes of tests expected to fail
36 # @param fail_timeout Maximum execution time for tests expected to fail
37 # @param skip_tests Names of tests to be skipped
38 # @param max_num_tests Maximum number of tests to run
39 # @param test_list List of tests to run, default is all
40 # @param allocate_cmd Command to use for allocating nodes from the resource manager
41 # @param deallocate_cmd Command to use for deallocating nodes from the resource manager
42 # @}
44 
45  def __init__(self):
46  # initialise parent class
47  LauncherMTTTool.__init__(self)
48  self.options = {}
49  self.options['hostfile'] = (None, "The hostfile for OpenMPI to use")
50  self.options['command'] = ("mpirun", "Command for executing the application")
51  self.options['np'] = (None, "Number of processes to run")
52  self.options['ppn'] = (None, "Number of processes per node to run")
53  self.options['timeout'] = (None, "Maximum execution time - terminate a test if it exceeds this time")
54  self.options['options'] = (None, "Comma-delimited sets of command line options that shall be used on each test")
55  self.options['skipped'] = ("77", "Exit status of a test that declares it was skipped")
56  self.options['merge_stdout_stderr'] = (False, "Merge stdout and stderr into one output stream")
57  self.options['stdout_save_lines'] = (-1, "Number of lines of stdout to save")
58  self.options['stderr_save_lines'] = (-1, "Number of lines of stderr to save")
59  self.options['test_dir'] = (None, "Names of directories to be scanned for tests")
60  self.options['fail_tests'] = (None, "Names of tests that are expected to fail")
61  self.options['fail_returncodes'] = (None, "Expected returncodes of tests expected to fail")
62  self.options['fail_timeout'] = (None, "Maximum execution time for tests expected to fail")
63  self.options['skip_tests'] = (None, "Names of tests to be skipped")
64  self.options['max_num_tests'] = (None, "Maximum number of tests to run")
65  self.options['test_list'] = (None, "List of tests to run, default is all")
66  self.options['allocate_cmd'] = (None, "Command to use for allocating nodes from the resource manager")
67  self.options['deallocate_cmd'] = (None, "Command to use for deallocating nodes from the resource manager")
68 
69  self.allocated = False
70  self.testDef = None
71  self.cmds = None
72  return
73 
74 
75  def activate(self):
76  # use the automatic procedure from IPlugin
77  IPlugin.activate(self)
78  return
79 
80 
81  def deactivate(self):
82  IPlugin.deactivate(self)
83  if self.allocated and self.testDef and self.cmds:
84  deallocate_cmdargs = shlex.split(self.cmds['deallocate_cmd'])
85  _status,_stdout,_stderr,_time = self.testDef.execmd.execute(self.cmds, deallocate_cmdargs, self.testDef)
86  self.allocated = False
87 
88  def print_name(self):
89  return "OpenMPI"
90 
91  def print_options(self, testDef, prefix):
92  lines = testDef.printOptions(self.options)
93  for line in lines:
94  print(prefix + line)
95  return
96 
97  def execute(self, log, keyvals, testDef):
98 
99  self.testDef = testDef
100 
101  midpath = False
102 
103  testDef.logger.verbose_print("OpenMPI Launcher")
104  # check the log for the title so we can
105  # see if this is setting our default behavior
106  try:
107  if log['section'] is not None:
108  if "Default" in log['section']:
109  # this section contains default settings
110  # for this launcher
111  myopts = {}
112  testDef.parseOptions(log, self.options, keyvals, myopts)
113  # transfer the findings into our local storage
114  keys = list(self.options.keys())
115  optkeys = list(myopts.keys())
116  for optkey in optkeys:
117  for key in keys:
118  if key == optkey:
119  self.options[key] = (myopts[optkey],self.options[key][1])
120  # we captured the default settings, so we can
121  # now return with success
122  log['status'] = 0
123  return
124  except KeyError:
125  # error - the section should have been there
126  log['status'] = 1
127  log['stderr'] = "Section not specified"
128  return
129  # must be executing a test of some kind - the install stage
130  # must be specified so we can find the tests to be run
131  try:
132  parent = keyvals['parent']
133  if parent is not None:
134  # get the log entry as it contains the location
135  # of the built tests
136  bldlog = testDef.logger.getLog(parent)
137  try:
138  location = bldlog['location']
139  except KeyError:
140  # if it wasn't recorded, then there is nothing
141  # we can do
142  log['status'] = 1
143  log['stderr'] = "Location of built tests was not provided"
144  return
145  # check for modules used during the build of these tests
146  try:
147  if bldlog['parameters'] is not None:
148  for md in bldlog['parameters']:
149  if "modules" == md[0]:
150  try:
151  if keyvals['modules'] is not None:
152  # append these modules to those
153  mods = md[1].split(',')
154  newmods = modules.split(',')
155  for md in newmods:
156  mods.append(md)
157  keyvals['modules'] = ','.join(mods)
158  except KeyError:
159  keyvals['modules'] = md[1]
160  break
161  except KeyError:
162  pass
163  # get the log of any middleware so we can get its location
164  try:
165  midlog = testDef.logger.getLog(bldlog['middleware'])
166  if midlog is not None:
167  # get the location of the middleware
168  try:
169  if midlog['location'] is not None:
170  # prepend that location to our paths
171  try:
172  oldbinpath = os.environ['PATH']
173  pieces = oldbinpath.split(':')
174  except KeyError:
175  oldbinpath = ""
176  pieces = []
177  bindir = os.path.join(midlog['location'], "bin")
178  pieces.insert(0, bindir)
179  newpath = ":".join(pieces)
180  os.environ['PATH'] = newpath
181  # prepend the loadable lib path
182  try:
183  oldldlibpath = os.environ['LD_LIBRARY_PATH']
184  pieces = oldldlibpath.split(':')
185  except KeyError:
186  oldldlibpath = ""
187  pieces = []
188  bindir = os.path.join(midlog['location'], "lib")
189  pieces.insert(0, bindir)
190  newpath = ":".join(pieces)
191  os.environ['LD_LIBRARY_PATH'] = newpath
192 
193  # mark that this was done
194  midpath = True
195  except KeyError:
196  # if it was already installed, then no location would be provided
197  pass
198  # check for modules required by the middleware
199  try:
200  if midlog['parameters'] is not None:
201  for md in midlog['parameters']:
202  if "modules" == md[0]:
203  try:
204  if keyvals['modules'] is not None:
205  # append these modules to those
206  mods = md[1].split(',')
207  newmods = modules.split(',')
208  for md in newmods:
209  mods.append(md)
210  keyvals['modules'] = ','.join(mods)
211  except KeyError:
212  keyvals['modules'] = md[1]
213  break
214  except KeyError:
215  pass
216  except KeyError:
217  pass
218  except KeyError:
219  log['status'] = 1
220  log['stderr'] = "Parent test build stage was not provided"
221  return
222  # parse any provided options - these will override the defaults
223  cmds = {}
224  testDef.parseOptions(log, self.options, keyvals, cmds)
225  self.cmds = cmds
226  # now ready to execute the test - we are pointed at the middleware
227  # and have obtained the list of any modules associated with it. We need
228  # to change to the test location and begin executing, first saving
229  # our current location so we can return when done
230  cwd = os.getcwd()
231  os.chdir(location)
232  # did they give us a list of specific directories where the desired
233  # tests to be executed reside?
234  tests = []
235  if cmds['test_list'] is None:
236  try:
237  if cmds['test_dir'] is not None:
238  # pick up the executables from the specified directories
239  dirs = cmds['test_dir'].split()
240  for dr in dirs:
241  dr = dr.strip()
242  # remove any commas and quotes
243  dr = dr.replace('\"','')
244  dr = dr.replace(',','')
245  for dirName, subdirList, fileList in os.walk(dr):
246  for fname in fileList:
247  # see if this is an executable
248  filename = os.path.abspath(os.path.join(dirName,fname))
249  if os.path.isfile(filename) and os.access(filename, os.X_OK):
250  # add this file to our list of tests to execute
251  tests.append(filename)
252  else:
253  # get the list of executables from this directory and any
254  # subdirectories beneath it
255  for dirName, subdirList, fileList in os.walk("."):
256  for fname in fileList:
257  # see if this is an executable
258  filename = os.path.abspath(os.path.join(dirName,fname))
259  if os.path.isfile(filename) and os.access(filename, os.X_OK):
260  # add this file to our list of tests to execute
261  tests.append(filename)
262  except KeyError:
263  # get the list of executables from this directory and any
264  # subdirectories beneath it
265  for dirName, subdirList, fileList in os.walk("."):
266  for fname in fileList:
267  # see if this is an executable
268  filename = os.path.abspath(os.path.join(dirName,fname))
269  if os.path.isfile(filename) and os.access(filename, os.X_OK):
270  # add this file to our list of tests to execute
271  tests.append(filename)
272  # If list of tests is provided, use list rather than grabbing all tests
273  else:
274  if cmds['test_dir'] is not None:
275  dirs = cmds['test_dir'].split()
276  else:
277  dirs = ['.']
278  for dr in dirs:
279  dr = dr.strip()
280  dr = dr.replace('\"','')
281  dr = dr.replace(',','')
282  for dirName, subdirList, fileList in os.walk(dr):
283  for fname in cmds['test_list'].split(","):
284  fname = fname.strip()
285  if fname not in fileList:
286  continue
287  filename = os.path.abspath(os.path.join(dirName,fname))
288  if os.path.isfile(filename) and os.access(filename, os.X_OK):
289  tests.append(filename)
290 
291  # check that we found something
292  if not tests:
293  log['status'] = 1
294  log['stderr'] = "No tests found"
295  os.chdir(cwd)
296  return
297  # get the "skip" exit status
298  skipStatus = int(cmds['skipped'])
299  # assemble the command
300  cmdargs = cmds['command'].split()
301  if cmds['np'] is not None:
302  cmdargs.append("-np")
303  cmdargs.append(cmds['np'])
304  if cmds['ppn'] is not None:
305  cmdargs.append("-N")
306  cmdargs.append(cmds['ppn'])
307  if cmds['hostfile'] is not None:
308  cmdargs.append("-hostfile")
309  cmdargs.append(cmds['hostfile'])
310  if cmds['timeout'] is not None:
311  cmdargs.append("--timeout")
312  cmdargs.append(cmds['timeout'])
313  # cycle thru the list of tests and execute each of them
314  log['testresults'] = []
315  finalStatus = 0
316  finalError = ""
317  numTests = 0
318  numPass = 0
319  numSkip = 0
320  numFail = 0
321  if cmds['max_num_tests'] is not None:
322  maxTests = int(cmds['max_num_tests'])
323  else:
324  maxTests = 10000000
325 
326  fail_tests = cmds['fail_tests']
327  if fail_tests is not None:
328  fail_tests = [t.strip() for t in fail_tests.split(",")]
329  else:
330  fail_tests = []
331  for i,t in enumerate(fail_tests):
332  for t2 in tests:
333  if t2.split("/")[-1] == t:
334  fail_tests[i] = t2
335  fail_returncodes = cmds['fail_returncodes']
336  if fail_returncodes is not None:
337  fail_returncodes = [int(t.strip()) for t in fail_returncodes.split(",")]
338 
339  if fail_tests is None:
340  expected_returncodes = {test:0 for test in tests}
341  else:
342  if fail_returncodes is None:
343  expected_returncodes = {test:(None if test in fail_tests else 0) for test in tests}
344  else:
345  fail_returncodes = {test:rtncode for test,rtncode in zip(fail_tests,fail_returncodes)}
346  expected_returncodes = {test:(fail_returncodes[test] if test in fail_returncodes else 0) for test in tests}
347 
348  # Allocate cluster
349  self.allocated = False
350  if cmds['allocate_cmd'] is not None and cmds['deallocate_cmd'] is not None:
351  self.allocated = True
352  allocate_cmdargs = shlex.split(cmds['allocate_cmd'])
353  _status,_stdout,_stderr,_time = testDef.execmd.execute(cmds, allocate_cmdargs, testDef)
354  if 0 != _status:
355  log['status'] = _status
356  log['stderr'] = _stderr
357  os.chdir(cwd)
358  return
359 
360  for test in tests:
361  # Skip tests that are in "skip_tests" ini input
362  if cmds['skip_tests'] is not None and test.split('/')[-1] in [st.strip() for st in cmds['skip_tests'].split()]:
363  numTests += 1
364  numSkip += 1
365  if numTests == maxTests:
366  break
367  continue
368  testLog = {'test':test}
369  cmdargs.append(test)
370  testLog['cmd'] = " ".join(cmdargs)
371 
372  harass_exec_ids = testDef.harasser.start(testDef)
373 
374  harass_check = testDef.harasser.check(harass_exec_ids, testDef)
375  if harass_check is not None:
376  testLog['stderr'] = 'Not all harasser scripts started. These failed to start: ' \
377  + ','.join([h_info[1]['start_script'] for h_info in harass_check[0]])
378  testLog['time'] = sum([r_info[3] for r_info in harass_check[1]])
379  testLog['status'] = 1
380  finalStatus = 1
381  finalError = testLog['stderr']
382  numFail = numFail + 1
383  testDef.harasser.stop(harass_exec_ids, testDef)
384  continue
385 
386  status,stdout,stderr,time = testDef.execmd.execute(cmds, cmdargs, testDef)
387 
388  testDef.harasser.stop(harass_exec_ids, testDef)
389 
390  if ((expected_returncodes[test] is None and 0 == status) or (expected_returncodes[test] is not None and expected_returncodes[test] != status)) and skipStatus != status and 0 == finalStatus:
391  if expected_returncodes[test] == 0:
392  finalStatus = status
393  else:
394  finalStatus = 1
395  finalError = stderr
396  if (expected_returncodes[test] is None and 0 != status) or (expected_returncodes[test] == status):
397  numPass = numPass + 1
398  elif skipStatus == status:
399  numSkip = numSkip + 1
400  else:
401  numFail = numFail + 1
402  if expected_returncodes[test] == 0:
403  testLog['status'] = status
404  else:
405  if status == expected_returncodes[test]:
406  testLog['status'] = 0
407  else:
408  testLog['status'] = 1
409  testLog['stdout'] = stdout
410  testLog['stderr'] = stderr
411  testLog['time'] = time
412  log['testresults'].append(testLog)
413  cmdargs = cmdargs[:-1]
414  numTests = numTests + 1
415  if numTests == maxTests:
416  break
417 
418  # Deallocate cluster
419  if cmds['allocate_cmd'] is not None and cmds['deallocate_cmd'] is not None and self.allocated:
420  deallocate_cmdargs = shlex.split(cmds['deallocate_cmd'])
421  _status,_stdout,_stderr,_time = testDef.execmd.execute(cmds, deallocate_cmdargs, testDef)
422  if 0 != _status:
423  log['status'] = _status
424  log['stderr'] = _stderr
425  os.chdir(cwd)
426  return
427  self.allocated = False
428 
429  log['status'] = finalStatus
430  log['stderr'] = finalError
431  log['numTests'] = numTests
432  log['numPass'] = numPass
433  log['numSkip'] = numSkip
434  log['numFail'] = numFail
435  try:
436  log['np'] = cmds['np']
437  except KeyError:
438  log['np'] = None
439 
440  # if we added middleware to the paths, remove it
441  if midpath:
442  os.environ['PATH'] = oldbinpath
443  os.environ['LD_LIBRARY_PATH'] = oldldlibpath
444 
445  os.chdir(cwd)
446  return
def print_options
Definition: OpenMPI.py:91
def deactivate
Definition: OpenMPI.py:81
def print_name
Definition: OpenMPI.py:88