4. Coupled Ensembles

The script provides a simple workflow which submit a set of jobs(A) and jobs(B) and wait until they are completed and then submits set of jobs(C). It demonstrates synchronization mechanisms provided by the Pilot-API. This example is useful if an executable C has dependencies on some of the output generated from jobs A and B.

4.1. Coupled Ensemble Code

Create a new file coupled_ensembles.py and paste the following code:

import os
import sys
import pilot
import traceback

""" DESCRIPTION: Tutorial 3: Coupled Ensembles
Note: User must edit PILOT SETUP and TASK DESCRIPTION 1-3 sections
This example will not run if these values are not set.
"""

# ---------------- BEGIN REQUIRED PILOT SETUP -----------------

# Distributed Coordination Service - Redis server and password
REDIS_PWD   = # Fill in the password to your redis server
REDIS_URL   = "redis://%s@localhost:6379" % REDIS_PWD

# Resource Information
HOSTNAME     = # Remote Resource URL
USER_NAME    = # Username on the remote resource
SAGA_ADAPTOR = # Name of the SAGA adaptor, e.g. fork, sge, pbs, slurm, etc.
# NOTE: See complete list of BigJob supported SAGA adaptors at:
# http://saga-project.github.io/BigJob/sphinxdoc/tutorial/table.html

# Fill in queue and allocation for the given resource 
# Note: Set fields to "None" if not applicable
QUEUE        = # Add queue you want to use
PROJECT      = # Add project / allocation / account to charge

WALLTIME     = # Maximum Runtime (minutes) for the Pilot Job

WORKDIR      = "" # Path of Resource Working Directory
# This is the directory where BigJob will store its output and error files

SPMD_VARIATION = # Specify the WAYNESS of SGE clusters ONLY, valid input '12way' for example

PROCESSES_PER_NODE = # Valid on PBS clusters ONLY - this is the number of processors per node. One processor core is treated as one processor on PBS; e.g. a node with 8 cores has a maximum ppn=8

PILOT_SIZE = # Number of cores required for the Pilot-Job

# Job Information
NUMBER_JOBS  = # The TOTAL number of tasks to run

# Continue to USER DEFINED TASK DESCRIPTION to add 
# the required information about the individual tasks.

# ---------------- END REQUIRED PILOT SETUP -----------------
#

def main():
    try:
        # this describes the parameters and requirements for our pilot job
        pilot_description = pilot.PilotComputeDescription()
        pilot_description.service_url = "%s://%s@%s" %  (SAGA_ADAPTOR,USER_NAME,HOSTNAME)
        pilot_description.queue = QUEUE
        pilot_description.project = PROJECT
        pilot_description.number_of_processes = PILOT_SIZE
        pilot_description.working_directory = WORKDIR
        pilot_description.walltime = WALLTIME
	pilot_description.processes_per_node = PROCESSES_PER_NODE
	pilot_description.spmd_variation = SPMD_VARIATION

        # create a new pilot job
        pilot_compute_service = pilot.PilotComputeService(REDIS_URL)
        pilotjob = pilot_compute_service.create_pilot(pilot_description)

        # submit 'A' tasks to pilot job
        task_set_A = list()
        for i in range(NUMBER_JOBS):

	# -------- BEGIN USER DEFINED TASK 1 DESCRIPTION --------- #
            task_desc = pilot.ComputeUnitDescription()
            task_desc.executable = '/bin/echo'
            task_desc.arguments = ['I am an $TASK_SET task with id $TASK_NO', ]
            task_desc.environment = {'TASK_SET': 'A', 'TASK_NO': i}
	    task_desc.spmd_variation = 'single'
            task_desc.number_of_processes = 1
            task_desc.output = 'A-stdout.txt'
            task_desc.error  = 'A-stderr.txt'
	# -------- END USER DEFINED TASK 1 DESCRIPTION --------- #

            task = pilotjob.submit_compute_unit(task_desc)
            print "* Submitted 'A' task '%s' with id '%s'" % (i, task.get_id())
            task_set_A.append(task)


        # submit 'B' tasks to pilot job
        task_set_B = list()
        for i in range(NUMBER_JOBS):

	# -------- BEGIN USER DEFINED TASK 2 DESCRIPTION --------- #
            task_desc = pilot.ComputeUnitDescription()
            task_desc.executable = '/bin/echo'
            task_desc.arguments = ['I am a $TASK_SET task with id $TASK_NO']
            task_desc.environment = {'TASK_SET': 'B', 'TASK_NO': i}
	    task_desc.spmd_variation = 'single'
            task_desc.number_of_processes = 1
            task_desc.output = 'B-stdout.txt'
            task_desc.error  = 'B-stderr.txt'
	# -------- END USER DEFINED TASK 2 DESCRIPTION --------- #

            task = pilotjob.submit_compute_unit(task_desc)
            print "* Submitted 'B' task '%s' with id '%s'" % (i, task.get_id())
            task_set_B.append(task)

        # ---------------------------------------------------------------------
        print "Waiting for 'A' and 'B' tasks to complete..."
        pilotjob.wait()
	print "Executing 'C' tasks now..."
        # ---------------------------------------------------------------------

        # submit 'C' tasks to pilot job. each 'C' task takes the output of
        # an 'A' and a 'B' task and puts them together.
        task_set_C = list()
        for i in range(NUMBER_JOBS):

	# -------- BEGIN USER DEFINED TASK 3 DESCRIPTION --------- #
            task_desc = pilot.ComputeUnitDescription()
            task_desc.executable = '/bin/echo'
            task_desc.arguments = ['I am a $TASK_SET task with id $TASK_NO']
            task_desc.environment = {'TASK_SET': 'C', 'TASK_NO': i}
	    task_desc.spmd_variation = 'single'
            task_desc.number_of_processes = 1
            task_desc.output = 'C-stdout.txt'
            task_desc.error  = 'C-stderr.txt'
	# -------- END USER DEFINED TASK 3 DESCRIPTION --------- #

            task = pilotjob.submit_compute_unit(task_desc)
            print "* Submitted 'C' task '%s' with id '%s'" % (i, task.get_id())
            task_set_C.append(task)

        # ---------------------------------------------------------------------
        print "Waiting for 'C' tasks to complete..."
        pilotjob.wait()
        # ---------------------------------------------------------------------

        return(0)

    except Exception, ex:
            print "AN ERROR OCCURRED: %s" % ((str(ex)))
            # print a stack trace in case of an exception -
            # this can be helpful for debugging the problem
            traceback.print_exc()
            return(-1)

    finally:
        # alway try to shut down pilots, otherwise jobs might end up
        # lingering in the queue
        print ("Terminating BigJob...")
        pilotjob.cancel()
        pilot_compute_service.cancel()


