Clover coverage report - PLT Utilities Test Coverage (plt-20120304-r5436)
Coverage timestamp: Sat Mar 3 2012 22:01:56 CST
file stats: LOC: 439   Methods: 61
NCLOC: 202   Classes: 16
 
 Source file Conditionals Statements Methods TOTAL
TaskController.java 35% 43.6% 39.3% 41.2%
coverage coverage
 1    /*BEGIN_COPYRIGHT_BLOCK*
 2   
 3    PLT Utilities BSD License
 4   
 5    Copyright (c) 2007-2010 JavaPLT group at Rice University
 6    All rights reserved.
 7   
 8    Developed by: Java Programming Languages Team
 9    Rice University
 10    http://www.cs.rice.edu/~javaplt/
 11   
 12    Redistribution and use in source and binary forms, with or without modification, are permitted
 13    provided that the following conditions are met:
 14   
 15    - Redistributions of source code must retain the above copyright notice, this list of conditions
 16    and the following disclaimer.
 17    - Redistributions in binary form must reproduce the above copyright notice, this list of
 18    conditions and the following disclaimer in the documentation and/or other materials provided
 19    with the distribution.
 20    - Neither the name of the JavaPLT group, Rice University, nor the names of the library's
 21    contributors may be used to endorse or promote products derived from this software without
 22    specific prior written permission.
 23   
 24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
 25    IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
 26    FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS AND
 27    CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 28    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 29    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
 30    IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 31    OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 32   
 33    *END_COPYRIGHT_BLOCK*/
 34   
 35    package edu.rice.cs.plt.concurrent;
 36   
 37    import java.util.concurrent.CancellationException;
 38    import java.util.concurrent.ExecutionException;
 39    import java.util.concurrent.Future;
 40    import java.util.concurrent.TimeUnit;
 41    import java.util.concurrent.TimeoutException;
 42    import java.util.concurrent.atomic.AtomicReference;
 43   
 44    import edu.rice.cs.plt.collect.ListenerSet;
 45    import edu.rice.cs.plt.collect.SnapshotSynchronizedSet;
 46    import edu.rice.cs.plt.lambda.ResolvingThunk;
 47    import edu.rice.cs.plt.lambda.Runnable1;
 48    import edu.rice.cs.plt.lambda.WrappedException;
 49    import edu.rice.cs.plt.tuple.Option;
 50   
 51    /**
 52    * <p>Provides access to a concurrent task that produces a value. Extends standard Future behavior
 53    * with an initial non-running state; also provides a variety of additional methods.</p>
 54    *
 55    * <p>To implement a concrete instance, a subclass must provide {@link #doStart}, {@link #doStop},
 56    * and, optionally, {@link #discard}. The {@link #state} field and related {@link State} class hierarchy
 57    * is protected, rather than private, in order to facilitate subclasses that modify the set and
 58    * behavior of internal states; these should be treated as private by most subclasses.</p>
 59    */
 60    public abstract class TaskController<R> implements ResolvingThunk<R>, Future<R> {
 61   
 62    /** Current internal state; should only be modified with {@link AtomicReference#compareAndSet}. */
 63    protected final AtomicReference<State> state;
 64    private final CompletionMonitor _done; // signal means get() will not block on current state
 65    private volatile ListenerSet<R> _finishListeners;
 66   
 67  12 protected TaskController() {
 68  12 state = new AtomicReference<State>(new FreshState());
 69  12 _done = new CompletionMonitor();
 70    // use a SnapshotSynchronizedSet for listeners, which is thread-safe and supports efficient writes
 71  12 _finishListeners = new ListenerSet<R>(SnapshotSynchronizedSet.<Runnable1<? super R>>makeLinkedHash());
 72    }
 73   
 74    /** Get the current status. */
 75  7 public Status status() { return state.get().status(); }
 76   
 77    /** Check whether computation has completed &mdash; the state is either {@code FINISHED} or {@code CANCELED}. */
 78  0 public boolean isDone() { Status s = status(); return s == Status.FINISHED || s == Status.CANCELED; }
 79    /** Check whether the task has produced a value &mdash; the status is {@code FINISHED}. */
 80  0 public boolean hasValue() { return status() == Status.FINISHED; }
 81    /** Check whether the task has been canceled before completion &mdash; the status is {@code CANCELED}. */
 82  0 public boolean isCanceled() { return status() == Status.CANCELED; }
 83    /** Check whether the task has been canceled before completion &mdash; the status is {@code CANCELED}. */
 84  0 public boolean isCancelled() { return status() == Status.CANCELED; }
 85   
 86    /**
 87    * Returns {@code true} if the status is {@code FINISHED} <em>and</em> {@code value()} will return a result
 88    * (rather than throwing an exception).
 89    */
 90  0 public boolean isResolved() {
 91    // cast to workaround limitation in Eclipse compiler
 92  0 return ((Object) state.get()) instanceof TaskController.CleanlyFinishedState;
 93    }
 94   
 95    /**
 96    * Request that the task be run. If the task is {@code RUNNING} or {@code FINISHED}, has no effect.
 97    * There may be a delay between this invocation's return and a state change to {@code RUNNING}.
 98    * @throws CancellationException If the task is {@code CANCELED}.
 99    */
 100  9 public void start() { state.get().start(); }
 101   
 102    /**
 103    * Request that the task be abandoned, and that any resources associated with it be disposed.
 104    * This convenience method sets {@code stopRunning} to {@code true}.
 105    * @return {@code true} if the task has not yet finished.
 106    */
 107  0 public boolean cancel() { return state.get().cancel(true); }
 108   
 109    /**
 110    * Request that the task be abandoned, and that any resources associated with it be disposed.
 111    * {@code PAUSED} tasks are always canceled immediately. If {@code stopRunning}, a {@code RUNNING}
 112    * task will also be asked to terminate, but there may be a delay between this invocation's return
 113    * and a state change to {@code CANCELED}; and if the task completes successfully in the interim,
 114    * the cancel request is ignored. (If {@code stopRunning} is {@code false}, {@code RUNNING} tasks are
 115    * allowed to run to completion.)
 116    * @param stopRunning Whether a task that has begun running should be canceled.
 117    * @return {@code true} if the task has not yet started, or if {@code stopRunning} is {@code true}
 118    * and the task has not yet finished.
 119    */
 120  0 public boolean cancel(boolean stopRunning) { return state.get().cancel(stopRunning); }
 121   
 122    /**
 123    * Get the result. If {@link #isDone} is false, ensure that the task is {@code RUNNING} and block until it
 124    * finishes.
 125    * @throws WrappedException Wraps any exception encountered, as documented by {@link #get()}. For simplicity,
 126    * all exceptions, including RuntimeExceptions, are wrapped.
 127    */
 128  8 public R value() {
 129  8 try { return state.get().get(); }
 130  3 catch (Exception e) { throw new WrappedException(e); }
 131    }
 132   
 133    /**
 134    * Get the result. If {@link #isDone} is false, ensure that the task is {@code RUNNING} and block until it
 135    * finishes.
 136    * @throws InterruptedException If the current thread is interrupted while waiting.
 137    * @throws ExecutionException If the running task terminated early with an exception.
 138    * @throws CancellationException If the final state is {@code CANCELED} rather than {@code FINISHED}.
 139    * @throws RuntimeException Any other exception that occurs in the controller implementation.
 140    */
 141  4 public R get() throws InterruptedException, ExecutionException { return state.get().get(); }
 142   
 143    /**
 144    * Get the result. If {@link #isDone} is false, ensure that the task is {@code RUNNING} and block until it
 145    * finishes, or until a timeout is reached.
 146    * @param timeout Maximum wait time, in milliseconds.
 147    * @throws InterruptedException If the current thread is interrupted while waiting.
 148    * @throws TimeoutException If the current thread times out while waiting.
 149    * @throws ExecutionException If the running task terminated early with an exception.
 150    * @throws CancellationException If the final state is {@code CANCELED} rather than {@code FINISHED}.
 151    * @throws RuntimeException Any other exception that occurs in the controller implementation.
 152    */
 153  0 public R get(long timeout) throws InterruptedException, ExecutionException, TimeoutException {
 154  0 return get(timeout, TimeUnit.MILLISECONDS);
 155    }
 156   
 157    /**
 158    * Get the result. If {@link #isDone} is false, ensure that the task is {@code RUNNING} and block until it
 159    * finishes, or until a timeout is reached.
 160    * @param timeout Maximum wait time, in {@code unit} units.
 161    * @param unit Units for {@code timeout}.
 162    * @throws InterruptedException If the current thread is interrupted while waiting.
 163    * @throws TimeoutException If the current thread times out while waiting.
 164    * @throws ExecutionException If the running task terminated early with an exception.
 165    * @throws CancellationException If the final state is {@code CANCELED} rather than {@code FINISHED}.
 166    * @throws RuntimeException Any other exception that occurs in the controller implementation.
 167    */
 168  0 public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
 169  0 return state.get().get(timeout, unit);
 170    }
 171   
 172    /**
 173    * Try to get the result within the given amount of time. If {@link #isDone} is false, ensure that the task
 174    * is {@code RUNNING} and block until it finishes, or until a timeout is reached.
 175    * @param timeout Maximum wait time, in milliseconds.
 176    * @return The wrapped result, if it is available before an interrupt or timeout occurs; otherwise, "none".
 177    * @throws ExecutionException If the running task terminated early with an exception.
 178    * @throws CancellationException If the final state is {@code CANCELED} rather than {@code FINISHED}.
 179    * @throws RuntimeException Any other exception that occurs in the controller implementation.
 180    */
 181  0 public Option<R> attemptGet(long timeout) throws ExecutionException {
 182  0 try { return Option.some(get(timeout, TimeUnit.MILLISECONDS)); }
 183  0 catch (InterruptedException e) { return Option.none(); }
 184  0 catch (TimeoutException e) { return Option.none(); }
 185    }
 186   
 187    /**
 188    * Try to get the result within the given amount of time. If {@link #isDone} is false, ensure that the task
 189    * is {@code RUNNING} and block until it finishes, or until a timeout is reached.
 190    * @param timeout Maximum wait time, in {@code unit} units.
 191    * @param unit Units for {@code timeout}.
 192    * @return The wrapped result, if it is available before an interrupt or timeout occurs; otherwise, "none".
 193    * @throws ExecutionException If the running task terminated early with an exception.
 194    * @throws CancellationException If the final state is {@code CANCELED} rather than {@code FINISHED}.
 195    * @throws RuntimeException Any other exception that occurs in the controller implementation.
 196    */
 197  0 public Option<R> attemptGet(long timeout, TimeUnit unit) throws ExecutionException {
 198  0 try { return Option.some(get(timeout, unit)); }
 199  0 catch (InterruptedException e) { return Option.none(); }
 200  0 catch (TimeoutException e) { return Option.none(); }
 201    }
 202   
 203    /**
 204    * Access the ListenerSet responding to the completion of computation. Registered listeners will only be
 205    * run if the task completes cleanly (without throwing an exception) and the registration occurs before
 206    * the task completes.
 207    */
 208  0 public ListenerSet<R>.Sink finishListeners() { return _finishListeners.sink(); }
 209   
 210    /**
 211    * Begin computation and call {@link #started}. If starting does not occur immediately (for example,
 212    * blocking occurs first), the {@code started()} call may occur in a different thread.
 213    */
 214    protected abstract void doStart();
 215   
 216    /**
 217    * Terminate computation and call {@link #stopped}. Never called before {@code started()} has been
 218    * invoked. If termination does not occur immediately (for example, blocking occurs first), the
 219    * {@code stopped()} call may occur in a different thread.
 220    */
 221    protected abstract void doStop();
 222   
 223    /**
 224    * Clean up after the task has completed. Called whenever the task enters a canceled or finished
 225    * state (for example, when {@code finishedCleanly()} has been called, or when {@code cancel()} is
 226    * invoked on a {@code PAUSED} controller). By default, does nothing, but can be overridden to close
 227    * connections or throw away unnecessary objects. (Where TaskControllers live far beyond their computation
 228    * life span (as simple wrappers for a value), this allows objects related to the computation to be
 229    * garbage-collected in the interim.)
 230    */
 231  0 protected void discard() {}
 232   
 233    /**
 234    * Called by by the constructor (or a thread spawned by the constructor) when startup is complete.
 235    * Must be invoked before any of the completion methods ({@code stopped()}, {@code finishedCleanly()},
 236    * etc.)
 237    */
 238  14 protected final void started() {
 239  14 boolean kept = false;
 240  14 State current = state.get();
 241  14 State next = runningState();
 242    // cast to workaround limitation in Eclipse compiler
 243  14 while (((Object) current) instanceof TaskController.StartingState && !kept) {
 244    // must loop because a FreshStartingState->CanceledStartingState transition could occur concurrently
 245    // can use weakCompareAndSet since we're already in a while loop
 246  13 kept = state.weakCompareAndSet(current, next);
 247  13 if (kept) { ((StartingState) current).started(); }
 248  0 else { current = state.get(); }
 249    }
 250    }
 251   
 252    /**
 253    * Called by an invocation of {@link #doStop} when stopping is complete. May also be invoked if
 254    * the task terminates in a canceled-like state, even if {@code stop()} was never invoked.
 255    */
 256  0 protected final void stopped() { finished(new CanceledState()); }
 257   
 258    /** Called when running has completed and produced a value. */
 259  9 protected final void finishedCleanly(R result) {
 260  9 finished(new CleanlyFinishedState(result));
 261  9 _finishListeners.run(result);
 262  9 _finishListeners.clear(); // discard listeners, since they won't be used again
 263    }
 264   
 265    /** Called when the running task terminates early with an exception. */
 266  2 protected final void finishedWithTaskException(Exception e) {
 267  2 finished(new ExecutionExceptionState(new ExecutionException(e)));
 268    }
 269   
 270    /** Called when the runner implementation encounters an exception. */
 271  1 protected final void finishedWithImplementationException(RuntimeException e) {
 272  1 finished(new InternalExceptionState(e));
 273    }
 274   
 275  12 private final void finished(State finishedState) {
 276  12 State current = state.get();
 277  12 boolean changed = false;
 278  12 while (!changed && current.status() != Status.CANCELED && current.status() != Status.FINISHED) {
 279    // can use weakCompareAndSet since we're already in a while loop
 280  12 changed = state.weakCompareAndSet(current, finishedState);
 281  12 if (changed) { _done.signal(); discard(); }
 282  0 else { current = state.get(); }
 283    }
 284    }
 285   
 286    /** Produce a running state. May be overridden to change the way RunningStates are implemented. */
 287  14 protected RunningState runningState() { return new RunningState(); }
 288   
 289   
 290    public static enum Status {
 291    /** The task is not currently running, but can be started. */
 292    PAUSED,
 293    /** The task is currently executing. */
 294    RUNNING,
 295    /** The task has completed, and the result is available. */
 296    FINISHED,
 297    /** The task has been stopped and cannot be restarted. No result is available. */
 298    CANCELED;
 299    }
 300   
 301   
 302    /** An internal state for the controller, implementing the controller's core behavior. */
 303    protected abstract class State {
 304    public abstract Status status();
 305    public abstract void start();
 306    public abstract boolean cancel(boolean stopRunning);
 307    public abstract R get() throws InterruptedException, ExecutionException;
 308    public abstract R get(long timeout, TimeUnit u) throws InterruptedException, ExecutionException, TimeoutException;
 309    }
 310   
 311    /** Any state in which the task must be started before it can complete. */
 312    protected abstract class WaitingState extends State {
 313  2 public Status status() { return Status.PAUSED; }
 314  0 public boolean cancel(boolean stopRunning) {
 315  0 if (state.compareAndSet(this, new CanceledState())) {
 316  0 _done.signal();
 317  0 discard();
 318  0 return true;
 319    }
 320  0 else { return state.get().cancel(stopRunning); }
 321    }
 322  4 public R get() throws InterruptedException, ExecutionException {
 323  4 start(); return state.get().get();
 324    }
 325  0 public R get(long timeout, TimeUnit u) throws InterruptedException, ExecutionException, TimeoutException {
 326  0 start(); return state.get().get(timeout, u);
 327    }
 328    }
 329   
 330    /** Any state in which the task has already been asked to compute a result. */
 331    protected abstract class ComputingState extends State {
 332  8 public R get() throws InterruptedException, ExecutionException {
 333  8 _done.ensureSignaled();
 334  8 return state.get().get();
 335    }
 336  0 public R get(long timeout, TimeUnit u) throws InterruptedException, ExecutionException, TimeoutException {
 337  0 _done.ensureSignaled(timeout, u);
 338  0 return state.get().get(); // will not block, so can't timeout
 339    }
 340    }
 341   
 342    /** Initial state. */
 343    protected class FreshState extends WaitingState {
 344  12 public void start() {
 345  12 if (state.compareAndSet(this, new FreshStartingState())) { doStart(); }
 346  0 else { state.get().start(); }
 347    }
 348    }
 349   
 350    /** {@code FreshState.start()} has been invoked, but startup is not yet complete. */
 351    protected abstract class StartingState extends ComputingState {
 352  0 public Status status() { return Status.PAUSED; }
 353  0 public void start() {}
 354  0 public boolean cancel(boolean stopRunning) {
 355  0 if (stopRunning) {
 356  0 if (state.compareAndSet(this, new CanceledStartingState())) { return true; }
 357  0 else { return state.get().cancel(stopRunning); }
 358    }
 359  0 else { return false; }
 360    }
 361    /** Operation to perform when starting is complete */
 362    public abstract void started();
 363    }
 364   
 365    /** Simple instance of StartingState. */
 366    protected class FreshStartingState extends StartingState {
 367  13 public void started() {}
 368    }
 369   
 370    /** A StartingState that has been canceled while waiting for startup to complete. */
 371    protected class CanceledStartingState extends StartingState {
 372  0 public boolean cancel(boolean stopRunning) { return stopRunning; }
 373  0 public void started() { state.get().cancel(true); }
 374    }
 375   
 376    /** Startup has completed and we're waiting for a result. */
 377    protected class RunningState extends ComputingState {
 378  27 public Status status() { return Status.RUNNING; }
 379  0 public void start() {}
 380  0 public boolean cancel(boolean stopRunning) {
 381  0 if (stopRunning) {
 382  0 if (state.compareAndSet(this, new CancelingState())) { doStop(); return true; }
 383  0 else { return state.get().cancel(stopRunning); }
 384    }
 385  0 else { return false; }
 386    }
 387    }
 388   
 389    /** Canceled while running; waiting for termination to complete. */
 390    protected class CancelingState extends ComputingState {
 391  0 public Status status() { return Status.RUNNING; }
 392  0 public void start() {}
 393  0 public boolean cancel(boolean stopRunning) { return stopRunning; }
 394    }
 395   
 396    /** Any state for a task that has finished. */
 397    protected abstract class FinishedState extends State {
 398  2 public Status status() { return Status.FINISHED; }
 399  0 public void start() {}
 400  0 public boolean cancel(boolean stopRunning) { return false; }
 401    }
 402   
 403    /**
 404    * Finished with a result. Only states with this type are "resolved" -- others should either
 405    * require additional computation or throw an exception.
 406    */
 407    protected class CleanlyFinishedState extends FinishedState {
 408    private R _result;
 409  9 public CleanlyFinishedState(R result) { _result = result; }
 410  9 public R get() { return _result; }
 411  0 public R get(long timeout, TimeUnit u) { return _result; }
 412    }
 413   
 414    /** Finished with an ExecutionException. */
 415    protected class ExecutionExceptionState extends FinishedState {
 416    private ExecutionException _e;
 417  2 public ExecutionExceptionState(ExecutionException e) { _e = e; }
 418  2 public R get() throws ExecutionException { throw _e; }
 419  0 public R get(long timeout, TimeUnit u) throws ExecutionException { throw _e; }
 420    }
 421   
 422    /** Finished with a RuntimeException. */
 423    protected class InternalExceptionState extends FinishedState {
 424    private RuntimeException _e;
 425  1 public InternalExceptionState(RuntimeException e) { _e = e; }
 426  1 public R get() { throw _e; }
 427  0 public R get(long timeout, TimeUnit u) throws ExecutionException { throw _e; }
 428    }
 429   
 430    /** Has been successfully canceled. */
 431    protected class CanceledState extends State {
 432  0 public Status status() { return Status.CANCELED; }
 433  0 public void start() { throw new CancellationException("Task is canceled"); }
 434  0 public boolean cancel(boolean stopRunning) { return false; }
 435  0 public R get() { throw new CancellationException("Task is canceled"); }
 436  0 public R get(long timeout, TimeUnit u) { throw new CancellationException("Task is canceled"); }
 437    }
 438   
 439    }