1  """ B{PilotCompute Module}: Implementation of L{PilotComputeService}, L{PilotCompute}, L{ComputeUnit}. 
  2  """ 
  3   
  4  import sys 
  5  import getopt 
  6  import time 
  7  import pdb 
  8  import os 
  9  import traceback 
 10  import logging 
 11  import uuid 
 12   
 13   
 14  """ import bigjob classes """ 
 15  from bigjob import logger 
 16  from bigjob import bigjob, subjob, description 
 17   
 18  """ import API objects """ 
 19  from pilot.api import PilotCompute, PilotComputeService, ComputeUnit, State 
 20   
 21  """ Configure coordination backend here """ 
 22   
 23  from pilot.coordination.redis_adaptor import RedisCoordinationAdaptor as CoordinationAdaptor 
 24   
 25   
 26  """ This variable defines the coordination system that is used by BigJob 
 27      e.g.  
 28          advert://localhost (SAGA/Advert SQLITE) 
 29          advert://advert.cct.lsu.edu:8080 (SAGA/Advert POSTGRESQL) 
 30          redis://localhost:6379 (Redis at localhost) 
 31          tcp://localhost (ZMQ) 
 32  """ 
 33  COORDINATION_URL = "redis://localhost" 
 34   
 35   
 37      """ B{PilotComputeService (PCS).} 
 38       
 39          Factory for L{PilotCompute}s..  
 40                       
 41      """ 
 42      PJS_ID_PREFIX="pcs-"    
 43   
 44   
 46          """ Create a PilotJobService object. 
 47   
 48              Keyword arguments: 
 49              pcs_id -- Don't create a new, but connect to an existing (optional) 
 50          """ 
 51          self.pilot_computes=[] 
 52          self.coordination_url=coordination_url 
 53          self.coordination_queue="" 
 54          if pcs_url==None:       
 55              self.id = self.PJS_ID_PREFIX+str(uuid.uuid1()) 
 56              self.url = os.path.join(self.coordination_url, "pcs", self.id) 
 57              self.coordination_queue = "PilotComputeServiceQueue-" + str(self.id) 
 58              logger.debug("Created Pilot Compute Service: %s"%self.url) 
 59          else: 
 60              logger.error("Reconnect to PilotComputeService currently not supported.") 
  61           
 62               
 63   
 64 -    def create_pilot(self, rm=None, pilot_compute_description=None, pj_type=None, context=None): 
  65          """ Add a PilotJob to the PilotJobService 
 66   
 67              Keyword arguments: 
 68              pilot_compute_description -- PilotJob Description 
 69               
 70              Return value: 
 71              A PilotCompute object 
 72          """ 
 73          bj_dict = self.__translate_pj_bj_description(pilot_compute_description) 
 74          bj = self.__start_bigjob(bj_dict) 
 75          pj = PilotCompute(self, bj, pilot_compute_description) 
 76          self.pilot_computes.append(pj) 
 77          return pj 
  78              
 79       
 81          """ List managed L{PilotCompute}s. 
 82   
 83              Return value: 
 84              A list of L{PilotCompute} urls 
 85   
 86          """ 
 87          return self.pilot_computes 
  88           
 89       
 91          """ Cancel the PilotComputeService. 
 92   
 93              This also cancels all the PilotJobs that were under control of this PJS. 
 94   
 95              Keyword arguments: 
 96              None 
 97   
 98              Return value: 
 99              Result of operation 
100          """ 
101          for i in self.pilot_computes: 
102              i.cancel() 
 103               
104       
106          status_string = "{\n" 
107          for i in self.pilot_computes: 
108              status_string = status_string + " PJ: %s, State: %s;"%(i.get_url(), i.get_state()) 
109          status_string = status_string + "\n}" 
110          return status_string 
 111       
112       
113       
114       
116          resource_description={} 
117          if pilot_compute_description.has_key("service_url"): 
118              resource_description["resource_url"] = pilot_compute_description["service_url"]  
119               
120          if pilot_compute_description.has_key("queue"): 
121              resource_description["queue"] = pilot_compute_description["queue"]  
122          else: 
123              resource_description["queue"] = None 
124               
125          if pilot_compute_description.has_key("allocation"): 
126              resource_description["allocation"] = pilot_compute_description["allocation"]  
127          else: 
128              resource_description["allocation"] = None 
129           
130          for i in pilot_compute_description.keys(): 
131              resource_description[i] = pilot_compute_description[i]  
132           
133          resource_description["pilot_compute_description"] = pilot_compute_description 
134           
135          return resource_description 
 136   
