Package pilot :: Package scheduler :: Module data_compute_affinity_scheduler

Source Code for Module pilot.scheduler.data_compute_affinity_scheduler

  1  """ Affinity-aware scheduler that evaluates affinity labels and input/output data flow 
  2       
  3   
  4  """ 
  5  import random 
  6  import logging 
  7  from bigjob import logger 
  8   
9 -class Scheduler:
10
11 - def __init__(self):
12 self.pilot_data=[] 13 self.pilot_jobs=[]
14
15 - def set_pilot_data(self, pilot_data):
16 """ set resources which are used for scheduling """ 17 self.pilot_data=pilot_data
18 19
20 - def set_pilot_jobs(self, pilot_jobs):
21 """ set resources which are used for scheduling """ 22 self.pilot_jobs=pilot_jobs
23 24
25 - def schedule_pilot_data(self, data_unit_description=None):
26 logger.debug("Schedule to PD - # Avail pilots: %d"%len(self.pilot_data)) 27 candidate_pilot_data = [] 28 if data_unit_description.has_key("affinity_datacenter_label") and data_unit_description.has_key("affinity_machine_label"): 29 for i in self.pilot_data: 30 pilot_data_description = i.pilot_data_description 31 if pilot_data_description.has_key("affinity_machine_label") and pilot_data_description.has_key("affinity_datacenter_label"): 32 if data_unit_description["affinity_datacenter_label"] == pilot_data_description["affinity_datacenter_label"]\ 33 and data_unit_description["affinity_machine_label"] == pilot_data_description["affinity_machine_label"]: 34 candidate_pilot_data.append(i) 35 36 if len(candidate_pilot_data) == 0: 37 # No PD with requested affinity found 38 # move data unit into a "possibly" remote pilot data 39 logger.debug("No pilot data w/ affinity found... Looking for alternative pilot.") 40 candidate_pilot_data = self.pilot_data 41 42 if len(candidate_pilot_data)>0: 43 return random.choice(candidate_pilot_data) 44 45 return None 46 47 #if len(self.pilot_data)!=0: 48 # return random.choice(self.pilot_data) 49 return None
50 51
52 - def schedule_pilot_job(self, compute_unit_description=None):
53 """ Enforces affinity description: if no PJ is available with the right 54 affinity, WU can't be scheduled. 55 56 TODO: incorporate potential data movements to co-locate PD/WU 57 58 """ 59 logger.debug("Schedule to PJ - # Avail PJs: %d"%len(self.pilot_jobs)) 60 candidate_pilot_jobs = [] 61 required_number_of_processes=1 62 if compute_unit_description.has_key("number_of_processes"): 63 required_number_of_processes = int(compute_unit_description["number_of_processes"]) 64 65 if compute_unit_description.has_key("affinity_datacenter_label") and compute_unit_description.has_key("affinity_machine_label"): 66 for i in self.pilot_jobs: 67 free_nodes = i.get_free_nodes() 68 logger.debug("BJ: %r State: %s Free nodes: %d"%(i, i.get_state(), free_nodes)) 69 if i.get_state()=="Running" and free_nodes >= required_number_of_processes: # check whether pilot is active 70 pilot_job_description = i.pilot_compute_description 71 if pilot_job_description["affinity_datacenter_label"] == compute_unit_description["affinity_datacenter_label"]\ 72 and pilot_job_description["affinity_machine_label"] == compute_unit_description["affinity_machine_label"]: 73 candidate_pilot_jobs.append(i) 74 75 76 if len(candidate_pilot_jobs) == 0: 77 # No PJ with requested affinity found 78 # move compute unit into a "possibly" remote pilot job 79 logger.debug("No pilot compute w/ affinity found... Looking for alternative pilot.") 80 for i in self.pilot_jobs: 81 logger.debug("BJ: %r State: %s"%(i, i.get_state())) 82 free_nodes = i.get_free_nodes() 83 if i.get_state()=="Running" and free_nodes >= required_number_of_processes: 84 candidate_pilot_jobs.append(i) 85 #candidate_pilot_jobs=self.pilot_jobs 86 87 88 logger.debug("Candidate PJs: %r"%(candidate_pilot_jobs)) 89 if len(candidate_pilot_jobs)>0: 90 return random.choice(candidate_pilot_jobs) 91 92 return None
93
94 - def __check_pilot_data_dependency(self, work_unit_description):
95 pilot_data_dependencies = work_unit_description["input_pilot_data"] 96 for i in pilot_data_dependencies: 97 pd = PilotData.pilot 98 ps = i.get_pilot_data()
99