3. Chained Ensembles

What if you had two different executables to run? What if this second set of executables had some dependencies on data from A? Can you use one BigJob to run both jobs? Yes!

The below example submits a set of echo jobs (set A) using BigJob, and for every successful job (with state Done), it submits another /bin/echo job (set B) to the same Pilot-Job.

We can think of this as A is comprised of subjobs {a1,a2,a3}, while B is comprised of subjobs {b1,b2,b3}. Rather than wait for each subjob {a1},{a2},{a3} to complete, {b1} can run as soon as {a1} is complete, or {b1} can run as soon as a slot becomes available – i.e. {a2} could finish before {a1}.

The code below demonstrates this behavior. As soon as there is a slot available to run a job in B (i.e. a job in A has completed), it executes the job in B. This keeps the BigJob utilization high.

3.1. Chained Ensemble Code

Create a new file chained_ensembles.py and paste the following code:

import os
import sys
import pilot
import traceback

""" DESCRIPTION: Tutorial 2: Chaining Tasks
Note: User must edit PILOT SETUP and TASK DESCRIPTION 1-2 sections
This example will not run if these values are not set.
"""

# ---------------- BEGIN REQUIRED PILOT SETUP -----------------

# Distributed Coordination Service - Redis server and password
REDIS_PWD   = # Fill in the password to your redis server
REDIS_URL   = "redis://%s@localhost:6379" % REDIS_PWD

# Resource Information
HOSTNAME     = # Remote Resource URL
USER_NAME    = # Username on the remote resource
SAGA_ADAPTOR = # Name of the SAGA adaptor, e.g. fork, sge, pbs, slurm, etc.
# NOTE: See complete list of BigJob supported SAGA adaptors at:
# http://saga-project.github.io/BigJob/sphinxdoc/tutorial/table.html

# Fill in queue and allocation for the given resource 
# Note: Set fields to "None" if not applicable
QUEUE        = # Add queue you want to use
PROJECT      = # Add project / allocation / account to charge

WALLTIME     = # Maximum Runtime (minutes) for the Pilot Job

WORKDIR      = "" # Path of Resource Working Directory
# This is the directory where BigJob will store its output and error files

SPMD_VARIATION = # Specify the WAYNESS of SGE clusters ONLY, valid input '12way' for example. 

PROCESSES_PER_NODE = # Valid on PBS clusters ONLY - this is the number of processors per node. One processor core is treated as one processor on PBS; e.g. a node with 8 cores has a maximum ppn=8

PILOT_SIZE = # Number of cores required for the Pilot-Job

# Job Information
NUMBER_JOBS  = # The TOTAL number of tasks to run

# Continue to USER DEFINED TASK DESCRIPTION to add 
# the required information about the individual tasks.

# ---------------- END REQUIRED PILOT SETUP -----------------
#

def main():
    try:
        # this describes the parameters and requirements for our pilot job
        pilot_description = pilot.PilotComputeDescription()
        pilot_description.service_url = "%s://%s@%s" %  (SAGA_ADAPTOR,USER_NAME,HOSTNAME)
        pilot_description.queue = QUEUE
        pilot_description.project = PROJECT
        pilot_description.number_of_processes = PILOT_SIZE
        pilot_description.working_directory = WORKDIR
        pilot_description.walltime = WALLTIME
	pilot_description.processes_per_node = PROCESSES_PER_NODE
	pilot_description.spmd_variation = SPMD_VARIATION

        # create a new pilot job
        pilot_compute_service = pilot.PilotComputeService(REDIS_URL)
        pilotjob = pilot_compute_service.create_pilot(pilot_description)

        # submit 'A' tasks to pilot job
        task_set_A = list()
        for i in range(NUMBER_JOBS):

	# -------- BEGIN USER DEFINED TASK 1 DESCRIPTION --------- #
            task_desc = pilot.ComputeUnitDescription()
            task_desc.executable = '/bin/echo'
            task_desc.arguments = ['I am an $TASK_SET task with id $TASK_NO', ]
            task_desc.environment = {'TASK_SET': 'A', 'TASK_NO': i}
	    task_desc.spmd_variation = 'single'
            task_desc.number_of_processes = 1
            task_desc.output = 'A-stdout.txt'
            task_desc.error  = 'A-stderr.txt'
	# -------- END USER DEFINED TASK 1 DESCRIPTION --------- #

	    # Submit task to PilotJob
            task = pilotjob.submit_compute_unit(task_desc)
            print "* Submitted 'A' task '%s' with id '%s'" % (i, task.get_id())
            task_set_A.append(task)

        # Chaining tasks i.e submit a compute unit, when compute unit from A is successfully executed.
        # A 'B' task reads the content of the output file of an 'A' task and writes it into its own
        # output file.
        task_set_B = list()
        while len(task_set_A) > 0:
            for a_task in task_set_A:
                if a_task.get_state() == "Done":
                    print "One 'A' task %s finished. Launching a 'B' task." % (a_task.get_id())

	# -------- BEGIN USER DEFINED TASK 2 DESCRIPTION --------- #
                    task_desc = pilot.ComputeUnitDescription()
                    task_desc.executable = '/bin/echo'
                    task_desc.arguments = ['I am a $TASK_SET task with id $TASK_NO', ]
                    task_desc.environment = {'TASK_SET': 'B', 'TASK_NO': a_task}
	    	    task_desc.spmd_variation = 'single'
                    task_desc.number_of_processes = 1
                    task_desc.output = 'B-stdout.txt'
                    task_desc.error  = 'B-stderr.txt'
	# -------- END USER DEFINED TASK 2 DESCRIPTION --------- #

		    # Submit task to Pilot Job
                    task = pilotjob.submit_compute_unit(task_desc)
                    print "* Submitted 'B' task '%s' with id '%s'" % (i, task.get_id())
                    task_set_B.append(task)
                    task_set_A.remove(a_task)

        return(0)

    except Exception, ex:
            print "AN ERROR OCCURRED: %s" % ((str(ex)))
            # print a stack trace in case of an exception -
            # this can be helpful for debugging the problem
            traceback.print_exc()
            return(-1)

    finally:
        # alway try to shut down pilots, otherwise jobs might end up
        # lingering in the queue
        print ("Terminating BigJob...")
        pilotjob.cancel()
        pilot_compute_service.cancel()


