locked
Submitting "Embarrassingly Parallel Tasks" RRS feed

  • Question

  • I am trying to figure out how to make script to submit a job with multiple tasks to the HPC scheduler.  Running this from the command line is very desirable.  These are “embarrassingly parallel tasks”.  Here is my setup.

    main directory (with the following subdirectories)

                    Analysis – directory that contains exe’s, dll’s, and files used for all tasks (bunch of files).

                    Tasks – directory that contains a hundreds to thousands of tasks (*.cmd or *.exe files)

    Ideally the scheduler would make a unique directory on each node for a task and copy the Analysis directory to that node’s unique directory (for efficiency).  Subsequent tasks could reuse the directory.  After the job is done, the job scheduler would delete the directory when finished.  There are many files in the Analysis directory.

    I could make Analysis a working directory but running multiple tasks from that directory would not work since each task would produce temporary files that are named the same for all tasks but are specific to each task.  Essentially, the tasks would try to run exe’s that output files with identical names.

    We are using the HPC scheduler with Matlab software to make a grid for Matlab tasks.  This works great but we don’t always need to run Matlab scripts.  I am trying to get a job submission script setup to work for non Matlab jobs (outside of Matlab due to Mathworks costs).

    I have been able to submit a job with a single task.  It seems getting a job with multiple tasks would be straight forward.

    job submit /scheduler:xxx-xxx.com /workdir:\\ZTURA0XXXX\D$\Work\X\y\Analysis \\ZTURA0XXXX\D$\Work\X\y\Job\Test.cmd

    Obviously, this is only a start.  I read something about mpiexec but I am stumbling around terribly.  I also notice some C# code in an example.  That would be a nice path to use also but I am so uninformed regarding running code on the HPC Scheduler that I really don’t know the correct path to determine what to do.

    Thanks for any direction or help.


    • Edited by conrarn Thursday, September 20, 2012 6:11 PM
    Wednesday, September 19, 2012 9:05 PM

