1  """ 
  2  B{ComputeDataService Module}: A central implementation of the L{ComputeDataService} 
  3   
  4  A Meta-Scheduling service for pilots (both PilotCompute and PilotData) 
  5   
  6  """ 
  7   
  8  import sys 
  9  import os 
 10  import time 
 11  import threading 
 12  import logging 
 13  import pdb 
 14  import Queue 
 15  import uuid 
 16  import traceback 
 17  import urlparse 
 18   
 19  from bigjob import logger 
 20   
 21  import pilot 
 22  from pilot.api import ComputeDataService, State 
 23  from pilot.impl.pilotdata_manager import DataUnit 
 24  from pilot.impl.pilotcompute_manager import ComputeUnit 
 25   
 26   
 27   
 28   
 29  from pilot.coordination.redis_adaptor import RedisCoordinationAdaptor as CoordinationAdaptor 
 30   
 31  """ Loaded Module determines scheduler: 
 32       
 33      bigdata.scheduler.data_compute_scheduler - selects random locations for PD and CUs 
 34      bigdata.scheduler.data_compute_affinity_scheduler - considers affinity descriptions 
 35       
 36  """ 
 37  from pilot.scheduler.data_compute_affinity_scheduler import Scheduler 
 38   
 40      """ B{ComputeDataService (CDS).} 
 41       
 42          The ComputeDataService is the application's interface to submit  
 43          ComputeUnits and PilotData/DataUnit to the Pilot-Manager  
 44          in the P* Model. 
 45      """     
 46      CDS_ID_PREFIX="cds-"   
 47   
 48   
 50          """ Create a ComputeDataService object. 
 51   
 52              Keyword arguments: 
 53              cds_url -- Reconnect to an existing CDS (optional). 
 54          """ 
 55           
 56          self.data_units={} 
 57          self.pilot_data_services=[] 
 58           
 59           
 60          self.compute_units={} 
 61          self.pilot_job_services=[] 
 62               
 63          if cds_url == None: 
 64              self.id=self.CDS_ID_PREFIX + str(uuid.uuid1()) 
 65              application_url = CoordinationAdaptor.get_base_url(pilot.application_id) 
 66              self.url = CoordinationAdaptor.add_cds(application_url, self)             
 67          else: 
 68              self.id = self.__get_cds_id(cds_url) 
 69              self.url = cds_url 
 70              
 71           
 72          self.scheduler = Scheduler() 
 73          self.cu_queue = Queue.Queue() 
 74          self.du_queue = Queue.Queue() 
 75          self.stop=threading.Event() 
 76          self.scheduler_thread=threading.Thread(target=self._scheduler_thread) 
 77          self.scheduler_thread.daemon=True 
 78          self.scheduler_thread.start() 
  79           
 80   
 82          start = cds_url.index(self.CDS_ID_PREFIX) 
 83          end =cds_url.index("/", start) 
 84          return cds_url[start:end] 
  85   
 86   
 87       
 88       
 90          """ Add a PilotComputeService to this CDS. 
 91   
 92              @param pcs: The PilotComputeService to which this ComputeDataService will connect. 
 93   
 94          """ 
 95          self.pilot_job_services.append(pcs) 
 96          CoordinationAdaptor.update_cds(self.url, self) 
  97           
 98   
100          """ Remove a PilotJobService from this CDS. 
101   
102              Note that it won't cancel the PilotComputeService, it will just no 
103              longer be connected to this CDS. 
104   
105              Keyword arguments: 
106              @param pcs: The PilotComputeService to remove from this ComputeDataService.  
107          """ 
108          self.pilot_job_services.remove(pcs) 
109          CoordinationAdaptor.update_cds(self.url, self) 
 110   
111   
113          """ Submit a CU to this Compute Data Service. 
114   
115              @param compute_unit_description: The ComputeUnitDescription from the application 
116              @return: ComputeUnit object 
117          """ 
118          cu = ComputeUnit(compute_unit_description, self) 
119          self.compute_units[cu.id]=cu 
120          self.cu_queue.put(cu) 
121          CoordinationAdaptor.update_cds(self.url, self) 
122          return cu 
 123       
125          """ List all pilot compute of CDS """ 
126          return self.pilot_job_service 
 127       
128       
129       
131          """ Add a PilotDataService  
132   
133              @param pds: The PilotDataService to add. 
134          """ 
135          self.pilot_data_services.append(pds) 
136          CoordinationAdaptor.update_cds(self.url, self) 
 137       
139          """ Remove a PilotDataService  
140              @param pds: The PilotDataService to remove  
141          """ 
142          self.pilot_data_services.remove(pds) 
143          CoordinationAdaptor.update_cds(self.url, self) 
 144       
145        
147          """ List all pilot data of CDS """ 
148          return self.pilot_data_services 
 149       
150       
152          """ List all DUs of CDS """ 
153          return self.data_units.items() 
 154       
155       
157          if self.data_units.has_key(du_id): 
158              return self.data_units[du_id] 
159          return None 
 160       
161       
163          """ creates a data unit object and binds it to a physical resource (a pilotdata) """ 
164          du = DataUnit(pilot_data=None,  
165                        data_unit_description=data_unit_description) 
166          self.data_units[du.id]=du 
167          self.du_queue.put(du) 
168           
169          CoordinationAdaptor.update_cds(self.url, self) 
170          return du 
 171       
