Clover coverage report - PLT Utilities Test Coverage (plt-20120304-r5436)
Coverage timestamp: Sat Mar 3 2012 22:01:56 CST
file stats: LOC: 238   Methods: 22
NCLOC: 118   Classes: 7
 
 Source file Conditionals Statements Methods TOTAL
IncrementalTaskController.java 31.2% 47.5% 36.4% 40.9%
coverage coverage
 1    /*BEGIN_COPYRIGHT_BLOCK*
 2   
 3    PLT Utilities BSD License
 4   
 5    Copyright (c) 2007-2010 JavaPLT group at Rice University
 6    All rights reserved.
 7   
 8    Developed by: Java Programming Languages Team
 9    Rice University
 10    http://www.cs.rice.edu/~javaplt/
 11   
 12    Redistribution and use in source and binary forms, with or without modification, are permitted
 13    provided that the following conditions are met:
 14   
 15    - Redistributions of source code must retain the above copyright notice, this list of conditions
 16    and the following disclaimer.
 17    - Redistributions in binary form must reproduce the above copyright notice, this list of
 18    conditions and the following disclaimer in the documentation and/or other materials provided
 19    with the distribution.
 20    - Neither the name of the JavaPLT group, Rice University, nor the names of the library's
 21    contributors may be used to endorse or promote products derived from this software without
 22    specific prior written permission.
 23   
 24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
 25    IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
 26    FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS AND
 27    CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 28    DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 29    DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
 30    IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 31    OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 32   
 33    *END_COPYRIGHT_BLOCK*/
 34   
 35    package edu.rice.cs.plt.concurrent;
 36   
 37    import java.util.concurrent.BlockingQueue;
 38    import java.util.concurrent.CancellationException;
 39    import java.util.concurrent.ExecutionException;
 40    import java.util.concurrent.LinkedBlockingQueue;
 41    import java.util.concurrent.TimeUnit;
 42    import java.util.concurrent.TimeoutException;
 43    import java.util.concurrent.atomic.AtomicInteger;
 44   
 45    import edu.rice.cs.plt.collect.ListenerSet;
 46    import edu.rice.cs.plt.lambda.WrappedException;
 47   
 48    /**
 49    * <p>Provides access to a concurrent task that produces incremental results. Adds
 50    * a {@link #pause} method, more responsive cancellation, and access to intermediate computation
 51    * results via {@link #steps} and {@link #intermediateQueue}.</p>
 52    *
 53    * <p>To implement a concrete instance, a subclass must provide {@link #doStart}, {@link #doPause},
 54    * {@link #doStop}, and, optionally, {@link #discard}.</p>
 55    */
 56    public abstract class IncrementalTaskController<I, R> extends TaskController<R> {
 57   
 58    private final boolean _ignoreIntermediate;
 59    private final AtomicInteger _steps;
 60    private final BlockingQueue<I> _intermediateQueue;
 61    private final ListenerSet<I> _intermediateListeners;
 62   
 63    /** Sets {@code ignoreIntermediate} to {@code false}. */
 64  0 protected IncrementalTaskController() { this(false); }
 65   
 66    /**
 67    * If {@code ignoreIntermediate}, intermediate results will not be enqueued. (They will,
 68    * however, be passed to any listeners and counted by {@link #steps}.)
 69    */
 70  2 protected IncrementalTaskController(boolean ignoreIntermediate) {
 71  2 _ignoreIntermediate = ignoreIntermediate;
 72  2 _steps = new AtomicInteger(0);
 73  2 _intermediateQueue = _ignoreIntermediate ? null : new LinkedBlockingQueue<I>();
 74  2 _intermediateListeners = new ListenerSet<I>();
 75    }
 76   
 77    /**
 78    * Get the number of intermediate steps the task has taken. If {@code storeIntermediate}, at
 79    * least this many values have been added to the {@code intermediateQueue}.
 80    */
 81  1 public int steps() { return _steps.get(); }
 82   
 83    /**
 84    * <p>Get the queue for storing intermediate results. Throws an exception if {@code storeIntermediate}
 85    * is {@code false}; otherwise each intermediate result is added to this queue. Clients can check
 86    * the controller's status to determine if no additional results will be enqueued. However, there
 87    * is no guarantee (in general) that a running task will produce an intermediate result before completing.
 88    * (The task being run may follow some convention for indicating termination via the queue.) While the
 89    * result is intended to be used only for read operations, given the lack of a nice interface for
 90    * separating queue reads from queue writes, the result allows write access to the queue as well.</p>
 91    *
 92    * <p>IncrementalTaskController implementations should invoke {@link #stepped} to add items to
 93    * the queue, rather than doing so directly.</p>
 94    * @throws IllegalStateException If {@code storeIntermediate} is {@code false}.
 95    */
 96  5 public BlockingQueue<I> intermediateQueue() {
 97  0 if (_ignoreIntermediate) { throw new IllegalStateException("No queue is maintained"); }
 98  5 else { return _intermediateQueue; }
 99    }
 100   
 101    /**
 102    * Access the ListenerSet responding to the availability of intermediate results. Registered listeners
 103    * will be run for each intermediate result that becomes available.
 104    */
 105  0 public ListenerSet<I>.Sink intermediateListeners() {
 106  0 return _intermediateListeners.sink();
 107    }
 108   
 109    /**
 110    * Request that the task be paused. After pausing, the task will not continue executing unless
 111    * {@link #start} is invoked. If the the task is {@code PAUSED} or {@code FINISHED}, has no effect.
 112    * @throws CancellationException If the task is {@code CANCELED}.
 113    */
 114  1 public void pause() {
 115    // ideally, this would be implemented as part of the state, but we can't do that without
 116    // redefining the entire hierarchy of state classes
 117  1 boolean success = false;
 118  1 do {
 119  1 State s = state.get();
 120  1 Object sObj = s; // workaround for Eclipse compiler limitations
 121  1 if (sObj instanceof TaskController.RunningState) {
 122  1 success = state.compareAndSet(s, new FreshPausingState());
 123  1 if (success) { doPause(); }
 124    }
 125  0 else if (sObj instanceof TaskController.FreshStartingState) {
 126  0 success = state.compareAndSet(s, new PausedStartingState());
 127    }
 128  0 else if (sObj instanceof IncrementalTaskController.StartedPausingState) {
 129  0 success = state.compareAndSet(s, new FreshPausingState());
 130    }
 131  0 else if (sObj instanceof TaskController.CanceledState) {
 132  0 throw new CancellationException("Task is canceled");
 133    }
 134    else { // ignore other fresh, paused, finished, pausing, or canceling states
 135  0 success = true;
 136    }
 137  1 } while (!success);
 138    }
 139   
 140    /**
 141    * Pause computation and call {@link #paused}. If pausing does not occur immediately, the {@code paused()}
 142    * call may occur in a different thread. Will only be called after {@code started()} has been invoked.
 143    * When execution should resume again, {@code doStart()} will be invoked (but only after {@code paused()}
 144    * has been called). In order to support responsive canceling, a call to {@code doStop()} may occur
 145    * concurrently, or before {@code paused()} is called.
 146    */
 147    protected abstract void doPause();
 148   
 149    /**
 150    * Resume computation (after a pause) and call {@link #started}. If starting does not occur immediately
 151    * (for example, blocking occurs first), the {@code started()} call may occur in a different thread.
 152    */
 153    protected abstract void doResume();
 154   
 155  2 protected void paused() {
 156  2 boolean kept = false;
 157  2 State current = state.get();
 158  2 State next = new PausedState();
 159    // cast to Object as workaround for Eclipse compiler limitation
 160  2 while (((Object) current) instanceof IncrementalTaskController.PausingState && !kept) {
 161    // must loop because a transition between PausingStates could occur concurrently
 162    // can use weakCompareAndSet since we're already in a while loop
 163  1 kept = state.weakCompareAndSet(current, next);
 164  1 if (kept) { ((PausingState) current).paused(); }
 165  0 else { current = state.get(); }
 166    }
 167    }
 168   
 169    /** Record an intermediate result. Should only be called while the controller is in a running state. */
 170  10 protected void stepped(I intermediateResult) {
 171  10 if (!_ignoreIntermediate) {
 172  10 try { _intermediateQueue.put(intermediateResult); }
 173    // shouldn't block in the current implementation, but if that changes, we should throw the
 174    // InterruptedException -- it may have been caused by a doCancel() implementation.
 175  0 catch (InterruptedException e) { throw new WrappedException(e); }
 176    }
 177  10 _steps.incrementAndGet();
 178  10 _intermediateListeners.run(intermediateResult);
 179    }
 180   
 181    /** The tasked has been started and then paused. */
 182    protected class PausedState extends WaitingState {
 183  1 public final void start() {
 184  1 if (state.compareAndSet(this, new FreshStartingState())) { doResume(); }
 185  0 else { state.get().start(); }
 186    }
 187    }
 188   
 189    /** pause() has been invoked on a RunningState, but the task has not yet paused. */
 190    protected abstract class PausingState extends ComputingState {
 191  0 public Status status() { return Status.RUNNING; }
 192  0 public boolean cancel(boolean stopRunning) {
 193  0 if (stopRunning) {
 194  0 if (state.compareAndSet(this, new CanceledPausingState())) { doStop(); return true; }
 195  0 else { return state.get().cancel(stopRunning); }
 196    }
 197  0 else { return false; }
 198    }
 199    /** Operation to perform when pausing is complete */
 200    public abstract void paused();
 201    }
 202   
 203    /** Simple instance of PausingState. */
 204    protected class FreshPausingState extends PausingState {
 205  0 public void start() {
 206  0 if (!state.compareAndSet(this, new StartedPausingState())) { state.get().start(); }
 207    }
 208  0 public R get() throws InterruptedException, ExecutionException {
 209  0 start(); return state.get().get();
 210    }
 211  0 public R get(long timeout, TimeUnit u) throws InterruptedException, ExecutionException, TimeoutException {
 212  0 start(); return state.get().get(timeout, u);
 213    }
 214  1 public void paused() {}
 215    }
 216   
 217    /** A PausingState that has been canceled while waiting for the pause to complete. */
 218    protected class CanceledPausingState extends PausingState {
 219  0 public void start() {} // we're already committed to canceling
 220  0 public boolean cancel(boolean stopRunning) { return stopRunning; }
 221  0 public void paused() { state.get().cancel(true); }
 222    }
 223   
 224    /** A PausingState that has been started while waiting for the pause to complete. */
 225    protected class StartedPausingState extends PausingState {
 226  0 public void start() {}
 227  0 public void paused() { state.get().start(); }
 228    }
 229   
 230    /** A StartingState that has been paused while waiting for startup to complete. */
 231    protected class PausedStartingState extends StartingState {
 232  0 public void start() {
 233  0 if (!state.compareAndSet(this, new FreshStartingState())) { state.get().start(); }
 234    }
 235  0 public void started() { IncrementalTaskController.this.pause(); }
 236    }
 237   
 238    }