|
|||||||||||||||||||
| Source file | Conditionals | Statements | Methods | TOTAL | |||||||||||||||
| IncrementalTaskController.java | 31.2% | 47.5% | 36.4% | 40.9% |
|
||||||||||||||
| 1 | /*BEGIN_COPYRIGHT_BLOCK* | |
| 2 | ||
| 3 | PLT Utilities BSD License | |
| 4 | ||
| 5 | Copyright (c) 2007-2010 JavaPLT group at Rice University | |
| 6 | All rights reserved. | |
| 7 | ||
| 8 | Developed by: Java Programming Languages Team | |
| 9 | Rice University | |
| 10 | http://www.cs.rice.edu/~javaplt/ | |
| 11 | ||
| 12 | Redistribution and use in source and binary forms, with or without modification, are permitted | |
| 13 | provided that the following conditions are met: | |
| 14 | ||
| 15 | - Redistributions of source code must retain the above copyright notice, this list of conditions | |
| 16 | and the following disclaimer. | |
| 17 | - Redistributions in binary form must reproduce the above copyright notice, this list of | |
| 18 | conditions and the following disclaimer in the documentation and/or other materials provided | |
| 19 | with the distribution. | |
| 20 | - Neither the name of the JavaPLT group, Rice University, nor the names of the library's | |
| 21 | contributors may be used to endorse or promote products derived from this software without | |
| 22 | specific prior written permission. | |
| 23 | ||
| 24 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR | |
| 25 | IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND | |
| 26 | FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS AND | |
| 27 | CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |
| 28 | DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, | |
| 29 | DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER | |
| 30 | IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT | |
| 31 | OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
| 32 | ||
| 33 | *END_COPYRIGHT_BLOCK*/ | |
| 34 | ||
| 35 | package edu.rice.cs.plt.concurrent; | |
| 36 | ||
| 37 | import java.util.concurrent.BlockingQueue; | |
| 38 | import java.util.concurrent.CancellationException; | |
| 39 | import java.util.concurrent.ExecutionException; | |
| 40 | import java.util.concurrent.LinkedBlockingQueue; | |
| 41 | import java.util.concurrent.TimeUnit; | |
| 42 | import java.util.concurrent.TimeoutException; | |
| 43 | import java.util.concurrent.atomic.AtomicInteger; | |
| 44 | ||
| 45 | import edu.rice.cs.plt.collect.ListenerSet; | |
| 46 | import edu.rice.cs.plt.lambda.WrappedException; | |
| 47 | ||
| 48 | /** | |
| 49 | * <p>Provides access to a concurrent task that produces incremental results. Adds | |
| 50 | * a {@link #pause} method, more responsive cancellation, and access to intermediate computation | |
| 51 | * results via {@link #steps} and {@link #intermediateQueue}.</p> | |
| 52 | * | |
| 53 | * <p>To implement a concrete instance, a subclass must provide {@link #doStart}, {@link #doPause}, | |
| 54 | * {@link #doStop}, and, optionally, {@link #discard}.</p> | |
| 55 | */ | |
| 56 | public abstract class IncrementalTaskController<I, R> extends TaskController<R> { | |
| 57 | ||
| 58 | private final boolean _ignoreIntermediate; | |
| 59 | private final AtomicInteger _steps; | |
| 60 | private final BlockingQueue<I> _intermediateQueue; | |
| 61 | private final ListenerSet<I> _intermediateListeners; | |
| 62 | ||
| 63 | /** Sets {@code ignoreIntermediate} to {@code false}. */ | |
| 64 | 0 | protected IncrementalTaskController() { this(false); } |
| 65 | ||
| 66 | /** | |
| 67 | * If {@code ignoreIntermediate}, intermediate results will not be enqueued. (They will, | |
| 68 | * however, be passed to any listeners and counted by {@link #steps}.) | |
| 69 | */ | |
| 70 | 2 | protected IncrementalTaskController(boolean ignoreIntermediate) { |
| 71 | 2 | _ignoreIntermediate = ignoreIntermediate; |
| 72 | 2 | _steps = new AtomicInteger(0); |
| 73 | 2 | _intermediateQueue = _ignoreIntermediate ? null : new LinkedBlockingQueue<I>(); |
| 74 | 2 | _intermediateListeners = new ListenerSet<I>(); |
| 75 | } | |
| 76 | ||
| 77 | /** | |
| 78 | * Get the number of intermediate steps the task has taken. If {@code storeIntermediate}, at | |
| 79 | * least this many values have been added to the {@code intermediateQueue}. | |
| 80 | */ | |
| 81 | 1 | public int steps() { return _steps.get(); } |
| 82 | ||
| 83 | /** | |
| 84 | * <p>Get the queue for storing intermediate results. Throws an exception if {@code storeIntermediate} | |
| 85 | * is {@code false}; otherwise each intermediate result is added to this queue. Clients can check | |
| 86 | * the controller's status to determine if no additional results will be enqueued. However, there | |
| 87 | * is no guarantee (in general) that a running task will produce an intermediate result before completing. | |
| 88 | * (The task being run may follow some convention for indicating termination via the queue.) While the | |
| 89 | * result is intended to be used only for read operations, given the lack of a nice interface for | |
| 90 | * separating queue reads from queue writes, the result allows write access to the queue as well.</p> | |
| 91 | * | |
| 92 | * <p>IncrementalTaskController implementations should invoke {@link #stepped} to add items to | |
| 93 | * the queue, rather than doing so directly.</p> | |
| 94 | * @throws IllegalStateException If {@code storeIntermediate} is {@code false}. | |
| 95 | */ | |
| 96 | 5 | public BlockingQueue<I> intermediateQueue() { |
| 97 | 0 | if (_ignoreIntermediate) { throw new IllegalStateException("No queue is maintained"); } |
| 98 | 5 | else { return _intermediateQueue; } |
| 99 | } | |
| 100 | ||
| 101 | /** | |
| 102 | * Access the ListenerSet responding to the availability of intermediate results. Registered listeners | |
| 103 | * will be run for each intermediate result that becomes available. | |
| 104 | */ | |
| 105 | 0 | public ListenerSet<I>.Sink intermediateListeners() { |
| 106 | 0 | return _intermediateListeners.sink(); |
| 107 | } | |
| 108 | ||
| 109 | /** | |
| 110 | * Request that the task be paused. After pausing, the task will not continue executing unless | |
| 111 | * {@link #start} is invoked. If the the task is {@code PAUSED} or {@code FINISHED}, has no effect. | |
| 112 | * @throws CancellationException If the task is {@code CANCELED}. | |
| 113 | */ | |
| 114 | 1 | public void pause() { |
| 115 | // ideally, this would be implemented as part of the state, but we can't do that without | |
| 116 | // redefining the entire hierarchy of state classes | |
| 117 | 1 | boolean success = false; |
| 118 | 1 | do { |
| 119 | 1 | State s = state.get(); |
| 120 | 1 | Object sObj = s; // workaround for Eclipse compiler limitations |
| 121 | 1 | if (sObj instanceof TaskController.RunningState) { |
| 122 | 1 | success = state.compareAndSet(s, new FreshPausingState()); |
| 123 | 1 | if (success) { doPause(); } |
| 124 | } | |
| 125 | 0 | else if (sObj instanceof TaskController.FreshStartingState) { |
| 126 | 0 | success = state.compareAndSet(s, new PausedStartingState()); |
| 127 | } | |
| 128 | 0 | else if (sObj instanceof IncrementalTaskController.StartedPausingState) { |
| 129 | 0 | success = state.compareAndSet(s, new FreshPausingState()); |
| 130 | } | |
| 131 | 0 | else if (sObj instanceof TaskController.CanceledState) { |
| 132 | 0 | throw new CancellationException("Task is canceled"); |
| 133 | } | |
| 134 | else { // ignore other fresh, paused, finished, pausing, or canceling states | |
| 135 | 0 | success = true; |
| 136 | } | |
| 137 | 1 | } while (!success); |
| 138 | } | |
| 139 | ||
| 140 | /** | |
| 141 | * Pause computation and call {@link #paused}. If pausing does not occur immediately, the {@code paused()} | |
| 142 | * call may occur in a different thread. Will only be called after {@code started()} has been invoked. | |
| 143 | * When execution should resume again, {@code doStart()} will be invoked (but only after {@code paused()} | |
| 144 | * has been called). In order to support responsive canceling, a call to {@code doStop()} may occur | |
| 145 | * concurrently, or before {@code paused()} is called. | |
| 146 | */ | |
| 147 | protected abstract void doPause(); | |
| 148 | ||
| 149 | /** | |
| 150 | * Resume computation (after a pause) and call {@link #started}. If starting does not occur immediately | |
| 151 | * (for example, blocking occurs first), the {@code started()} call may occur in a different thread. | |
| 152 | */ | |
| 153 | protected abstract void doResume(); | |
| 154 | ||
| 155 | 2 | protected void paused() { |
| 156 | 2 | boolean kept = false; |
| 157 | 2 | State current = state.get(); |
| 158 | 2 | State next = new PausedState(); |
| 159 | // cast to Object as workaround for Eclipse compiler limitation | |
| 160 | 2 | while (((Object) current) instanceof IncrementalTaskController.PausingState && !kept) { |
| 161 | // must loop because a transition between PausingStates could occur concurrently | |
| 162 | // can use weakCompareAndSet since we're already in a while loop | |
| 163 | 1 | kept = state.weakCompareAndSet(current, next); |
| 164 | 1 | if (kept) { ((PausingState) current).paused(); } |
| 165 | 0 | else { current = state.get(); } |
| 166 | } | |
| 167 | } | |
| 168 | ||
| 169 | /** Record an intermediate result. Should only be called while the controller is in a running state. */ | |
| 170 | 10 | protected void stepped(I intermediateResult) { |
| 171 | 10 | if (!_ignoreIntermediate) { |
| 172 | 10 | try { _intermediateQueue.put(intermediateResult); } |
| 173 | // shouldn't block in the current implementation, but if that changes, we should throw the | |
| 174 | // InterruptedException -- it may have been caused by a doCancel() implementation. | |
| 175 | 0 | catch (InterruptedException e) { throw new WrappedException(e); } |
| 176 | } | |
| 177 | 10 | _steps.incrementAndGet(); |
| 178 | 10 | _intermediateListeners.run(intermediateResult); |
| 179 | } | |
| 180 | ||
| 181 | /** The tasked has been started and then paused. */ | |
| 182 | protected class PausedState extends WaitingState { | |
| 183 | 1 | public final void start() { |
| 184 | 1 | if (state.compareAndSet(this, new FreshStartingState())) { doResume(); } |
| 185 | 0 | else { state.get().start(); } |
| 186 | } | |
| 187 | } | |
| 188 | ||
| 189 | /** pause() has been invoked on a RunningState, but the task has not yet paused. */ | |
| 190 | protected abstract class PausingState extends ComputingState { | |
| 191 | 0 | public Status status() { return Status.RUNNING; } |
| 192 | 0 | public boolean cancel(boolean stopRunning) { |
| 193 | 0 | if (stopRunning) { |
| 194 | 0 | if (state.compareAndSet(this, new CanceledPausingState())) { doStop(); return true; } |
| 195 | 0 | else { return state.get().cancel(stopRunning); } |
| 196 | } | |
| 197 | 0 | else { return false; } |
| 198 | } | |
| 199 | /** Operation to perform when pausing is complete */ | |
| 200 | public abstract void paused(); | |
| 201 | } | |
| 202 | ||
| 203 | /** Simple instance of PausingState. */ | |
| 204 | protected class FreshPausingState extends PausingState { | |
| 205 | 0 | public void start() { |
| 206 | 0 | if (!state.compareAndSet(this, new StartedPausingState())) { state.get().start(); } |
| 207 | } | |
| 208 | 0 | public R get() throws InterruptedException, ExecutionException { |
| 209 | 0 | start(); return state.get().get(); |
| 210 | } | |
| 211 | 0 | public R get(long timeout, TimeUnit u) throws InterruptedException, ExecutionException, TimeoutException { |
| 212 | 0 | start(); return state.get().get(timeout, u); |
| 213 | } | |
| 214 | 1 | public void paused() {} |
| 215 | } | |
| 216 | ||
| 217 | /** A PausingState that has been canceled while waiting for the pause to complete. */ | |
| 218 | protected class CanceledPausingState extends PausingState { | |
| 219 | 0 | public void start() {} // we're already committed to canceling |
| 220 | 0 | public boolean cancel(boolean stopRunning) { return stopRunning; } |
| 221 | 0 | public void paused() { state.get().cancel(true); } |
| 222 | } | |
| 223 | ||
| 224 | /** A PausingState that has been started while waiting for the pause to complete. */ | |
| 225 | protected class StartedPausingState extends PausingState { | |
| 226 | 0 | public void start() {} |
| 227 | 0 | public void paused() { state.get().start(); } |
| 228 | } | |
| 229 | ||
| 230 | /** A StartingState that has been paused while waiting for startup to complete. */ | |
| 231 | protected class PausedStartingState extends StartingState { | |
| 232 | 0 | public void start() { |
| 233 | 0 | if (!state.compareAndSet(this, new FreshStartingState())) { state.get().start(); } |
| 234 | } | |
| 235 | 0 | public void started() { IncrementalTaskController.this.pause(); } |
| 236 | } | |
| 237 | ||
| 238 | } |
|
||||||||||