172       
174          """ Cancel the CDS.  
175              All associated PD and PC objects are canceled.             
176          """ 
177           
178          self.stop.set() 
179          CoordinationAdaptor.delete_cds(self.url) 
 180      
182          """ Waits for CUs and DUs. Return after all DU's have been placed (i.e. in state Running) 
183              and all CU's have been completed (i.e. in state Done) or if a fault has occurred or 
184              the user has cancelled a CU or DU.             
185          """ 
186          try: 
187              logger.debug("### START WAIT ###") 
188              self.cu_queue.join() 
189              logger.debug("CU queue empty")         
190              self.du_queue.join() 
191              logger.debug("DU queue empty")         
192       
193              for i in self.data_units.values(): 
194                  i.wait() 
195              logger.debug("DUs done")         
196                   
197              for i in self.compute_units.values(): 
198                  i.wait()      
199              logger.debug("CUs done")         
200                      
201              logger.debug("### END WAIT ###") 
202          except: 
203              logger.debug("Ctrl-c detected. Terminating ComputeDataService...") 
204              self.cancel() 
205              raise KeyboardInterrupt 
 206                   
207           
209          "@return: State of the ComputeDataService" 
210          return self.state 
 211       
212       
214          "@return: id of ComputeDataService" 
215          return str(self.id) 
 216       
217       
219          """ Make sure that background thread terminates""" 
220          self.cancel() 
 221      
222       
223       
231       
233          """ Schedule DU to a suitable pilot data 
234           
235              Currently one level of scheduling is used: 
236                  1.) Add all resources managed by the contained PDS  
237                  2.) Select one resource 
238          """  
239          logger.debug("Schedule PD") 
240          self.__update_scheduler_resources() 
241          selected_pilot_data = self.scheduler.schedule_pilot_data(du.data_unit_description) 
242          return selected_pilot_data  
 243       
245          logger.debug("Schedule CU") 
246          self.__update_scheduler_resources() 
247          selected_pilot_job = self.scheduler.schedule_pilot_job(cu.compute_unit_description) 
248          return selected_pilot_job 
 249       
251          while True and self.stop.isSet()==False:             
252              try: 
253                   
254                  du = self.du_queue.get(True, 1)   
255                   
256                  if isinstance(du, DataUnit): 
257                      pd=self._schedule_du(du)                 
258                      if(pd!=None):                         
259                          logger.debug("Initiate Transfer to PD.") 
260                          du.add_pilot_data(pd) 
261                          logger.debug("Transfer to PD finished.") 
262                          du._update_state(State.Running)  
263                          self.du_queue.task_done()                    
264                      else: 
265                          self.du_queue.task_done()  
266                          self.du_queue.put(du) 
267              except Queue.Empty: 
268                  pass 
269                       
270              try:     
271                   
272                  cu = self.cu_queue.get(True, 1)                 
273                  if isinstance(cu, ComputeUnit):   
274                      self.__wait_for_du(cu)                   
275                      pj=self._schedule_cu(cu)  
276                      if pj !=None: 
277                          cu = self.__expand_working_directory(cu, pj)                         
278                          pj._submit_cu(cu)            
279                          self.cu_queue.task_done()          
280                      else: 
281                          logger.debug("No resource found.") 
282                          self.cu_queue.task_done()  
283                          self.cu_queue.put(cu) 
284              except Queue.Empty: 
285                  pass 
286              except: 
287                  exc_type, exc_value, exc_traceback = sys.exc_info() 
288                  logger.error("*** print_tb:") 
289                  traceback.print_tb(exc_traceback, limit=1, file=sys.stderr) 
290                  logger.error("*** print_exception:") 
291                  traceback.print_exception(exc_type, exc_value, exc_traceback, 
292                                limit=2, file=sys.stderr) 
293               
294              if self.cu_queue.empty() and self.du_queue.empty(): 
295                  time.sleep(5)         
296   
297          logger.debug("Re-Scheduler terminated") 
 298       
299      
301          """ wait for Data Units that are required for Compute Unit """ 
302          cu_description = compute_unit.compute_unit_description 
303          if cu_description.has_key("input_data") and len(cu_description["input_data"])>0: 
304              for input_du_url in cu_description["input_data"]: 
305                  for du in self.data_units.values(): 
306                      if input_du_url == du.get_url(): 
307                          logger.debug("Wait for DU: %s"%du.get_url()) 
308                          du.wait()       
 309       
311          """  
312              DEPRECATED capability! 
313              Expand pilotdata:// url specified in the compute_unit_description  
314              to a local url on the machine of the PJ 
315               
316              pilotdata://localhost/434bfc5c-23fd-11e1-a43f-00264a13ca4c 
317               
318              to 
319               
320             /tmp/pilotstore//434bfc5c-23fd-11e1-a43f-00264a13ca4c on machine running pilot_job         
321          """  
322           
323           
324           
325           
326           
327           
328           
329           
330           
331           
332           
333           
334           
335           
336           
337           
338           
339           
340           
341           
342           
343           
344           
345           
346           
347           
348           
349           
350           
351           
352           
353           
354           
355           
356           
357           
358           
359           
360           
361           
362           
363           
364          return compute_unit 
 365               
366               
368          """ 
369              stage required files to machine of pilot job 
370          """ 
371          pass 
 372       
 375      
376   
377   
378   
379   
382          raise NotImplementedError("Please use ComputeDataService.") 
  383   
384   
387          raise NotImplementedError("Please use ComputeDataService.") 
  388