1 """ Affinity-aware scheduler that evaluates affinity labels and input/output data flow
2
3
4 """
5 import random
6 import logging
7 from bigjob import logger
8
10
14
18
19
23
24
26 logger.debug("Schedule to PD - # Avail pilots: %d"%len(self.pilot_data))
27 candidate_pilot_data = []
28 if data_unit_description.has_key("affinity_datacenter_label") and data_unit_description.has_key("affinity_machine_label"):
29 for i in self.pilot_data:
30 pilot_data_description = i.pilot_data_description
31 if pilot_data_description.has_key("affinity_machine_label") and pilot_data_description.has_key("affinity_datacenter_label"):
32 if data_unit_description["affinity_datacenter_label"] == pilot_data_description["affinity_datacenter_label"]\
33 and data_unit_description["affinity_machine_label"] == pilot_data_description["affinity_machine_label"]:
34 candidate_pilot_data.append(i)
35
36 if len(candidate_pilot_data) == 0:
37
38
39 logger.debug("No pilot data w/ affinity found... Looking for alternative pilot.")
40 candidate_pilot_data = self.pilot_data
41
42 if len(candidate_pilot_data)>0:
43 return random.choice(candidate_pilot_data)
44
45 return None
46
47
48
49 return None
50
51
53 """ Enforces affinity description: if no PJ is available with the right
54 affinity, WU can't be scheduled.
55
56 TODO: incorporate potential data movements to co-locate PD/WU
57
58 """
59 logger.debug("Schedule to PJ - # Avail PJs: %d"%len(self.pilot_jobs))
60 candidate_pilot_jobs = []
61 required_number_of_processes=1
62 if compute_unit_description.has_key("number_of_processes"):
63 required_number_of_processes = int(compute_unit_description["number_of_processes"])
64
65 if compute_unit_description.has_key("affinity_datacenter_label") and compute_unit_description.has_key("affinity_machine_label"):
66 for i in self.pilot_jobs:
67 free_nodes = i.get_free_nodes()
68 logger.debug("BJ: %r State: %s Free nodes: %d"%(i, i.get_state(), free_nodes))
69 if i.get_state()=="Running" and free_nodes >= required_number_of_processes:
70 pilot_job_description = i.pilot_compute_description
71 if pilot_job_description["affinity_datacenter_label"] == compute_unit_description["affinity_datacenter_label"]\
72 and pilot_job_description["affinity_machine_label"] == compute_unit_description["affinity_machine_label"]:
73 candidate_pilot_jobs.append(i)
74
75
76 if len(candidate_pilot_jobs) == 0:
77
78
79 logger.debug("No pilot compute w/ affinity found... Looking for alternative pilot.")
80 for i in self.pilot_jobs:
81 logger.debug("BJ: %r State: %s"%(i, i.get_state()))
82 free_nodes = i.get_free_nodes()
83 if i.get_state()=="Running" and free_nodes >= required_number_of_processes:
84 candidate_pilot_jobs.append(i)
85
86
87
88 logger.debug("Candidate PJs: %r"%(candidate_pilot_jobs))
89 if len(candidate_pilot_jobs)>0:
90 return random.choice(candidate_pilot_jobs)
91
92 return None
93
95 pilot_data_dependencies = work_unit_description["input_pilot_data"]
96 for i in pilot_data_dependencies:
97 pd = PilotData.pilot
98 ps = i.get_pilot_data()
99