Clover coverage report - PLT Utilities Test Coverage (plt-20120304-r5436)
Coverage timestamp: Sat Mar 3 2012 22:01:56 CST
file stats: LOC: 852   Methods: 63
NCLOC: 308   Classes: 7
 
 Source file Conditionals Statements Methods TOTAL
ConcurrentUtil.java 50% 40.3% 34.9% 40%
coverage coverage
 1    /*BEGIN_COPYRIGHT_BLOCK*
 2   
 3    PLT Utilities BSD License
 4   
 5    Copyright (c) 2007-2010 JavaPLT group at Rice University
 6    All rights reserved.
 7   
 8    Developed by: Java Programming Languages Team
 9    Rice University
 10    http://www.cs.rice.edu/~javaplt/
 11   
 12    Redistribution and use in source and binary forms, with or without modification, are permitted
 13    provided that the following conditions are met:
 14   
 15    - Redistributions of source code must retain the above copyright notice, this list of conditions
 16    and the following disclaimer.
 17    - Redistributions in binary form must reproduce the above copyright notice, this list of
 18    conditions and the following disclaimer in the documentation and/or other materials provided
 19    with the distribution.
 20    - Neither the name of the JavaPLT group, Rice University, nor the names of the library's
 21    contributors may be used to endorse or promote products derived from this software without
 22    specific prior written permission.
 23   
 24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
 25    IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
 26    FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS AND
 27    CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 28    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 29    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
 30    IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 31    OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 32   
 33    *END_COPYRIGHT_BLOCK*/
 34   
 35    package edu.rice.cs.plt.concurrent;
 36   
 37    import java.io.*;
 38    import java.rmi.Remote;
 39    import java.rmi.RemoteException;
 40    import java.rmi.server.UnicastRemoteObject;
 41    import java.util.ArrayList;
 42    import java.util.HashMap;
 43    import java.util.List;
 44    import java.util.Properties;
 45    import java.util.Map;
 46    import java.util.concurrent.Callable;
 47    import java.util.concurrent.CancellationException;
 48    import java.util.concurrent.ExecutionException;
 49    import java.util.concurrent.Executor;
 50    import java.util.concurrent.Future;
 51    import java.util.concurrent.TimeUnit;
 52    import java.util.concurrent.TimeoutException;
 53    import edu.rice.cs.plt.lambda.*;
 54    import edu.rice.cs.plt.io.IOUtil;
 55    import edu.rice.cs.plt.io.VoidOutputStream;
 56   
 57    import static edu.rice.cs.plt.debug.DebugUtil.error;
 58    import static edu.rice.cs.plt.debug.DebugUtil.debug;
 59   
 60    /** Utility methods for executing code in concurrent threads or processes. */
 61    public final class ConcurrentUtil {
 62   
 63  0 private ConcurrentUtil() {}
 64   
 65    /** A runnable that simply sleeps for the specified amount of time (milliseconds), or until an interrupt occurs. */
 66    public static final Runnable1<Long> SLEEPING_RUNNABLE = new SleepingRunnable();
 67   
 68    private static final class SleepingRunnable implements Runnable1<Long>, Serializable {
 69  10 public void run(Long delay) {
 70  10 try { Thread.sleep(delay); }
 71    catch (InterruptedException e) { /* discard */ }
 72    }
 73    }
 74   
 75    /** Sleep for the given amount of time (milliseconds), or until an interrupt occurs. */
 76  10 public static void sleep(long delay) { SLEEPING_RUNNABLE.run(delay); }
 77   
 78    /** A runnable that performs useless computation for the specified amount of time (milliseconds). */
 79    public static final Runnable1<Long> WORKING_RUNNABLE = new WorkingRunnable();
 80   
 81    private static final class WorkingRunnable implements Runnable1<Long>, Serializable {
 82    private long junk = 1;
 83  2 public void run(Long delay) {
 84  2 long finished = System.currentTimeMillis() + delay;
 85  2 while (System.currentTimeMillis() < finished) {
 86  1 if (Thread.interrupted()) break;
 87    // if the bound on i is too small, strange JIT effects seem to mess up interrupt detection...
 88  86340000 for (int i = 0; i < 10000; i++) { junk = junk * (delay+1); }
 89    }
 90    }
 91    }
 92   
 93    /** Perform useless computation for the given amount of time (milliseconds), or until an interrupt occurs. */
 94  2 public static void work(long delay) { WORKING_RUNNABLE.run(delay); }
 95   
 96    /**
 97    * Get the expected value of {@link System#nanoTime} after the given period has passed. Negative values
 98    * for {@code time} return a time from the past. Note that, since an overflow wrap-around may occur
 99    * at any time in the system's nanosecond clock, comparisons between the current time and this method's
 100    * result are non-trivial. For example, to test whether the result time {@code future} has passed:
 101    * {@code System.nanoTime() - future > 0}. (This checks that the current time is within a Long.MAX_VALUE range
 102    * <em>after</em> {@code future}, regardless of the absolute numeric values. We can infer that (most likely)
 103    * {@code future} is less than 292 years in the past, or (unlikely) future is more than 292 years in the future.)
 104    */
 105  0 public static long futureTimeNanos(long time, TimeUnit unit) {
 106  0 return System.nanoTime() + unit.toNanos(time);
 107    }
 108   
 109    /**
 110    * Get the expected value of {@link System#currentTimeMillis} after the given period of time has passed.
 111    * Negative values for {@code time} return a time from the past. While the notes about overflow in
 112    * {@link #futureTimeNanos} apply in principal here, an overflow of the 64-bit millisecond clock happens
 113    * once every 600 million years, with the year 1970 at 0. So it's safe to use simple operators to make
 114    * comparisons between the current time and this method's result.
 115    */
 116  0 public static long futureTimeMillis(long time, TimeUnit unit) {
 117  0 return System.currentTimeMillis() + unit.toMillis(time);
 118    }
 119   
 120    /**
 121    * If the given time (based on {@link System#currentTimeMillis}) has passed, throw a TimeoutException.
 122    * Otherwise, invoke {@link Object#wait(long)} on the given object, which may return due to
 123    * a {@code notify()} call, the timeout being reached, or a spurious wake-up. To distinguish between
 124    * these possibilities, clients should wrap this call in a while loop:
 125    * {@code long t = futureTimeMillis(...); while (!condition) waitUntilMillis(lock, t);}
 126    * This loop either completes if the condition is satisfied or throws an appropriate exception
 127    * due to an interrupt or timeout.
 128    * @param obj Object whose {@code wait()} method will be invoked. Must be locked by the current thread.
 129    * @param futureTime A millisecond time value based on {@code System.currentTimeMillis()} after which
 130    * this method should no longer invoke {@code obj.wait()}.
 131    * @throws InterruptedException If the wait is interrupted.
 132    * @throws TimeoutException If, at invocation time, {@code futureTime} is in the past.
 133    * @see #futureTimeMillis
 134    */
 135  0 public static void waitUntilMillis(Object obj, long futureTime) throws InterruptedException, TimeoutException {
 136  0 long delta = futureTime - System.currentTimeMillis();
 137  0 if (delta > 0) { obj.wait(delta); }
 138  0 else { throw new TimeoutException(); }
 139    }
 140   
 141    /**
 142    * If the given time (based on {@link System#nanoTime}) has passed, throw a TimeoutException.
 143    * Otherwise, invoke {@link Object#wait(long, int)} on the given object, which may return due to
 144    * a {@code notify()} call, the timeout being reached, or a spurious wake-up. To distinguish between
 145    * these possibilities, clients should wrap this call in a while loop:
 146    * {@code long t = futureTimeNanos(...); while (!condition) waitUntilNanos(lock, t);}
 147    * This loop either completes if the condition is satisfied or throws an appropriate exception
 148    * due to an interrupt or timeout.
 149    * @param obj Object whose {@code wait()} method will be invoked. Must be locked by the current thread.
 150    * @param futureTime A nanosecond time value based on {@code System.nanoTime()} after which
 151    * this method should no longer invoke {@code obj.wait()}.
 152    * @throws InterruptedException If the wait is interrupted.
 153    * @throws TimeoutException If, at invocation time, {@code futureTime} is in the past.
 154    * @see #futureTimeNanos
 155    */
 156  0 public static void waitUntilNanos(Object obj, long futureTime) throws InterruptedException, TimeoutException {
 157  0 long delta = futureTime - System.nanoTime();
 158  0 if (delta > 0) { TimeUnit.NANOSECONDS.timedWait(obj, delta); }
 159  0 else { throw new TimeoutException(); }
 160    }
 161   
 162    /** Wrap a thunk in a Callable interface. The {@code call()} method will not throw checked exceptions. */
 163  0 public static <T> Callable<T> asCallable(Thunk<? extends T> thunk) {
 164  0 return new ThunkCallable<T>(thunk);
 165    }
 166   
 167    private static final class ThunkCallable<T> implements Callable<T>, Serializable {
 168    private final Thunk<? extends T> _thunk;
 169  0 public ThunkCallable(Thunk<? extends T> thunk) { _thunk = thunk; }
 170  0 public T call() { return _thunk.value(); }
 171    }
 172   
 173    /**
 174    * Wrap a Future in a TaskController interface (which is also a Thunk). The state of the controller
 175    * corresponds to the state of the Future; since Futures have no notion of "starting," the result is
 176    * automatically "running" &mdash; it is never in a "paused" state.
 177    */
 178  0 public static <T> TaskController<T> asTaskController(Future<? extends T> future) {
 179  0 TaskController<T> result = new FutureTaskController<T>(LambdaUtil.valueLambda(future));
 180  0 result.start();
 181  0 return result;
 182    }
 183   
 184    /**
 185    * Wrap a Future produced by a Thunk in a TaskController interface (which is also a Thunk). "Starting"
 186    * the resulting controller corresponds to invoking {@code futureThunk}; subsequently, the state of
 187    * the controller corresponds to the state of the Future.
 188    */
 189  0 public static <T> TaskController<T> asTaskController(Thunk<? extends Future<? extends T>> futureThunk) {
 190  0 return new FutureTaskController<T>(futureThunk);
 191    }
 192   
 193    /**
 194    * A simple Executor that creates a new thread for each task; threads are identified by the name
 195    * {@code THREAD_EXECUTOR-n} for some n.
 196    */
 197    public static final Executor THREAD_EXECUTOR = new Executor() {
 198    private int count = 0;
 199  8 public void execute(Runnable r) {
 200  8 new Thread(r, "THREAD_EXECUTOR-" + (++count)).start();
 201    }
 202    };
 203   
 204    /** A trivial Executor that simply runs each task directly. {@code execute()} blocks until the task completes. */
 205    public static final Executor DIRECT_EXECUTOR = new Executor() {
 206  4 public void execute(Runnable r) { r.run(); }
 207    };
 208   
 209    /**
 210    * Execute the given task in a separate thread, and provide access to its result. This is a
 211    * convenience method that sets {@code start} to {@code true}.
 212    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 213    * wrapped in a {@link WrappedException}.
 214    * @see ExecutorTaskController
 215    */
 216  1 public static TaskController<Void> runInThread(Runnable task) {
 217  1 return computeWithExecutor(LambdaUtil.asThunk(task), THREAD_EXECUTOR, true);
 218    }
 219   
 220    /**
 221    * Execute the given task in a separate thread, and provide access to its result.
 222    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 223    * wrapped in a {@link WrappedException}.
 224    * @param start If {@code true}, the task will be started before returning; otherwise, the client should invoke
 225    * {@link TaskController#start} on the returned controller.
 226    * @see ExecutorTaskController
 227    */
 228  0 public static TaskController<Void> runInThread(Runnable task, boolean start) {
 229  0 return computeWithExecutor(LambdaUtil.asThunk(task), THREAD_EXECUTOR, start);
 230    }
 231   
 232    /**
 233    * Execute the given task in a separate thread, and provide access to its result. This is a
 234    * convenience method that sets {@code start} to {@code true}.
 235    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 236    * wrapped in a {@link WrappedException}.
 237    * @see ExecutorTaskController
 238    */
 239  1 public static <R> TaskController<R> computeInThread(Thunk<? extends R> task) {
 240  1 return computeWithExecutor(task, THREAD_EXECUTOR, true);
 241    }
 242   
 243    /**
 244    * Execute the given task in a separate thread, and provide access to its result.
 245    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 246    * wrapped in a {@link WrappedException}.
 247    * @param start If {@code true}, the task will be started before returning; otherwise, the client should invoke
 248    * {@link TaskController#start} on the returned controller.
 249    * @see ExecutorTaskController
 250    */
 251  1 public static <R> TaskController<R> computeInThread(Thunk<? extends R> task, boolean start) {
 252  1 return computeWithExecutor(task, THREAD_EXECUTOR, start);
 253    }
 254   
 255    /**
 256    * Execute the given task in a separate thread, and provide access to its results. This is a convenience method
 257    * that sets {@code start} to {@code true} and {@code ignoreIntermediate} to {@code false}.
 258    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 259    * wrapped in a {@link WrappedException}.
 260    * @see ExecutorIncrementalTaskController
 261    */
 262  1 public static <I, R> IncrementalTaskController<I, R> computeInThread(IncrementalTask<? extends I, ? extends R> task) {
 263  1 return computeWithExecutor(task, THREAD_EXECUTOR, true, false);
 264    }
 265   
 266    /**
 267    * Execute the given task in a separate thread, and provide access to its results. This is a convenience method
 268    * that sets {@code ignoreIntermediate} to {@code false}.
 269    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 270    * wrapped in a {@link WrappedException}.
 271    * @param start If {@code true}, the task will be started before returning; otherwise, the client should invoke
 272    * {@link TaskController#start} on the returned controller.
 273    * @see ExecutorIncrementalTaskController
 274    */
 275  0 public static <I, R>
 276    IncrementalTaskController<I, R> computeInThread(IncrementalTask<? extends I, ? extends R> task, boolean start) {
 277  0 return computeWithExecutor(task, THREAD_EXECUTOR, start, false);
 278    }
 279   
 280    /**
 281    * Execute the given task in a separate thread, and provide access to its results.
 282    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 283    * wrapped in a {@link WrappedException}.
 284    * @param start If {@code true}, the task will be started before returning; otherwise, the client should invoke
 285    * {@link TaskController#start} on the returned controller.
 286    * @param ignoreIntermediate If {@code true}, all intermediate results will be immediately discarded.
 287    * @see ExecutorIncrementalTaskController
 288    */
 289  0 public static <I, R>
 290    IncrementalTaskController<I, R> computeInThread(IncrementalTask<? extends I, ? extends R> task,
 291    boolean start, boolean ignoreIntermediate) {
 292  0 return computeWithExecutor(task, THREAD_EXECUTOR, start, ignoreIntermediate);
 293    }
 294   
 295    /**
 296    * Execute the given task with {@code exec} and provide access to its result. This is a convenience
 297    * method that sets {@code start} to {@code true}.
 298    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 299    * wrapped in a {@link WrappedException}.
 300    * @param exec An executor which is given the task to run when the controller is started.
 301    * @see ExecutorTaskController
 302    */
 303  0 public static <R> TaskController<R> computeWithExecutor(Thunk<? extends R> task, Executor exec) {
 304  0 return computeWithExecutor(task, exec, true);
 305    }
 306   
 307    /**
 308    * Execute the given task with {@code exec} and provide access to its result.
 309    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 310    * wrapped in a {@link WrappedException}.
 311    * @param exec An executor which is given the task to run when the controller is started.
 312    * @param start If {@code true}, the task will be started before returning; otherwise, the client should invoke
 313    * {@link TaskController#start} on the returned controller.
 314    * @see ExecutorTaskController
 315    */
 316  3 public static <R> TaskController<R> computeWithExecutor(Thunk<? extends R> task, Executor exec, boolean start) {
 317  3 ExecutorTaskController<R> result = new ExecutorTaskController<R>(exec, task);
 318  2 if (start) { result.start(); }
 319  3 return result;
 320    }
 321   
 322    /**
 323    * Execute the given task with {@code exec} and provide access to its result. This is a convenience method
 324    * that sets {@code start} to {@code true} and {@code ignoreIntermediate} to {@code false}.
 325    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 326    * wrapped in a {@link WrappedException}.
 327    * @param exec An executor which is given the task to run when the controller is started.
 328    * @see ExecutorIncrementalTaskController
 329    */
 330  0 public static <I, R>
 331    IncrementalTaskController<I, R> computeWithExecutor(IncrementalTask<? extends I, ? extends R> task,
 332    Executor exec) {
 333  0 return computeWithExecutor(task, exec, true, false);
 334    }
 335   
 336    /**
 337    * Execute the given task with {@code exec} and provide access to its result. This is a convenience method
 338    * that sets {@code ignoreIntermediate} to {@code false}.
 339    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 340    * wrapped in a {@link WrappedException}.
 341    * @param exec An executor which is given the task to run when the controller is started.
 342    * @param start If {@code true}, the task will be started before returning; otherwise, the client should invoke
 343    * {@link TaskController#start} on the returned controller.
 344    * @see ExecutorIncrementalTaskController
 345    */
 346  0 public static <I, R>
 347    IncrementalTaskController<I, R> computeWithExecutor(IncrementalTask<? extends I, ? extends R> task,
 348    Executor exec, boolean start) {
 349  0 return computeWithExecutor(task, exec, start, false);
 350    }
 351   
 352    /**
 353    * Execute the given task with {@code exec} and provide access to its result.
 354    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 355    * wrapped in a {@link WrappedException}.
 356    * @param exec An executor which is given the task to run when the controller is started.
 357    * @param start If {@code true}, the task will be started before returning; otherwise, the client should invoke
 358    * {@link TaskController#start} on the returned controller.
 359    * @param ignoreIntermediate If {@code true}, all intermediate results will be immediately discarded.
 360    * @see ExecutorIncrementalTaskController
 361    */
 362  1 public static <I, R>
 363    IncrementalTaskController<I, R> computeWithExecutor(IncrementalTask<? extends I, ? extends R> task,
 364    Executor exec, boolean start, boolean ignoreIntermediate) {
 365  1 IncrementalTaskController<I, R> result =
 366    new ExecutorIncrementalTaskController<I, R>(exec, task, ignoreIntermediate);
 367  1 if (start) { result.start(); }
 368  1 return result;
 369    }
 370   
 371   
 372    /**
 373    * <p>Execute the given task in a separate process and provide access to its result. The task and the
 374    * return value must be serializable. Typically, the subprocess terminates when the TaskController enters a
 375    * finished state. However, if {@code task} spawns additional threads and no exceptions are thrown by the
 376    * controller's {@code value()} method, the subprocess may remain alive indefinitely; the remaining threads
 377    * are responsible for process termination.</p>
 378    *
 379    * <p>This is a convenience method that uses {@link JVMBuilder#DEFAULT} and sets {@code start} to {@code true}.</p>
 380    * @param task A task to perform. Will be abruptly terminated with the process if canceled while running.
 381    * @see ProcessTaskController
 382    */
 383  3 public static <R> TaskController<R> computeInProcess(Thunk<? extends R> task) {
 384  3 return computeInProcess(task, JVMBuilder.DEFAULT, true);
 385    }
 386   
 387    /**
 388    * <p>Execute the given task in a separate process and provide access to its result. The task and the return
 389    * value must be serializable. Typically, the subprocess terminates when the TaskController enters a
 390    * finished state. However, if {@code task} spawns additional threads and no exceptions are thrown by the
 391    * controller's {@code value()} method, the subprocess may remain alive indefinitely; the remaining threads
 392    * are responsible for process termination.</p>
 393    *
 394    * <p>This is a convenience method that uses {@link JVMBuilder#DEFAULT}.</p>
 395    *
 396    * @param task A task to perform. Will be abruptly terminated with the process if canceled while running.
 397    * @param start If {@code true}, the task will be started before returning; otherwise, the client should
 398    * invoke {@link TaskController#start} on the returned controller.
 399    * @see ProcessTaskController
 400    */
 401  0 public static <R> TaskController<R> computeInProcess(Thunk<? extends R> task, boolean start) {
 402  0 return computeInProcess(task, JVMBuilder.DEFAULT, start);
 403    }
 404   
 405    /**
 406    * <p>Execute the given task in a separate process and provide access to its result. The task and the return
 407    * value must be serializable. Typically, the subprocess terminates when the TaskController enters a
 408    * finished state. However, if {@code task} spawns additional threads and no exceptions are thrown by the
 409    * controller's {@code value()} method, the subprocess may remain alive indefinitely; the remaining threads
 410    * are responsible for process termination.</p>
 411    *
 412    * @param task A task to perform. Will be abruptly terminated with the process if canceled while running.
 413    * @param jvmBuilder A JVMBuilder set up with the necessary subprocess parameters. The class path must include
 414    * the task's class, ConcurrentUtil, and their dependencies. If the current JVM has
 415    * property values for {@code plt.*}, those values will be added to {@code jvmBuilder}
 416    * (unless they're already set to something else).
 417    * @see ProcessTaskController
 418    */
 419  0 public static <R> TaskController<R> computeInProcess(Thunk<? extends R> task, JVMBuilder jvmBuilder) {
 420  0 return computeInProcess(task, jvmBuilder, true);
 421    }
 422   
 423    /**
 424    * <p>Execute the given task in a separate process and provide access to its result. The task and the
 425    * return value must be serializable. Typically, the subprocess terminates when the TaskController enters a
 426    * finished state. However, if {@code task} spawns additional threads and no exceptions are thrown by the
 427    * controller's {@code value()} method, the subprocess may remain alive indefinitely; the remaining threads
 428    * are responsible for process termination.</p>
 429    *
 430    * @param task A task to perform. Will be abruptly terminated with the process if canceled while running.
 431    * @param jvmBuilder A JVMBuilder set up with the necessary subprocess parameters. The class path must include
 432    * the task's class, ConcurrentUtil, and their dependencies. If the current JVM has
 433    * property values for {@code plt.*}, those values will be added to {@code jvmBuilder}
 434    * (unless they're already set to something else).
 435    * @param start If {@code true}, the task will be started before returning; otherwise, the client should invoke
 436    * {@link TaskController#start} on the returned controller.
 437    * @see ProcessTaskController
 438    */
 439  3 public static <R> TaskController<R> computeInProcess(Thunk<? extends R> task, JVMBuilder jvmBuilder,
 440    boolean start) {
 441  3 jvmBuilder = jvmBuilder.addDefaultProperties(getProperties("plt."));
 442  3 ProcessTaskController<R> controller = new ProcessTaskController<R>(jvmBuilder, THREAD_EXECUTOR, task);
 443  3 if (start) { controller.start(); }
 444  3 return controller;
 445    }
 446   
 447   
 448    /**
 449    * <p>Execute the given task in a separate process and provide access to its result. The task and the
 450    * return value must be serializable. Typically, the subprocess terminates when the TaskController enters a
 451    * finished state. However, if {@code task} spawns additional threads and no exceptions are thrown by the
 452    * controller's {@code value()} method, the subprocess may remain alive indefinitely; the remaining threads
 453    * are responsible for process termination.</p>
 454    *
 455    * <p>This is a convenience method that uses {@link JVMBuilder#DEFAULT} and sets {@code start} to {@code true}
 456    * and {@code ignoreIntermediate} to {@code false}.</p>
 457    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 458    * wrapped in a {@link WrappedException}.
 459    * @see ProcessIncrementalTaskController
 460    */
 461  1 public static <I, R>
 462    IncrementalTaskController<I, R> computeInProcess(IncrementalTask<? extends I, ? extends R> task) {
 463  1 return computeInProcess(task, JVMBuilder.DEFAULT, true, false);
 464    }
 465   
 466    /**
 467    * <p>Execute the given task in a separate process and provide access to its result. The task and the return
 468    * value must be serializable. Typically, the subprocess terminates when the TaskController enters a
 469    * finished state. However, if {@code task} spawns additional threads and no exceptions are thrown by the
 470    * controller's {@code value()} method, the subprocess may remain alive indefinitely; the remaining threads
 471    * are responsible for process termination.</p>
 472    *
 473    * <p>This is a convenience method that uses {@link JVMBuilder#DEFAULT} and sets {@code ignoreIntermediate}
 474    * to {@code false}.</p>
 475    *
 476    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 477    * wrapped in a {@link WrappedException}.
 478    * @param start If {@code true}, the task will be started before returning; otherwise, the client should
 479    * invoke {@link TaskController#start} on the returned controller.
 480    * @see ProcessIncrementalTaskController
 481    */
 482  0 public static <I, R>
 483    IncrementalTaskController<I, R> computeInProcess(IncrementalTask<? extends I, ? extends R> task, boolean start) {
 484  0 return computeInProcess(task, JVMBuilder.DEFAULT, start, false);
 485    }
 486   
 487    /**
 488    * <p>Execute the given task in a separate process and provide access to its result. The task and the return
 489    * value must be serializable. Typically, the subprocess terminates when the TaskController enters a
 490    * finished state. However, if {@code task} spawns additional threads and no exceptions are thrown by the
 491    * controller's {@code value()} method, the subprocess may remain alive indefinitely; the remaining threads
 492    * are responsible for process termination.</p>
 493    *
 494    * <p>This is a convenience method that sets {@code start} to {@code true} and {@code ignoreIntermediate}
 495    * to {@code false}.</p>
 496    *
 497    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 498    * wrapped in a {@link WrappedException}.
 499    * @param jvmBuilder A JVMBuilder set up with the necessary subprocess parameters. The class path must include
 500    * the task's class, ConcurrentUtil, and their dependencies. If the current JVM has
 501    * property values for {@code plt.*}, those values will be added to {@code jvmBuilder}
 502    * (unless they're already set to something else).
 503    * @see ProcessIncrementalTaskController
 504    */
 505  0 public static <I, R>
 506    IncrementalTaskController<I, R> computeInProcess(IncrementalTask<? extends I, ? extends R> task,
 507    JVMBuilder jvmBuilder) {
 508  0 return computeInProcess(task, jvmBuilder, true, false);
 509    }
 510   
 511    /**
 512    * <p>Execute the given task in a separate process and provide access to its result. The task and the
 513    * return value must be serializable. Typically, the subprocess terminates when the TaskController enters a
 514    * finished state. However, if {@code task} spawns additional threads and no exceptions are thrown by the
 515    * controller's {@code value()} method, the subprocess may remain alive indefinitely; the remaining threads
 516    * are responsible for process termination.</p>
 517    *
 518    * <p>This is a convenience method that sets {@code ignoreIntermediate} to {@code false}.</p>
 519    *
 520    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 521    * wrapped in a {@link WrappedException}.
 522    * @param jvmBuilder A JVMBuilder set up with the necessary subprocess parameters. The class path must include
 523    * the task's class, ConcurrentUtil, and their dependencies. If the current JVM has
 524    * property values for {@code plt.*}, those values will be added to {@code jvmBuilder}
 525    * (unless they're already set to something else).
 526    * @param start If {@code true}, the task will be started before returning; otherwise, the client should invoke
 527    * {@link TaskController#start} on the returned controller.
 528    * @see ProcessIncrementalTaskController
 529    */
 530  0 public static <I, R>
 531    IncrementalTaskController<I, R> computeInProcess(IncrementalTask<? extends I, ? extends R> task,
 532    JVMBuilder jvmBuilder, boolean start) {
 533  0 return computeInProcess(task, jvmBuilder, start, false);
 534    }
 535   
 536    /**
 537    * <p>Execute the given task in a separate process and provide access to its result. The task and the
 538    * return value must be serializable. Typically, the subprocess terminates when the TaskController enters a
 539    * finished state. However, if {@code task} spawns additional threads and no exceptions are thrown by the
 540    * controller's {@code value()} method, the subprocess may remain alive indefinitely; the remaining threads
 541    * are responsible for process termination.</p>
 542    *
 543    * @param task A task to perform. Should respond to an interrupt by throwing an {@link InterruptedException}
 544    * wrapped in a {@link WrappedException}.
 545    * @param jvmBuilder A JVMBuilder set up with the necessary subprocess parameters. The class path must include
 546    * the task's class, ConcurrentUtil, and their dependencies. If the current JVM has
 547    * property values for {@code plt.*}, those values will be added to {@code jvmBuilder}
 548    * (unless they're already set to something else).
 549    * @param start If {@code true}, the task will be started before returning; otherwise, the client should invoke
 550    * {@link TaskController#start} on the returned controller.
 551    * @param ignoreIntermediate If {@code true}, all intermediate results will be immediately discarded.
 552    * @see ProcessIncrementalTaskController
 553    */
 554  1 public static <I, R>
 555    IncrementalTaskController<I, R> computeInProcess(IncrementalTask<? extends I, ? extends R> task,
 556    JVMBuilder jvmBuilder, boolean start,
 557    boolean ignoreIntermediate) {
 558  1 jvmBuilder = jvmBuilder.addDefaultProperties(getProperties("plt."));
 559  1 ProcessIncrementalTaskController<I, R> controller =
 560    new ProcessIncrementalTaskController<I, R>(jvmBuilder, THREAD_EXECUTOR, task, ignoreIntermediate);
 561  1 if (start) { controller.start(); }
 562  1 return controller;
 563    }
 564   
 565   
 566   
 567    /**
 568    * Export the given RMI object in a new process and return the exported stub. If any exception occurs,
 569    * the process is destroyed. This convenience method uses {@link JVMBuilder#DEFAULT} to start the new process.
 570    * @param factory A thunk to evaluate in the remote JVM, producing an object that can be exported via
 571    * {@link UnicastRemoteObject#exportObject(Remote, int)}. The factory must be serializable.
 572    * @return An RMI proxy that can be cast to the remote interface type of the object returned by
 573    * {@code factory}. (See {@link Remote} for the definition of "remote interface.")
 574    * @throws IOException If a problem occurs in starting the new process or serializing {@code factory}.
 575    * @throws ExecutionException If an exception occurs in {@code factory} or while exporting the result.
 576    * @throws InterruptedException If this thread is interrupted while waiting for the result to be produced.
 577    */
 578  1 public static Remote exportInProcess(Thunk<? extends Remote> factory)
 579    throws InterruptedException, ExecutionException, IOException {
 580  1 return exportInProcess(factory, JVMBuilder.DEFAULT, null);
 581    }
 582   
 583    /**
 584    * Export the given RMI object in a new process and return the exported stub. If any exception occurs,
 585    * the process is destroyed.
 586    * @param factory A thunk to evaluate in the remote JVM, producing an object that can be exported via
 587    * {@link UnicastRemoteObject#exportObject(Remote, int)}. The factory must be serializable.
 588    * @param jvmBuilder A JVMBuilder set up with the necessary subprocess parameters. The class path must include
 589    * the factory's class, ConcurrentUtil, and their dependencies.
 590    * @return An RMI proxy that can be cast to the remote interface type of the object returned by
 591    * {@code factory}. (See {@link Remote} for the definition of "remote interface.")
 592    * @throws IOException If a problem occurs in starting the new process or serializing {@code factory}.
 593    * @throws ExecutionException If an exception occurs in {@code factory} or while exporting the result.
 594    * @throws InterruptedException If this thread is interrupted while waiting for the result to be produced.
 595    */
 596  3 public static Remote exportInProcess(Thunk<? extends Remote> factory, JVMBuilder jvmBuilder)
 597    throws InterruptedException, ExecutionException, IOException {
 598  3 return exportInProcess(factory, jvmBuilder, null);
 599    }
 600   
 601    /**
 602    * Export the given RMI object in a new process and return the exported stub. If any exception occurs,
 603    * the process is destroyed.
 604    * @param factory A thunk to evaluate in the remote JVM, producing an object that can be exported via
 605    * {@link UnicastRemoteObject#exportObject(Remote, int)}. The factory must be serializable.
 606    * @param jvmBuilder A JVMBuilder set up with the necessary subprocess parameters. The class path must include
 607    * the factory's class, ConcurrentUtil, and their dependencies. If the current JVM has
 608    * property values for {@code plt.*}, those values will be added to {@code jvmBuilder}
 609    * (unless they're already set to something else).
 610    * @param onExit Code to execute when the process exits, assuming a result is successfully returned. May be
 611    * {@code null}, indicating that nothing should be run. If an exception occurs here, the process is
 612    * destroyed immediately and this listener will not be invoked.
 613    * @return An RMI proxy that can be cast to the remote interface type of the object returned by
 614    * {@code factory}. (See {@link Remote} for the definition of "remote interface.")
 615    * @throws IOException If a problem occurs in starting the new process or serializing {@code factory}.
 616    * @throws ExecutionException If an exception occurs in {@code factory} or while exporting the result.
 617    * @throws InterruptedException If this thread is interrupted while waiting for the result to be produced.
 618    */
 619  4 public static Remote exportInProcess(Thunk<? extends Remote> factory, JVMBuilder jvmBuilder,
 620    Runnable1<? super Process> onExit)
 621    throws InterruptedException, ExecutionException, IOException {
 622  4 Thunk<Remote> task = new ExportRemoteTask(factory);
 623    // no need to spawn a thread if we don't need to wait for the process to quit
 624  4 Executor exec = (onExit == null) ? DIRECT_EXECUTOR : THREAD_EXECUTOR;
 625    // use localhost to avoid issues with changing IPs and firewalls
 626  4 jvmBuilder = jvmBuilder.addDefaultProperty("java.rmi.server.hostname", "127.0.0.1");
 627  4 jvmBuilder = jvmBuilder.addDefaultProperties(getProperties("plt."));
 628  4 try { return new ProcessTaskController<Remote>(jvmBuilder, exec, task, onExit).get(); }
 629    // an interrupt on this thread translates into a "cancel" because DIRECT_EXECUTOR runs the task on this thread
 630  0 catch (CancellationException e) { throw new InterruptedException(); }
 631    catch (WrappedException e) {
 632  0 if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); }
 633  0 else { throw e; }
 634    }
 635    }
 636   
 637    private static class ExportRemoteTask implements Thunk<Remote>, Serializable {
 638    private final Thunk<? extends Remote> _factory;
 639    // The result must be stored statically to prevent garbage-collection. (It's not clear whether
 640    // the lack of a strong reference from the RMI code is specified behavior or a bug...)
 641    private static final List<Remote> _cache = new ArrayList<Remote>(1);
 642  4 public ExportRemoteTask(Thunk<? extends Remote> factory) { _factory = factory; }
 643  4 public Remote value() {
 644  4 Remote server = _factory.value();
 645  4 _cache.add(server);
 646  4 try { return UnicastRemoteObject.exportObject(server, 0); }
 647  0 catch (RemoteException e) { throw new WrappedException(e); }
 648    }
 649    }
 650   
 651   
 652    /** Test whether the given process has terminated. */
 653  0 public static boolean processIsTerminated(Process p) {
 654  0 try { p.exitValue(); return true; }
 655  0 catch (IllegalThreadStateException e) { return false; }
 656    }
 657   
 658    /**
 659    * Create a daemon thread that will invoke {@link Process#waitFor} on the given process, then run the given
 660    * listener. If the thread is interrupted while blocked on {@code waitFor()}, the listener will not be run.
 661    */
 662  0 public static void onProcessExit(final Process p, final Runnable1<? super Process> listener) {
 663  0 Thread t = new Thread("ConcurrentUtil.onProcessExit") {
 664  0 public void run() {
 665  0 try { p.waitFor(); listener.run(p); }
 666    catch (InterruptedException e) { /* terminate early */ }
 667    }
 668    };
 669  0 t.setDaemon(true);
 670  0 t.start();
 671    }
 672   
 673    /**
 674    * Create two threads to continually discard the contents of the given process's output and error streams.
 675    * If, instead, the streams are simply ignored, the system buffers may fill up, causing the process to block (see
 676    * the class documentation for {@link Process}; experimentation under Java 5 shows the buffer size to be
 677    * approximately 4 KB).
 678    */
 679  0 public static void discardProcessOutput(Process p) {
 680  0 copyProcessOut(p, VoidOutputStream.INSTANCE);
 681  0 copyProcessErr(p, VoidOutputStream.INSTANCE);
 682    }
 683   
 684    /**
 685    * Create two threads to continually copy the contents of the given process's output and error streams to
 686    * the given destinations. This is a convenience method that invokes {@link #copyProcessOut(Process, OutputStream)}
 687    * and {@link #copyProcessErr(Process, OutputStream)}.
 688    */
 689  0 public static void copyProcessOutput(Process p, OutputStream out, OutputStream err) {
 690  0 copyProcessOut(p, out);
 691  0 copyProcessErr(p, err);
 692    }
 693   
 694    /**
 695    * Create a thread that will continually discard the contents of the given process's standard output.
 696    * If, instead, the stream is simply ignored, the system buffer may fill up, causing the process to block (see
 697    * the class documentation for {@link Process}; experimentation under Java 5 shows the buffer size to be
 698    * approximately 4 KB).
 699    * @return The thread performing the discard operation, already started.
 700    */
 701  0 public static Thread discardProcessOut(Process p) { return copyProcessOut(p, VoidOutputStream.INSTANCE); }
 702   
 703    /**
 704    * Create a thread that will continually copy the contents of the given process's standard output to another
 705    * output stream. This is a convenience method that sets {@code close} to {@code true}.
 706    * @return The thread performing the copy operation, already started.
 707    */
 708  0 public static Thread copyProcessOut(Process p, OutputStream out) { return copyProcessOut(p, out, true); }
 709   
 710    /**
 711    * Create a thread that will continually copy the contents of the given process's standard output to another
 712    * output stream. Processing continues until the end-of-file is reached. If {@code close} is {@code true},
 713    * {@code out} will then be closed.
 714    * @return The thread performing the copy operation, already started.
 715    */
 716  0 public static Thread copyProcessOut(Process p, OutputStream out, boolean close) {
 717  0 Thread result = new Thread(new CopyStream(p.getInputStream(), out, close), "ConcurrentUtil.copyProcessOut");
 718  0 result.setDaemon(true); // this thread should not keep the JVM from exiting
 719  0 result.start();
 720  0 return result;
 721    }
 722   
 723    /**
 724    * Create a task providing access to the given process's standard output as a string. The result is not
 725    * available until an end-of-file is reached, but it is buffered locally to prevent blocking. A separate
 726    * non-daemon thread performs this buffering.
 727    */
 728  0 public static TaskController<String> processOutAsString(Process p) {
 729  0 return computeInThread(new StreamToString(p.getInputStream()));
 730    }
 731   
 732    /**
 733    * Create a task providing access to the given process's standard output as a string. The result is not
 734    * available until an end-of-file is reached, but it is buffered locally to prevent blocking. A task
 735    * passed to {@code exec} performs this buffering.
 736    */
 737  0 public static TaskController<String> processOutAsString(Process p, Executor exec) {
 738  0 return computeWithExecutor(new StreamToString(p.getInputStream()), exec);
 739    }
 740   
 741    /**
 742    * Create a thread that will continually discard the contents of the given process's standard output.
 743    * If, instead, the stream is simply ignored, the system buffer may fill up, causing the process to block (see
 744    * the class documentation for {@link Process}; experimentation under Java 5 shows the buffer size to be
 745    * approximately 4 KB).
 746    * @return The thread performing the discard operation, already started.
 747    */
 748  0 public static Thread discardProcessErr(Process p) { return copyProcessErr(p, VoidOutputStream.INSTANCE); }
 749   
 750    /**
 751    * Create a thread that will continually copy the contents of the given process's error output to another
 752    * output stream. This is a convenience method that sets {@code close} to {@code false}.
 753    * @return The thread performing the copy operation, already started.
 754    */
 755  0 public static Thread copyProcessErr(Process p, OutputStream err) { return copyProcessErr(p, err, false); }
 756   
 757    /**
 758    * Create a thread that will continually copy the contents of the given process's error output to another
 759    * output stream. Processing continues until the end-of-file is reached. If {@code close} is {@code true},
 760    * {@code out} will then be closed.
 761    * @return The thread performing the copy operation, already started.
 762    */
 763  0 public static Thread copyProcessErr(Process p, OutputStream err, boolean close) {
 764  0 Thread result = new Thread(new CopyStream(p.getErrorStream(), err, close), "ConcurrentUtil.copyProcessErr");
 765  0 result.setDaemon(true); // this thread should not keep the JVM from exiting
 766  0 result.start();
 767  0 return result;
 768    }
 769   
 770    /**
 771    * Create a task providing access to the given process's error output as a string. The result is not
 772    * available until an end-of-file is reached, but it is buffered locally to prevent blocking. A separate
 773    * non-daemon thread performs this buffering.
 774    */
 775  0 public static TaskController<String> processErrAsString(Process p) {
 776  0 return computeInThread(new StreamToString(p.getErrorStream()));
 777    }
 778   
 779    /**
 780    * Create a task providing access to the given process's error output as a string. The result is not
 781    * available until an end-of-file is reached, but it is buffered locally to prevent blocking. A task
 782    * passed to {@code exec} performs this buffering.
 783    */
 784  0 public static TaskController<String> processErrAsString(Process p, Executor exec) {
 785  0 return computeWithExecutor(new StreamToString(p.getErrorStream()), exec);
 786    }
 787   
 788    /** Shared code for copying and closing a stream. */
 789    private static final class CopyStream implements Runnable, Serializable {
 790    private final InputStream _in;
 791    private final OutputStream _out;
 792    private final boolean _close;
 793  0 public CopyStream(InputStream in, OutputStream out, boolean close) { _in = in; _out = out; _close = close; }
 794  0 public void run() {
 795  0 try {
 796  0 try { IOUtil.copyInputStream(_in, _out); }
 797  0 finally { if (_close) _out.close(); }
 798    }
 799  0 catch (IOException e) { error.log(e); }
 800    }
 801    }
 802   
 803    /** Shared code for reading a stream as a string. */
 804    private static final class StreamToString implements Thunk<String> {
 805    private final InputStream _stream;
 806  0 public StreamToString(InputStream stream) { _stream = stream; }
 807  0 public String value() {
 808  0 try { return IOUtil.toString(new InputStreamReader(_stream)); }
 809  0 catch (IOException e) { throw new WrappedException(e); }
 810    }
 811    }
 812   
 813   
 814    /**
 815    * Get a subset of the system properties for names that match at least one of the given prefixes.
 816    * @throws SecurityException As in {@link System#getProperties}.
 817    */
 818  8 public static Properties getProperties(String... prefixes) {
 819  8 Properties result = new Properties();
 820    // Properties should be a Map<String, String>, but it's not defined that way. Depending on the
 821    // implementation, it may even allow clients to put non-string entries.
 822  8 for (Map.Entry<Object, Object> entry : System.getProperties().entrySet()) {
 823  408 for (String prefix : prefixes) {
 824  408 if (entry.getKey() instanceof String && ((String) entry.getKey()).startsWith(prefix)) {
 825  16 result.put(entry.getKey(), entry.getValue());
 826  16 break;
 827    }
 828    }
 829    }
 830  8 return result;
 831    }
 832   
 833    /**
 834    * Get a subset of the system properties for names that match at least one of the given prefixes.
 835    * Returns a String map; in the unusual case that a value is not a String, it is converted
 836    * via {@code toString()}.
 837    * @throws SecurityException As in {@link System#getProperties}.
 838    */
 839  0 public static Map<String, String> getPropertiesAsMap(String... prefixes) {
 840  0 Map<String, String> result = new HashMap<String, String>();
 841  0 for (Map.Entry<Object, Object> entry : System.getProperties().entrySet()) {
 842  0 for (String prefix : prefixes) {
 843  0 if (entry.getKey() instanceof String && ((String) entry.getKey()).startsWith(prefix)) {
 844  0 result.put((String) entry.getKey(), entry.getValue().toString());
 845  0 break;
 846    }
 847    }
 848    }
 849  0 return result;
 850    }
 851   
 852    }