1 """
2 B{ComputeDataService Module}: A central implementation of the L{ComputeDataService}
3
4 A Meta-Scheduling service for pilots (both PilotCompute and PilotData)
5
6 """
7
8 import sys
9 import os
10 import time
11 import threading
12 import logging
13 import pdb
14 import Queue
15 import uuid
16 import traceback
17 import urlparse
18
19 from bigjob import logger
20
21 import pilot
22 from pilot.api import ComputeDataService, State
23 from pilot.impl.pilotdata_manager import DataUnit
24 from pilot.impl.pilotcompute_manager import ComputeUnit
25
26
27
28
29 from pilot.coordination.redis_adaptor import RedisCoordinationAdaptor as CoordinationAdaptor
30
31 """ Loaded Module determines scheduler:
32
33 bigdata.scheduler.data_compute_scheduler - selects random locations for PD and CUs
34 bigdata.scheduler.data_compute_affinity_scheduler - considers affinity descriptions
35
36 """
37 from pilot.scheduler.data_compute_affinity_scheduler import Scheduler
38
40 """ B{ComputeDataService (CDS).}
41
42 The ComputeDataService is the application's interface to submit
43 ComputeUnits and PilotData/DataUnit to the Pilot-Manager
44 in the P* Model.
45 """
46 CDS_ID_PREFIX="cds-"
47
48
50 """ Create a ComputeDataService object.
51
52 Keyword arguments:
53 cds_url -- Reconnect to an existing CDS (optional).
54 """
55
56 self.data_units={}
57 self.pilot_data_services=[]
58
59
60 self.compute_units={}
61 self.pilot_job_services=[]
62
63 if cds_url == None:
64 self.id=self.CDS_ID_PREFIX + str(uuid.uuid1())
65 application_url = CoordinationAdaptor.get_base_url(pilot.application_id)
66 self.url = CoordinationAdaptor.add_cds(application_url, self)
67 else:
68 self.id = self.__get_cds_id(cds_url)
69 self.url = cds_url
70
71
72 self.scheduler = Scheduler()
73 self.cu_queue = Queue.Queue()
74 self.du_queue = Queue.Queue()
75 self.stop=threading.Event()
76 self.scheduler_thread=threading.Thread(target=self._scheduler_thread)
77 self.scheduler_thread.daemon=True
78 self.scheduler_thread.start()
79
80
82 start = cds_url.index(self.CDS_ID_PREFIX)
83 end =cds_url.index("/", start)
84 return cds_url[start:end]
85
86
87
88
90 """ Add a PilotComputeService to this CDS.
91
92 @param pcs: The PilotComputeService to which this ComputeDataService will connect.
93
94 """
95 self.pilot_job_services.append(pcs)
96 CoordinationAdaptor.update_cds(self.url, self)
97
98
100 """ Remove a PilotJobService from this CDS.
101
102 Note that it won't cancel the PilotComputeService, it will just no
103 longer be connected to this CDS.
104
105 Keyword arguments:
106 @param pcs: The PilotComputeService to remove from this ComputeDataService.
107 """
108 self.pilot_job_services.remove(pcs)
109 CoordinationAdaptor.update_cds(self.url, self)
110
111
113 """ Submit a CU to this Compute Data Service.
114
115 @param compute_unit_description: The ComputeUnitDescription from the application
116 @return: ComputeUnit object
117 """
118 cu = ComputeUnit(compute_unit_description, self)
119 self.compute_units[cu.id]=cu
120 self.cu_queue.put(cu)
121 CoordinationAdaptor.update_cds(self.url, self)
122 return cu
123
125 """ List all pilot compute of CDS """
126 return self.pilot_job_service
127
128
129
131 """ Add a PilotDataService
132
133 @param pds: The PilotDataService to add.
134 """
135 self.pilot_data_services.append(pds)
136 CoordinationAdaptor.update_cds(self.url, self)
137
139 """ Remove a PilotDataService
140 @param pds: The PilotDataService to remove
141 """
142 self.pilot_data_services.remove(pds)
143 CoordinationAdaptor.update_cds(self.url, self)
144
145
147 """ List all pilot data of CDS """
148 return self.pilot_data_services
149
150
152 """ List all DUs of CDS """
153 return self.data_units.items()
154
155
157 if self.data_units.has_key(du_id):
158 return self.data_units[du_id]
159 return None
160
161
163 """ creates a data unit object and binds it to a physical resource (a pilotdata) """
164 du = DataUnit(pilot_data=None,
165 data_unit_description=data_unit_description)
166 self.data_units[du.id]=du
167 self.du_queue.put(du)
168
169 CoordinationAdaptor.update_cds(self.url, self)
170 return du
171
172
174 """ Cancel the CDS.
175 All associated PD and PC objects are canceled.
176 """
177
178 self.stop.set()
179 CoordinationAdaptor.delete_cds(self.url)
180
182 """ Waits for CUs and DUs. Return after all DU's have been placed (i.e. in state Running)
183 and all CU's have been completed (i.e. in state Done) or if a fault has occurred or
184 the user has cancelled a CU or DU.
185 """
186 try:
187 logger.debug("### START WAIT ###")
188 self.cu_queue.join()
189 logger.debug("CU queue empty")
190 self.du_queue.join()
191 logger.debug("DU queue empty")
192
193 for i in self.data_units.values():
194 i.wait()
195 logger.debug("DUs done")
196
197 for i in self.compute_units.values():
198 i.wait()
199 logger.debug("CUs done")
200
201 logger.debug("### END WAIT ###")
202 except:
203 logger.debug("Ctrl-c detected. Terminating ComputeDataService...")
204 self.cancel()
205 raise KeyboardInterrupt
206
207
209 "@return: State of the ComputeDataService"
210 return self.state
211
212
214 "@return: id of ComputeDataService"
215 return str(self.id)
216
217
219 """ Make sure that background thread terminates"""
220 self.cancel()
221
222
223
231
233 """ Schedule DU to a suitable pilot data
234
235 Currently one level of scheduling is used:
236 1.) Add all resources managed by the contained PDS
237 2.) Select one resource
238 """
239 logger.debug("Schedule PD")
240 self.__update_scheduler_resources()
241 selected_pilot_data = self.scheduler.schedule_pilot_data(du.data_unit_description)
242 return selected_pilot_data
243
245 logger.debug("Schedule CU")
246 self.__update_scheduler_resources()
247 selected_pilot_job = self.scheduler.schedule_pilot_job(cu.compute_unit_description)
248 return selected_pilot_job
249
251 while True and self.stop.isSet()==False:
252 try:
253
254 du = self.du_queue.get(True, 1)
255
256 if isinstance(du, DataUnit):
257 pd=self._schedule_du(du)
258 if(pd!=None):
259 logger.debug("Initiate Transfer to PD.")
260 du.add_pilot_data(pd)
261 logger.debug("Transfer to PD finished.")
262 du._update_state(State.Running)
263 self.du_queue.task_done()
264 else:
265 self.du_queue.task_done()
266 self.du_queue.put(du)
267 except Queue.Empty:
268 pass
269
270 try:
271
272 cu = self.cu_queue.get(True, 1)
273 if isinstance(cu, ComputeUnit):
274 self.__wait_for_du(cu)
275 pj=self._schedule_cu(cu)
276 if pj !=None:
277 cu = self.__expand_working_directory(cu, pj)
278 pj._submit_cu(cu)
279 self.cu_queue.task_done()
280 else:
281 logger.debug("No resource found.")
282 self.cu_queue.task_done()
283 self.cu_queue.put(cu)
284 except Queue.Empty:
285 pass
286 except:
287 exc_type, exc_value, exc_traceback = sys.exc_info()
288 logger.error("*** print_tb:")
289 traceback.print_tb(exc_traceback, limit=1, file=sys.stderr)
290 logger.error("*** print_exception:")
291 traceback.print_exception(exc_type, exc_value, exc_traceback,
292 limit=2, file=sys.stderr)
293
294 if self.cu_queue.empty() and self.du_queue.empty():
295 time.sleep(5)
296
297 logger.debug("Re-Scheduler terminated")
298
299
301 """ wait for Data Units that are required for Compute Unit """
302 cu_description = compute_unit.compute_unit_description
303 if cu_description.has_key("input_data") and len(cu_description["input_data"])>0:
304 for input_du_url in cu_description["input_data"]:
305 for du in self.data_units.values():
306 if input_du_url == du.get_url():
307 logger.debug("Wait for DU: %s"%du.get_url())
308 du.wait()
309
311 """
312 DEPRECATED capability!
313 Expand pilotdata:// url specified in the compute_unit_description
314 to a local url on the machine of the PJ
315
316 pilotdata://localhost/434bfc5c-23fd-11e1-a43f-00264a13ca4c
317
318 to
319
320 /tmp/pilotstore//434bfc5c-23fd-11e1-a43f-00264a13ca4c on machine running pilot_job
321 """
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364 return compute_unit
365
366
368 """
369 stage required files to machine of pilot job
370 """
371 pass
372
375
376
377
378
379
382 raise NotImplementedError("Please use ComputeDataService.")
383
384
387 raise NotImplementedError("Please use ComputeDataService.")
388