1  """ B{ComputeDataServiceDecentral Module}: A decentral implementation of a ComputeDataService (see L{ComputeDataServiceDecentral}). 
  2   
  3  A Meta-Scheduling service for pilots (both PilotCompute and PilotData) 
  4  """ 
  5   
  6  import sys 
  7  import os 
  8  import time 
  9  import threading 
 10  import logging 
 11  import pdb 
 12  import Queue 
 13  import uuid 
 14  import traceback 
 15  import urlparse 
 16   
 17  import bigjob 
 18  from bigjob import logger, bigjob, subjob, description 
 19   
 20  import pilot 
 21  from pilot.api import ComputeDataService, State 
 22  from pilot.api.api import PilotError 
 23  from pilot.impl.pilotdata_manager import PilotData, DataUnit 
 24  from pilot.impl.pilotcompute_manager import PilotCompute, PilotComputeService 
 25  from pilot.impl.pilot_manager import ComputeUnit 
 26   
 27   
 28   
 29  from pilot.coordination.redis_adaptor import RedisCoordinationAdaptor as CoordinationAdaptor 
 30   
 31  """ Loaded Module determines scheduler: 
 32       
 33      bigdata.scheduler.data_compute_scheduler - selects random locations for PD and CUs 
 34      bigdata.scheduler.data_compute_affinity_scheduler - considers affinity descriptions 
 35       
 36  """ 
 37  from pilot.scheduler.data_compute_affinity_scheduler import Scheduler 
 38   
 40      """ B{ComputeDataServiceDecentral.} 
 41       
 42          The ComputeDataService is the application's interface to submit  
 43          ComputeUnits and PilotData/DataUnit to the Pilot-Manager  
 44          in the P* Model. 
 45           
 46          The decentral ComputeDateService will only work with Redis! 
 47      """     
 48      CDS_ID_PREFIX="cds-"   
 49   
 51          """ Create a ComputeDataService (Decentral) object. 
 52   
 53              @param cds_url: Reconnect to an existing CDS (optional). 
 54          """ 
 55           
 56          self.data_units={} 
 57          self.pilot_data_services=[] 
 58           
 59           
 60          self.compute_units={} 
 61          self.pilot_job_services=[] 
 62               
 63          if cds_url == None: 
 64              self.id=self.CDS_ID_PREFIX + str(uuid.uuid1()) 
 65              application_url = CoordinationAdaptor.get_base_url(pilot.application_id) 
 66              self.url = CoordinationAdaptor.add_cds(application_url, self)             
 67          else: 
 68              self.id = self.__get_cds_id(cds_url) 
 69              self.url = cds_url 
 70              
 71           
 72          self.scheduler = Scheduler() 
 73          self.du_queue = Queue.Queue() 
 74           
 75          self.stop=threading.Event() 
 76          self.scheduler_thread=threading.Thread(target=self._scheduler_thread) 
 77          self.scheduler_thread.daemon=True 
 78          self.scheduler_thread.start() 
 79          logger.debug("Created ComputeDataServiceDecentral") 
  80              
 81           
 83          start = cds_url.index(self.CDS_ID_PREFIX) 
 84          end =cds_url.index("/", start) 
 85          return cds_url[start:end] 
  86   
 87   
 88       
 89       
 91          """ Add a PilotComputeService to this CDS. 
 92   
 93              @param pcs: The PilotComputeService to which this ComputeDataService will connect. 
 94   
 95          """ 
 96          self.pilot_job_services.append(pcs) 
 97          CoordinationAdaptor.update_cds(self.url, self) 
 98          if len(self.pilot_job_services)>1: 
 99              logger.error("Decentral ComputeDataService only supports 1 PilotComputeService") 
100              raise PilotError("Decentral ComputeDataService only supports 1 PilotComputeService") 
 101           
102   
104          """ Remove a PilotJobService from this CDS. 
105   
106              Note that it won't cancel the PilotJobService, it will just no 
107              longer be connected to this WUS. 
108   
109              Keyword arguments: 
110              pilotjob_services -- The PilotJob Service(s) to remove from this 
111                                   Work Unit Service.  
112   
113              Return: 
114              Result 
115          """ 
116          self.pilot_job_services.remove(pcs) 
117          CoordinationAdaptor.update_cds(self.url, self) 
118          if len(self.pilot_job_services)>1: 
119              logger.error("Decentral ComputeDataService only supports 1 PilotComputeService") 
120              raise PilotError("Decentral ComputeDataService only supports 1 PilotComputeService") 
 121   
122   
124          """ Submit a CU to this Compute Data Service. 
125              @param compute_unit_description:  The L{ComputeUnitDescription} from the application 
126              @return: L{ComputeUnit} object 
127          """ 
128          cu = ComputeUnit(compute_unit_description, self) 
129          self.compute_units[cu.id]=cu 
130          self.__submit_cu(cu)         
131          return cu 
 132       
