Package pilot :: Package impl :: Module pilot_manager

Source Code for Module pilot.impl.pilot_manager

  1  """ 
  2  B{ComputeDataService Module}: A central implementation of the L{ComputeDataService} 
  3   
  4  A Meta-Scheduling service for pilots (both PilotCompute and PilotData) 
  5   
  6  """ 
  7   
  8  import sys 
  9  import os 
 10  import time 
 11  import threading 
 12  import logging 
 13  import pdb 
 14  import Queue 
 15  import uuid 
 16  import traceback 
 17  import urlparse 
 18   
 19  from bigjob import logger 
 20   
 21  import pilot 
 22  from pilot.api import ComputeDataService, State 
 23  from pilot.impl.pilotdata_manager import DataUnit 
 24  from pilot.impl.pilotcompute_manager import ComputeUnit 
 25   
 26   
 27  #from pilot.coordination.advert import AdvertCoordinationAdaptor as CoordinationAdaptor 
 28  #from pilot.coordination.nocoord_adaptor import NoCoordinationAdaptor as CoordinationAdaptor 
 29  from pilot.coordination.redis_adaptor import RedisCoordinationAdaptor as CoordinationAdaptor 
 30   
 31  """ Loaded Module determines scheduler: 
 32       
 33      bigdata.scheduler.data_compute_scheduler - selects random locations for PD and CUs 
 34      bigdata.scheduler.data_compute_affinity_scheduler - considers affinity descriptions 
 35       
 36  """ 
 37  from pilot.scheduler.data_compute_affinity_scheduler import Scheduler 
 38   
