MGH Biostatistics



MGH logoHMS logo

The Biopara

Parallel System

 

R logoMatlab logo

 

Author: Peter Lazar

 

 

 

 

 

 

 


 




Introduction



 

The power of the R statistical package and its ability to easily process and format huge datasets have long had users wishing for more computing power for this application. The proliferation of parallel computation has led to the creation of many packages to allow users on a budget to set up and maintain a cluster of heterogeneous machines that challenge the computing power of even traditional supercomputers. Parallel computing is thus a viable option for users seeking to improve their statistical computing power. Alas, not all applications are set up to handle parallel computation.  Among these inherent single processor applications are the very popular and powerful statistical package of R. With this in mind, we designed a message passing system that allows R to work as a parallel application for coarsely parallel computations. The system is called “Biopara” and is written entirely in R native code and makes calls to utilities that are present on all modern machines: java and ssh. This makes the system fully portable and cross platform for easy deployment in foreign environments. The system is capable of maintaining user environments across the entire cluster. The environments are different for each user and will not overlap or over-write each other. The environments will persist across system reboots and node failures. The entire system is failure-resistant and capable of detecting and restarting stalled jobs due to worker node failure or excessive system load. In this, it is inherently load balancing. The biopara system is an excellent option for users wishing to expand their statistical computing power using cluster computation.


 



Overall Design



 

We have experimented with the traditional methods of parallelization. These methods usually involve an inactive ring of workers that are woken up and passed jobs to execute on demand. These workers execute (usually based on a rsh or ssh command) a single command and then pass the data out to the monitor who then passes it back to the calling client for reassembly. The trouble with such a setup is that each invocation requires R to be launched then torn down. The overhead becomes readily apparent when thousands of calls are placed and becomes overwhelming and outstrips and speedups from parallelization. With this in mind, we have elected a system where the worker nodes stay awake. Clients use file passing to send a request along with their current environment to invoke worker ring activity through a central orchestrating server that handles all environment manipulation. Workers use sockets (R socketconnections) to communicate results back to clients. R’s dump utility creates text representations of R objects which pass easily as text over connections. This allows arbitrary structures to be passed around between different instances of R. The parse(text=variable) facility of R allow one to simply pack up function calls as strings and send them across the network to be evaluated remotely. Function output and even error can then be packed up just as easily and sent back to the client. Most of the activity, hence, actually takes place entirely in memory.

Another major improvement is the use of existing systems to implement the machinery of execution. Most parallel applications require the installation or existence of one of the parallel management suites. This is unnecessary and forces the user to install and tinker with other application suites. These suites then run in the background and hog cpu cycles and perhaps open security holes and exploits. We use java and ssh, packages which already exist on practically every system in circulation these days.

Most of the existing parallel systems do not deal with the issue of user environments and multi-user applications. We take great trouble to make sure each user is isolated from every other user on the system. Variables and functions exist only in each specific user’s environment. These environments are managed by a central server.

Most parallel systems do not deal with the issue of node failure for longer computations. It is very important for a computation that takes days or even hours to complete. If a network or local node failure occurs, the entire task could be lost. We programmed the master to maintain records of each run and keep track of dynamic runtime for each job that it has passed to a worker. The first return is used as a litmus test of the general runtime of the job. Any node that exceeds this by a certain interval is counted as lost or down and the task is rerouted to a responsive node. Hence, we have inherent load-balancing.

            We believe that we have built a parallel system that addresses many of the issues that most current parallel systems only scratch the surface of.

 

 



V. Using Parallel Matlab

 



Matlab logo


 

The biostatistics cluster maintains a ring of 20 matlab servers running Mathworks’ DML parallel system. These servers have a rundml frontend monitor that watches the shared directory "/home/source/matlab" and look for filenames of the form "pararelayusername.mat" and will output output files of the form "parroutrelayusername.mat". None of these files should remain after a successful run.

