1 """ B{PilotCompute Module}: Implementation of L{PilotComputeService}, L{PilotCompute}, L{ComputeUnit}.
2 """
3
4 import sys
5 import getopt
6 import time
7 import pdb
8 import os
9 import traceback
10 import logging
11 import uuid
12
13
14 """ import bigjob classes """
15 from bigjob import logger
16 from bigjob import bigjob, subjob, description
17
18 """ import API objects """
19 from pilot.api import PilotCompute, PilotComputeService, ComputeUnit, State
20
21 """ Configure coordination backend here """
22
23 from pilot.coordination.redis_adaptor import RedisCoordinationAdaptor as CoordinationAdaptor
24
25
26 """ This variable defines the coordination system that is used by BigJob
27 e.g.
28 advert://localhost (SAGA/Advert SQLITE)
29 advert://advert.cct.lsu.edu:8080 (SAGA/Advert POSTGRESQL)
30 redis://localhost:6379 (Redis at localhost)
31 tcp://localhost (ZMQ)
32 """
33 COORDINATION_URL = "redis://localhost"
34
35
37 """ B{PilotComputeService (PCS).}
38
39 Factory for L{PilotCompute}s..
40
41 """
42 PJS_ID_PREFIX="pcs-"
43
44
46 """ Create a PilotJobService object.
47
48 Keyword arguments:
49 pcs_id -- Don't create a new, but connect to an existing (optional)
50 """
51 self.pilot_computes=[]
52 self.coordination_url=coordination_url
53 self.coordination_queue=""
54 if pcs_url==None:
55 self.id = self.PJS_ID_PREFIX+str(uuid.uuid1())
56 self.url = os.path.join(self.coordination_url, "pcs", self.id)
57 self.coordination_queue = "PilotComputeServiceQueue-" + str(self.id)
58 logger.debug("Created Pilot Compute Service: %s"%self.url)
59 else:
60 logger.error("Reconnect to PilotComputeService currently not supported.")
61
62
63
64 - def create_pilot(self, rm=None, pilot_compute_description=None, pj_type=None, context=None):
65 """ Add a PilotJob to the PilotJobService
66
67 Keyword arguments:
68 pilot_compute_description -- PilotJob Description
69
70 Return value:
71 A PilotCompute object
72 """
73 bj_dict = self.__translate_pj_bj_description(pilot_compute_description)
74 bj = self.__start_bigjob(bj_dict)
75 pj = PilotCompute(self, bj, pilot_compute_description)
76 self.pilot_computes.append(pj)
77 return pj
78
79
81 """ List managed L{PilotCompute}s.
82
83 Return value:
84 A list of L{PilotCompute} urls
85
86 """
87 return self.pilot_computes
88
89
91 """ Cancel the PilotComputeService.
92
93 This also cancels all the PilotJobs that were under control of this PJS.
94
95 Keyword arguments:
96 None
97
98 Return value:
99 Result of operation
100 """
101 for i in self.pilot_computes:
102 i.cancel()
103
104
106 status_string = "{\n"
107 for i in self.pilot_computes:
108 status_string = status_string + " PJ: %s, State: %s;"%(i.get_url(), i.get_state())
109 status_string = status_string + "\n}"
110 return status_string
111
112
113
114
116 resource_description={}
117 if pilot_compute_description.has_key("service_url"):
118 resource_description["resource_url"] = pilot_compute_description["service_url"]
119
120 if pilot_compute_description.has_key("queue"):
121 resource_description["queue"] = pilot_compute_description["queue"]
122 else:
123 resource_description["queue"] = None
124
125 if pilot_compute_description.has_key("allocation"):
126 resource_description["allocation"] = pilot_compute_description["allocation"]
127 else:
128 resource_description["allocation"] = None
129
130 for i in pilot_compute_description.keys():
131 resource_description[i] = pilot_compute_description[i]
132
133 resource_description["pilot_compute_description"] = pilot_compute_description
134
135 return resource_description
136
137
139 """ private method - starts a bigjob on the defined resource """
140 gram_url = bj_dict["resource_url"]
141 logger.debug("start bigjob at: " + gram_url)
142 bj = bigjob(self.coordination_url)
143 ppn="1"
144 if ("processes_per_node" in bj_dict):
145 ppn=bj_dict["processes_per_node"]
146 else:
147 bj_dict["processes_per_node"]="1"
148
149 walltime = 3600
150 if ("walltime" in bj_dict):
151 walltime=bj_dict["walltime"]
152
153 working_directory = None
154 if ("working_directory" in bj_dict):
155 working_directory=bj_dict["working_directory"]
156
157 bj_filetransfer = None
158 if ("file_transfer" in bj_dict):
159 bj_filetransfer = bj_dict["file_transfer"]
160
161
162 bj.start_pilot_job(lrms_url = gram_url,
163 number_nodes = bj_dict["number_of_processes"],
164 queue = bj_dict["queue"],
165 project = bj_dict["allocation"],
166 working_directory = working_directory,
167 walltime = walltime,
168 processes_per_node = ppn,
169 filetransfers = bj_filetransfer,
170 external_queue = self.coordination_queue,
171 pilot_compute_description = bj_dict["pilot_compute_description"]
172 )
173 return bj
174
175
176
178 """ B{Pilot Compute (PC).}
179
180 This is the object that is returned by the PilotComputeService when a
181 new PilotCompute is created based on a PilotComputeDescription.
182
183 The PilotCompute object can be used by the application to keep track
184 of active pilots.
185
186 A ComputePilot has state, can be queried and cancelled.
187
188 Properties:
189
190 - state:
191 The state of the pilot.
192
193 - id:
194 The id may be 'None' if the Pilot is not yet in Running state.
195 The returned ID can be used to connect to the CP instance later
196 on, for example from within a different application instance.
197 type: string (url)
198
199 - pilot_compute_description:
200 The PilotComputeDescription used to create this pilot.
201
202 """
203 - def __init__(self, pilot_compute_service=None,
204 bigjob_object=None,
205 pilot_compute_description=None,
206 pilot_url=None):
207
208 """ Create/reconnect to a Pilot Compute.
209
210 Keyword arguments:
211 pilot_url -- restore from cp_id
212
213 The implementation will attempt to reconnect to the PC instance
214 referenced by the pilot_url.
215
216 """
217 self.__subjobs = []
218 self.__pilot_compute_service = None
219 if pilot_url==None:
220 logger.debug("Create PilotCompute for BigJob: " + str(bigjob_object))
221 self.pilot_compute_description=pilot_compute_description
222 self.__pilot_compute_service=pilot_compute_service
223 self.__bigjob = bigjob_object
224 else:
225 logger.debug("Reconnect to an existing Pilot Compute")
226 self.__bigjob = bigjob(pilot_url=pilot_url)
227
228
229
230
231 if self.__pilot_compute_service!=None:
232 self.coordination_queue = pilot_compute_service.coordination_queue
233
234
236 """ Terminates the pilot """
237 self.__bigjob.cancel()
238
239
241 """ Returns the state of the pilot """
242 return self.__bigjob.get_state()
243
244
246 """ Wait until Pilot Compute to enter a final state (Done, Cancel or Failed)
247
248 It is not an error to call wait() in a final state -- the call simply
249 returns immediately.
250 """
251 self.__bigjob.wait()
252
253
255 """ list managed L{ComputeUnit}s.
256
257 Return value:
258 A list of L{ComputeUnit} IDs
259
260 The returned list can include units which have been submitted to
261 this pilot.
262 """
263 sj_list = self.__bigjob.list_subjobs()
264 cu_list = []
265 for i in sj_list:
266 cu_list.append(ComputeUnit(cu_url=i))
267 return cu_list
268
269
271 """ Get unique URL referencing the Pilot Compute
272 This URL can be used to reconnect to the Pilot Compute
273 """
274 return self.__bigjob.get_url()
275
276
278 """ Returns the number of free slots available within the pilot """
279 return self.__bigjob.get_free_nodes()
280
281
283 """ returns a dict that contains the details of the Pilot Compute,
284 - job state
285 - description
286 - ...
287 """
288 return self.__bigjob.get_details()
289
290
292 """
293 Submit a CU to this pilot.
294
295 @param compute_unit_description: The L{ComputeUnitDescription} or dictionary describing
296 the compute task
297
298 @return: L{ComputeUnit} object
299
300 The CUD is (possibly translated and) passed on to the PDS scheduler,
301 which will attempt to instantiate the described workload process on
302 the managed set of Pilot Computes.
303
304 On success, the returned CU is in Pending state (or moved into any
305 state downstream from Pending).
306
307 The call will will honor all attributes set on the CUD. Attributes which
308 are not explicitly set are interpreted as having default values (see
309 documentation of CUD), or, where default values are not specified,
310 are ignored.
311 """
312 cu = ComputeUnit(compute_unit_description)
313 return self._submit_cu(cu)
314
315
317 return str(self.__bigjob)
318
319
320
321
322
324 """ Submits compute unit to Bigjob """
325 logger.debug("Submit CU to big-job")
326 sj = subjob()
327 sj.submit_job(self.__bigjob.pilot_url, compute_unit.subjob_description)
328 self.__subjobs.append(sj)
329 compute_unit._update_subjob(sj)
330 return compute_unit
331
332
333
334
336 """ B{ComputeUnit (CU).}
337
338 This is the object that is returned by the ComputeDataService when a
339 new ComputeUnit is submitted based on a ComputeUnitDescription.
340
341 The ComputeUnit object can be used by the application to keep track
342 of ComputeUnits that are active.
343
344 A ComputeUnit has state, can be queried and can be cancelled.
345
346 """
347
348 CU_ID_PREFIX="cu-"
349
350 - def __init__(self, compute_unit_description=None, compute_data_service=None, cu_url=None):
351
352 if cu_url==None:
353 self.id = self.CU_ID_PREFIX + str(uuid.uuid1())
354 if compute_data_service!=None:
355 self.url = compute_data_service.url + "/" + self.id
356 logger.debug("Created CU: %s"%self.url)
357 self.state = State.New
358 self.__subjob = None
359 self.compute_unit_description = compute_unit_description
360 self.subjob_description = self.__translate_cu_sj_description(compute_unit_description)
361 else:
362 self.__subjob = subjob(subjob_url=cu_url)
363
364
366 """Returns unique identifier of Compute Unit. Deprecated: Please use get_url() instead."""
367 return self.id
368
369
371 """Returns URL of Compute Unit. This URL can be used to reconnect to this Compute Unit later on."""
372 if self.__subjob!=None:
373 return self.__subjob.get_url()
374 else:
375 return self.get_id()
376
377
379 """Returns dict with Compute Unit Details (e.g. job description, timings)"""
380 if self.__subjob!=None:
381 return self.__subjob.get_details()
382 else:
383 return None
384
385
387 """Returns current state of Compute Unit"""
388 if self.__subjob != None:
389 self.state = self.__subjob.get_state()
390 return self.state
391
392
403
404
406 """Terminates Compute Unit"""
407 if self.__subjob != None:
408 return self.__subjob.cancel()
409 return None
410
413
414
416 self.compute_unit_description = compute_unit_description
417 self.subjob_description = self.__translate_cu_sj_description(compute_unit_description)
418
420 self.__subjob = subjob
421
422
424 jd = description()
425 if compute_unit_description.has_key("executable"):
426 jd.executable = compute_unit_description["executable"]
427 if compute_unit_description.has_key("spmd_variation"):
428 jd.spmd_variation = compute_unit_description["spmd_variation"]
429 else:
430 jd.spmd_variation = "single"
431 if compute_unit_description.has_key("arguments"):
432 jd.arguments = compute_unit_description["arguments"]
433 if compute_unit_description.has_key("environment"):
434 jd.environment = compute_unit_description["environment"]
435
436
437 if compute_unit_description.has_key("number_of_processes"):
438 jd.number_of_processes=int(compute_unit_description["number_of_processes"])
439 elif compute_unit_description.has_key("total_cpu_count"):
440 jd.number_of_processes=int(compute_unit_description["total_cpu_count"])
441 else:
442 jd.number_of_processes=1
443
444 if compute_unit_description.has_key("working_directory"):
445 jd.working_directory = compute_unit_description["working_directory"]
446 if compute_unit_description.has_key("output"):
447 jd.output = compute_unit_description["output"]
448 if compute_unit_description.has_key("error"):
449 jd.error = compute_unit_description["error"]
450 if compute_unit_description.has_key("file_transfer"):
451 jd.file_transfer=compute_unit_description["file_transfer"]
452 if compute_unit_description.has_key("input_data"):
453 jd.input_data=compute_unit_description["input_data"]
454 if compute_unit_description.has_key("output_data"):
455 jd.output_data=compute_unit_description["output_data"]
456 return jd
457