Clover coverage report - PLT Utilities Test Coverage (plt-20120304-r5436)
Coverage timestamp: Sat Mar 3 2012 22:01:56 CST
file stats: LOC: 393   Methods: 26
NCLOC: 262   Classes: 12
 
 Source file Conditionals Statements Methods TOTAL
ProcessIncrementalTaskController.java 60% 68.2% 65.4% 66.7%
coverage coverage
 1    /*BEGIN_COPYRIGHT_BLOCK*
 2   
 3    PLT Utilities BSD License
 4   
 5    Copyright (c) 2007-2010 JavaPLT group at Rice University
 6    All rights reserved.
 7   
 8    Developed by: Java Programming Languages Team
 9    Rice University
 10    http://www.cs.rice.edu/~javaplt/
 11   
 12    Redistribution and use in source and binary forms, with or without modification, are permitted
 13    provided that the following conditions are met:
 14   
 15    - Redistributions of source code must retain the above copyright notice, this list of conditions
 16    and the following disclaimer.
 17    - Redistributions in binary form must reproduce the above copyright notice, this list of
 18    conditions and the following disclaimer in the documentation and/or other materials provided
 19    with the distribution.
 20    - Neither the name of the JavaPLT group, Rice University, nor the names of the library's
 21    contributors may be used to endorse or promote products derived from this software without
 22    specific prior written permission.
 23   
 24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
 25    IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
 26    FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS AND
 27    CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 28    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 29    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
 30    IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 31    OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 32   
 33    *END_COPYRIGHT_BLOCK*/
 34   
 35    package edu.rice.cs.plt.concurrent;
 36   
 37    import java.io.*;
 38    import java.util.concurrent.ArrayBlockingQueue;
 39    import java.util.concurrent.BlockingQueue;
 40    import java.util.concurrent.Executor;
 41   
 42    import edu.rice.cs.plt.io.IOUtil;
 43    import edu.rice.cs.plt.iter.IterUtil;
 44    import edu.rice.cs.plt.lambda.Runnable1;
 45    import edu.rice.cs.plt.lambda.WrappedException;
 46   
 47    import static edu.rice.cs.plt.debug.DebugUtil.error;
 48   
 49    /**
 50    * A TaskController that executes an IncrementalTask in another Java process. The task and its results must
 51    * be serializable. If the task completes successfully, the remote process is allowed to run indefinitely
 52    * (the process terminates automatically for simple tasks, but if additional non-daemon threads are spawned,
 53    * users are responsible for cleaning up the process). If the task is unsuccessful, the remote process
 54    * is immediately destroyed. A local task, scheduled by an Executor, manages the startup of and exchange
 55    * of information with the remote process. This local task is submitted
 56    * (via {@link Executor#execute}) when {@code start()} is invoked (if the executor blocks, so will
 57    * {@code start()}); its status is changed to "running" when it actually begins executing; if canceled
 58    * in the interim, the status will still be "paused" until the task begins its scheduled execution, but
 59    * no process will ever by spawned.
 60    */
 61    public class ProcessIncrementalTaskController<I, R> extends IncrementalTaskController<I, R> {
 62   
 63    // fields will be changed to null by discard(), but no need for volatile because it's only for garbage collection
 64    private JVMBuilder _jvmBuilder;
 65    private Executor _executor;
 66    private IncrementalTask<? extends I, ? extends R> _task;
 67    // may be null, indicating nothing should be done on exit
 68    private Runnable1<? super Process> _onExit;
 69    // must be volatile because it starts uninitialized
 70    private volatile Thread _t;
 71    private volatile ObjectOutputStream _commandSink;
 72   
 73    /**
 74    * Create, but do not start, aProcessIncrementalTaskController.
 75    * @param jvmBuilder A JVMBuilder for the remote process; must have this class, {@code task}'s class,
 76    * and their dependencies on its class path.
 77    * @param executor An executor for scheduling a local task that manages interaction with the remote
 78    * process. The local task completes once a value has been returned.
 79    * @param task A computation to perform; the task and its return value must be serializable.
 80    * @param ignoreIntermediate Whether intermediate values will be discarded (but counted) rather than enqueued.
 81    */
 82  1 public ProcessIncrementalTaskController(JVMBuilder jvmBuilder, Executor executor,
 83    IncrementalTask<? extends I, ? extends R> task,
 84    boolean ignoreIntermediate) {
 85  1 super(ignoreIntermediate);
 86  1 _jvmBuilder = jvmBuilder;
 87  1 _executor = executor;
 88  1 _task = task;
 89  1 _onExit = null;
 90  1 _t = null;
 91  1 _commandSink = null;
 92    }
 93   
 94    /**
 95    * Create, but do not start, a ProcessIncrementalTaskController. This constructor allows code to be executed
 96    * when the remote process terminates: if the computation terminates successfully, {@code onExit} will
 97    * be run after the process finishes (which may occur in the indefinite future if the task spawns
 98    * additional non-daemon threads).
 99    * @param jvmBuilder A JVMBuilder for the remote process; must have this class, {@code task}'s class,
 100    * and their dependencies on its class path.
 101    * @param executor An executor for scheduling a local task that manages interaction with the remote
 102    * process. If {@code onExit} is defined, the local task completes after the process
 103    * terminates and {@code onExit} has run; otherwise, the local task completes once a
 104    * value has been returned.
 105    * @param task A computation to perform; the task and its return value must be serializable.
 106    * @param ignoreIntermediate Whether intermediate values will be discarded (but counted) rather than enqueued.
 107    * @param onExit An action to perform when the process has quit, or {@code null} for no action.
 108    */
 109  0 public ProcessIncrementalTaskController(JVMBuilder jvmBuilder,
 110    Executor executor,
 111    IncrementalTask<? extends I, ? extends R> task,
 112    boolean ignoreIntermediate,
 113    Runnable1<? super Process> onExit) {
 114  0 super(ignoreIntermediate);
 115  0 _jvmBuilder = jvmBuilder;
 116  0 _executor = executor;
 117  0 _task = task;
 118  0 _onExit = onExit;
 119  0 _t = null;
 120  0 _commandSink = null;
 121    }
 122   
 123   
 124  1 protected void doStart() {
 125  1 _executor.execute(new Runnable() {
 126  1 public void run() {
 127  1 _t = Thread.currentThread();
 128  1 try {
 129    // stop if the task was canceled before starting
 130  0 if (Thread.interrupted()) { throw new InterruptedException(); }
 131  1 Process p = _jvmBuilder.start(Runner.class.getName(), IterUtil.<String>empty());
 132  1 try {
 133  1 InputStream in = p.getInputStream();
 134    // skip prefix
 135  1 int matching = 0;
 136  1 while (matching < Runner.PREFIX.length) {
 137  4 int read = in.read();
 138  0 if (read == -1) { throw new EOFException("Data prefix not found"); }
 139  4 else if ((byte) read == Runner.PREFIX[matching]) { matching++; } // cast handles negatives
 140  0 else if ((byte) read == Runner.PREFIX[0]) { matching = 1; } // cast handles negatives
 141  0 else { matching = 0; }
 142    }
 143    // prefix has been matched
 144  1 ObjectInputStream objIn = new ObjectInputStream(in);
 145  1 try {
 146  1 ObjectOutputStream objOut = new ObjectOutputStream(p.getOutputStream());
 147  1 try {
 148  1 objOut.writeObject(_task);
 149  1 objOut.writeObject(Command.RUN);
 150  1 objOut.flush();
 151  1 _commandSink = objOut;
 152   
 153  1 Result r;
 154  1 do {
 155  9 r = (Result) objIn.readObject();
 156  9 r.handle(ProcessIncrementalTaskController.this);
 157  9 } while (!(r instanceof FinishResult));
 158  1 if (r instanceof CleanFinishResult) {
 159    // let the process run if we finished cleanly
 160  1 Runnable1<? super Process> onExit = _onExit; // keep local copy so it can be discarded
 161  0 if (onExit != null) { p.waitFor(); onExit.run(p); }
 162    }
 163  0 else { p.destroy(); }
 164    }
 165  1 finally { objOut.close(); }
 166    }
 167  1 finally { objIn.close(); }
 168    }
 169    catch (EOFException e) {
 170  0 p.destroy();
 171  0 throw new IOException("Unable to run process; class path may need to be adjusted");
 172    }
 173    // destroy the process on an exception, but let it run if we completed cleanly
 174  0 catch (Throwable e) { p.destroy(); throw e; }
 175    }
 176    catch (InterruptedException e) { /* ignore -- indicates error occurred in another thread */ }
 177    catch (InterruptedIOException e) { /* ignore -- indicates error occurred in another thread */ }
 178  0 catch (RuntimeException e) { finishedWithImplementationException(e); }
 179  0 catch (Throwable t) { finishedWithImplementationException(new WrappedException(t)); }
 180    }
 181    });
 182    }
 183   
 184  0 protected void doStop() { writeCommand(Command.CANCEL); }
 185  0 protected void doPause() { writeCommand(Command.PAUSE); }
 186  0 protected void doResume() { writeCommand(Command.RUN); }
 187   
 188  0 private void writeCommand(Command c) {
 189  0 try { _commandSink.writeObject(c); _commandSink.flush(); }
 190    catch (IOException e) {
 191  0 finishedWithImplementationException(new WrappedException(e));
 192  0 _t.interrupt();
 193    }
 194    }
 195   
 196  1 protected void discard() {
 197  1 _jvmBuilder = null;
 198  1 _executor = null;
 199  1 _task = null;
 200  1 _onExit = null;
 201  1 _t = null;
 202  1 _commandSink = null;
 203    }
 204   
 205    /** A serializable command to be passed from the local to the remote process. */
 206    private static enum Command { RUN, PAUSE, CANCEL; }
 207   
 208    /** A serializable result of computation to be passed from the remote to the local process. */
 209    private static abstract class Result implements Serializable {
 210    public abstract <I, R> void handle(ProcessIncrementalTaskController<I, R> c);
 211    }
 212   
 213    private static class StartedResult extends Result {
 214  2 public <I, R> void handle(ProcessIncrementalTaskController<I, R> c) { c.started(); }
 215    }
 216   
 217    private static class PausedResult extends Result {
 218  1 public <I, R> void handle(ProcessIncrementalTaskController<I, R> c) { c.paused(); }
 219    }
 220   
 221    private static class StepResult extends Result {
 222    private final Object _value;
 223  5 public StepResult(Object value) { _value = value; }
 224    /** Clients are responsible for ensuring the value has the appropriate type. */
 225  5 @SuppressWarnings("unchecked") public <I, R> void handle(ProcessIncrementalTaskController<I, R> c) {
 226  5 c.stepped((I) _value);
 227    }
 228    }
 229   
 230    private static abstract class FinishResult extends Result {}
 231   
 232    private static class CleanFinishResult extends FinishResult {
 233    private final Object _value;
 234  1 public CleanFinishResult(Object value) { _value = value; }
 235    /** Clients are responsible for ensuring the value has the appropriate type. */
 236  1 @SuppressWarnings("unchecked") public <I, R> void handle(ProcessIncrementalTaskController<I, R> c) {
 237  1 c.finishedCleanly((R) _value);
 238    }
 239    }
 240   
 241    private static class TaskExceptionResult extends FinishResult {
 242    private final Exception _e;
 243  0 public TaskExceptionResult(Exception e) { _e = e; }
 244  0 public <I, R> void handle(ProcessIncrementalTaskController<I, R> c) { c.finishedWithTaskException(_e); }
 245    }
 246   
 247    private static class ImplementationExceptionResult extends FinishResult {
 248    private final RuntimeException _e;
 249  1 public ImplementationExceptionResult(Throwable t) {
 250  0 if (t instanceof RuntimeException) { _e = (RuntimeException) t; }
 251  1 else { _e = new WrappedException(t); }
 252    }
 253  0 public <I, R> void handle(ProcessIncrementalTaskController<I, R> c) { c.finishedWithImplementationException(_e); }
 254    }
 255   
 256    private static class CanceledResult extends FinishResult {
 257  0 public <I, R> void handle(ProcessIncrementalTaskController<I, R> c) { c.stopped(); }
 258    }
 259   
 260    /**
 261    * Reads a serialized thunk from the input stream, followed by a sequence of Commands. Writes to
 262    * {@code System.out} the byte array {@link #PREFIX}, followed by a sequence of Results, terminated
 263    * with FinishResult. Once running begins, no other output is written to {@code System.out} or
 264    * {@code System.err}.
 265    */
 266    private static class Runner {
 267    /**
 268    * A byte sequence marking the beginning of the result data. Allows java commands to output
 269    * text before {@code main()} is invoked without corrupting the data stream. (This occurs, for example,
 270    * with flag "-Xrunjdwp".) In order to avoid false positives, this prefix uses non-printing ASCII values.
 271    * To simplify the matching algorithm, each digit is guaranteed to be unique -- if a particular byte
 272    * fails to match, the DFA can only jump to either the initial state or the state after a single match.
 273    */
 274    public static final byte[] PREFIX = { 0x00, 0x7f, 0x03, -0x80 };
 275   
 276    private final IncrementalTask<?, ?> _task;
 277    private final ObjectOutputStream _objOut;
 278    private final ObjectInputStream _objIn;
 279    private final CompletionMonitor _continueMonitor;
 280    private final BlockingQueue<Result> _results;
 281    private final Thread _taskThread; // task run in a different thread to allow interrupting without corrupting i/o
 282    private final Thread _objInReader; // reads run in a different thread to allow asynchronous pausing
 283   
 284  1 public Runner(IncrementalTask<?, ?> task, ObjectOutputStream objOut, ObjectInputStream objIn) {
 285  1 _task = task;
 286  1 _objOut = objOut;
 287  1 _objIn = objIn;
 288  1 _continueMonitor = new CompletionMonitor(false);
 289  1 _results = new ArrayBlockingQueue<Result>(256);
 290   
 291  1 _taskThread = new Thread("task runner") {
 292  1 public void run() {
 293  1 try {
 294  1 try {
 295  1 while (!_task.isResolved()) {
 296  5 authorizeContinue();
 297  5 _results.put(new StepResult(_task.step()));
 298    }
 299  1 authorizeContinue();
 300  1 _results.put(new CleanFinishResult(_task.value()));
 301    }
 302  0 catch (InterruptedException e) { _results.put(new CanceledResult()); }
 303    catch (WrappedException e) {
 304  0 if (e.getCause() instanceof InterruptedException) { _results.put(new CanceledResult()); }
 305  0 else { _results.put(new TaskExceptionResult(e)); }
 306    }
 307  0 catch (RuntimeException e) { _results.put(new TaskExceptionResult(e)); }
 308  0 catch (Throwable t) { _results.put(new ImplementationExceptionResult(t)); }
 309    }
 310    catch (InterruptedException e) { /* interrupted while trying to put a result -- must have quit already */ }
 311    }
 312    };
 313   
 314  1 _objInReader = new Thread("objIn reader") {
 315  1 public void run() {
 316  1 try {
 317  1 try {
 318  2 while (!Thread.interrupted()) {
 319  2 Command c = (Command) _objIn.readObject();
 320  1 switch (c) {
 321  1 case RUN: _continueMonitor.signal(); break;
 322  0 case PAUSE: _continueMonitor.reset(); break;
 323  0 case CANCEL: _taskThread.interrupt(); break;
 324    }
 325    }
 326    }
 327    catch (InterruptedIOException e) { /* task has completed cleanly; stop processing */ }
 328  1 catch (Throwable t) { _results.put(new ImplementationExceptionResult(t)); }
 329    }
 330    catch (InterruptedException e) { /* interrupted while trying to put a result -- must have quit already */ }
 331    }
 332    };
 333    }
 334   
 335  1 public void run() throws IOException, InterruptedException {
 336    // assumes a StartedResult has been written to objOut already; but waits until a
 337    // "run" command is received before actually running the task
 338  1 _objInReader.start();
 339  1 _taskThread.start();
 340  1 try {
 341  1 Result r;
 342  1 do {
 343  8 r = _results.take();
 344  8 _objOut.writeObject(r);
 345  8 _objOut.flush();
 346  8 } while (!(r instanceof FinishResult));
 347    }
 348    finally {
 349    // interrupt threads in case they haven't already stopped
 350  1 _objInReader.interrupt();
 351  1 _taskThread.interrupt();
 352    }
 353    }
 354   
 355  6 private void authorizeContinue() throws InterruptedException {
 356  0 if (Thread.interrupted()) { throw new InterruptedException(); }
 357  6 if (!_continueMonitor.isSignaled()) {
 358  1 _results.put(new PausedResult());
 359  1 _continueMonitor.ensureSignaled();
 360  1 _results.put(new StartedResult());
 361    }
 362    }
 363   
 364  1 public static void main(String... args) {
 365  1 OutputStream out = System.out;
 366  1 IOUtil.attemptClose(System.err); // in case other objects already have a handle on it, try to close the stream
 367  1 IOUtil.ignoreSystemOut();
 368  1 IOUtil.ignoreSystemErr();
 369  1 try {
 370  1 out.write(PREFIX);
 371  1 out.flush();
 372  1 ObjectOutputStream objOut = new ObjectOutputStream(out);
 373  1 try {
 374  1 objOut.writeObject(new StartedResult());
 375  1 objOut.flush();
 376  1 ObjectInputStream objIn = new ObjectInputStream(System.in);
 377  1 try {
 378  1 IncrementalTask<?, ?> task = (IncrementalTask<?, ?>) objIn.readObject();
 379  1 Runner runner = new Runner(task, objOut, objIn);
 380  1 runner.run();
 381    }
 382  1 finally { objIn.close(); }
 383    }
 384  0 catch (RuntimeException e) { objOut.writeObject(new ImplementationExceptionResult(e)); }
 385  0 catch (Throwable t) { objOut.writeObject(new ImplementationExceptionResult(t)); }
 386  1 finally { objOut.close(); }
 387    }
 388  0 catch (IOException e) { error.log("Error writing to System.out", e); }
 389    }
 390   
 391    }
 392   
 393    }