Package pilot :: Package impl :: Module pilot_manager_decentral

Source Code for Module pilot.impl.pilot_manager_decentral

  1  """ B{ComputeDataServiceDecentral Module}: A decentral implementation of a ComputeDataService (see L{ComputeDataServiceDecentral}). 
  2   
  3  A Meta-Scheduling service for pilots (both PilotCompute and PilotData) 
  4  """ 
  5   
  6  import sys 
  7  import os 
  8  import time 
  9  import threading 
 10  import logging 
 11  import pdb 
 12  import Queue 
 13  import uuid 
 14  import traceback 
 15  import urlparse 
 16   
 17  import bigjob 
 18  from bigjob import logger, bigjob, subjob, description 
 19   
 20  import pilot 
 21  from pilot.api import ComputeDataService, State 
 22  from pilot.api.api import PilotError 
 23  from pilot.impl.pilotdata_manager import PilotData, DataUnit 
 24  from pilot.impl.pilotcompute_manager import PilotCompute, PilotComputeService 
 25  from pilot.impl.pilot_manager import ComputeUnit 
 26   
 27   
 28  #from pilot.coordination.advert import AdvertCoordinationAdaptor as CoordinationAdaptor 
 29  from pilot.coordination.redis_adaptor import RedisCoordinationAdaptor as CoordinationAdaptor 
 30   
 31  """ Loaded Module determines scheduler: 
 32       
 33      bigdata.scheduler.data_compute_scheduler - selects random locations for PD and CUs 
 34      bigdata.scheduler.data_compute_affinity_scheduler - considers affinity descriptions 
 35       
 36  """ 
 37  from pilot.scheduler.data_compute_affinity_scheduler import Scheduler 
 38   
