package noodle.sky.xgrid; import java.io.File; import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; /** * Run a job with the xgrid command-line client. Here is an example: *
*
* XGridJob job = new XGridJob("host.domain", "script.sh");
* job.addFile(new File("script.sh"));
* job.submit();
* while (!job.resultsAvailable()) {
* Thread.sleep(500);
* }
* String results = job.getResults();
* job.dispose();
*
*
*
* @author tom riley
*/
public class XGridJob
{
private String command;
private String host;
private File tmpDir;
private Pattern idPattern = Pattern.compile("^\\D+(\\d+)\\D+\n$");
private String id;
/**
* Construct a new instance of XGridJob.
*
* @param host controller host
* @param command command to run on the grid
* @throws IOException if something goes wrong
*/
public XGridJob(String host, String command) throws IOException {
this.command = command;
this.host = host;
if (command == null)
throw new NullPointerException("command");
if (host == null)
throw new NullPointerException("host");
tmpDir = createTempDir();
}
/**
* Add a file to be passed with the job.
*
* @param fname file name
* @param contents files contents
* @throws IOException if an IO error occurs
*/
public void addFile(String fname, String contents) throws IOException {
FileUtils.writeStringToFile(new File(tmpDir, fname), contents, null);
}
/**
* Add a file to be passed with the job.
*
* @param file file to send
* @throws IOException if an IO error occurs
*/
public void addFile(File file) throws IOException {
FileUtils.copyFile(file, new File(tmpDir, file.getName()));
}
/**
* Submit the job. Returns immediately.
*
* @throws IOException if an IO error occurs
*/
public void submit() throws IOException {
String out = process(tmpDir, new String[]{"xgrid", "-h", host, "-job", "submit", "-in", tmpDir.getAbsolutePath(), command});
Matcher m = idPattern.matcher(out);
if (m.matches()) {
id = m.group(1);
}
}
/**
* Find out whether results are available. Poll this function to
* know when to call {@link #getResults()}.
*
* @return true if results available, false if not
* @throws IOException if an IO error occurs
*/
public boolean resultsAvailable() throws IOException {
String attributes = process(tmpDir, new String[]{"xgrid", "-h", host, "-job", "attributes", "-id", id});
return (attributes.indexOf("jobStatus = Finished") >= 0);
}
/**
* Get the job output.
*
* @return Text written to STDOUT by job.
* @throws IOException if an IO error occurs
*/
public String getResults() throws IOException {
String results = process(tmpDir, new String[]{"xgrid", "-h", host, "-job", "results", "-id", id});
return results;
}
/**
* Delete the job from the controller. Call after getting results.
*
* @throws IOException if an IO error occurs
*/
public void dispose() throws IOException {
try {
process(tmpDir, new String[]{"xgrid", "-h", host, "-job", "delete", "-id", id});
} finally {
if (tmpDir != null) {
FileUtils.deleteDirectory(tmpDir);
}
}
}
private File createTempDir() throws IOException {
File tempDir = File.createTempFile("xgridclient", "");
tempDir.delete();
if (!tempDir.mkdir()) {
throw new RuntimeException("Failed to create temporary directory " + tempDir.getAbsolutePath());
}
return tempDir;
}
private String process(File workingDir, String args[]) throws IOException {
ProcessBuilder pb = new ProcessBuilder(args);
if (workingDir != null)
pb.directory(workingDir);
Process p = pb.start();
String out = IOUtils.toString(p.getInputStream());
String err = IOUtils.toString(p.getErrorStream());
try {
int exitCode = p.waitFor();
if (exitCode != 0) {
throw new RuntimeException(StringUtils.join(args, ' ') + " returned " + exitCode);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return out;
}
}