Answers

  • I just now got back on this forum and noticed this thread. I did find some time to look into this and wrote this application to submit jobs the scheduler.

    //*************************************************************************************************************************************
    //*************************************************************************************************************************************

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Hpc.Scheduler;
    using Microsoft.Hpc.Scheduler.Properties;
    using System.Reflection;
    using System.IO;

    //*************************************************************************************************************************************
    //*************************************************************************************************************************************

    //*************************************************************************************************************************************
    //*************************************************************************************************************************************
    //
    // SubmitJobs - Application is used submit jobs to an Windows 2008 R2 HPC job scheduler.
    //
    //   VERSION     Author        DESCRIPTION
    //
    //   2013.08.13  N.M.Conrardy  Original program.
    //

    namespace SubmitJobs
    {
      class Program
      {
        //*********************************************************************************************************************************
        // Fields.

        static bool   optionHelp     = false;
        static bool   optionInvalid  = false;

        static string jobScheduler   = null;
        static string jobName        = null;
        static string taskDirectory  = null;

        static string programName = Assembly.GetExecutingAssembly().GetName().Name;
        static string version     = Assembly.GetExecutingAssembly().GetName().Version.ToString();

        static ISchedulerJob    job;
        static ISchedulerTask   task;

        static ManualResetEvent jobFinishedEvent = new ManualResetEvent(false);

        static List<string> fileList = new List<string>();

        //*********************************************************************************************************************************
        // Constructors.

        //*********************************************************************************************************************************
        // Properties.

        //*********************************************************************************************************************************
        // Methods.

        //*********************************************************************************************************************************
        static void Main(string[] args)
        {
          ProcessCommandLine(args);

          if (optionInvalid)
          {
            Console.ForegroundColor = ConsoleColor.White;

            Console.WriteLine("                ");
            Console.WriteLine("INVALID OPTIONS.");
            Console.WriteLine("                ");

            Console.ResetColor();

            DisplayHelp();

          }
          else if (optionHelp)
          {
            DisplayHelp();
          }
          else
          {
            // Get the list of tasks.

            try
            {
              string [] files = Directory.GetFileSystemEntries(taskDirectory);
     
              foreach (string file in files)
              {
                fileList.Add(file);
              }
            }
           
            catch (Exception e)
            {
              Console.Error.WriteLine("Could not connect to the scheduler: {0}", e.Message);

              return; //abort if no connection could be made
            }

            // Create a scheduler object to be used to establish a connection to the scheduler on the headnode.
       
            using (IScheduler scheduler = new Scheduler())
            {
              // Connect to the scheduler.
       
              Console.WriteLine("Connecting to {0} ...", jobScheduler);
       
              try
              {
                scheduler.Connect(jobScheduler);
              }
       
              catch (Exception e)
              {
                Console.Error.WriteLine("Could not connect to the scheduler: {0}", e.Message);

                return; //abort if no connection could be made
              }
             
              // Create a job to submit to the scheduler the job will be equivalent to the CLI command: job submit /numcores:1-1 "echo hello world"
       
              job = scheduler.CreateJob();
       
              // Some of the optional job parameters to specify. If omitted, defaults are:
              //   Name = {blank}
              //   UnitType = Core
              //   Min/Max Resources = Autocalculated
              //   etc...
       
              job.Name = jobName;
              Console.WriteLine("Creating job name {0} ...", job.Name);
       
              job.UnitType = JobUnitType.Core;
       
              job.AutoCalculateMin = true;
              job.AutoCalculateMax = true;

            //job.AutoCalculateMin = false;
            //job.AutoCalculateMax = false;
            //
            //job.MinimumNumberOfCores =   1;
            //job.MaximumNumberOfCores =  16;
             
              // The commandline parameter tells the scheduler what the task should do.  CommandLine is the only mandatory parameter you must set for every task.
       
              foreach (string file in fileList)
              {
                task = job.CreateTask();

                task.Name = "NA";

                task.CommandLine = file;
       
                Console.WriteLine("Creating a task - {0}", task.CommandLine);
       
                // Add the task to the job.
       
                job.AddTask(task);
              }
       
              // Use callback to check if a job is finished
       
              job.OnJobState += new EventHandler<JobStateEventArg>(job_OnJobState);
       
              // Submit the job. You can specify your username and password in the parameters, or set them to null and you will be prompted for your credentials
       
              Console.WriteLine("Submitting job to the cluster ...");
              Console.WriteLine();
       
            //job.Priority = JobPriority.AboveNormal;

              scheduler.SubmitJob(job, null, null);
       
               Console.WriteLine("Submitted {0} tasks to the job ID = {1}.", fileList.Count, job.Id);

              // Wait for job to finish
       
            //jobFinishedEvent.WaitOne();
       
              // Close the connection
       
              scheduler.Close();
            } // Call scheduler.Dispose() to free the object when finished
          }
        }

        //*********************************************************************************************************************************
        static void job_OnJobState(object sender, JobStateEventArg e)
        {
          if (e.NewState == JobState.Finished) //the job is finished
          {
            task.Refresh(); // update the task object with updates from the scheduler

            Console.WriteLine("Job completed.");
          //Console.WriteLine("Output: " + task.Output); //print the task's output
            jobFinishedEvent.Set();
          }
          else if (e.NewState == JobState.Canceled || e.NewState == JobState.Failed)
          {
            Console.WriteLine("Job did not finish.");
            jobFinishedEvent.Set();
          }
          else if (e.NewState == JobState.Queued && e.PreviousState != JobState.Validating)
          {
            Console.WriteLine("The job is currently queued.");
            Console.WriteLine("Waiting for job to start...");
          }
        }

        //*********************************************************************************************************************************
        static void DisplayHelp()
        {
          Console.ForegroundColor = ConsoleColor.Green;
          Console.WriteLine("{0} Version {1} - NMC Tools                                                                             ", programName, version);
          Console.WriteLine("                                                                                                        ");
                                                                                                                                    
          Console.ForegroundColor = ConsoleColor.White;                                                                             
          Console.WriteLine("  SubmitJobs jobScheduler taskDirectory                                                                 ");
          Console.WriteLine("                                                                                                        ");
          Console.WriteLine("    jobScheduler  - name of the HPC job scheduler                                                       ");
          Console.WriteLine("    jobName       - name of the job                                                                     ");
          Console.WriteLine("    taskDirectory - directory of the tasks                                                              ");
          Console.WriteLine("                                                                                                        ");
          Console.WriteLine("  Example                                                                                               ");
          Console.WriteLine("                                                                                                        ");
          Console.WriteLine("    SubmitJobs xaxx-hxxx.abc.bd.ddd.aed NMC \\\\xxxx-xxxxxxxxx\\xxxxxxxxx\\nmconrardy\\Work\\Grid\\Tasks");
          Console.WriteLine("                                                                                                        ");
          Console.ResetColor();                                                                                     

          return;
        }

        //*********************************************************************************************************************************
        static void ProcessCommandLine(string[] args)
        {
          if (args.Length == 0)
          {
            optionHelp = true;
          }
          else if (args.Length == 3)
          {
            jobScheduler  = args[0];
            jobName       = args[1];
            taskDirectory = args[2];
          }
          else
          {
            optionInvalid = true;
          }

          return;
        }
      }
    }

     

    • Marked as answer by conrarn Thursday, June 5, 2014 5:24 PM
    Thursday, June 5, 2014 5:24 PM

