4.3. Coupled Ensembles

The script provides a simple workflow which submit a set of jobs(A) and jobs(B) and wait until they are completed and then submits set of jobs(C). It demonstrates synchronization mechanisms provided by the Pilot-API. This example is useful if an executable C has dependencies on some of the output generated from jobs A and B.

import os
import sys
import saga
import pilot
import traceback

""" This tutorial example shows another form of task set synchronization.
    It exemplifies a simple  workflow which submit a set of tasks (set A) and
    (set B) and wait until they are completed until it submits another set of
    tasks (set C). Both A- and B-tasks are 'producers'. C-tasks 'consumers' and
    concatenate the output of an A- and a B-tasks.
"""

#------------------------------------------------------------------------------
# Redis password and 'user' name
REDIS_PWD   = # Fill in the password to your server
USER_NAME   = # Fill in your username on the resource you're running on

# The coordination server
COORD       = "redis://%s@localhost:6379" % REDIS_PWD
# The host to run BigJob on
HOSTNAME    = "localhost"
# The working directory on your machine
WORKDIR     = "/home/%s/example1" % USER_NAME
# The number of jobs you want to run
NUMBER_JOBS = 4


#------------------------------------------------------------------------------
#
if __name__ == "__main__":

    try:
        # this describes the parameters and requirements for our pilot job
        pilot_description = pilot.PilotComputeDescription()
        pilot_description.service_url = "fork://%s" % HOSTNAME
        pilot_description.number_of_processes = NUMBER_JOBS
        pilot_description.working_directory = WORKDIR
        pilot_description.walltime = 10

        # create a new pilot job
        pilot_compute_service = pilot.PilotComputeService(COORD)
        pilotjob = pilot_compute_service.create_pilot(pilot_description)

        # submit 'A' tasks to pilot job
        task_set_A = list()
        for i in range(NUMBER_JOBS):
            task_desc = pilot.ComputeUnitDescription()
            task_desc.executable = '/bin/echo'
            task_desc.arguments = ['I am an $TASK_SET task with id $TASK_NO', ]
            task_desc.environment = {'TASK_SET': 'A', 'TASK_NO': i}
            task_desc.number_of_processes = 1
            task_desc.output = 'A-stdout.txt'
            task_desc.error  = 'A-stderr.txt'
            task = pilotjob.submit_compute_unit(task_desc)
            print "* Submitted 'A' task '%s' with id '%s'" % (i, task.get_id())
            task_set_A.append(task)

        # submit 'B' tasks to pilot job
        task_set_B = list()
        for i in range(NUMBER_JOBS):
            task_desc = pilot.ComputeUnitDescription()
            task_desc.executable = '/bin/echo'
            task_desc.arguments = ['I am a $TASK_SET task with id $TASK_NO']
            task_desc.environment = {'TASK_SET': 'B', 'TASK_NO': i}
            task_desc.number_of_processes = 1
            task_desc.output = 'B-stdout.txt'
            task_desc.error  = 'B-stderr.txt'
            task = pilotjob.submit_compute_unit(task_desc)
            print "* Submitted 'B' task '%s' with id '%s'" % (i, task.get_id())
            task_set_B.append(task)

        # ---------------------------------------------------------------------
        print "Waiting for 'A' and 'B' tasks to complete..."
        pilotjob.wait()
	print "Executing 'C' tasks now…"
        # ---------------------------------------------------------------------

        # submit 'C' tasks to pilot job. each 'C' task takes the output of
        # an 'A' and a 'B' task and puts them together.
        task_set_C = list()
        for i in range(NUMBER_JOBS):
            task_desc = pilot.ComputeUnitDescription()
            task_desc.executable = '/bin/echo'
            task_desc.arguments = ['I am a $TASK_SET task with id $TASK_NO']
            task_desc.environment = {'TASK_SET': 'C', 'TASK_NO': i}
            task_desc.number_of_processes = 1
            task_desc.output = 'C-stdout.txt'
            task_desc.error  = 'C-stderr.txt'
            task = pilotjob.submit_compute_unit(task_desc)
            print "* Submitted 'C' task '%s' with id '%s'" % (i, task.get_id())
            task_set_C.append(task)

        # ---------------------------------------------------------------------
        print "Waiting for 'C' tasks to complete..."
        pilotjob.wait()
        # ---------------------------------------------------------------------

    except Exception, ex:
            print "AN ERROR OCCURED: %s" % ((str(ex)))
            # print a stack trace in case of an exception -
            # this can be helpful for debugging the problem
            traceback.print_exc()
            sys.exit(-1)

    finally:
        # alway try to shut down pilots, otherwise jobs might end up
        # lingering in the queue
        print ("Terminating BigJob...")
        pilotjob.cancel()
        pilot_compute_service.cancel()