next up previous contents
Next: 2.10 More about how Up: 2. Users' Manual Previous: 2.8 Priorities in Condor

Subsections

  
2.9 Parallel Applications in Condor: Condor-PVM

Condor has a PVM submit Universe which allows the user to submit PVM jobs to the Condor pool. In this section, we will first discuss the differences between running under normal PVM and running PVM under the Condor environment. Then we give some hints on how to write good PVM programs to suit the Condor environment via an example program. In the end, we illustrate how to submit PVM jobs to Condor by examining a sample Condor submit-description file which submits a PVM job.

NOTE: Condor-PVM is an optional Condor module. To check and see if it has been installed at your site, enter the following command:

        ls -l `condor_config_val PVMD`
(notice the use of backticks in the above command). If this shows the file ``condor_pvmd'' on your system, Condor-PVM is installed. If not, ask your site administrator to download Condor-PVM from http://www.cs.wisc.edu/condor in the ``contrib'' section of the ''Downloads'' page and install it.

2.9.1 What does Condor-PVM do?

Condor-PVM provides a framework to run parallel applications written to PVM in Condor's opportunistic environment. This means that you no longer need a set of dedicated machines to run PVM applications; Condor can be used to dynamically construct PVM virtual machines out of non-dedicated desktop machines on your network which would have otherwise been idle. In Condor-PVM, Condor acts as the resource manager for the PVM daemon. Whenever your PVM program asks for nodes (machines), the request is re-mapped to Condor. Condor then finds a machine in the Condor pool via the usual mechanisms, and adds it to the PVM virtual machine. If a machine needs to leave the pool, your PVM program is notified of that as well via the normal PVM mechanisms.

2.9.2 The Master-Worker Paradigm

There are several different parallel programming paradigms. One of the more common is the master-worker (or pool of tasks) arrangement. In a master-worker program model, one node acts as the controlling master for the parallel application and sends pieces work out to worker nodes. The worker node does some computation, and sends the result back to the master node. The master has a pool of work that needs to be done, and simply assigns the next piece of work out to the next worker that becomes available.

Not all parallel programming paradigms lend themselves to an opportunistic environment. In such an environment, any of the nodes could be preempted and therefore disappear at any moment. The master-worker model, on the other hand, is a model that can work well. The idea is the master needs to keep track of which piece of work it sends to each worker. If the master node is then informed that a worker has disappeared, it puts the piece of work it assigned to that worker back into the pool of tasks, and sends it out again to the next available worker. If the master notices that the number of workers has dropped below an acceptable level, it could request for more workers (via pvm_addhosts()). Or perhaps perhaps the master will request a replacement node every single time it is notified that a worker has gone away. The point is that in this paradigm, the number of workers is not important (although more is better!) and changes in the size of the virtual machine can be handled naturally.

Condor-PVM is designed to run PVM applications which follow the master-worker paradigm. Condor runs the master application on the machine where the job was submitted and will not preempt it. Workers are pulled in from the Condor pool as they become available.

2.9.3 Binary Compatibility

Condor-PVM does not define a new API (application program interface); programs can simply use the existing resource management PVM calls such as pvm_addhosts() and pvm_notify(). Because of this, some master-worker PVM applications are ready to run under Condor-PVM with no changes at all. Regardless of using Condor-PVM or not, it is good master-worker design to handle the case of a worker node disappearing, and therefore many programmers have already constructed their master program with all the necessary logic for fault-tolerance purposes.

In fact, regular PVM and Condor-PVM are binary compatible with each other. The same binary which runs under regular PVM will run under Condor, and vice-versa. There is no need to re-link for Condor-PVM. This permits easy application development (develop your PVM application interactively with the regular PVM console, XPVM, etc) as well as binary sharing between Condor and some dedicated MPP systems.

2.9.4 Runtime differences between Condor-PVM and regular PVM

This release of the Condor-PVM is based on PVM 3.3.11. The vast majority of the PVM library functions under Condor maintain the same semantics as in PVM 3.3.11, including messaging operations, group operations, and pvm_catchout().

We summarize the changes and new features of PVM under running in the Condor environment in the following list:

2.9.5 A Sample PVM program for Condor-PVM

