Package pilot :: Package api :: Package compute :: Module api

Source Code for Module pilot.api.compute.api

  1  """ Pilot Compute related entities.""" 
  2   
3 -class State(object):
4 """ B{State}. 5 6 States used for L{PilotCompute}, L{PilotData}, L{ComputeUnit}, L{DataUnit} and L{ComputeDataService} 7 """ 8 Unknown = "Unknown" 9 New = "New" 10 Running = "Running" 11 Done = "Done" 12 Canceled = "Canceled" 13 Failed = "Failed" 14 Pending = "Pending"
15 16 17
18 -class PilotComputeDescription(dict):
19 """ B{PilotComputeDescription (PCD).} 20 21 A PilotComputeDescription is a based on the attributes defined on 22 the SAGA Job Description. 23 24 The PilotComputeDescription is used by the application to specify 25 what kind of PilotJobs it requires. 26 27 Example:: 28 pilot_compute_description = { 29 "service_url": 'fork://localhost', 30 "number_of_processes": 1, 31 "working_directory": "/tmp/pilot-compute/", 32 'affinity_datacenter_label': "eu-de-south", 33 'affinity_machine_label': "mymachine-1" 34 } 35 36 B{Attention}: The PilotComputeDescription is mapped 1:1 to the underlying SAGA-Python (Bliss) 37 job description, which is used for launching the pilot. Depending on the resource, it is required 38 to add additional attributes, e.g. some XSEDE/Torque resources require the specification 39 of both number_of_process and processes_per_node: 40 41 Example:: 42 pilot_compute_description = { 43 "service_url": 'pbs+ssh://india.futuregrid.org', 44 "number_of_processes": 8, 45 "processes_per_node":8, 46 "working_directory": "/N/u/luckow", 47 'affinity_datacenter_label': "us-east-indiana", 48 'affinity_machine_label': "india" 49 } 50 51 52 53 """ 54 55 # Class members 56 __slots__ = ( 57 # Pilot / Agent description 58 'service_url', 59 'project', 60 'working_directory', 61 # I/O 62 'input', 63 'error', 64 'output', 65 'file_transfer', 66 67 # Parallelism 68 'number_of_processes', # Total number of processes to start 69 'processes_per_host', # Nr of processes per host 70 71 # Requirements 72 'wall_time_limit', 73 'queue', 74 75 # Affinity 76 'affinity_datacenter_label', # pilot jobs sharing the same label are located in the same data center 77 'affinity_machine_label', # pilot jobs sharing the same label are located on the same machine 78 ) 79 80
81 - def __init__(self):
82 self.service_url.__doc__="SAGA-Python URL for respective resource manager, e.g. fork://localhost"
83 84 85
86 - def __setattr__(self, attr, value):
87 self[attr]=value
88 89
90 - def __getattr__(self, attr):
91 return self[attr]
92 93 94
95 -class PilotCompute(object):
96 """ B{PilotCompute (PC)}. 97 98 This is the object that is returned by the PilotComputeService when a 99 new PilotCompute (aka Pilot-Job) is created based on a PilotComputeDescription. 100 101 The PilotCompute object can be used by the application to keep track 102 of PilotComputes that are active. 103 104 A PilotCompute has state, can be queried, can be cancelled and be 105 re-initialized. 106 """ 107 108 # Class members 109 __slots__ = ( 110 'id', # Reference to this PJ 111 'description', # Description of PilotCompute 112 'context', # SAGA context 113 'rm', # Resource Manager URL 114 'pj_type', # Type of TROY backend 115 'state', # State of the PilotCompute 116 'state_detail', # Adaptor specific state of the PilotCompute 117 'callback', # Callback object 118 'wall_time_left' # Remaining wallclock time left 119 ) 120
121 - def __init__(self):
122 raise NotImplementedError("Abstract super class, please use PilotCompute implementation class in pilot namespace")
123
124 - def cancel(self):
125 """ Remove the PilotCompute from the PilotCompute Service. 126 127 Keyword arguments: 128 None 129 """ 130 pass
131
132 - def reinitialize(self, pilotjob_description):
133 """ Re-Initialize the PilotCompute to the (new) PilotComputeDescription. 134 135 Keyword arguments: 136 pilotjob_description -- A PilotComputeDescription 137 """ 138 pass 139
140 - def set_callback(self, member, cb):
141 """ Set a callback function for a member. 142 143 Keyword arguments: 144 member -- The member to set the callback for (state / state_detail). 145 cb -- The callback object to call. 146 """ 147 pass
148
149 - def unset_callback(self, member):
150 """ Unset a callback function from a member 151 152 Keyword arguments: 153 member -- The member to unset the callback from. 154 """ 155 pass
156
157 - def get_state(self):
158 pass
159 # 160 # Pilot ComputeService 161 #
162 -class PilotComputeService(object):
163 """ B{PilotComputeService (PCS).} 164 165 The PilotComputeService is responsible for creating and managing 166 the PilotComputes. 167 168 It is the application's interface to the Pilot-Manager in the 169 P* Model. 170 171 """ 172 173 #Class members 174 __slots__ = ( 175 'id', # Reference to this PJS 176 'state', # Status of the PJS 177 'pilot_jobs' # List of PJs under this PJS 178 ) 179 180 181
182 - def __init__(self, pjs_id=None):
183 """ Create a PilotComputeService object. 184 185 Keyword arguments: 186 pjs_id -- Don't create a new, but connect to an existing (optional) 187 """ 188 raise NotImplementedError("Abstract super class, please use PilotComputeService implementation class in pilot namespace")
189 190
191 - def create_pilot(self, rm, pilotcompute_description, pj_type=None, context=None):
192 """ Add a PilotCompute to the PilotComputeService 193 194 Keyword arguments: 195 rm -- Contact string for the resource manager 196 pilotcompute_description -- PilotCompute Description 197 pj_type -- backend type (optional) 198 context -- Security context (optional) 199 200 Return value: 201 A PilotCompute handle 202 """ 203 pass
204
205 - def cancel(self):
206 """ Cancel the PilotComputeService. 207 208 This also cancels all the PilotJobs that were under control of this PJS. 209 210 Keyword arguments: 211 None 212 213 Return value: 214 Result of operation 215 """ 216 pass
217 218 219 # 220 # ComputeUnitService 221 #
222 -class ComputeUnitService(object):
223 """ B{ComputeUnitService (CUS).} 224 225 Please use ComputeDataService! 226 227 The ComputeUnitService is the application's interface to submit 228 ComputeUnits to the Pilot-Manager in the P* Model. 229 230 It can provide the application with a list of ComputeUnits that are 231 managed by the Pilot-Manager. 232 233 The ComputeUnitService is linked to a PilotComputeService for the actual 234 execution of the ComputeUnits. 235 236 """ 237
238 - def __init__(self, wus_id=None):
239 """ Create a Work Service object. 240 241 Keyword arguments: 242 wus_id -- Reconnect to an existing WUS (optional). 243 """ 244 raise NotImplementedError("Abstract super class, please use ComputeUnitService implementation class in pilot namespace")
245 246 247
248 - def add_pilot_job_service(self, pjs):
249 """ Add a PilotComputeService to this WUS. 250 251 Keyword arguments: 252 pilotjob_services -- The PilotCompute Service(s) to which this 253 Work Unit Service will connect. 254 255 Return: 256 Result 257 """ 258 pass
259 260
261 - def remove_pilot_job_service(self, pjs):
262 """ Remove a PilotComputeService from this WUS. 263 264 Note that it won't cancel the PilotComputeService, it will just no 265 longer be connected to this WUS. 266 267 Keyword arguments: 268 pilotjob_services -- The PilotCompute Service(s) to remove from this 269 Work Unit Service. 270 271 Return: 272 Result 273 """ 274 pass
275 276
277 - def submit_compute_unit(self, wud):
278 """ Submit a WU to this Work Unit Service. 279 280 Keyword argument: 281 wud -- The WorkUnitDescription from the application 282 283 Return: 284 WorkUnit object 285 """ 286 pass
287 288
289 - def cancel(self):
290 """ Cancel the WUS. 291 292 Cancelling the WUS also cancels all the WUs submitted to it. 293 294 Keyword arguments: 295 None 296 297 Return: 298 Result 299 """ 300 pass
301
302 - def get_state(self):
303 pass
304
305 - def get_id(self):
306 pass
307 308 309 310 # 311 # ComuteUnitDescription 312 #
313 -class ComputeUnitDescription(dict):
314 """ B{ComputeUnitDescription (CUD).} 315 316 The ComputeUnitDescription is a task description based on 317 SAGA Job Description. 318 319 It offers the application to describe a L{ComputeUnit} in an abstract 320 way that is dealt with by the Pilot-Manager. It can contain 321 references to depended L{DataUnit}s. ComputeUnitDescription are submitted 322 to the L{ComputeDataService}. 323 324 Format:: 325 326 compute_unit_description = 327 { 328 'executable': <path to executable>, 329 'arguments': <arguments>, # Arguments 330 'environment': <environment>, # environment variables 331 332 # Working directory 333 # Recommendation: Do not set working directory! 334 # if None working directory is sandbox directory of this CU (automatically 335 # created by BigJob) 336 'working_directory': <working directory>, 337 338 # I/O 339 'input': <stdin>, 340 'error': <sterr>, 341 'output': <stdout>, 342 343 # Parallelism 344 345 # Defines how many CPU cores are reserved for the application process. 346 'number_of_processes': <Total number of processes to start>, 347 348 # Defines how the application process is launched: 349 # "single": ./a.out 350 # "mpi": mpirun -np <number_of_processes> ./a.out 351 # In the MPI case BJ generates an appropriate machinefile 352 'spmd_variation': <Type and startup mechanism. Supported Values: [single, mpi]>, 353 354 355 # Data - input/output data flow for ComputeUnit 356 'input_data': [<data unit url>, ... ], 357 'output_data': [<data unit url>, ... ] 358 } 359 360 Example:: 361 compute_unit_description = { 362 "executable": "/bin/cat", 363 "arguments": ["test.txt"], 364 "number_of_processes": 1, 365 "output": "stdout.txt", 366 "error": "stderr.txt", 367 "input_data" : [data_unit.get_url()], # this stages the content of the data unit to the working directory of the compute unit 368 "affinity_datacenter_label": "eu-de-south", 369 "affinity_machine_label": "mymachine-1" 370 } 371 ComputeUnitDescription objects are loosely typed. A dictionary containing the respective keys 372 can be passed instead to the L{ComputeDataService}. 373 """
374 - def __init__(self):
375 pass
376 377
378 - def __setattr__(self, attr, value):
379 self[attr]=value
380 381
382 - def __getattr__(self, attr):
383 return self[attr]
384 385 386 # 387 # ComputeUnit(WU) 388 #
389 -class ComputeUnit(object):
390 """ B{ComputeUnit (CU).} 391 392 This is the object that is returned by the ComputeUnitService when a 393 new ComputeUnit is created based on a ComputeUnitDescription. 394 395 The ComputeUnit object can be used by the application to keep track 396 of ComputeUnits that are active. 397 398 A ComputeUnit has state, can be queried and can be cancelled. 399 """ 400 401 # Class members 402 __slots__ = ( 403 'id', # Reference to this WU 404 'description', # Description of this WU 405 'state_detail', # Detailed (application specific) state of this WU 406 'callback' # Callback object 407 ) 408 409
410 - def cancel(self):
411 """ Cancel the CU. """ 412 raise NotImplementedError("Abstract super class, please use ComputeUnit implementation class in pilot namespace")
413 414 415
416 - def set_callback(self, member, cb):
417 """ Set a callback function for a member. 418 419 Keyword arguments: 420 member -- The member to set the callback for (state / state_detail). 421 cb -- The callback object to call. 422 """ 423 pass
424 425
426 - def unset_callback(self, member):
427 """ Unset a callback function from a member 428 429 Keyword arguments: 430 member -- The member to unset the callback from. 431 """ 432 pass
433 434
435 - def get_state(self):
436 pass
437 438
439 - def wait(self):
440 """ Wait until in Done state 441 (or Failed state) 442 """
443 # 444 # Callback (Abstract) Class 445 #
446 -class Callback(object):
447 """ Callback class. 448 449 Specifies the structure for callback classes. 450 451 Callbacks can be set for WorkUnits on the state or state_detail members. 452 """ 453
454 - def cb(self, wu, member, value):
455 """ This is the method that needs to be implemented by the application 456 457 Keyword arguments: 458 wu -- The WU that is calling back. 459 member -- The member that triggered the callback. 460 value -- The new (detailed) state. 461 462 Return: 463 Keep -- Keep or remove the callback 464 """ 465 pass
466