137       
139          """ private method - starts a bigjob on the defined resource """ 
140          gram_url = bj_dict["resource_url"] 
141          logger.debug("start bigjob at: " + gram_url) 
142          bj = bigjob(self.coordination_url) 
143          ppn="1" 
144          if ("processes_per_node" in bj_dict): 
145              ppn=bj_dict["processes_per_node"] 
146          else: 
147              bj_dict["processes_per_node"]="1" 
148   
149          walltime = 3600 
150          if ("walltime" in bj_dict): 
151              walltime=bj_dict["walltime"] 
152   
153          working_directory = None 
154          if ("working_directory" in bj_dict): 
155              working_directory=bj_dict["working_directory"] 
156               
157          bj_filetransfer = None 
158          if ("file_transfer" in bj_dict): 
159              bj_filetransfer = bj_dict["file_transfer"] 
160   
161   
162          bj.start_pilot_job(lrms_url = gram_url, 
163                             number_nodes = bj_dict["number_of_processes"], 
164                             queue = bj_dict["queue"], 
165                             project = bj_dict["allocation"], 
166                             working_directory = working_directory,  
167                             walltime = walltime, 
168                             processes_per_node = ppn, 
169                             filetransfers = bj_filetransfer, 
170                             external_queue = self.coordination_queue, 
171                             pilot_compute_description = bj_dict["pilot_compute_description"] 
172                             ) 
173          return bj 
  174       
175   
176       
178      """ B{Pilot Compute (PC).} 
179       
180          This is the object that is returned by the PilotComputeService when a  
181          new PilotCompute is created based on a PilotComputeDescription. 
182   
183          The PilotCompute object can be used by the application to keep track  
184          of active pilots. 
185           
186          A ComputePilot has state, can be queried and cancelled. 
187   
188          Properties: 
189   
190            - state:             
191              The state of the pilot. 
192            
193            - id: 
194              The id may be 'None' if the Pilot is not yet in Running state. 
195              The returned ID can be used to connect to the CP instance later  
196              on, for example from within a different application instance.   
197              type: string (url) 
198   
199            - pilot_compute_description: 
200              The PilotComputeDescription used to create this pilot.             
201       
202      """      
203 -    def __init__(self, pilot_compute_service=None,  
204                         bigjob_object=None,  
205                         pilot_compute_description=None, 
206                         pilot_url=None):  
 207           
208          """ Create/reconnect to a Pilot Compute.   
209   
210              Keyword arguments: 
211              pilot_url   -- restore from cp_id 
212   
213              The implementation will attempt to reconnect to the PC instance 
214              referenced by the pilot_url.   
215   
216          """         
217          self.__subjobs = [] 
218          self.__pilot_compute_service = None 
219          if pilot_url==None: 
220              logger.debug("Create PilotCompute for BigJob: " + str(bigjob_object)) 
221              self.pilot_compute_description=pilot_compute_description 
222              self.__pilot_compute_service=pilot_compute_service 
223              self.__bigjob = bigjob_object         
224          else: 
225              logger.debug("Reconnect to an existing Pilot Compute") 
226              self.__bigjob = bigjob(pilot_url=pilot_url) 
227           
228           
229           
230           
231          if self.__pilot_compute_service!=None: 
232              self.coordination_queue = pilot_compute_service.coordination_queue 
 233               
234           
236          """ Terminates the pilot """ 
237          self.__bigjob.cancel()     
 238       
239       
241          """ Returns the state of the pilot """ 
242          return self.__bigjob.get_state()     
 243       
244       
246          """ Wait until Pilot Compute to enter a final state (Done, Cancel or Failed)  
247   
248          It is not an error to call wait() in a final state -- the call simply 
249          returns immediately.         
250          """ 
251          self.__bigjob.wait() 
 252       
253       
255          """ list managed L{ComputeUnit}s. 
256   
257              Return value: 
258              A list of L{ComputeUnit} IDs 
259   
260              The returned list can include units which have been submitted to 
261              this pilot. 
262          """ 
263          sj_list = self.__bigjob.list_subjobs() 
264          cu_list = [] 
265          for i in sj_list: 
266              cu_list.append(ComputeUnit(cu_url=i)) 
267          return cu_list 
 268       
269       
271          """ Get unique URL referencing the Pilot Compute  
272              This URL can be used to reconnect to the Pilot Compute         
273          """ 
274          return self.__bigjob.get_url() 
 275       
276       
278          """ Returns the number of free slots available within the pilot """ 
279          return self.__bigjob.get_free_nodes() 
 280       
281       
283          """ returns a dict that contains the details of the Pilot Compute,  
284                  - job state 
285                  - description 
286                  - ...         
287          """ 
288          return self.__bigjob.get_details() 
 289          