133       
135          """ List all pilot compute of CDS """ 
136          return self.pilot_job_service 
 137       
138       
139       
140       
142          """ Submits compute unit to Bigjob """ 
143                   
144          if len(self.pilot_job_services)!=1: 
145              raise PilotError("No PilotComputeService found. Please start a PCS before submitting ComputeUnits.") 
146           
147          self.__wait_for_du(compute_unit) 
148           
149          sj = subjob() 
150          self.pcs_coordination_namespace=self.pilot_job_services[0].coordination_queue 
151          logger.debug("Submit CU to big-job via external queue: %s"%self.pcs_coordination_namespace) 
152          sj.submit_job(self.pcs_coordination_namespace, compute_unit.subjob_description) 
153          compute_unit._update_subjob(sj) 
154          return compute_unit 
 155       
156       
158          """ wait for Data Units that are required for Compute Unit """ 
159          cu_description = compute_unit.compute_unit_description 
160          if cu_description.has_key("input_data") and len(cu_description["input_data"])>0: 
161              for input_du_url in cu_description["input_data"]: 
162                  for du in self.data_units.values(): 
163                      if input_du_url == du.get_url(): 
164                          logger.debug("Wait for DU: %s"%du.get_url()) 
165                          du.wait() 
 166       
167       
168       
169       
171          """ Add a PilotDataService  
172   
173              @param pds: The PilotDataService to add. 
174          """ 
175          self.pilot_data_services.append(pds) 
176          CoordinationAdaptor.update_cds(self.url, self) 
 177       
179          """ Remove a PilotDataService  
180              @param pds: The PilotDataService to remove  
181          """ 
182          self.pilot_data_services.remove(pds) 
183          CoordinationAdaptor.update_cds(self.url, self) 
 184       
185        
187          """ List all pilot data of CDS """ 
188          return self.pilot_data_services 
 189       
190       
192          """ List all DUs of CDS """ 
193          return self.data_units.items() 
 194       
195       
197          if self.data_units.has_key(du_id): 
198              return self.data_units[du_id] 
199          return None 
 200       
201       
203          """ creates a data unit object and binds it to a physical resource (a pilotdata) """ 
204          du = DataUnit(pilot_data=None,  
205                        data_unit_description=data_unit_description) 
206          self.data_units[du.id]=du 
207          self.du_queue.put(du) 
208           
209          CoordinationAdaptor.update_cds(self.url, self) 
210          return du 
 211       
212       
213       
214       
215      
217          """ Cancel the CDS.  
218              All associated PD and PC objects are canceled.             
219          """ 
220           
221          self.stop.set() 
222          CoordinationAdaptor.delete_cds(self.url) 
 223      
224      
226          """ Waits for CUs and DUs. Return after all DU's have been placed (i.e. in state Running) 
227              and all CU's have been completed (i.e. in state Done) or if a fault has occurred or 
228              the user has cancelled a CU or DU.             
229          """ 
230          try: 
231              logger.debug("### START WAIT ###") 
232              for i in self.data_units.values(): 
233                  i.wait() 
234              logger.debug("Wait for DUs finished")         
235                   
236              for i in self.compute_units.values(): 
237                  i.wait()      
238              logger.debug("CUs done")         
239                      
240              logger.debug("### END WAIT ###") 
241          except: 
242              logger.debug("Ctrl-c detected. Terminating ComputeDataService...") 
243              self.cancel() 
244              raise KeyboardInterrupt 
 245                   
246           
249       
250       
253       
254       
256          """ Make sure that background thread terminates""" 
257          self.cancel() 
 258      
259      
260       
261       
269       
270       
272          """ Schedule DU to a suitable pilot data 
273           
274              Currently one level of scheduling is used: 
275                  1.) Add all resources managed by the contained PDS  
276                  2.) Select one resource 
277          """  
278          logger.debug("Schedule PD") 
279          self.__update_scheduler_resources() 
280          selected_pilot_data = self.scheduler.schedule_pilot_data(du.data_unit_description) 
281          return selected_pilot_data  
 282       
283       
285          while True and self.stop.isSet()==False:             
286              try: 
287                   
288                  du = self.du_queue.get(True, 1)   
289                   
290                  if isinstance(du, DataUnit): 
291                      pd=self._schedule_du(du)                 
292                      if(pd!=None):                         
293                          logger.debug("Initiate Transfer to PD.") 
294                          du.add_pilot_data(pd) 
295                          logger.debug("Transfer to PD finished.") 
296                          du._update_state(State.Running)  
297                          self.du_queue.task_done()                    
298                      else: 
299                          self.du_queue.task_done()  
300                          self.du_queue.put(du) 
301              except Queue.Empty: 
302                  pass 
303                  
304              if self.du_queue.empty(): 
305                  time.sleep(5)         
306   
307          logger.debug("Re-Scheduler terminated") 
  308