39 -class ComputeDataServiceDecentral(ComputeDataService):
40 """ B{ComputeDataServiceDecentral.} 41 42 The ComputeDataService is the application's interface to submit 43 ComputeUnits and PilotData/DataUnit to the Pilot-Manager 44 in the P* Model. 45 46 The decentral ComputeDateService will only work with Redis! 47 """ 48 CDS_ID_PREFIX="cds-" 49
50 - def __init__(self, cds_url=None):
51 """ Create a ComputeDataService (Decentral) object. 52 53 @param cds_url: Reconnect to an existing CDS (optional). 54 """ 55 # Pilot Data 56 self.data_units={} 57 self.pilot_data_services=[] 58 59 # Pilot Compute 60 self.compute_units={} 61 self.pilot_job_services=[] 62 63 if cds_url == None: 64 self.id=self.CDS_ID_PREFIX + str(uuid.uuid1()) 65 application_url = CoordinationAdaptor.get_base_url(pilot.application_id) 66 self.url = CoordinationAdaptor.add_cds(application_url, self) 67 else: 68 self.id = self.__get_cds_id(cds_url) 69 self.url = cds_url 70 71 # Background Thread for scheduling 72 self.scheduler = Scheduler() 73 self.du_queue = Queue.Queue() 74 75 self.stop=threading.Event() 76 self.scheduler_thread=threading.Thread(target=self._scheduler_thread) 77 self.scheduler_thread.daemon=True 78 self.scheduler_thread.start() 79 logger.debug("Created ComputeDataServiceDecentral")
80 81
82 - def __get_cds_id(self, cds_url):
83 start = cds_url.index(self.CDS_ID_PREFIX) 84 end =cds_url.index("/", start) 85 return cds_url[start:end]
86 87 88 ########################################################################### 89 # Pilot Compute
90 - def add_pilot_compute_service(self, pcs):
91 """ Add a PilotComputeService to this CDS. 92 93 @param pcs: The PilotComputeService to which this ComputeDataService will connect. 94 95 """ 96 self.pilot_job_services.append(pcs) 97 CoordinationAdaptor.update_cds(self.url, self) 98 if len(self.pilot_job_services)>1: 99 logger.error("Decentral ComputeDataService only supports 1 PilotComputeService") 100 raise PilotError("Decentral ComputeDataService only supports 1 PilotComputeService")
101 102
103 - def remove_pilot_compute_service(self, pcs):
104 """ Remove a PilotJobService from this CDS. 105 106 Note that it won't cancel the PilotJobService, it will just no 107 longer be connected to this WUS. 108 109 Keyword arguments: 110 pilotjob_services -- The PilotJob Service(s) to remove from this 111 Work Unit Service. 112 113 Return: 114 Result 115 """ 116 self.pilot_job_services.remove(pcs) 117 CoordinationAdaptor.update_cds(self.url, self) 118 if len(self.pilot_job_services)>1: 119 logger.error("Decentral ComputeDataService only supports 1 PilotComputeService") 120 raise PilotError("Decentral ComputeDataService only supports 1 PilotComputeService")
121 122
123 - def submit_compute_unit(self, compute_unit_description):
124 """ Submit a CU to this Compute Data Service. 125 @param compute_unit_description: The L{ComputeUnitDescription} from the application 126 @return: L{ComputeUnit} object 127 """ 128 cu = ComputeUnit(compute_unit_description, self) 129 self.compute_units[cu.id]=cu 130 self.__submit_cu(cu) 131 return cu
132 133
134 - def list_pilot_compute(self):
135 """ List all pilot compute of CDS """ 136 return self.pilot_job_service
137 138 139 ########################################################################### 140 # Compute Data Service private methods
141 - def __submit_cu(self, compute_unit):
142 """ Submits compute unit to Bigjob """ 143 144 if len(self.pilot_job_services)!=1: 145 raise PilotError("No PilotComputeService found. Please start a PCS before submitting ComputeUnits.") 146 147 self.__wait_for_du(compute_unit) 148 149 sj = subjob() 150 self.pcs_coordination_namespace=self.pilot_job_services[0].coordination_queue 151 logger.debug("Submit CU to big-job via external queue: %s"%self.pcs_coordination_namespace) 152 sj.submit_job(self.pcs_coordination_namespace, compute_unit.subjob_description) 153 compute_unit._update_subjob(sj) 154 return compute_unit
155 156
157 - def __wait_for_du(self, compute_unit):
158 """ wait for Data Units that are required for Compute Unit """ 159 cu_description = compute_unit.compute_unit_description 160 if cu_description.has_key("input_data") and len(cu_description["input_data"])>0: 161 for input_du_url in cu_description["input_data"]: 162 for du in self.data_units.values(): 163 if input_du_url == du.get_url(): 164 logger.debug("Wait for DU: %s"%du.get_url()) 165 du.wait()
166 167 168 ########################################################################### 169 # Pilot Data
170 - def add_pilot_data_service(self, pds):
171 """ Add a PilotDataService 172 173 @param pds: The PilotDataService to add. 174 """ 175 self.pilot_data_services.append(pds) 176 CoordinationAdaptor.update_cds(self.url, self)
177
178 - def remove_pilot_data_service(self, pds):
179 """ Remove a PilotDataService 180 @param pds: The PilotDataService to remove 181 """ 182 self.pilot_data_services.remove(pds) 183 CoordinationAdaptor.update_cds(self.url, self)
184 185
186 - def list_pilot_data(self):
187 """ List all pilot data of CDS """ 188 return self.pilot_data_services
189 190
191 - def list_data_units(self):
192 """ List all DUs of CDS """ 193 return self.data_units.items()
194 195
196 - def get_data_unit(self, du_id):
197 if self.data_units.has_key(du_id): 198 return self.data_units[du_id] 199 return None
200 201
202 - def submit_data_unit(self, data_unit_description):
203 """ creates a data unit object and binds it to a physical resource (a pilotdata) """ 204 du = DataUnit(pilot_data=None, 205 data_unit_description=data_unit_description) 206 self.data_units[du.id]=du 207 self.du_queue.put(du) 208 # queue currently not persisted 209 CoordinationAdaptor.update_cds(self.url, self) 210 return du
211 212 213 ########################################################################### 214 # General 215
216 - def cancel(self):
217 """ Cancel the CDS. 218 All associated PD and PC objects are canceled. 219 """ 220 # terminate background thread 221 self.stop.set() 222 CoordinationAdaptor.delete_cds(self.url)
223 224
225 - def wait(self):
226 """ Waits for CUs and DUs. Return after all DU's have been placed (i.e. in state Running) 227 and all CU's have been completed (i.e. in state Done) or if a fault has occurred or 228 the user has cancelled a CU or DU. 229 """ 230 try: 231 logger.debug("### START WAIT ###") 232 for i in self.data_units.values(): 233 i.wait() 234 logger.debug("Wait for DUs finished") 235 236 for i in self.compute_units.values(): 237 i.wait() 238 logger.debug("CUs done") 239 240 logger.debug("### END WAIT ###") 241 except: 242 logger.debug("Ctrl-c detected. Terminating ComputeDataService...") 243 self.cancel() 244 raise KeyboardInterrupt
245 246
247 - def get_state(self):
248 return self.state
249 250
251 - def get_id(self):
252 return str(self.id)
253 254
255 - def __del__(self):
256 """ Make sure that background thread terminates""" 257 self.cancel()
258 259 260 ########################################################################### 261 # Internal Scheduling
263 logger.debug("__update_scheduler_resources") 264 pd = [s for i in self.pilot_data_services for s in i.list_pilots()] 265 self.scheduler.set_pilot_data(pd) 266 pj = [p for i in self.pilot_job_services for p in i.list_pilots()] 267 logger.debug("Pilot-Jobs: " + str(pj)) 268 self.scheduler.set_pilot_jobs(pj)
269 270
271 - def _schedule_du(self, du):
272 """ Schedule DU to a suitable pilot data 273 274 Currently one level of scheduling is used: 275 1.) Add all resources managed by the contained PDS 276 2.) Select one resource 277 """ 278 logger.debug("Schedule PD") 279 self.__update_scheduler_resources() 280 selected_pilot_data = self.scheduler.schedule_pilot_data(du.data_unit_description) 281 return selected_pilot_data
282 283
284 - def _scheduler_thread(self):
285 while True and self.stop.isSet()==False: 286 try: 287 #logger.debug("Scheduler Thread: " + str(self.__class__) + " Pilot Data") 288 du = self.du_queue.get(True, 1) 289 # check whether this is a real du object 290 if isinstance(du, DataUnit): 291 pd=self._schedule_du(du) 292 if(pd!=None): 293 logger.debug("Initiate Transfer to PD.") 294 du.add_pilot_data(pd) 295 logger.debug("Transfer to PD finished.") 296 du._update_state(State.Running) 297 self.du_queue.task_done() 298 else: 299 self.du_queue.task_done() 300 self.du_queue.put(du) 301 except Queue.Empty: 302 pass 303 304 if self.du_queue.empty(): 305 time.sleep(5) 306 307 logger.debug("Re-Scheduler terminated")
308