Package pilot :: Package impl :: Module pilotcompute_manager

Source Code for Module pilot.impl.pilotcompute_manager

  1  """ B{PilotCompute Module}: Implementation of L{PilotComputeService}, L{PilotCompute}, L{ComputeUnit}. 
  2  """ 
  3   
  4  import sys 
  5  import getopt 
  6  import time 
  7  import pdb 
  8  import os 
  9  import traceback 
 10  import logging 
 11  import uuid 
 12   
 13   
 14  """ import bigjob classes """ 
 15  from bigjob import logger 
 16  from bigjob import bigjob, subjob, description 
 17   
 18  """ import API objects """ 
 19  from pilot.api import PilotCompute, PilotComputeService, ComputeUnit, State 
 20   
 21  """ Configure coordination backend here """ 
 22  #from pilot.coordination.advert import AdvertCoordinationAdaptor as CoordinationAdaptor 
 23  from pilot.coordination.redis_adaptor import RedisCoordinationAdaptor as CoordinationAdaptor 
 24   
 25   
 26  """ This variable defines the coordination system that is used by BigJob 
 27      e.g.  
 28          advert://localhost (SAGA/Advert SQLITE) 
 29          advert://advert.cct.lsu.edu:8080 (SAGA/Advert POSTGRESQL) 
 30          redis://localhost:6379 (Redis at localhost) 
 31          tcp://localhost (ZMQ) 
 32  """ 
 33  COORDINATION_URL = "redis://localhost" 
 34   
 35   