if __name__ == "__main__":
    sys.exit(main())

3.1.1. How to Edit The Examples

You will need to make the necessary changes to chained_ensembles.py as you did in the previous example. In the case of this tutorial, you can actually copy the “REQUIRED PILOT JOB” section that you filled out from simple_ensembles.py and paste it over the corresponding section in chained_ensembles.py. You may need to revisit the previous part of this tutorial to understand how to fill out this section if you skipped it.

The link to the table is reiterated here for your convenience:

Valid Variable Parameters.

The important difference between this file and the previous file is that there are two separate “USER DEFINED TASK DESCRIPTION” sections - numbered 1 and 2. Again, these two sections will not require any modifications for the purposes of this tutorial. We will not review every variable again, but instead, review the relationship between the 2 task descriptions.

Go to line 70, “BEGIN USER DEFINED TASK 1 DESCRIPTION.” This looks a lot like the description we saw in the previous example. It is also contained in a for loop from 0 to the NUMBER_JOBS. We are running the same executable, with almost the same arguments, except that we append an ‘A’ as an additional TASK_SET variable. If we look at lines 90-93, we see that as soon as a task in the “A” set reaches the “Done” state, we start what is defined in “BEGIN USER DEFINED TASK 2 DESCRIPTION” as a “B” task. This shows us an important feature of BigJob. We can call get_state() on a task to find out if it is complete or not. The second task description is to run the same executable, /bin/echo, and print instead that it is a B task, with its task number.

To give you an idea of some sample values for NUMBER_OF_PROCESSES (for the Pilot) and NUMBER_JOBS for this Coupled Ensembles tutorial example, reference the machine Lonestar in the table. Note that the valid NUMBER_OF_PROCESSES values are increments of 12. This tells us there are 12 cores per node on Lonestar. A sample job for this example might have NUMBER_JOBS=24 and PILOT_SIZE=24, which would reserve 2 nodes on Lonestar. Note that when running these examples locally, i.e. with SAGA_ADAPTOR set to either ssh or fork, you cannot request a Pilot larger than the number of cores on the machine in which you are running, e.g. a 2-core computer can run a 2-core Pilot, while a 4-core computer can run a 4-core Pilot.

3.1.2. Run the Code

Save the file and execute it (make sure your virtualenv is activated):

python chained_ensembles.py

The output should look something like this:

* Submitted 'A' task '0' with id 'cu-27ab3846-e9a9-11e2-88eb-14109fd519a1'
* Submitted 'A' task '1' with id 'cu-27c2cca4-e9a9-11e2-88eb-14109fd519a1'
[...]
One 'A' task cu-27ab3846-e9a9-11e2-88eb-14109fd519a1 finished. Launching a 'B' task.
* Submitted 'B' task '31' with id 'cu-352139c6-e9a9-11e2-88eb-14109fd519a1'
[...]
Terminating BigJob...

3.1.3. Check the Output

Again, we will find all the relevant BigJob output in the directory that we defined as “WORKDIR” in the above example. Note that some of the sj-directories in their stdout.txt files will contain A task output while others will contain B task output.