All replies

  • I would suggest playing with the following commands:

    job new - create a job

    job add  - add tasks to job

    xcopy \\headnode\workdir\C$ C:\local_workdir  - create dir structure,cleanup etc

     or psexec.exe is very useful for remote programs/scripts execution

     or copy a script to every compute node  before you start sending your tasks and create folder structure there

    job submit - execute your tasks on compute nodes

    job clone - when job done ,resend it with new parameters (or you may use parametric tasks)

    tasklist /S computer_name - for remote process monitoring


    Daniel Drypczewski

    Thursday, September 27, 2012 3:30 AM
  • Thanks Daniel for the suggestion.  That is exactly what we used to do when we used our own home grown grid engine.  There should be a tool out there that manages this stuff much better.  Mathworks uses the following to zip up the Analysis directory, create a unique directory on each blade for each core, copy the files to the unique folder, unzip the files, run the job, and then cleans up after itself.

    mpiexec -l -genvlist MDCE_STORAGE_CONSTRUCTOR,MDCE_STORAGE_LOCATION,MDCE_JOB_LOCATION,MDCE_DEBUG,MDCE_DECODE_FUNCTION,MDCE_FORCE_MPI_OPTION,CCP_NODES,CCP_JOBID
            -hosts %CCP_NODES% "c:\Program Files\MATLAB\R2012a\bin\worker.bat"  -parallel

    I have been digging into mpiexe.exe but I am learning there are many versions.  In addition, the help for this app is not all that good.

    Thursday, September 27, 2012 8:15 PM
  • I would have included worker.bat but that is Mathworks code so I probably don't have the legal right to post it.

    Thursday, September 27, 2012 10:36 PM
  • mpiexec.exe -help2 or mpiexec.exe -help3 may give you more info on available options

    Yes, the command you have posted could be a clean up task executed after you job is done but you have to set up all the dependencies for the worker.bat script (local paths).I am thinking on something like this:

    Execute your job  -  job submit mpiexec -hosts %CCP_NODES% task.exe ->produce output and temp folders

    Clean up -  job submit mpiexec -hosts %CCP_NODES% -l -genvlist {env variables} "\\headnode\C$\Program Files\MATLAB\R2012a\bin\worker.bat"


    Daniel Drypczewski

    Friday, September 28, 2012 1:41 AM
  • I checked the helpX's and they are not much help for me.

    The script I posted is the ONLY job submitted when Mathworks runs through HPC.

    This is the Mathworks script I used to submit Matlab jobs to HPC.  I will see if I can get the gist of worker.bat to post later.

    function [job, jobManager] = SubmitJobs(jobManagerName)
      %
      % This is the main wrapper script for submitting created AP jobs to the MDCS
      %
      %  [job, jobManager] = SubmitJobs('AutopilotHPC')
      %  [job, jobManager] = SubmitJobs('local'       )
      %  [job, jobManager] = SubmitJobs('NMC_64Bit'   )
      %
    
      % Getting the jobs files
    
      dirJobFiles = dir('Jobs\*.m');
    
      if (isempty(dirJobFiles))
    
        exit('Error - no files exist in Jobs directory.');
    
      end
    
      [tmp ix] = sort(datenum({dirJobFiles.date})); % sort on date
      jobFiles = dirJobFiles(ix);
    
      % Creating the array of job files.
    
      for i = 1:length(jobFiles)
    
        fcns{i} = str2func(jobFiles(i).name(1:end-2)); % strip off .m
    
      end
    
      try
    
        jobManager = findResource('scheduler', 'configuration', jobManagerName);
    
      catch
    
        error('Could not connect to the job manager.');
    
      end
    
      job = jobManager.createJob();
    
      % Setup file dependencies for job.
    
      rdir = fileparts(mfilename('fullpath'));
    
      job.FileDependencies =        ...
      {                             ...
        fullfile(rdir, 'Analysis'), ...
        fullfile(rdir, 'Jobs'    )  ...
      };
    
      % Create a job tasks and set the debugging.
    
      for i = 1:length(fcns)
    
        task(i) = job.createTask(fcns{i},0);
    
        task(i).CaptureCommandWindowOutput = true;
    
      end
    
      % Restart worker set to true.
    
      try
    
        job.RestartWorker = true;
    
      catch
    
        fprintf('This job manager does not have the ability to restart the worker.\n');
    
      end
    
      % Submit job to job manager
    
      submit(job);
    
      return;
    


    Friday, September 28, 2012 4:00 PM
  • This is essentially what is included in worker.bat.

    FOR /F "usebackq delims=" %%I IN (`""!MATLAB_BIN_DIR!MATLAB.exe"" -query -dmlworker -noFigureWindows %*`) DO (
        %%I>NUL 2>NUL
    )
    "!MATLAB_BIN_DIR!%MATLAB_ARCH%\MATLAB.exe" -dmlworker -noFigureWindows -r "%MATLAB_FUNCTION_TO_CALL%" %1 %2 %3 %4 %5 %6 %7 %8 %9

    Monday, October 1, 2012 10:56 PM
  • Well, since I am not getting any bites on this topic, does anyone know of a software package that submit job/tasks to the HPC scheduler?
    Wednesday, October 3, 2012 10:33 PM
  • Final try ... does anyone know of a software script/code/package that allows one to submit jobs eloquently?  I hate to spend the time creating this when there is most likely something already available to use.
    Friday, October 26, 2012 4:56 PM
  • I just now got back on this forum and noticed this thread. I did find some time to look into this and wrote this application to submit jobs the scheduler.

    //*************************************************************************************************************************************
    //*************************************************************************************************************************************

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Hpc.Scheduler;
    using Microsoft.Hpc.Scheduler.Properties;
    using System.Reflection;
    using System.IO;

    //*************************************************************************************************************************************
    //*************************************************************************************************************************************

    //*************************************************************************************************************************************
    //*************************************************************************************************************************************
    //
    // SubmitJobs - Application is used submit jobs to an Windows 2008 R2 HPC job scheduler.
    //
    //   VERSION     Author        DESCRIPTION
    //
    //   2013.08.13  N.M.Conrardy  Original program.
    //

    namespace SubmitJobs
    {
      class Program
      {
        //*********************************************************************************************************************************
        // Fields.

        static bool   optionHelp     = false;
        static bool   optionInvalid  = false;

        static string jobScheduler   = null;
        static string jobName        = null;
        static string taskDirectory  = null;

        static string programName = Assembly.GetExecutingAssembly().GetName().Name;
        static string version     = Assembly.GetExecutingAssembly().GetName().Version.ToString();

        static ISchedulerJob    job;
        static ISchedulerTask   task;

        static ManualResetEvent jobFinishedEvent = new ManualResetEvent(false);

        static List<string> fileList = new List<string>();

        //*********************************************************************************************************************************
        // Constructors.

        //*********************************************************************************************************************************
        // Properties.

        //*********************************************************************************************************************************
        // Methods.

        //*********************************************************************************************************************************
        static void Main(string[] args)
        {
          ProcessCommandLine(args);

          if (optionInvalid)
          {
            Console.ForegroundColor = ConsoleColor.White;

            Console.WriteLine("                ");
            Console.WriteLine("INVALID OPTIONS.");
            Console.WriteLine("                ");

            Console.ResetColor();

            DisplayHelp();

          }
          else if (optionHelp)
          {
            DisplayHelp();
          }
          else
          {
            // Get the list of tasks.

            try
            {
              string [] files = Directory.GetFileSystemEntries(taskDirectory);
     
              foreach (string file in files)
              {
                fileList.Add(file);
              }
            }
           
            catch (Exception e)
            {
              Console.Error.WriteLine("Could not connect to the scheduler: {0}", e.Message);

              return; //abort if no connection could be made
            }

            // Create a scheduler object to be used to establish a connection to the scheduler on the headnode.
       
            using (IScheduler scheduler = new Scheduler())
            {
              // Connect to the scheduler.
       
              Console.WriteLine("Connecting to {0} ...", jobScheduler);
       
              try
              {
                scheduler.Connect(jobScheduler);
              }
       
              catch (Exception e)
              {
                Console.Error.WriteLine("Could not connect to the scheduler: {0}", e.Message);

                return; //abort if no connection could be made
              }
             
              // Create a job to submit to the scheduler the job will be equivalent to the CLI command: job submit /numcores:1-1 "echo hello world"
       
              job = scheduler.CreateJob();
       
              // Some of the optional job parameters to specify. If omitted, defaults are:
              //   Name = {blank}
              //   UnitType = Core
              //   Min/Max Resources = Autocalculated
              //   etc...
       
              job.Name = jobName;
              Console.WriteLine("Creating job name {0} ...", job.Name);
       
              job.UnitType = JobUnitType.Core;
       
              job.AutoCalculateMin = true;
              job.AutoCalculateMax = true;

            //job.AutoCalculateMin = false;
            //job.AutoCalculateMax = false;
            //
            //job.MinimumNumberOfCores =   1;
            //job.MaximumNumberOfCores =  16;
             
              // The commandline parameter tells the scheduler what the task should do.  CommandLine is the only mandatory parameter you must set for every task.
       
              foreach (string file in fileList)
              {
                task = job.CreateTask();

                task.Name = "NA";

                task.CommandLine = file;
       
                Console.WriteLine("Creating a task - {0}", task.CommandLine);
       
                // Add the task to the job.
       
                job.AddTask(task);
              }
       
              // Use callback to check if a job is finished
       
              job.OnJobState += new EventHandler<JobStateEventArg>(job_OnJobState);
       
              // Submit the job. You can specify your username and password in the parameters, or set them to null and you will be prompted for your credentials
       
              Console.WriteLine("Submitting job to the cluster ...");
              Console.WriteLine();
       
            //job.Priority = JobPriority.AboveNormal;

              scheduler.SubmitJob(job, null, null);
       
               Console.WriteLine("Submitted {0} tasks to the job ID = {1}.", fileList.Count, job.Id);

              // Wait for job to finish
       
            //jobFinishedEvent.WaitOne();
       
              // Close the connection
       
              scheduler.Close();
            } // Call scheduler.Dispose() to free the object when finished
          }
        }

        //*********************************************************************************************************************************
        static void job_OnJobState(object sender, JobStateEventArg e)
        {
          if (e.NewState == JobState.Finished) //the job is finished
          {
            task.Refresh(); // update the task object with updates from the scheduler

            Console.WriteLine("Job completed.");
          //Console.WriteLine("Output: " + task.Output); //print the task's output
            jobFinishedEvent.Set();
          }
          else if (e.NewState == JobState.Canceled || e.NewState == JobState.Failed)
          {
            Console.WriteLine("Job did not finish.");
            jobFinishedEvent.Set();
          }
          else if (e.NewState == JobState.Queued && e.PreviousState != JobState.Validating)
          {
            Console.WriteLine("The job is currently queued.");
            Console.WriteLine("Waiting for job to start...");
          }
        }

        //*********************************************************************************************************************************
        static void DisplayHelp()
        {
          Console.ForegroundColor = ConsoleColor.Green;
          Console.WriteLine("{0} Version {1} - NMC Tools                                                                             ", programName, version);
          Console.WriteLine("                                                                                                        ");
                                                                                                                                    
          Console.ForegroundColor = ConsoleColor.White;                                                                             
          Console.WriteLine("  SubmitJobs jobScheduler taskDirectory                                                                 ");
          Console.WriteLine("                                                                                                        ");
          Console.WriteLine("    jobScheduler  - name of the HPC job scheduler                                                       ");
          Console.WriteLine("    jobName       - name of the job                                                                     ");
          Console.WriteLine("    taskDirectory - directory of the tasks                                                              ");
          Console.WriteLine("                                                                                                        ");
          Console.WriteLine("  Example                                                                                               ");
          Console.WriteLine("                                                                                                        ");
          Console.WriteLine("    SubmitJobs xaxx-hxxx.abc.bd.ddd.aed NMC \\\\xxxx-xxxxxxxxx\\xxxxxxxxx\\nmconrardy\\Work\\Grid\\Tasks");
          Console.WriteLine("                                                                                                        ");
          Console.ResetColor();                                                                                     

          return;
        }

        //*********************************************************************************************************************************
        static void ProcessCommandLine(string[] args)
        {
          if (args.Length == 0)
          {
            optionHelp = true;
          }
          else if (args.Length == 3)
          {
            jobScheduler  = args[0];
            jobName       = args[1];
            taskDirectory = args[2];
          }
          else
          {
            optionInvalid = true;
          }

          return;
        }
      }
    }

     

    • Marked as answer by conrarn Thursday, June 5, 2014 5:24 PM
    Thursday, June 5, 2014 5:24 PM