290       
292          """ 
293              Submit a CU to this pilot. 
294   
295              @param compute_unit_description: The L{ComputeUnitDescription} or dictionary describing 
296                                               the compute task 
297   
298              @return: L{ComputeUnit} object 
299   
300              The CUD is (possibly translated and) passed on to the PDS scheduler, 
301              which will attempt to instantiate the described workload process on 
302              the managed set of Pilot Computes.   
303               
304              On success, the returned CU is in Pending state (or moved into any 
305              state downstream from Pending). 
306   
307              The call will will honor all attributes set on the CUD.  Attributes which 
308              are not explicitly set are interpreted as having default values (see 
309              documentation of CUD), or, where default values are not specified, 
310              are ignored. 
311          """ 
312          cu = ComputeUnit(compute_unit_description) 
313          return self._submit_cu(cu) 
 314       
315       
317          return str(self.__bigjob) 
 318           
319           
320       
321       
322           
324          """ Submits compute unit to Bigjob """ 
325          logger.debug("Submit CU to big-job") 
326          sj = subjob() 
327          sj.submit_job(self.__bigjob.pilot_url, compute_unit.subjob_description) 
328          self.__subjobs.append(sj) 
329          compute_unit._update_subjob(sj) 
330          return compute_unit 
 331           
332   
333   
334           
336      """ B{ComputeUnit (CU).} 
337       
338          This is the object that is returned by the ComputeDataService when a  
339          new ComputeUnit is submitted based on a ComputeUnitDescription. 
340   
341          The ComputeUnit object can be used by the application to keep track  
342          of ComputeUnits that are active. 
343   
344          A ComputeUnit has state, can be queried and can be cancelled.  
345           
346      """ 
347       
348      CU_ID_PREFIX="cu-"   
349   
350 -    def __init__(self, compute_unit_description=None, compute_data_service=None, cu_url=None): 
 351           
352          if cu_url==None: 
353              self.id = self.CU_ID_PREFIX + str(uuid.uuid1()) 
354              if compute_data_service!=None: 
355                  self.url = compute_data_service.url + "/" + self.id 
356                  logger.debug("Created CU: %s"%self.url)   
357              self.state = State.New        
358              self.__subjob = None  
359              self.compute_unit_description = compute_unit_description  
360              self.subjob_description = self.__translate_cu_sj_description(compute_unit_description) 
361          else: 
362              self.__subjob = subjob(subjob_url=cu_url) 
 363              
364       
366          """Returns unique identifier of Compute Unit. Deprecated: Please use get_url() instead.""" 
367          return self.id 
 368       
369       
371          """Returns URL of Compute Unit. This URL can be used to reconnect to this Compute Unit later on.""" 
372          if self.__subjob!=None:       
373              return self.__subjob.get_url() 
374          else: 
375              return self.get_id()    
 376           
377       
379          """Returns dict with Compute Unit Details (e.g. job description, timings)"""    
380          if self.__subjob!=None:       
381              return self.__subjob.get_details() 
382          else: 
383              return None       
 384       
385       
387          """Returns current state of Compute Unit""" 
388          if self.__subjob != None: 
389              self.state = self.__subjob.get_state() 
390          return self.state 
 391       
392       
403   
404       
406          """Terminates Compute Unit""" 
407          if self.__subjob != None: 
408              return self.__subjob.cancel() 
409          return None 
 410       
413   
414       
416          self.compute_unit_description = compute_unit_description  
417          self.subjob_description = self.__translate_cu_sj_description(compute_unit_description) 
 418   
420          self.__subjob = subjob 
 421           
422       
424          jd = description() 
425          if compute_unit_description.has_key("executable"):  
426              jd.executable = compute_unit_description["executable"] 
427          if compute_unit_description.has_key("spmd_variation"): 
428              jd.spmd_variation = compute_unit_description["spmd_variation"] 
429          else: 
430              jd.spmd_variation = "single" 
431          if compute_unit_description.has_key("arguments"):  
432              jd.arguments = compute_unit_description["arguments"] 
433          if compute_unit_description.has_key("environment"): 
434              jd.environment = compute_unit_description["environment"]  
435           
436           
437          if compute_unit_description.has_key("number_of_processes"): 
438              jd.number_of_processes=int(compute_unit_description["number_of_processes"]) 
439          elif compute_unit_description.has_key("total_cpu_count"): 
440              jd.number_of_processes=int(compute_unit_description["total_cpu_count"]) 
441          else: 
442              jd.number_of_processes=1 
443               
444          if compute_unit_description.has_key("working_directory"):  
445              jd.working_directory = compute_unit_description["working_directory"] 
446          if compute_unit_description.has_key("output"):  
447              jd.output =  compute_unit_description["output"] 
448          if compute_unit_description.has_key("error"):  
449              jd.error = compute_unit_description["error"] 
450          if compute_unit_description.has_key("file_transfer"): 
451              jd.file_transfer=compute_unit_description["file_transfer"]   
452          if compute_unit_description.has_key("input_data"): 
453              jd.input_data=compute_unit_description["input_data"]   
454          if compute_unit_description.has_key("output_data"): 
455              jd.output_data=compute_unit_description["output_data"]             
456          return jd 
 457