if __name__ == "__main__":
    sys.exit(main())

4.1.1. How to Edit The Examples

You will need to make the necessary changes to coupled_ensembles.py as you did in the previous example. In the case of this tutorial, you can actually copy the “REQUIRED PILOT JOB” section that you filled out from simple_ensembles.py and paste it over the corresponding section in coupled_ensembles.py. You may need to revisit the previous part of this tutorial to understand how to fill out this section if you skipped it.

The link to the table is reiterated here for your convenience: Table of `Valid Variable Parameters<http://saga-project.github.io/BigJob/sphinxdoc/tutorial/table.html>`_.

The important difference between this file and the previous file is that there are two separate “USER DEFINED TASK DESCRIPTION” sections - numbered 1-3. Again, these two sections will not require any modifications for the purposes of this tutorial. We will not review every variable again, but instead, review the relationship between the 3 task descriptions. The three task descriptions are identical except that they each have a different TASK_SET variable assigned - either A, B, or C.

NOTE that we call each task set the same number of times (i.e. NUMBER_JOBS) in the tutorial code, but this is not a requirement. It just simplifies the code for tutorial purposes. It is possible you want to run 16 A, 16 B, and then 32 C using the output from both A and B. An example from the table would be to use the resource Stampede which has 16 cores per node (see NUMBER_OF_PROCESSES column of Table) - we can therefore reserve NUMBER_JOBS=32 with a PILOT_SIZE=32 (because 32 is a multiple of 16). In this case, we are requesting 2 nodes from Stampede.

In this case, the important logic to draw your attention too is on line 106-108:

print "Waiting for 'A' and 'B' tasks to complete..."
pilotjob.wait()
print "Executing 'C' tasks now..."

Lines 106 and 108 are simply print statements, but they provide valuable context. In this example, we submit both the A and B tasks to the Pilot, but instead of running C tasks right away, we call wait() on the pilotjob itself. This tells BigJob to wait for all of the submitted tasks to finish, before continuing in the code. After all the A and B (submitted tasks) have finished, it then submits the C tasks.

4.1.2. Run the Code

Save the file and execute it (make sure your virtualenv is activated):

python coupled_ensembles.py

The output should look something like this (based on NUMBER_JOBS=32, PILOT_SIZE=32):

* Submitted 'A' task '0' with id 'cu-833b3762-e9ac-11e2-b250-14109fd519a1'
* Submitted 'A' task '1' with id 'cu-8352c0f8-e9ac-11e2-b250-14109fd519a1'
[...]
* Submitted 'A' task '31' with id 'cu-86137aee-e9ac-11e2-b250-14109fd519a1'
* Submitted 'B' task '0' with id 'cu-862ad342-e9ac-11e2-b250-14109fd519a1'
[...]
* Submitted 'B' task '31' with id 'cu-88fe4c2a-e9ac-11e2-b250-14109fd519a1'
Waiting for 'A' and 'B' tasks to complete...
* Submitted 'C' task '0' with id 'cu-ffb024ce-e9ac-11e2-b250-14109fd519a1'
[...]
* Submitted 'C' task '31' with id 'cu-0281b708-e9ad-11e2-b250-14109fd519a1'
Waiting for 'C' tasks to complete...
Terminating BigJob...

4.1.3. Check the Output

Again, we will find all the relevant BigJob output in the directory that we defined as “WORKDIR” in the above example. Note that some of the sj-directories in their stdout.txt files will contain either A, B, or C output.