36 -class PilotComputeService(PilotComputeService):
37 """ B{PilotComputeService (PCS).} 38 39 Factory for L{PilotCompute}s.. 40 41 """ 42 PJS_ID_PREFIX="pcs-" 43 44
45 - def __init__(self, coordination_url=COORDINATION_URL, pcs_url=None):
46 """ Create a PilotJobService object. 47 48 Keyword arguments: 49 pcs_id -- Don't create a new, but connect to an existing (optional) 50 """ 51 self.pilot_computes=[] 52 self.coordination_url=coordination_url 53 self.coordination_queue="" 54 if pcs_url==None: # new pcs 55 self.id = self.PJS_ID_PREFIX+str(uuid.uuid1()) 56 self.url = os.path.join(self.coordination_url, "pcs", self.id) 57 self.coordination_queue = "PilotComputeServiceQueue-" + str(self.id) 58 logger.debug("Created Pilot Compute Service: %s"%self.url) 59 else: 60 logger.error("Reconnect to PilotComputeService currently not supported.")
61 62 63
64 - def create_pilot(self, rm=None, pilot_compute_description=None, pj_type=None, context=None):
65 """ Add a PilotJob to the PilotJobService 66 67 Keyword arguments: 68 pilot_compute_description -- PilotJob Description 69 70 Return value: 71 A PilotCompute object 72 """ 73 bj_dict = self.__translate_pj_bj_description(pilot_compute_description) 74 bj = self.__start_bigjob(bj_dict) 75 pj = PilotCompute(self, bj, pilot_compute_description) 76 self.pilot_computes.append(pj) 77 return pj
78 79
80 - def list_pilots(self):
81 """ List managed L{PilotCompute}s. 82 83 Return value: 84 A list of L{PilotCompute} urls 85 86 """ 87 return self.pilot_computes
88 89
90 - def cancel(self):
91 """ Cancel the PilotComputeService. 92 93 This also cancels all the PilotJobs that were under control of this PJS. 94 95 Keyword arguments: 96 None 97 98 Return value: 99 Result of operation 100 """ 101 for i in self.pilot_computes: 102 i.cancel()
103 104
105 - def __repr__(self):
106 status_string = "{\n" 107 for i in self.pilot_computes: 108 status_string = status_string + " PJ: %s, State: %s;"%(i.get_url(), i.get_state()) 109 status_string = status_string + "\n}" 110 return status_string
111 112 113 ########################################################################### 114 # Internal methods
115 - def __translate_pj_bj_description(self, pilot_compute_description):
116 resource_description={} 117 if pilot_compute_description.has_key("service_url"): 118 resource_description["resource_url"] = pilot_compute_description["service_url"] 119 120 if pilot_compute_description.has_key("queue"): 121 resource_description["queue"] = pilot_compute_description["queue"] 122 else: 123 resource_description["queue"] = None 124 125 if pilot_compute_description.has_key("allocation"): 126 resource_description["allocation"] = pilot_compute_description["allocation"] 127 else: 128 resource_description["allocation"] = None 129 130 for i in pilot_compute_description.keys(): 131 resource_description[i] = pilot_compute_description[i] 132 133 resource_description["pilot_compute_description"] = pilot_compute_description 134 135 return resource_description
136 137
138 - def __start_bigjob(self, bj_dict):
139 """ private method - starts a bigjob on the defined resource """ 140 gram_url = bj_dict["resource_url"] 141 logger.debug("start bigjob at: " + gram_url) 142 bj = bigjob(self.coordination_url) 143 ppn="1" 144 if ("processes_per_node" in bj_dict): 145 ppn=bj_dict["processes_per_node"] 146 else: 147 bj_dict["processes_per_node"]="1" 148 149 walltime = 3600 150 if ("walltime" in bj_dict): 151 walltime=bj_dict["walltime"] 152 153 working_directory = None 154 if ("working_directory" in bj_dict): 155 working_directory=bj_dict["working_directory"] 156 157 bj_filetransfer = None 158 if ("file_transfer" in bj_dict): 159 bj_filetransfer = bj_dict["file_transfer"] 160 161 162 bj.start_pilot_job(lrms_url = gram_url, 163 number_nodes = bj_dict["number_of_processes"], 164 queue = bj_dict["queue"], 165 project = bj_dict["allocation"], 166 working_directory = working_directory, 167 walltime = walltime, 168 processes_per_node = ppn, 169 filetransfers = bj_filetransfer, 170 external_queue = self.coordination_queue, 171 pilot_compute_description = bj_dict["pilot_compute_description"] 172 ) 173 return bj
174 175 ############################################################################### 176
177 -class PilotCompute(PilotCompute):
178 """ B{Pilot Compute (PC).} 179 180 This is the object that is returned by the PilotComputeService when a 181 new PilotCompute is created based on a PilotComputeDescription. 182 183 The PilotCompute object can be used by the application to keep track 184 of active pilots. 185 186 A ComputePilot has state, can be queried and cancelled. 187 188 Properties: 189 190 - state: 191 The state of the pilot. 192 193 - id: 194 The id may be 'None' if the Pilot is not yet in Running state. 195 The returned ID can be used to connect to the CP instance later 196 on, for example from within a different application instance. 197 type: string (url) 198 199 - pilot_compute_description: 200 The PilotComputeDescription used to create this pilot. 201 202 """
203 - def __init__(self, pilot_compute_service=None, 204 bigjob_object=None, 205 pilot_compute_description=None, 206 pilot_url=None): # for reconnecting
207 208 """ Create/reconnect to a Pilot Compute. 209 210 Keyword arguments: 211 pilot_url -- restore from cp_id 212 213 The implementation will attempt to reconnect to the PC instance 214 referenced by the pilot_url. 215 216 """ 217 self.__subjobs = [] 218 self.__pilot_compute_service = None 219 if pilot_url==None: 220 logger.debug("Create PilotCompute for BigJob: " + str(bigjob_object)) 221 self.pilot_compute_description=pilot_compute_description 222 self.__pilot_compute_service=pilot_compute_service 223 self.__bigjob = bigjob_object 224 else: 225 logger.debug("Reconnect to an existing Pilot Compute") 226 self.__bigjob = bigjob(pilot_url=pilot_url) 227 228 # Store the URL of pilot compute service for later reference 229 # This URL is used as central queue for a set of BJs in the 230 # ComputeDataServiceDecentral 231 if self.__pilot_compute_service!=None: 232 self.coordination_queue = pilot_compute_service.coordination_queue
233 234
235 - def cancel(self):
236 """ Terminates the pilot """ 237 self.__bigjob.cancel()
238 239
240 - def get_state(self):
241 """ Returns the state of the pilot """ 242 return self.__bigjob.get_state()
243 244
245 - def wait(self):
246 """ Wait until Pilot Compute to enter a final state (Done, Cancel or Failed) 247 248 It is not an error to call wait() in a final state -- the call simply 249 returns immediately. 250 """ 251 self.__bigjob.wait()
252 253
254 - def list_compute_units(self):
255 """ list managed L{ComputeUnit}s. 256 257 Return value: 258 A list of L{ComputeUnit} IDs 259 260 The returned list can include units which have been submitted to 261 this pilot. 262 """ 263 sj_list = self.__bigjob.list_subjobs() 264 cu_list = [] 265 for i in sj_list: 266 cu_list.append(ComputeUnit(cu_url=i)) 267 return cu_list
268 269
270 - def get_url(self):
271 """ Get unique URL referencing the Pilot Compute 272 This URL can be used to reconnect to the Pilot Compute 273 """ 274 return self.__bigjob.get_url()
275 276
277 - def get_free_nodes(self):
278 """ Returns the number of free slots available within the pilot """ 279 return self.__bigjob.get_free_nodes()
280 281
282 - def get_details(self):
283 """ returns a dict that contains the details of the Pilot Compute, 284 - job state 285 - description 286 - ... 287 """ 288 return self.__bigjob.get_details()
289 290
291 - def submit_compute_unit(self, compute_unit_description):
292 """ 293 Submit a CU to this pilot. 294 295 @param compute_unit_description: The L{ComputeUnitDescription} or dictionary describing 296 the compute task 297 298 @return: L{ComputeUnit} object 299 300 The CUD is (possibly translated and) passed on to the PDS scheduler, 301 which will attempt to instantiate the described workload process on 302 the managed set of Pilot Computes. 303 304 On success, the returned CU is in Pending state (or moved into any 305 state downstream from Pending). 306 307 The call will will honor all attributes set on the CUD. Attributes which 308 are not explicitly set are interpreted as having default values (see 309 documentation of CUD), or, where default values are not specified, 310 are ignored. 311 """ 312 cu = ComputeUnit(compute_unit_description) 313 return self._submit_cu(cu)
314 315
316 - def __repr__(self):
317 return str(self.__bigjob)
318 319 320 ########################################################################### 321 # Internal methods 322
323 - def _submit_cu(self, compute_unit):
324 """ Submits compute unit to Bigjob """ 325 logger.debug("Submit CU to big-job") 326 sj = subjob() 327 sj.submit_job(self.__bigjob.pilot_url, compute_unit.subjob_description) 328 self.__subjobs.append(sj) 329 compute_unit._update_subjob(sj) 330 return compute_unit
331 332 333 ############################################################################### 334
335 -class ComputeUnit(ComputeUnit):
336 """ B{ComputeUnit (CU).} 337 338 This is the object that is returned by the ComputeDataService when a 339 new ComputeUnit is submitted based on a ComputeUnitDescription. 340 341 The ComputeUnit object can be used by the application to keep track 342 of ComputeUnits that are active. 343 344 A ComputeUnit has state, can be queried and can be cancelled. 345 346 """ 347 348 CU_ID_PREFIX="cu-" 349
350 - def __init__(self, compute_unit_description=None, compute_data_service=None, cu_url=None):
351 352 if cu_url==None: 353 self.id = self.CU_ID_PREFIX + str(uuid.uuid1()) 354 if compute_data_service!=None: 355 self.url = compute_data_service.url + "/" + self.id 356 logger.debug("Created CU: %s"%self.url) 357 self.state = State.New 358 self.__subjob = None # reference to BigJob Subjob 359 self.compute_unit_description = compute_unit_description # CU Description 360 self.subjob_description = self.__translate_cu_sj_description(compute_unit_description) 361 else: 362 self.__subjob = subjob(subjob_url=cu_url)
363 364
365 - def get_id(self):
366 """Returns unique identifier of Compute Unit. Deprecated: Please use get_url() instead.""" 367 return self.id
368 369
370 - def get_url(self):
371 """Returns URL of Compute Unit. This URL can be used to reconnect to this Compute Unit later on.""" 372 if self.__subjob!=None: 373 return self.__subjob.get_url() 374 else: 375 return self.get_id()
376 377
378 - def get_details(self):
379 """Returns dict with Compute Unit Details (e.g. job description, timings)""" 380 if self.__subjob!=None: 381 return self.__subjob.get_details() 382 else: 383 return None
384 385
386 - def get_state(self):
387 """Returns current state of Compute Unit""" 388 if self.__subjob != None: 389 self.state = self.__subjob.get_state() 390 return self.state
391 392
393 - def wait(self):
394 """ Wait until in Done state 395 (or Failed state) 396 """ 397 while True: 398 state = self.get_state() 399 logger.debug("Compute Unit: %s, State: %s"%(self.id, state)) 400 if state==State.Done or state==State.Failed: 401 break 402 time.sleep(2)
403 404
405 - def cancel(self):
406 """Terminates Compute Unit""" 407 if self.__subjob != None: 408 return self.__subjob.cancel() 409 return None
410
411 - def __repr__(self):
412 return self.id
413 414
415 - def _update_compute_unit_description(self, compute_unit_description):
416 self.compute_unit_description = compute_unit_description # CU Description 417 self.subjob_description = self.__translate_cu_sj_description(compute_unit_description)
418
419 - def _update_subjob(self, subjob):
420 self.__subjob = subjob
421 422 # INTERNAL
423 - def __translate_cu_sj_description(self, compute_unit_description):
424 jd = description() 425 if compute_unit_description.has_key("executable"): 426 jd.executable = compute_unit_description["executable"] 427 if compute_unit_description.has_key("spmd_variation"): 428 jd.spmd_variation = compute_unit_description["spmd_variation"] 429 else: 430 jd.spmd_variation = "single" 431 if compute_unit_description.has_key("arguments"): 432 jd.arguments = compute_unit_description["arguments"] 433 if compute_unit_description.has_key("environment"): 434 jd.environment = compute_unit_description["environment"] 435 436 # handling number of processes 437 if compute_unit_description.has_key("number_of_processes"): 438 jd.number_of_processes=int(compute_unit_description["number_of_processes"]) 439 elif compute_unit_description.has_key("total_cpu_count"): 440 jd.number_of_processes=int(compute_unit_description["total_cpu_count"]) 441 else: 442 jd.number_of_processes=1 443 444 if compute_unit_description.has_key("working_directory"): 445 jd.working_directory = compute_unit_description["working_directory"] 446 if compute_unit_description.has_key("output"): 447 jd.output = compute_unit_description["output"] 448 if compute_unit_description.has_key("error"): 449 jd.error = compute_unit_description["error"] 450 if compute_unit_description.has_key("file_transfer"): 451 jd.file_transfer=compute_unit_description["file_transfer"] 452 if compute_unit_description.has_key("input_data"): 453 jd.input_data=compute_unit_description["input_data"] 454 if compute_unit_description.has_key("output_data"): 455 jd.output_data=compute_unit_description["output_data"] 456 return jd
457