39 -class ComputeDataService(ComputeDataService):
40 """ B{ComputeDataService (CDS).} 41 42 The ComputeDataService is the application's interface to submit 43 ComputeUnits and PilotData/DataUnit to the Pilot-Manager 44 in the P* Model. 45 """ 46 CDS_ID_PREFIX="cds-" 47 48
49 - def __init__(self, cds_url=None):
50 """ Create a ComputeDataService object. 51 52 Keyword arguments: 53 cds_url -- Reconnect to an existing CDS (optional). 54 """ 55 # Pilot Data 56 self.data_units={} 57 self.pilot_data_services=[] 58 59 # Pilot Job 60 self.compute_units={} 61 self.pilot_job_services=[] 62 63 if cds_url == None: 64 self.id=self.CDS_ID_PREFIX + str(uuid.uuid1()) 65 application_url = CoordinationAdaptor.get_base_url(pilot.application_id) 66 self.url = CoordinationAdaptor.add_cds(application_url, self) 67 else: 68 self.id = self.__get_cds_id(cds_url) 69 self.url = cds_url 70 71 # Background Thread for scheduling 72 self.scheduler = Scheduler() 73 self.cu_queue = Queue.Queue() 74 self.du_queue = Queue.Queue() 75 self.stop=threading.Event() 76 self.scheduler_thread=threading.Thread(target=self._scheduler_thread) 77 self.scheduler_thread.daemon=True 78 self.scheduler_thread.start()
79 80
81 - def __get_cds_id(self, cds_url):
82 start = cds_url.index(self.CDS_ID_PREFIX) 83 end =cds_url.index("/", start) 84 return cds_url[start:end]
85 86 87 ########################################################################### 88 # Pilot Compute
89 - def add_pilot_compute_service(self, pcs):
90 """ Add a PilotComputeService to this CDS. 91 92 @param pcs: The PilotComputeService to which this ComputeDataService will connect. 93 94 """ 95 self.pilot_job_services.append(pcs) 96 CoordinationAdaptor.update_cds(self.url, self)
97 98
99 - def remove_pilot_compute_service(self, pcs):
100 """ Remove a PilotJobService from this CDS. 101 102 Note that it won't cancel the PilotComputeService, it will just no 103 longer be connected to this CDS. 104 105 Keyword arguments: 106 @param pcs: The PilotComputeService to remove from this ComputeDataService. 107 """ 108 self.pilot_job_services.remove(pcs) 109 CoordinationAdaptor.update_cds(self.url, self)
110 111
112 - def submit_compute_unit(self, compute_unit_description):
113 """ Submit a CU to this Compute Data Service. 114 115 @param compute_unit_description: The ComputeUnitDescription from the application 116 @return: ComputeUnit object 117 """ 118 cu = ComputeUnit(compute_unit_description, self) 119 self.compute_units[cu.id]=cu 120 self.cu_queue.put(cu) 121 CoordinationAdaptor.update_cds(self.url, self) 122 return cu
123
124 - def list_pilot_compute(self):
125 """ List all pilot compute of CDS """ 126 return self.pilot_job_service
127 128 ########################################################################### 129 # Pilot Data
130 - def add_pilot_data_service(self, pds):
131 """ Add a PilotDataService 132 133 @param pds: The PilotDataService to add. 134 """ 135 self.pilot_data_services.append(pds) 136 CoordinationAdaptor.update_cds(self.url, self)
137
138 - def remove_pilot_data_service(self, pds):
139 """ Remove a PilotDataService 140 @param pds: The PilotDataService to remove 141 """ 142 self.pilot_data_services.remove(pds) 143 CoordinationAdaptor.update_cds(self.url, self)
144 145
146 - def list_pilot_data(self):
147 """ List all pilot data of CDS """ 148 return self.pilot_data_services
149 150
151 - def list_data_units(self):
152 """ List all DUs of CDS """ 153 return self.data_units.items()
154 155
156 - def get_data_unit(self, du_id):
157 if self.data_units.has_key(du_id): 158 return self.data_units[du_id] 159 return None
160 161
162 - def submit_data_unit(self, data_unit_description):
163 """ creates a data unit object and binds it to a physical resource (a pilotdata) """ 164 du = DataUnit(pilot_data=None, 165 data_unit_description=data_unit_description) 166 self.data_units[du.id]=du 167 self.du_queue.put(du) 168 # queue currently not persisted 169 CoordinationAdaptor.update_cds(self.url, self) 170 return du
171 172
173 - def cancel(self):
174 """ Cancel the CDS. 175 All associated PD and PC objects are canceled. 176 """ 177 # terminate background thread 178 self.stop.set() 179 CoordinationAdaptor.delete_cds(self.url)
180
181 - def wait(self):
182 """ Waits for CUs and DUs. Return after all DU's have been placed (i.e. in state Running) 183 and all CU's have been completed (i.e. in state Done) or if a fault has occurred or 184 the user has cancelled a CU or DU. 185 """ 186 try: 187 logger.debug("### START WAIT ###") 188 self.cu_queue.join() 189 logger.debug("CU queue empty") 190 self.du_queue.join() 191 logger.debug("DU queue empty") 192 193 for i in self.data_units.values(): 194 i.wait() 195 logger.debug("DUs done") 196 197 for i in self.compute_units.values(): 198 i.wait() 199 logger.debug("CUs done") 200 201 logger.debug("### END WAIT ###") 202 except: 203 logger.debug("Ctrl-c detected. Terminating ComputeDataService...") 204 self.cancel() 205 raise KeyboardInterrupt
206 207
208 - def get_state(self):
209 "@return: State of the ComputeDataService" 210 return self.state
211 212
213 - def get_id(self):
214 "@return: id of ComputeDataService" 215 return str(self.id)
216 217
218 - def __del__(self):
219 """ Make sure that background thread terminates""" 220 self.cancel()
221 222 ########################################################################### 223 # Internal Scheduling
225 logger.debug("__update_scheduler_resources") 226 pd = [s for i in self.pilot_data_services for s in i.list_pilots()] 227 self.scheduler.set_pilot_data(pd) 228 pj = [p for i in self.pilot_job_services for p in i.list_pilots()] 229 logger.debug("Pilot-Jobs: " + str(pj)) 230 self.scheduler.set_pilot_jobs(pj)
231
232 - def _schedule_du(self, du):
233 """ Schedule DU to a suitable pilot data 234 235 Currently one level of scheduling is used: 236 1.) Add all resources managed by the contained PDS 237 2.) Select one resource 238 """ 239 logger.debug("Schedule PD") 240 self.__update_scheduler_resources() 241 selected_pilot_data = self.scheduler.schedule_pilot_data(du.data_unit_description) 242 return selected_pilot_data
243
244 - def _schedule_cu(self, cu):
245 logger.debug("Schedule CU") 246 self.__update_scheduler_resources() 247 selected_pilot_job = self.scheduler.schedule_pilot_job(cu.compute_unit_description) 248 return selected_pilot_job
249
250 - def _scheduler_thread(self):
251 while True and self.stop.isSet()==False: 252 try: 253 #logger.debug("Scheduler Thread: " + str(self.__class__) + " Pilot Data") 254 du = self.du_queue.get(True, 1) 255 # check whether this is a real du object 256 if isinstance(du, DataUnit): 257 pd=self._schedule_du(du) 258 if(pd!=None): 259 logger.debug("Initiate Transfer to PD.") 260 du.add_pilot_data(pd) 261 logger.debug("Transfer to PD finished.") 262 du._update_state(State.Running) 263 self.du_queue.task_done() 264 else: 265 self.du_queue.task_done() 266 self.du_queue.put(du) 267 except Queue.Empty: 268 pass 269 270 try: 271 #logger.debug("Scheduler Thread: " + str(self.__class__) + " Pilot Job") 272 cu = self.cu_queue.get(True, 1) 273 if isinstance(cu, ComputeUnit): 274 self.__wait_for_du(cu) 275 pj=self._schedule_cu(cu) 276 if pj !=None: 277 cu = self.__expand_working_directory(cu, pj) 278 pj._submit_cu(cu) 279 self.cu_queue.task_done() 280 else: 281 logger.debug("No resource found.") 282 self.cu_queue.task_done() 283 self.cu_queue.put(cu) 284 except Queue.Empty: 285 pass 286 except: 287 exc_type, exc_value, exc_traceback = sys.exc_info() 288 logger.error("*** print_tb:") 289 traceback.print_tb(exc_traceback, limit=1, file=sys.stderr) 290 logger.error("*** print_exception:") 291 traceback.print_exception(exc_type, exc_value, exc_traceback, 292 limit=2, file=sys.stderr) 293 294 if self.cu_queue.empty() and self.du_queue.empty(): 295 time.sleep(5) 296 297 logger.debug("Re-Scheduler terminated")
298 299
300 - def __wait_for_du(self, compute_unit):
301 """ wait for Data Units that are required for Compute Unit """ 302 cu_description = compute_unit.compute_unit_description 303 if cu_description.has_key("input_data") and len(cu_description["input_data"])>0: 304 for input_du_url in cu_description["input_data"]: 305 for du in self.data_units.values(): 306 if input_du_url == du.get_url(): 307 logger.debug("Wait for DU: %s"%du.get_url()) 308 du.wait()
309
310 - def __expand_working_directory(self, compute_unit, pilot_job):
311 """ 312 DEPRECATED capability! 313 Expand pilotdata:// url specified in the compute_unit_description 314 to a local url on the machine of the PJ 315 316 pilotdata://localhost/434bfc5c-23fd-11e1-a43f-00264a13ca4c 317 318 to 319 320 /tmp/pilotstore//434bfc5c-23fd-11e1-a43f-00264a13ca4c on machine running pilot_job 321 """ 322 #======================================================================= 323 # if compute_unit.compute_unit_description.has_key("working_directory"): 324 # working_directory=compute_unit.compute_unit_description["working_directory"] 325 # if working_directory.find(DataUnit.DU_ID_PREFIX)!=-1: 326 # pilot_data_url = working_directory 327 # pj_description = pilot_job.pilot_compute_description 328 # pj_dc_affinity = pj_description["affinity_datacenter_label"] 329 # pj_machine_affinity = pj_description["affinity_machine_label"] 330 # pd = [s for i in self.pilot_data_services for s in i.list_pilots()] 331 # 332 # # find all pilot stores with the same affinity 333 # candidate_pd = [] 334 # for i in pd: 335 # pd_description = i.pilot_data_description 336 # pd_dc_affinity = pd_description["affinity_datacenter_label"] 337 # pd_machine_affinity = pd_description["affinity_machine_label"] 338 # if pd_dc_affinity == pj_dc_affinity and pd_machine_affinity == pj_machine_affinity: 339 # candidate_pd.append(i) 340 # 341 # # check whether required pilot_data is part of pilot_data 342 # target_pd = None 343 # target_du = None 344 # for pd in candidate_pd: 345 # for du in pd.list_data_units(): 346 # logger.debug("DU URL:%s"%(du.url)) 347 # if du.url == pilot_data_url: 348 # logger.debug("Found PD %s at %s"%(du.url, pd.service_url)) 349 # target_pd = pd 350 # target_du = du 351 # break 352 # if target_du == None: 353 # self.__stage_du_to_pj(pilot_data_url, pilot_job) 354 # 355 # if target_pd!=None: 356 # pd_url = target_pd.url_for_du(target_du) 357 # components = urlparse.urlparse(pd_url) 358 # compute_unit.compute_unit_description["working_directory"] = components.path 359 # compute_unit._update_compute_unit_description(compute_unit.compute_unit_description) 360 # logger.debug("__expand_working_directory %s: Set working directory to %s"%(pilot_data_url, compute_unit.compute_unit_description["working_directory"])) 361 # return compute_unit 362 # 363 #======================================================================= 364 return compute_unit
365 366
367 - def __stage_du_to_pj(self, pilotdata, pilotjob):
368 """ 369 stage required files to machine of pilot job 370 """ 371 pass
372
373 - def __find_pd_at_pj_resource(self, pilotjob):
374 pass
375 376 377 ############################################################################### 378 # Unimplemented entities 379
380 -class ComputeUnitService(object):
381 - def __init__(self):
382 raise NotImplementedError("Please use ComputeDataService.")
383 384
385 -class DataUnitService(object):
386 - def __init__(self):
387 raise NotImplementedError("Please use ComputeDataService.")
388