Normal PVM applications assume dedicated machines. However, when running a PVM application under Condor, since Condor's environment is an opportunistic environment, machines can be suspended and even removed from the PVM virtual machine during the life-time of the PVM application.

Here, we include an extensively commented skeleton of a sample PVM program master_sum.c, which, we hope, will help you to write PVM code that is better suited for a non-dedicated opportunistic environment like Condor.

/* 
 * master_sum.c
 *
 * master program to perform parallel addition - takes a number n 
 * as input and returns the result of the sum 0..(n-1).  Addition 
 * is performed in parallel by k tasks, where k is also taken as 
 * input.  The numbers 0..(n-1) are stored in an array, and each 
 * worker adds a portion of the array, and returns the sum to the 
 * master.  The Master adds these sums and prints final sum.  
 *
 * To make the program fault-tolerant, the master has to monitor 
 * the tasks that exited without sending the result back.  The 
 * master creates some new tasks to do the work of those tasks 
 * that have exited. 
 */

#define NOTIFY_NUM 5  /* number of items to notify */

#define HOSTDELETE 12
#define HOSTSUSPEND 13
#define HOSTRESUME 14
#define TASKEXIT 15
#define HOSTADD 16
    
/* send the pertask and start number to the worker task i */
void send_data_to_worker(int i, int *tid, int *num, int pertask, 
            FILE *fp, int round)
{
     int status;
     int start_val;
     
     /* send the round number */
     pvm_initsend(PvmDataDefault); /* XDR format */
     pvm_pkint(&round, 1, 1);    /* number of numbers to add */
     status = pvm_send(tid[i], ROUND_TAG);

     pvm_initsend(PvmDataDefault); /* XDR format */
     pvm_pkint(&pertask, 1, 1);    /* number of numbers to add */
     status = pvm_send(tid[i], NUM_NUM_TAG);

     pvm_initsend(PvmDataDefault); /* XDR format */
     start_val = i * pertask; /* initial number for this task */
     pvm_pkint(&start_val, 1, 1);     /* the initial number */
     status = pvm_send(tid[i], START_NUM_TAG);   

     fprintf(fp, "Round %d: Send data %d to worker task %d, ``
           ``tid =%x. status %d \n", round, start_val, i, tid[i], status);
}

/* 
 * to see if more hosts are needed 
 * 1 = yes; 0 = no 
 */
int need_more_hosts(int i)
{
     int nhost, narch;
     char *hosts="0";  /* any host in arch class 0 */
     struct pvmhostinfo *hostp = (struct pvmhostinfo *) 
                     calloc (1, sizeof(struct pvmhostinfo));

     /* get the current configuration */
     pvm_config(&nhost, &narch, &hostp);
     
     if (nhost > i)
        return 0;
     else 
        return 1;
}

/* 
 * Add a new host until success, assuming that request for 
 * PvmAddHost notification has already been sent 
 */
void add_a_host(FILE *fp)
{
     int done = 0;
     int buf_id;
     int success = 0;
     int tid;
     int msg_len, msg_tag, msg_src;
     char *hosts="0";  /* any host in arch class 0 */
     int infos[1];

     while (done != 1) {
        /* 
        * add one host - no specific machine named 
        * add host will asynchronously, so we need
        * to receive the notification before go on.
        */
        pvm_addhosts(&hosts,1 , infos);
      
        /* receive hostadd notification from anyone */
        buf_id = pvm_recv(-1, HOSTADD);
      
        if (buf_id < 0) {
            fprintf(fp, "Error with buf_id = %d\n", buf_id);
            done = 0;
            continue;
        }
        done = 1;
     
        pvm_bufinfo(buf_id, &msg_len , &msg_tag, &msg_src);
        pvm_upkint(&tid, 1, 1);

        pvm_notify(PvmHostDelete, HOSTDELETE, 1, &tid);

        fprintf(fp, "Received HOSTADD: ");
        fprintf(fp, "Host %x added from %x\n", tid, msg_src);
        fflush(fp);
    }
}

/* 
 * Spawn a worker task until success.  
 * Return its tid, and the tid of its host. 
 */
void spawn_a_worker(int i, int* tid, int * host_tid, FILE *fp)
{
     int numt = 0;
     int status;

     while (numt == 0){
          /* spawn a worker on a host belonging to arch class 0 */
          numt = pvm_spawn ("worker_sum", NULL, PvmTaskArch, "0", 1, &tid[i]);

          fprintf(fp, "master spawned %d task tid[%d] = %x\n",numt,i,tid[i]);
          fflush(fp);
         
          /* if the spawn is successful */
          if (numt == 1) {
               /* notify when the task exits */
               status = pvm_notify(PvmTaskExit, TASKEXIT, 1, &tid[i]);
               
               fprintf(fp, "Notify status for exit = %d\n", status);
               
               if (pvm_pstat(tid[i]) != PvmOk) numt = 0;
          }
          
          if (numt != 1) {
               fprintf(fp, "!! Failed to spawn task[%d]\n", i);
               
               /* 
                * currently Condor-pvm allows only one task running on 
                * a host
                */
                while (need_more_hosts(i) == 1)
                    add_a_host(fp);
          }
     }
}


main()
{
    int n;                  /* will add <n> numbers n .. n-1 */
    int ntasks;             /* need <ntask> workers to do the addition. */
    int pertask;            /* numbers to add per task */
    int tid[MAX_TASKS];     /* tids of tasks */ 
    int deltid[MAX_TASKS];  /* tids monitored for deletion */
    int sum[MAX_TASKS];     /* hold the reported sum */
    int num[MAX_TASKS];     /* the initial numbers the workers should add */
    int host_tid[MAX_TASKS];/* the tids of the host that the *
                             * tasks <0..ntasks> are running on*/
    
    int i, numt, nhost, narch, status;
    int result;
    int mytid;    /* task id of master */
    int mypid;    /* process id of master */
    int buf_id;   /* id of recv buffer */
    int msg_leg, msg_tag, msg_src, msg_len;
    int int_val;  

    int infos[MAX_TASKS];
    char * hosts[MAX_TASKS];
    struct pvmhostinfo *hostp = (struct pvmhostinfo *) 
                    calloc (MAX_TASKS, sizeof(struct pvmhostinfo));

    FILE *fp;
    char outfile_name[100];

    char *codes[NOTIFY_NUM] = {"HostDelete", "HostSuspend", 
            "HostResume", "TaskExit", "HostAdd"};
    
    int count;   /* the number of times that while loops */
    int round_val;
    int correct = 0;
    int wrong = 0;

    mypid = getpid();

    sprintf(outfile_name, "out_sum.%d", mypid);
    fp = fopen(outfile_name, "w"); 

    /* redirect all children tasks' stdout to fp */
    pvm_catchout(stderr);  

    if (pvm_parent() == PvmNoParent){
        fprintf(fp, "I have no parent!\n");
        fflush(fp);
    }

    /* will add <n> numbers 0..(n-1) */
    fprintf(fp, "How many numbers? ");
    fflush(fp);
    scanf("%d", &n);
    fprintf(fp, "%d\n", n);
    fflush(fp);

    /* will spawn ntasks workers to perform addition */
    fprintf(fp, "How many tasks? ");
    fflush(fp);
    scanf("%d", &ntasks);
    fprintf(fp, "%d\n\n", ntasks);
    fflush(fp);

    /* will iterate count loops */
    fprintf(fp, "How many loops? ");
    fflush(fp);
    scanf("%d", &count);
    fprintf(fp, "%d\n", count);
    fflush(fp);

    /* set the hosts to be in arch class 0 */
    for (i = 0; i< ntasks; i++) hosts[i] = "0";

    /* numbers to be added by each worker */
    pertask = n/ntasks;

    /* get the master's TID */
    mytid = pvm_mytid();
    fprintf(fp, "mytid = %x; mypid = %d\n", mytid, mypid);

    /* get the current configuration */
    pvm_config(&nhost, &narch, &hostp);

    fprintf(fp, "current number of hosts = %d\n", nhost);
    fflush(fp);

    /* 
     * notify request for host addition, with tag HOSTADD, 
     * no tids to monitor.  
     *
     * -1 turns the notification request on;
     * 0 turns it off;
     * a positive integer n will generate at most n 
     * notifications.
     */     
    pvm_notify(PvmHostAdd, HOSTADD, -1, NULL);

    /* add more hosts - no specific machine named */
    i = ntasks - nhost;
    if (i > 0) {
        status = pvm_addhosts(hosts, i , infos);
      
        fprintf(fp, "master: addhost status = %d\n", status);
        fflush(fp);
    }
     
    /* if not enough hosts, loop and call pvm_addhosts */
    for (i = nhost; i < ntasks; i++) {
        /* receive notification from anyone, with HostAdd tag */
        buf_id = pvm_recv(-1, HOSTADD);

        if (buf_id < 0) {
           fprintf(fp, "Error with buf_id = %d\n", buf_id);
        } else {
           fprintf(fp, "Success with buf_id = %d\n", buf_id);
        }

        pvm_bufinfo(buf_id, &msg_len , &msg_tag, &msg_src);
        if (msg_tag==HOSTADD) {
            pvm_upkint(&int_val, 1, 1);

            fprintf(fp, "Received HOSTADD: ");
            fprintf(fp, "Host %x added from %x\n", int_val, msg_src);
           fflush(fp);
        } else {
           fprintf(fp, "Received unexpected message with tag: %d\n", msg_tag);
        }
    }

    /* get current configuration */
    pvm_config(&nhost, &narch, &hostp);

    /* notify all exceptional conditions about the hosts*/
    status = pvm_notify(PvmHostDelete, HOSTDELETE, ntasks, deltid);
    fprintf(fp, "Notify status for delete = %d\n", status);
     
    status = pvm_notify(PvmHostSuspend, HOSTSUSPEND, ntasks, deltid);
    fprintf(fp, "Notify status for suspend = %d\n", status);
     
    status = pvm_notify(PvmHostResume, HOSTRESUME, ntasks, deltid);
    fprintf(fp, "Notify status for resume = %d\n", status);

    /* spawn <ntasks> */
    for (i = 0; i < ntasks ; i++) {
        /* spawn the i-th task, with notifications. */
        spawn_a_worker(i, tid, host_tid, fp);
    }

    /* add the result <count> times */
    while (count > 0) {
        /* 
         * if array length was not perfectly divisible by ntasks, 
         *    some numbers are remaining. Add these yourself 
         */
        result = 0;
        for ( i = ntasks * pertask ; i < n ; i++)
           result += i;
     
        /* initialize the sum array with -1 */
        for (i = 0; i< ntasks; i++) 
            sum[i] = -1;
 
        /* send array partitions to each task */
        for (i = 0; i < ntasks ; i++) {
           send_data_to_worker(i, tid, num, pertask, fp, count);
        }

        /* 
        * Wait for results.  If a task exited without 
        * sending back the result, start another task to do
        * its job. 
        */
        for (i = 0; i< ntasks; ) {   
            buf_id = pvm_recv(-1, -1);
            pvm_bufinfo(buf_id, &msg_len , &msg_tag, &msg_src);
            fprintf(fp, "Receive: task %x returns mesg tag %d, ``
                ``buf_id = %d\n", msg_src, msg_tag, buf_id);
            fflush(fp);
           
            /* is a result returned by a worker */
            if(msg_tag == RESULT_TAG)  {
                int j;
                
                pvm_upkint(&round_val, 1, 1);
                fprintf(fp, "  round_val = %d\n", round_val);
                fflush(fp);
             
                if (round_val != count) continue;

                pvm_upkint(&int_val, 1, 1);
                for (j=0; (j<ntasks) && (tid[j] != msg_src); j++)
                    ;
                fprintf(fp, "  Data from task %d, tid = %x : %d\n", 
                    j, msg_src, int_val);
                fflush(fp);
                
                if (sum[j] == -1) {
                    sum[j] = int_val; /* store the sum */
                    i++;
                }
           } else if (msg_tag == TASKEXIT) {
                /* A task has exited. */
                /* Find out which task has exited. */ 
                int which_tid, j;          
                pvm_upkint(&which_tid, 1, 1);
                for (j=0; (j<ntasks) && (tid[j] != which_tid); j++)
                 ;
                fprintf(fp, "  from tid %x : task %d, tid =  %x, ``
                    ``exited.\n", 
                    msg_src, j, which_tid);
                fflush(fp);
                /* 
                 * If a task exited before sending back the message,
                 * create another task to do the same job.
                 */
                if (j < ntasks && sum[j] == -1) {
                     /* spawn the j-th task */
                     spawn_a_worker(j, tid, host_tid, fp);
                     
                     /* send unfinished work to the new task */
                     send_data_to_worker(j, tid, num, pertask, fp, count);
                }
            } else if (msg_tag == HOSTDELETE) {
                /* 
                * If a host has been deleted, check to see if 
                * the tasks running on it has been finished.  
                * If not, should create  new worker tasks to do 
                * the work on some other  hosts.
                */
                int which_tid, j;
                    
                /* get which host has been suspended/deleted */
                pvm_upkint(&which_tid, 1, 1);
                    
                fprintf(fp, "  from tid %x : %x %s\n", msg_src, which_tid, 
                    codes[msg_tag - HOSTDELETE]);
                fflush(fp);
                    
                /* 
                 * If the task on that host has not finished its
                 * work, then create new task to do the work.
                 */
                for (j = 0; j < ntasks; j++) {
                     if (host_tid[j] == which_tid && sum[j] == -1) {
                          fprintf(fp, "host_tid[%d] = %x, ``
                            ``need new task\n",
                              j, host_tid[j]);
                          fflush(fp);
                          
                          /* spawn the i-th task, with notifications. */
                          spawn_a_worker(j, tid, host_tid, fp);
                          
                          /* send the unfinished work to the new task */
                          send_data_to_worker(j,tid,num,pertask,fp,count);
                     }
                }
            } else {
                /* print out some other notifications or messages */
                int which_tid;
                pvm_upkint(&which_tid, 1, 1);
        
                fprintf(fp, "  from tid %x : %x %s\n", msg_src,
                        which_tid,   codes[msg_tag - HOSTDELETE]);
                fflush(fp);
            }
        }        
      
        /* add up the sum */
        for (i=0; i<ntasks; i++)
           result += sum[i];
          
        fprintf(fp, "Sum from  0 to %d is %d\n", n-1 , result);
        fflush(fp);
          
        /* check correctness */
        if (result == (n-1)*n/2) {
           correct++;
           fprintf(fp, "*** Result Correct! ***\n");
        } else {
           wrong++;
           fprintf(fp, "*** Result WRONG! ***\n");
        }

        fflush(fp);
        count--;
    }
     
     fprintf(fp, "correct = %d; wrong = %d\n", correct, wrong);
     fflush(fp);

     pvm_exit();
     exit(0);
}

  
2.9.6 Sample PVM submit file

Like submitting jobs in any other universe, to submit a PVM job, the user needs to specify the requirements and options in the submit-desciption file and run condor_submit. Figure 2.9.6 on page [*] is an example of a submit-description file for a PVM job. This job has a master PVM program called master_pvm.


  
Figure 2.2: A sample submit file for PVM jobs.
\begin{figure}\begin{tex2html_preform}\begin{verbatim} ...

In this sample submit file, the command universe = PVM specifies that the jobs should be submitted into PVM universe.

The command executable = master_pvm tells Condor that the PVM master program is master_sum. This program will be started on the submitting machine. The workers should be spawned by this master program during execution.

This submit file also tells Condor that the PVM virtual machine is consisted of two different classes of machine architectures. Class 0 contains machines with INTEL architecture running SOLARIS251; class 1 contains machines with SUN4x (SPARC) architecture running SOLARIS251.

By using machine_count = <min>..<max>, the submit file tells Condor that before the PVM program, there should be at least <min> number of machines of the current class. It also asks Condor to give it as many as <max> machines. During the execution of the program, the application can get more machines of each of the class by calling pvm_addhosts() with a string specifying the desired architecture class. (See the sample program in this section for details.)

The queue command should be inserted after the specifications of each class.


next up previous contents
Next: 2.10 More about how Up: 2. Users' Manual Previous: 2.8 Priorities in Condor
condor-admin@cs.wisc.edu