1 """ B{ComputeDataServiceDecentral Module}: A decentral implementation of a ComputeDataService (see L{ComputeDataServiceDecentral}).
2
3 A Meta-Scheduling service for pilots (both PilotCompute and PilotData)
4 """
5
6 import sys
7 import os
8 import time
9 import threading
10 import logging
11 import pdb
12 import Queue
13 import uuid
14 import traceback
15 import urlparse
16
17 import bigjob
18 from bigjob import logger, bigjob, subjob, description
19
20 import pilot
21 from pilot.api import ComputeDataService, State
22 from pilot.api.api import PilotError
23 from pilot.impl.pilotdata_manager import PilotData, DataUnit
24 from pilot.impl.pilotcompute_manager import PilotCompute, PilotComputeService
25 from pilot.impl.pilot_manager import ComputeUnit
26
27
28
29 from pilot.coordination.redis_adaptor import RedisCoordinationAdaptor as CoordinationAdaptor
30
31 """ Loaded Module determines scheduler:
32
33 bigdata.scheduler.data_compute_scheduler - selects random locations for PD and CUs
34 bigdata.scheduler.data_compute_affinity_scheduler - considers affinity descriptions
35
36 """
37 from pilot.scheduler.data_compute_affinity_scheduler import Scheduler
38
40 """ B{ComputeDataServiceDecentral.}
41
42 The ComputeDataService is the application's interface to submit
43 ComputeUnits and PilotData/DataUnit to the Pilot-Manager
44 in the P* Model.
45
46 The decentral ComputeDateService will only work with Redis!
47 """
48 CDS_ID_PREFIX="cds-"
49
51 """ Create a ComputeDataService (Decentral) object.
52
53 @param cds_url: Reconnect to an existing CDS (optional).
54 """
55
56 self.data_units={}
57 self.pilot_data_services=[]
58
59
60 self.compute_units={}
61 self.pilot_job_services=[]
62
63 if cds_url == None:
64 self.id=self.CDS_ID_PREFIX + str(uuid.uuid1())
65 application_url = CoordinationAdaptor.get_base_url(pilot.application_id)
66 self.url = CoordinationAdaptor.add_cds(application_url, self)
67 else:
68 self.id = self.__get_cds_id(cds_url)
69 self.url = cds_url
70
71
72 self.scheduler = Scheduler()
73 self.du_queue = Queue.Queue()
74
75 self.stop=threading.Event()
76 self.scheduler_thread=threading.Thread(target=self._scheduler_thread)
77 self.scheduler_thread.daemon=True
78 self.scheduler_thread.start()
79 logger.debug("Created ComputeDataServiceDecentral")
80
81
83 start = cds_url.index(self.CDS_ID_PREFIX)
84 end =cds_url.index("/", start)
85 return cds_url[start:end]
86
87
88
89
91 """ Add a PilotComputeService to this CDS.
92
93 @param pcs: The PilotComputeService to which this ComputeDataService will connect.
94
95 """
96 self.pilot_job_services.append(pcs)
97 CoordinationAdaptor.update_cds(self.url, self)
98 if len(self.pilot_job_services)>1:
99 logger.error("Decentral ComputeDataService only supports 1 PilotComputeService")
100 raise PilotError("Decentral ComputeDataService only supports 1 PilotComputeService")
101
102
104 """ Remove a PilotJobService from this CDS.
105
106 Note that it won't cancel the PilotJobService, it will just no
107 longer be connected to this WUS.
108
109 Keyword arguments:
110 pilotjob_services -- The PilotJob Service(s) to remove from this
111 Work Unit Service.
112
113 Return:
114 Result
115 """
116 self.pilot_job_services.remove(pcs)
117 CoordinationAdaptor.update_cds(self.url, self)
118 if len(self.pilot_job_services)>1:
119 logger.error("Decentral ComputeDataService only supports 1 PilotComputeService")
120 raise PilotError("Decentral ComputeDataService only supports 1 PilotComputeService")
121
122
124 """ Submit a CU to this Compute Data Service.
125 @param compute_unit_description: The L{ComputeUnitDescription} from the application
126 @return: L{ComputeUnit} object
127 """
128 cu = ComputeUnit(compute_unit_description, self)
129 self.compute_units[cu.id]=cu
130 self.__submit_cu(cu)
131 return cu
132
133
135 """ List all pilot compute of CDS """
136 return self.pilot_job_service
137
138
139
140
142 """ Submits compute unit to Bigjob """
143
144 if len(self.pilot_job_services)!=1:
145 raise PilotError("No PilotComputeService found. Please start a PCS before submitting ComputeUnits.")
146
147 self.__wait_for_du(compute_unit)
148
149 sj = subjob()
150 self.pcs_coordination_namespace=self.pilot_job_services[0].coordination_queue
151 logger.debug("Submit CU to big-job via external queue: %s"%self.pcs_coordination_namespace)
152 sj.submit_job(self.pcs_coordination_namespace, compute_unit.subjob_description)
153 compute_unit._update_subjob(sj)
154 return compute_unit
155
156
158 """ wait for Data Units that are required for Compute Unit """
159 cu_description = compute_unit.compute_unit_description
160 if cu_description.has_key("input_data") and len(cu_description["input_data"])>0:
161 for input_du_url in cu_description["input_data"]:
162 for du in self.data_units.values():
163 if input_du_url == du.get_url():
164 logger.debug("Wait for DU: %s"%du.get_url())
165 du.wait()
166
167
168
169
171 """ Add a PilotDataService
172
173 @param pds: The PilotDataService to add.
174 """
175 self.pilot_data_services.append(pds)
176 CoordinationAdaptor.update_cds(self.url, self)
177
179 """ Remove a PilotDataService
180 @param pds: The PilotDataService to remove
181 """
182 self.pilot_data_services.remove(pds)
183 CoordinationAdaptor.update_cds(self.url, self)
184
185
187 """ List all pilot data of CDS """
188 return self.pilot_data_services
189
190
192 """ List all DUs of CDS """
193 return self.data_units.items()
194
195
197 if self.data_units.has_key(du_id):
198 return self.data_units[du_id]
199 return None
200
201
203 """ creates a data unit object and binds it to a physical resource (a pilotdata) """
204 du = DataUnit(pilot_data=None,
205 data_unit_description=data_unit_description)
206 self.data_units[du.id]=du
207 self.du_queue.put(du)
208
209 CoordinationAdaptor.update_cds(self.url, self)
210 return du
211
212
213
214
215
217 """ Cancel the CDS.
218 All associated PD and PC objects are canceled.
219 """
220
221 self.stop.set()
222 CoordinationAdaptor.delete_cds(self.url)
223
224
226 """ Waits for CUs and DUs. Return after all DU's have been placed (i.e. in state Running)
227 and all CU's have been completed (i.e. in state Done) or if a fault has occurred or
228 the user has cancelled a CU or DU.
229 """
230 try:
231 logger.debug("### START WAIT ###")
232 for i in self.data_units.values():
233 i.wait()
234 logger.debug("Wait for DUs finished")
235
236 for i in self.compute_units.values():
237 i.wait()
238 logger.debug("CUs done")
239
240 logger.debug("### END WAIT ###")
241 except:
242 logger.debug("Ctrl-c detected. Terminating ComputeDataService...")
243 self.cancel()
244 raise KeyboardInterrupt
245
246
249
250
253
254
256 """ Make sure that background thread terminates"""
257 self.cancel()
258
259
260
261
269
270
272 """ Schedule DU to a suitable pilot data
273
274 Currently one level of scheduling is used:
275 1.) Add all resources managed by the contained PDS
276 2.) Select one resource
277 """
278 logger.debug("Schedule PD")
279 self.__update_scheduler_resources()
280 selected_pilot_data = self.scheduler.schedule_pilot_data(du.data_unit_description)
281 return selected_pilot_data
282
283
285 while True and self.stop.isSet()==False:
286 try:
287
288 du = self.du_queue.get(True, 1)
289
290 if isinstance(du, DataUnit):
291 pd=self._schedule_du(du)
292 if(pd!=None):
293 logger.debug("Initiate Transfer to PD.")
294 du.add_pilot_data(pd)
295 logger.debug("Transfer to PD finished.")
296 du._update_state(State.Running)
297 self.du_queue.task_done()
298 else:
299 self.du_queue.task_done()
300 self.du_queue.put(du)
301 except Queue.Empty:
302 pass
303
304 if self.du_queue.empty():
305 time.sleep(5)
306
307 logger.debug("Re-Scheduler terminated")
308