The following examples assumes use of the biopara system directly from the cluster in a terminal. Principles for remote usage are nearly identical and require the first argument to reflect you relative path to the shared directory ‘/home/source/matlab’ on rescluster2.mgh.Harvard.edu. This directory is on a samba share but it requires firewall validation of your local machine’s IP address. You can type “net use w:/ \\rescluster2\matlab” to map w: drive to the share. 

Here is how to get started:

1. Log in to the cluster using ssh (if it asks for an ssh "key", ignore it and hit enter)

ssh -l username rescluster2.mgh.harvard.edu

 

2. Go to /home/source/matlab and create a personal directory to work from. "mkdir" a directory for yourself and make sure that the permissions are world readable and writeable:

chmod 777 yourdirectory

 

Throughout your use of matlab, you will make function files and data files for the cluster to process. For the sake of avoiding collisions, please separate your work into your personal directory. A caveat of the system is that the servers run as another user so any file that your wish to submit to the parallel system for processing or data needs to have it's permissions set to be world readable:

chmod 777 yourfile

 

3. Start matlab

/opt/matlab_r14/bin/matlab -nodisplay

 

If you are running from the cluster, use Matlab in command line mode with the nodisplay command. It is extremely slow otherwise to the point of being unusable.

4. Make sure that "/home/source/matlab" is on the path and add your personal directory as well

addpath('/home/source/matlab')
addpath('/home/source/matlab/yourdirectory')

 

5. Run dml!

rundml is transparent in its operation. One uses rundml as if it were simply another matlab function. The overall syntax of the command is:

x=rundml('/home/source/matlab' , nruns,{'myfunc(a,b)'});

 

This will run the function myfunc(a,b), 'nruns' number of times, initializing the statement 'initialize' to set some environment variables. The system will check in '/working/directory' for files and launch in 'mode' mode. Here's what that all means:

Working Directory = Your path to the shared medium

This is either ‘/home/source/matlab’ for use directly from the cluster or the relative path to the \\rescluster2\matlab share for remote usage.

nruns = Number of runs

The number of times to run your function (this is ignored in multi-parameter mode, more on this below). Each run will happen on a different node up to 22 runs. After that, the nodes will qeue up multiple jobs and the entire task will execute in batches of 22. If you have a task that takes x seconds and you do y runs of it, then you will probably finish in (y/20)*x seconds plus some tiny overhead.

Function = Cell array containing strings

This is where you specify the function to execute. This takes a cell array of strings containing function calls. This is what a call to myfunc(a,b,c) would look like:

{'myfunc1(a,b,c)'}

 

Each call must be encased in quotes and the entire string encased in {}'s to make it a cell array. If there is more than a single semicolon separated string inside the cell array then the system will ignore the nruns variable and perform one run for each string inside the cell array. This is useful for passing different parameters to the same function. Technically, you could have arbitrary number of different functions in here and each will trigger 1 run.

{'myfunc1(a,b,c)';'myfunc1(b,c,a)'; 'myotherfunc(b,''g'')'}

 

This will trigger 3 runs. Note the use of double single quotes in myotherfunc's second argument. Since the items inside the cell array are strings already, you will need to double single quote every quotation mark that would ordinarily go inside your function call.

Longer data structures will not be printed to screen but rather displayed in a shorter notation such as [1x38 char] for an error message.

Alternate special operations for setup and monitoring:

rundml accepts two special functions: numservers, reset

'reset' : This will clear out your global environment for your username on all nodes and will empty the repository on the head cluster of your environment files

'numservers' : This will return the number of idle workers. Useful for seeing the load on the dml system.

You must enclose serv host commands in {}'s as well if it were a function name. The following is an example:


rundml(‘/home/source/matlab',0, {'reset'}) <-Resets all nodes

 



Using the Biopara R Parallel System

 



R logo


 

I have a ring of 22 R-workers running on the cluster. These servers poll the directory "/home/source/R". Here is a quick tutorial on how to use the R system.

Launch R:

/opt/R/bin/R

 

You will need to source the biopara definition to use it:

source("/home/source/R/biopara.r")

 

This will have it defined in your environment. Biopara has the following syntax overall syntax:

biopara(targetmachine, localmachine , numruns , list("yourfunction(a,b,c)")

 

This will contact host “targetmachine”, identifying yourself as “localmachine”, performing “numruns” copies of “yourfunction(a,b,c)”

targetmachine=list(“hostname”,host-port,”username-on-remote-system”)

 

The target machine is the host running the biopara master. For us, this will be “rescluster2.mgh.harvard.edu”,38000. The hostname is the hostname of this master machine and the host-port is the port that biopara master was started with. The third argument of a username is optional. It is used with the ssh validation system. The client will attempt to connect to the master and log in to validate user authenticity using the current username on the client machine. If the username on the master computer is different, the third option should be used. The ssh session will ask for a password. The ssh session is used to write a single file: ~biopara/biopara/local-machine-name.allow where local-machine-name is the name of the host specified in the second argument to biopara. 

localmachine=list(“local-hostname”,local-port)

Local-hostname is the fully qualified hostname or ip address of the local machine given as a string. The local port is any free port on the local machine and will be automatically created when the client connects to the master. If the local machine name given is wrong, the master will not return the connection and the client will not continue.

Numruns = The number of runs

The number of times to run the function call. If your list contains more than one item, numruns is ignored and 1 run will execute for each item in the list. This is called multi-parameter mode and is described below.

Function = A list of strings containing function calls

This is a list of strings containing function calls. You must include even a single function call as a list for compatibility with the multi system. Usually these are of the form

list("myfunc(1,2,3)")

 

This will generate "numruns" executions of myfunc. If there is a single item in the list, the system will generate numruns copies or the function call and perform each one on one of the worker nodes. If there is more than one comma separated string inside the list, then biopara will run in multiparamter mode and generate a single run for each string in the list. Here is an example:

list("myfunc(1,2,3)","myfunc(3,2,1)", "myotherfunc(4,5)")

 

This will ignore the numruns and generate 3 runs. This is useful for running a function with different variables or a number of different functions.

A typical run would return a list, one item per run and this item will contain whatever the output of your function would be. If your function returns a list, then you will have a list of lists. Here are some examples of biopara operation.

y <- biopara(list("rescluster2.mgh.harvard.edu”,38000), list(“132.183.156.114”,39000), 22 ,list("doit(3)"))

 

This defines doit and makes 22 runs of it, putting 22 results into y.

For functions that return more than one output, the output will preserve all of the subelements, even though outputting the answer list will usually display only the top element of each item. You may unpack the answer to gain access to the underlying elements by indexing using double square brackets [[]]. To unpack the 3rd item from an answer list "out":

myitem <- out[[3]];

Myitem will then contain the full object and you may reference any element contained within.

Alternate special operations for setup and monitoring:

 

There are a few special function names that will issue system commands to the workers. These need to be invoked singly as the client will ignore them as part of a function chain. Here are the commands:

"last" : This will display the usage history of the system.

"hosts" : This will reply with a list of socket descriptions to the worker ring.

"numservers" : This will return the number of responding workers

"reset" : This will clear your environment on all worker nodes.

"setenv" : This will take the user’s desktop environment and broadcast it to all of the workers. For finer control, one can replace the numruns variable with a list containing strings describing the variables to submit to the workers. A 10 million-item array of 8 digit numbers is 38.1 megs for comparison.

Here are some examples of these special commands as sent from host 132.183.156.114 and their results:

biopara(list("rescluster2.mgh.harvard.edu”,38000), list(“132.183.156.114”,39000), 22 ,list("reset”) )<- Resets your environment 
biopara(list("rescluster2.mgh.harvard.edu”,38000), list(“132.183.156.114”,39000), 22 ,list("setenv”) )<-Transmits your environment biopara(list("rescluster2.mgh.harvard.edu”,38000), list(“132.183.156.114”,39000),list(“a”,”b”) ,list("setenv”) )<- Transmits only a and b  biopara(list("rescluster2.mgh.harvard.edu”,38000), list(“132.183.156.114”,39000), 22 ,list("last”) )<- Returns history
biopara(list("rescluster2.mgh.harvard.edu”,38000), list(“132.183.156.114”,39000), 22 ,list("hosts”) )<- Names all sockets in the connection

A warning! If you are tweaking a function and running it in parallel, you might notice that it seems to be returning data from a previous definition of your function. This is because of the caching behavior of R. Since each server is simply a running instance of R, it will not recheck the definition of a function each time it runs. You should perform a reset and a setenv pair if you redefine a function.

 



Compiled Personal Matlab Servers

Matlab logo



 

There is another alternate system available that allows a user to create a private ring of servers tailored to a specific function. The added advantage of this is that all of the code to be run is compiled with the matlab compiler into c++ native code. This eliminates the need for licenses for all of the 22 servers and will speed up execution of code that responds well to compilation. A user must use a create function, followed by an arbitrary number of runs. Finally, the user must use the destroy function or else the server ring will run indefinitely. The function is called cmpserve and facilitates both the client and server behavior of the compiled system. The syntax is the following:

cmpserve(mode, functionname, data)

 

Where functionname is a string containing the functionname. Mode is one of the following:

create: This will find your function and wrap a file passing server around it then compile the resulting m file and create a ring of servers

debug: This will only generate the server's m-file as usrname-function-cmpserver.m that can be used for debugging.

destroy: This tears down the ring and deletes all files. This must be done manually! Otherwise, the ring will remain up indefinitely.

reset: This clear all the existing environments and resets the servers

relaunch: Makes sure all of the servers are running

check: Prints out visual warnings if any nodes are detected down

checkcount: Prints out number of nodes that are running

checkrepair: Checks to make sure that all servers are running and will issue a command to relaunch all servers that did not respond.

setenv: Setenv function as before in biopara except you need to save the variables to be used into an intermediary file and then pass the intermediary filename as a string in the data field.

cmpserve('setenv', 'permtest', '/home/source/matlab/in.mat')

run: To run a function, you must pass in a cell array of cell arrays into the data field. Each "row" of the cell array should contain as many elements as your function has arguments. The number of rows determines the number of runs. For a function that takes 3 args, you would have a 3x (number of desired runs) cell array. The arguments are, of course, respective.

customcreate/customdebug: To enable access of data structure for your function calls and to vary the output count, you can do a custom build as follows. I have reserved the two special characters @ and #, @ for output and # for input. A normal create would generate a function that only takes standard inputs and outputs exactly as the function returns for nargin and nargout. A custom build of the function permtest requesting three outputs and accessing a matrix 't' and 2 other arguments would be as follows:

cmpserve('customcreate', 'permtest', '[@,@,@]=permtest(t(#),#,#)')

The server will parse [@,@,@]=permtest(t(#),#,#) and assign

[x1,x2,x3] = permest(t(argsin{1}),argsin{2},argsin{3})

Note that argsin{1} is now an index into your array t.

 

Caution!

Try/catch blocks, as far as I can tell, simply dont work. If you dont setenv and define 't' in the above example, the entire ring will crash silently with a runtime error on each worker node. It will give no indication of having crashed, your function will simply never return. A good idea is to ssh to one of the compute nodes and run the executable (which can be found as username-functionname-cmpserver on /home/source/matlab) and see if any runtime errors happen.

There is also an issue with the flexlm license manager. If a user compiles a matlab function, the license manager will keep the compiler assigned to that user for 10-15 minutes. If you encounter an error saying that the compiler is out of licenses, simply try back in 10 minutes.

The compiler is very finicky about the functions it will accept for compilation. All output in terms of compiler error is printed directly to the screen as the function is being compiled. I recommend a destroy after an unsuccessful compilation. Functions to avoid are: eval, random, normrand, movefile, addpath, system, copyfile and fileattrib.

 



Questions?



 

Please contact Peter Lazar at plazar@amber.mgh.harvard.edu if you have any questions or problems using the biopara system.