001/*
002 * Copyright (C) 2017 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Functions.constant;
020import static com.google.common.base.MoreObjects.toStringHelper;
021import static com.google.common.base.Preconditions.checkArgument;
022import static com.google.common.base.Preconditions.checkNotNull;
023import static com.google.common.base.Preconditions.checkState;
024import static com.google.common.collect.Lists.asList;
025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED;
026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING;
027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN;
028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED;
029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE;
030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER;
031import static com.google.common.util.concurrent.Futures.getDone;
032import static com.google.common.util.concurrent.Futures.immediateFuture;
033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
034import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
035
036import static java.util.logging.Level.FINER;
037import static java.util.logging.Level.SEVERE;
038import static java.util.logging.Level.WARNING;
039
040import com.google.common.annotations.Beta;
041import com.google.common.annotations.VisibleForTesting;
042import com.google.common.base.Function;
043import com.google.common.collect.FluentIterable;
044import com.google.common.collect.ImmutableList;
045import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable;
046import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable;
047import com.google.common.util.concurrent.Futures.FutureCombiner;
048
049
050
051import java.io.Closeable;
052import java.util.IdentityHashMap;
053import java.util.Map;
054import java.util.concurrent.Callable;
055import java.util.concurrent.CancellationException;
056import java.util.concurrent.CountDownLatch;
057import java.util.concurrent.ExecutionException;
058import java.util.concurrent.Executor;
059import java.util.concurrent.Future;
060import java.util.concurrent.RejectedExecutionException;
061import java.util.concurrent.atomic.AtomicReference;
062import java.util.logging.Logger;
063
064
065/**
066 * A step in a pipeline of an asynchronous computation. When the last step in the computation is
067 * complete, some objects captured during the computation are closed.
068 *
069 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an
070 * asynchronously-computed intermediate value, or else an exception that indicates the failure or
071 * cancellation of the operation so far. The only way to extract the value or exception from a step
072 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the
073 * "value" of a successful step or the "result" (value or exception) of any step.
074 *
075 * <ol>
076 *   <li>A pipeline starts at its leaf step (or steps), which is created from either a callable
077 *       block or a {@link ListenableFuture}.
078 *   <li>Each other step is derived from one or more input steps. At each step, zero or more objects
079 *       can be captured for later closing.
080 *   <li>There is one last step (the root of the tree), from which you can extract the final result
081 *       of the computation. After that result is available (or the computation fails), all objects
082 *       captured by any of the steps in the pipeline are closed.
083 * </ol>
084 *
085 * <h3>Starting a pipeline</h3>
086 *
087 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a
088 * callable block} that may capture objects for later closing. To start a pipeline from a {@link
089 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link
090 * #from(ListenableFuture)} instead.
091 *
092 * <h3>Derived steps</h3>
093 *
094 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in
095 * ways similar to {@link FluentFuture}s:
096 *
097 * <ul>
098 *   <li>by transforming the value from a successful input step,
099 *   <li>by catching the exception from a failed input step, or
100 *   <li>by combining the results of several input steps.
101 * </ul>
102 *
103 * Each derivation can capture the next value or any intermediate objects for later closing.
104 *
105 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its
106 * exception, or combine it with others, you cannot do anything else with it, including declare it
107 * to be the last step of the pipeline.
108 *
109 * <h4>Transforming</h4>
110 *
111 * To derive the next step by asynchronously applying a function to an input step's value, call
112 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction,
113 * Executor)} on the input step.
114 *
115 * <h4>Catching</h4>
116 *
117 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction,
118 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step.
119 *
120 * <h4>Combining</h4>
121 *
122 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link
123 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads.
124 *
125 * <h3>Cancelling</h3>
126 *
127 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step
128 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a
129 * successfully cancelled step will immediately start closing all objects captured for later closing
130 * by it and by its input steps.
131 *
132 * <h3>Ending a pipeline</h3>
133 *
134 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to
135 * close the captured objects automatically or manually.
136 *
137 * <h4>Automatically closing</h4>
138 *
139 * You can extract a {@link Future} that represents the result of the last step in the pipeline by
140 * calling {@link #finishToFuture()}. All objects the pipeline has captured for closing will begin
141 * to be closed asynchronously <b>after</b> the returned {@code Future} is done: the future
142 * completes before closing starts, rather than once it has finished.
143 *
144 * <pre>{@code
145 * FluentFuture<UserName> userName =
146 *     ClosingFuture.submit(
147 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
148 *             executor)
149 *         .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
150 *         .transform((closer, result) -> result.get("userName"), directExecutor())
151 *         .catching(DBException.class, e -> "no user", directExecutor())
152 *         .finishToFuture();
153 * }</pre>
154 *
155 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query
156 * result cursor will both be closed, even if the operation is cancelled or fails.
157 *
158 * <h4>Manually closing</h4>
159 *
160 * If you want to close the captured objects manually, after you've used the final result, call
161 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the
162 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects.
163 *
164 * <pre>{@code
165 *     ClosingFuture.submit(
166 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
167 *             executor)
168 *     .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
169 *     .transform((closer, result) -> result.get("userName"), directExecutor())
170 *     .catching(DBException.class, e -> "no user", directExecutor())
171 *     .finishToValueAndCloser(
172 *         valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor);
173 *
174 * // later
175 * try { // get() will throw if the operation failed or was cancelled.
176 *   UserName userName = userNameValueAndCloser.get();
177 *   // do something with userName
178 * } finally {
179 *   userNameValueAndCloser.closeAsync();
180 * }
181 * }</pre>
182 *
183 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and
184 * the query result cursor will both be closed, even if the operation is cancelled or fails.
185 *
186 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The
187 * automatic-closing approach described above is safer.
188 *
189 * @param <V> the type of the value of this step
190 * @since 30.0
191 */
192// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations.
193@Beta // @Beta for one release.
194
195// TODO(dpb): GWT compatibility.
196public final class ClosingFuture<V> {
197
198  private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName());
199
200  /**
201   * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is
202   * done.
203   */
204  public static final class DeferredCloser {
205    private final CloseableList list;
206
207    DeferredCloser(CloseableList list) {
208      this.list = list;
209    }
210
211    /**
212     * Captures an object to be closed when a {@link ClosingFuture} pipeline is done.
213     *
214     * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code
215     * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code
216     * Closeable}. (For more about the flavors, see <a
217     * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your
218     * build</a>.)
219     *
220     * <p>Be careful when targeting an older SDK than you are building against (most commonly when
221     * building for Android): Ensure that any object you pass implements the interface not just in
222     * your current SDK version but also at the oldest version you support. For example, <a
223     * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version
224     * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper
225     * {@code Closeable} with a method reference like {@code cursor::close}.
226     *
227     * <p>Note that this method is still binary-compatible between flavors because the erasure of
228     * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}.
229     *
230     * @param closeable the object to be closed (see notes above)
231     * @param closingExecutor the object will be closed on this executor
232     * @return the first argument
233     */
234    
235    public <C extends Object & AutoCloseable> C eventuallyClose(
236        C closeable, Executor closingExecutor) {
237      checkNotNull(closingExecutor);
238      if (closeable != null) {
239        list.add(closeable, closingExecutor);
240      }
241      return closeable;
242    }
243  }
244
245  /**
246   * An operation that computes a result.
247   *
248   * @param <V> the type of the result
249   */
250  @FunctionalInterface
251  public interface ClosingCallable<V extends Object> {
252    /**
253     * Computes a result, or throws an exception if unable to do so.
254     *
255     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
256     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
257     * not before this method completes), even if this method throws or the pipeline is cancelled.
258     */
259    V call(DeferredCloser closer) throws Exception;
260  }
261
262  /**
263   * An operation that computes a {@link ClosingFuture} of a result.
264   *
265   * @param <V> the type of the result
266   * @since 30.1
267   */
268  @FunctionalInterface
269  public interface AsyncClosingCallable<V extends Object> {
270    /**
271     * Computes a result, or throws an exception if unable to do so.
272     *
273     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
274     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
275     * not before this method completes), even if this method throws or the pipeline is cancelled.
276     */
277    ClosingFuture<V> call(DeferredCloser closer) throws Exception;
278  }
279
280  /**
281   * A function from an input to a result.
282   *
283   * @param <T> the type of the input to the function
284   * @param <U> the type of the result of the function
285   */
286  @FunctionalInterface
287  public interface ClosingFunction<T extends Object, U extends Object> {
288
289    /**
290     * Applies this function to an input, or throws an exception if unable to do so.
291     *
292     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
293     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
294     * not before this method completes), even if this method throws or the pipeline is cancelled.
295     */
296    U apply(DeferredCloser closer, T input) throws Exception;
297  }
298
299  /**
300   * A function from an input to a {@link ClosingFuture} of a result.
301   *
302   * @param <T> the type of the input to the function
303   * @param <U> the type of the result of the function
304   */
305  @FunctionalInterface
306  public interface AsyncClosingFunction<T extends Object, U extends Object> {
307    /**
308     * Applies this function to an input, or throws an exception if unable to do so.
309     *
310     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
311     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
312     * not before this method completes), even if this method throws or the pipeline is cancelled.
313     */
314    ClosingFuture<U> apply(DeferredCloser closer, T input) throws Exception;
315  }
316
317  /**
318   * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and
319   * allows the user to close all the closeable objects that were captured during it for later
320   * closing.
321   *
322   * <p>The asynchronous operation will have completed before this object is created.
323   *
324   * @param <V> the type of the value of a successful operation
325   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
326   */
327  public static final class ValueAndCloser<V> {
328
329    private final ClosingFuture<? extends V> closingFuture;
330
331    ValueAndCloser(ClosingFuture<? extends V> closingFuture) {
332      this.closingFuture = checkNotNull(closingFuture);
333    }
334
335    /**
336     * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as
337     * {@link Future#get()} would.
338     *
339     * <p>Because the asynchronous operation has already completed, this method is synchronous and
340     * returns immediately.
341     *
342     * @throws CancellationException if the computation was cancelled
343     * @throws ExecutionException if the computation threw an exception
344     */
345    
346    public V get() throws ExecutionException {
347      return getDone(closingFuture.future);
348    }
349
350    /**
351     * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous
352     * operation on the {@link Executor}s specified by calls to {@link
353     * DeferredCloser#eventuallyClose(Closeable, Executor)}.
354     *
355     * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be
356     * closed synchronously.
357     *
358     * <p>Idempotent: objects will be closed at most once.
359     */
360    public void closeAsync() {
361      closingFuture.close();
362    }
363  }
364
365  /**
366   * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link
367   * ClosingFuture} pipeline.
368   *
369   * @param <V> the type of the final value of a successful pipeline
370   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
371   */
372  @FunctionalInterface
373  public interface ValueAndCloserConsumer<V> {
374
375    /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */
376    void accept(ValueAndCloser<V> valueAndCloser);
377  }
378
379  /**
380   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
381   *
382   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
383   *     execution
384   */
385  public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor executor) {
386    return new ClosingFuture<>(callable, executor);
387  }
388
389  /**
390   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
391   *
392   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
393   *     execution
394   * @since 30.1
395   */
396  public static <V> ClosingFuture<V> submitAsync(
397      AsyncClosingCallable<V> callable, Executor executor) {
398    return new ClosingFuture<>(callable, executor);
399  }
400
401  /**
402   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
403   *
404   * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V}
405   * implements {@link Closeable}. In order to start a pipeline with a value that will be closed
406   * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead.
407   */
408  public static <V> ClosingFuture<V> from(ListenableFuture<V> future) {
409    return new ClosingFuture<V>(future);
410  }
411
412  /**
413   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
414   *
415   * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)} when
416   * the pipeline is done, even if the pipeline is canceled or fails.
417   *
418   * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its
419   * value in order to close it.
420   *
421   * @param future the future to create the {@code ClosingFuture} from. For discussion of the
422   *     future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Closeable,
423   *     Executor)}.
424   * @param closingExecutor the future's result will be closed on this executor
425   * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the
426   *     underlying value may never be closed if the {@link Future} is canceled after its operation
427   *     begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types,
428   *     including those that pass them to this method, with {@link #submit(ClosingCallable,
429   *     Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a
430   *     {@link ListenableFuture} that doesn't create values that should be closed, use {@link
431   *     ClosingFuture#from}.
432   */
433  @Deprecated
434  public static <C extends Object & AutoCloseable>
435      ClosingFuture<C> eventuallyClosing(
436          ListenableFuture<C> future, final Executor closingExecutor) {
437    checkNotNull(closingExecutor);
438    final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future));
439    Futures.addCallback(
440        future,
441        new FutureCallback<AutoCloseable>() {
442          @Override
443          public void onSuccess(AutoCloseable result) {
444            closingFuture.closeables.closer.eventuallyClose(result, closingExecutor);
445          }
446
447          @Override
448          public void onFailure(Throwable t) {}
449        },
450        directExecutor());
451    return closingFuture;
452  }
453
454  /**
455   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
456   *
457   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
458   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
459   */
460  public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) {
461    return new Combiner(false, futures);
462  }
463
464  /**
465   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
466   *
467   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
468   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
469   */
470  public static Combiner whenAllComplete(
471      ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) {
472    return whenAllComplete(asList(future1, moreFutures));
473  }
474
475  /**
476   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
477   * all succeed. If any fail, the resulting pipeline will fail.
478   *
479   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
480   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
481   */
482  public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) {
483    return new Combiner(true, futures);
484  }
485
486  /**
487   * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming
488   * they all succeed. If any fail, the resulting pipeline will fail.
489   *
490   * <p>Calling this method allows you to use lambdas or method references typed with the types of
491   * the input {@link ClosingFuture}s.
492   *
493   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
494   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
495   */
496  public static <V1, V2> Combiner2<V1, V2> whenAllSucceed(
497      ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
498    return new Combiner2<>(future1, future2);
499  }
500
501  /**
502   * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming
503   * they all succeed. If any fail, the resulting pipeline will fail.
504   *
505   * <p>Calling this method allows you to use lambdas or method references typed with the types of
506   * the input {@link ClosingFuture}s.
507   *
508   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
509   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
510   */
511  public static <V1, V2, V3> Combiner3<V1, V2, V3> whenAllSucceed(
512      ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
513    return new Combiner3<>(future1, future2, future3);
514  }
515
516  /**
517   * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming
518   * they all succeed. If any fail, the resulting pipeline will fail.
519   *
520   * <p>Calling this method allows you to use lambdas or method references typed with the types of
521   * the input {@link ClosingFuture}s.
522   *
523   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
524   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
525   */
526  public static <V1, V2, V3, V4> Combiner4<V1, V2, V3, V4> whenAllSucceed(
527      ClosingFuture<V1> future1,
528      ClosingFuture<V2> future2,
529      ClosingFuture<V3> future3,
530      ClosingFuture<V4> future4) {
531    return new Combiner4<>(future1, future2, future3, future4);
532  }
533
534  /**
535   * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming
536   * they all succeed. If any fail, the resulting pipeline will fail.
537   *
538   * <p>Calling this method allows you to use lambdas or method references typed with the types of
539   * the input {@link ClosingFuture}s.
540   *
541   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
542   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
543   */
544  public static <V1, V2, V3, V4, V5> Combiner5<V1, V2, V3, V4, V5> whenAllSucceed(
545      ClosingFuture<V1> future1,
546      ClosingFuture<V2> future2,
547      ClosingFuture<V3> future3,
548      ClosingFuture<V4> future4,
549      ClosingFuture<V5> future5) {
550    return new Combiner5<>(future1, future2, future3, future4, future5);
551  }
552
553  /**
554   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
555   * all succeed. If any fail, the resulting pipeline will fail.
556   *
557   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
558   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
559   */
560  public static Combiner whenAllSucceed(
561      ClosingFuture<?> future1,
562      ClosingFuture<?> future2,
563      ClosingFuture<?> future3,
564      ClosingFuture<?> future4,
565      ClosingFuture<?> future5,
566      ClosingFuture<?> future6,
567      ClosingFuture<?>... moreFutures) {
568    return whenAllSucceed(
569        FluentIterable.of(future1, future2, future3, future4, future5, future6)
570            .append(moreFutures));
571  }
572
573  private final AtomicReference<State> state = new AtomicReference<>(OPEN);
574  private final CloseableList closeables = new CloseableList();
575  private final FluentFuture<V> future;
576
577  private ClosingFuture(ListenableFuture<V> future) {
578    this.future = FluentFuture.from(future);
579  }
580
581  private ClosingFuture(final ClosingCallable<V> callable, Executor executor) {
582    checkNotNull(callable);
583    TrustedListenableFutureTask<V> task =
584        TrustedListenableFutureTask.create(
585            new Callable<V>() {
586              @Override
587              public V call() throws Exception {
588                return callable.call(closeables.closer);
589              }
590
591              @Override
592              public String toString() {
593                return callable.toString();
594              }
595            });
596    executor.execute(task);
597    this.future = task;
598  }
599
600  private ClosingFuture(final AsyncClosingCallable<V> callable, Executor executor) {
601    checkNotNull(callable);
602    TrustedListenableFutureTask<V> task =
603        TrustedListenableFutureTask.create(
604            new AsyncCallable<V>() {
605              @Override
606              public ListenableFuture<V> call() throws Exception {
607                CloseableList newCloseables = new CloseableList();
608                try {
609                  ClosingFuture<V> closingFuture = callable.call(newCloseables.closer);
610                  closingFuture.becomeSubsumedInto(closeables);
611                  return closingFuture.future;
612                } finally {
613                  closeables.add(newCloseables, directExecutor());
614                }
615              }
616
617              @Override
618              public String toString() {
619                return callable.toString();
620              }
621            });
622    executor.execute(task);
623    this.future = task;
624  }
625
626  /**
627   * Returns a future that finishes when this step does. Calling {@code get()} on the returned
628   * future returns {@code null} if the step is successful or throws the same exception that would
629   * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code
630   * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline.
631   *
632   * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls
633   * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or
634   * a derivation method <i>on the same instance</i>. This is important because calling {@code
635   * statusFuture} alone does not provide a way to close the pipeline.
636   */
637  public ListenableFuture<?> statusFuture() {
638    return nonCancellationPropagating(future.transform(constant(null), directExecutor()));
639  }
640
641  /**
642   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
643   * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed
644   * when the pipeline is done.
645   *
646   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
647   * ClosingFuture} will be equivalent to this one.
648   *
649   * <p>If the function throws an exception, that exception is used as the result of the derived
650   * {@code ClosingFuture}.
651   *
652   * <p>Example usage:
653   *
654   * <pre>{@code
655   * ClosingFuture<List<Row>> rowsFuture =
656   *     queryFuture.transform((closer, result) -> result.getRows(), executor);
657   * }</pre>
658   *
659   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
660   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
661   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
662   *
663   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
664   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
665   * this {@code ClosingFuture}.
666   *
667   * @param function transforms the value of this step to the value of the derived step
668   * @param executor executor to run the function in
669   * @return the derived step
670   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
671   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
672   *     finished}
673   */
674  public <U> ClosingFuture<U> transform(
675      final ClosingFunction<? super V, U> function, Executor executor) {
676    checkNotNull(function);
677    AsyncFunction<V, U> applyFunction =
678        new AsyncFunction<V, U>() {
679          @Override
680          public ListenableFuture<U> apply(V input) throws Exception {
681            return closeables.applyClosingFunction(function, input);
682          }
683
684          @Override
685          public String toString() {
686            return function.toString();
687          }
688        };
689    // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function).
690    return derive(future.transformAsync(applyFunction, executor));
691  }
692
693  /**
694   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
695   * that returns a {@code ClosingFuture} to its value. The function can use a {@link
696   * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
697   * captured by the returned {@link ClosingFuture}).
698   *
699   * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one
700   * returned by the function.
701   *
702   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
703   * ClosingFuture} will be equivalent to this one.
704   *
705   * <p>If the function throws an exception, that exception is used as the result of the derived
706   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
707   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
708   * closed.
709   *
710   * <p>Usage guidelines for this method:
711   *
712   * <ul>
713   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
714   *       {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction,
715   *       Executor)} instead, with a function that returns the next value directly.
716   *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()}
717   *       for every closeable object this step creates in order to capture it for later closing.
718   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
719   *       ClosingFuture} call {@link #from(ListenableFuture)}.
720   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
721   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
722   *       {@link #withoutCloser(AsyncFunction)}
723   * </ul>
724   *
725   * <p>Example usage:
726   *
727   * <pre>{@code
728   * // Result.getRowsClosingFuture() returns a ClosingFuture.
729   * ClosingFuture<List<Row>> rowsFuture =
730   *     queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor);
731   *
732   * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the
733   * // number of written rows. openOutputFile() returns a FileOutputStream (which implements
734   * // Closeable).
735   * ClosingFuture<Integer> rowsFuture2 =
736   *     queryFuture.transformAsync(
737   *         (closer, result) -> {
738   *           FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor);
739   *           return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos));
740   *      },
741   *      executor);
742   *
743   * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created).
744   * ClosingFuture<List<Row>> rowsFuture3 =
745   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
746   *
747   * }</pre>
748   *
749   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
750   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
751   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
752   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
753   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
754   * responsible for completing the returned {@code ClosingFuture}.)
755   *
756   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
757   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
758   * this {@code ClosingFuture}.
759   *
760   * @param function transforms the value of this step to a {@code ClosingFuture} with the value of
761   *     the derived step
762   * @param executor executor to run the function in
763   * @return the derived step
764   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
765   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
766   *     finished}
767   */
768  public <U> ClosingFuture<U> transformAsync(
769      final AsyncClosingFunction<? super V, U> function, Executor executor) {
770    checkNotNull(function);
771    AsyncFunction<V, U> applyFunction =
772        new AsyncFunction<V, U>() {
773          @Override
774          public ListenableFuture<U> apply(V input) throws Exception {
775            return closeables.applyAsyncClosingFunction(function, input);
776          }
777
778          @Override
779          public String toString() {
780            return function.toString();
781          }
782        };
783    return derive(future.transformAsync(applyFunction, executor));
784  }
785
786  /**
787   * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input,
788   * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned
789   * {@link ListenableFuture}.
790   *
791   * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction,
792   * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it
793   * meets these conditions:
794   *
795   * <ul>
796   *   <li>It does not need to capture any {@link Closeable} objects by calling {@link
797   *       DeferredCloser#eventuallyClose(Closeable, Executor)}.
798   *   <li>It returns a {@link ListenableFuture}.
799   * </ul>
800   *
801   * <p>Example usage:
802   *
803   * <pre>{@code
804   * // Result.getRowsFuture() returns a ListenableFuture.
805   * ClosingFuture<List<Row>> rowsFuture =
806   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
807   * }</pre>
808   *
809   * @param function transforms the value of a {@code ClosingFuture} step to a {@link
810   *     ListenableFuture} with the value of a derived step
811   */
812  public static <V, U> AsyncClosingFunction<V, U> withoutCloser(
813      final AsyncFunction<V, U> function) {
814    checkNotNull(function);
815    return new AsyncClosingFunction<V, U>() {
816      @Override
817      public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception {
818        return ClosingFuture.from(function.apply(input));
819      }
820    };
821  }
822
823  /**
824   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
825   * to its exception if it is an instance of a given exception type. The function can use a {@link
826   * DeferredCloser} to capture objects to be closed when the pipeline is done.
827   *
828   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
829   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
830   * one.
831   *
832   * <p>If the function throws an exception, that exception is used as the result of the derived
833   * {@code ClosingFuture}.
834   *
835   * <p>Example usage:
836   *
837   * <pre>{@code
838   * ClosingFuture<QueryResult> queryFuture =
839   *     queryFuture.catching(
840   *         QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor);
841   * }</pre>
842   *
843   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
844   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
845   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
846   *
847   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
848   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
849   * this {@code ClosingFuture}.
850   *
851   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
852   *     type is matched against this step's exception. "This step's exception" means the cause of
853   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
854   *     underlying this step or, if {@code get()} throws a different kind of exception, that
855   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
856   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
857   * @param fallback the function to be called if this step fails with the expected exception type.
858   *     The function's argument is this step's exception. "This step's exception" means the cause
859   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
860   *     underlying this step or, if {@code get()} throws a different kind of exception, that
861   *     exception itself.
862   * @param executor the executor that runs {@code fallback} if the input fails
863   */
864  public <X extends Throwable> ClosingFuture<V> catching(
865      Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) {
866    return catchingMoreGeneric(exceptionType, fallback, executor);
867  }
868
869  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
870  private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric(
871      Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) {
872    checkNotNull(fallback);
873    AsyncFunction<X, W> applyFallback =
874        new AsyncFunction<X, W>() {
875          @Override
876          public ListenableFuture<W> apply(X exception) throws Exception {
877            return closeables.applyClosingFunction(fallback, exception);
878          }
879
880          @Override
881          public String toString() {
882            return fallback.toString();
883          }
884        };
885    // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function).
886    return derive(future.catchingAsync(exceptionType, applyFallback, executor));
887  }
888
889  /**
890   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
891   * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception
892   * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the
893   * pipeline is done (other than those captured by the returned {@link ClosingFuture}).
894   *
895   * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code
896   * ClosingFuture} will be equivalent to the one returned by the function.
897   *
898   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
899   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
900   * one.
901   *
902   * <p>If the function throws an exception, that exception is used as the result of the derived
903   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
904   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
905   * closed.
906   *
907   * <p>Usage guidelines for this method:
908   *
909   * <ul>
910   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
911   *       {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class,
912   *       ClosingFunction, Executor)} instead, with a function that returns the next value
913   *       directly.
914   *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()}
915   *       for every closeable object this step creates in order to capture it for later closing.
916   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
917   *       ClosingFuture} call {@link #from(ListenableFuture)}.
918   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
919   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
920   *       {@link #withoutCloser(AsyncFunction)}
921   * </ul>
922   *
923   * <p>Example usage:
924   *
925   * <pre>{@code
926   * // Fall back to a secondary input stream in case of IOException.
927   * ClosingFuture<InputStream> inputFuture =
928   *     firstInputFuture.catchingAsync(
929   *         IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor);
930   * }
931   * }</pre>
932   *
933   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
934   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
935   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
936   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
937   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
938   * responsible for completing the returned {@code ClosingFuture}.)
939   *
940   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
941   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
942   * this {@code ClosingFuture}.
943   *
944   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
945   *     type is matched against this step's exception. "This step's exception" means the cause of
946   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
947   *     underlying this step or, if {@code get()} throws a different kind of exception, that
948   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
949   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
950   * @param fallback the function to be called if this step fails with the expected exception type.
951   *     The function's argument is this step's exception. "This step's exception" means the cause
952   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
953   *     underlying this step or, if {@code get()} throws a different kind of exception, that
954   *     exception itself.
955   * @param executor the executor that runs {@code fallback} if the input fails
956   */
957  // TODO(dpb): Should this do something special if the function throws CancellationException or
958  // ExecutionException?
959  public <X extends Throwable> ClosingFuture<V> catchingAsync(
960      Class<X> exceptionType,
961      AsyncClosingFunction<? super X, ? extends V> fallback,
962      Executor executor) {
963    return catchingAsyncMoreGeneric(exceptionType, fallback, executor);
964  }
965
966  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
967  private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric(
968      Class<X> exceptionType,
969      final AsyncClosingFunction<? super X, W> fallback,
970      Executor executor) {
971    checkNotNull(fallback);
972    AsyncFunction<X, W> asyncFunction =
973        new AsyncFunction<X, W>() {
974          @Override
975          public ListenableFuture<W> apply(X exception) throws Exception {
976            return closeables.applyAsyncClosingFunction(fallback, exception);
977          }
978
979          @Override
980          public String toString() {
981            return fallback.toString();
982          }
983        };
984    return derive(future.catchingAsync(exceptionType, asyncFunction, executor));
985  }
986
987  /**
988   * Marks this step as the last step in the {@code ClosingFuture} pipeline.
989   *
990   * <p>The returned {@link Future} is completed when the pipeline's computation completes, or when
991   * the pipeline is cancelled.
992   *
993   * <p>All objects the pipeline has captured for closing will begin to be closed asynchronously
994   * <b>after</b> the returned {@code Future} is done: the future completes before closing starts,
995   * rather than once it has finished.
996   *
997   * <p>After calling this method, you may not call {@link
998   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other
999   * derivation method on this {@code ClosingFuture}.
1000   *
1001   * @return a {@link Future} that represents the final value or exception of the pipeline
1002   */
1003  public FluentFuture<V> finishToFuture() {
1004    if (compareAndUpdateState(OPEN, WILL_CLOSE)) {
1005      logger.log(FINER, "will close {0}", this);
1006      future.addListener(
1007          new Runnable() {
1008            @Override
1009            public void run() {
1010              checkAndUpdateState(WILL_CLOSE, CLOSING);
1011              close();
1012              checkAndUpdateState(CLOSING, CLOSED);
1013            }
1014          },
1015          directExecutor());
1016    } else {
1017      switch (state.get()) {
1018        case SUBSUMED:
1019          throw new IllegalStateException(
1020              "Cannot call finishToFuture() after deriving another step");
1021
1022        case WILL_CREATE_VALUE_AND_CLOSER:
1023          throw new IllegalStateException(
1024              "Cannot call finishToFuture() after calling finishToValueAndCloser()");
1025
1026        case WILL_CLOSE:
1027        case CLOSING:
1028        case CLOSED:
1029          throw new IllegalStateException("Cannot call finishToFuture() twice");
1030
1031        case OPEN:
1032          throw new AssertionError();
1033      }
1034    }
1035    return future;
1036  }
1037
1038  /**
1039   * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done,
1040   * {@code receiver} will be called with an object that contains the result of the operation. The
1041   * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use.
1042   *
1043   * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or
1044   * any other derivation method on this {@code ClosingFuture}.
1045   *
1046   * @param consumer a callback whose method will be called (using {@code executor}) when this
1047   *     operation is done
1048   */
1049  public void finishToValueAndCloser(
1050      final ValueAndCloserConsumer<? super V> consumer, Executor executor) {
1051    checkNotNull(consumer);
1052    if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) {
1053      switch (state.get()) {
1054        case SUBSUMED:
1055          throw new IllegalStateException(
1056              "Cannot call finishToValueAndCloser() after deriving another step");
1057
1058        case WILL_CLOSE:
1059        case CLOSING:
1060        case CLOSED:
1061          throw new IllegalStateException(
1062              "Cannot call finishToValueAndCloser() after calling finishToFuture()");
1063
1064        case WILL_CREATE_VALUE_AND_CLOSER:
1065          throw new IllegalStateException("Cannot call finishToValueAndCloser() twice");
1066
1067        case OPEN:
1068          break;
1069      }
1070      throw new AssertionError(state);
1071    }
1072    future.addListener(
1073        new Runnable() {
1074          @Override
1075          public void run() {
1076            provideValueAndCloser(consumer, ClosingFuture.this);
1077          }
1078        },
1079        executor);
1080  }
1081
1082  private static <C, V extends C> void provideValueAndCloser(
1083      ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) {
1084    consumer.accept(new ValueAndCloser<C>(closingFuture));
1085  }
1086
1087  /**
1088   * Attempts to cancel execution of this step. This attempt will fail if the step has already
1089   * completed, has already been cancelled, or could not be cancelled for some other reason. If
1090   * successful, and this step has not started when {@code cancel} is called, this step should never
1091   * run.
1092   *
1093   * <p>If successful, causes the objects captured by this step (if already started) and its input
1094   * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls
1095   * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously.
1096   *
1097   * @param mayInterruptIfRunning {@code true} if the thread executing this task should be
1098   *     interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be
1099   *     cancelled regardless
1100   * @return {@code false} if the step could not be cancelled, typically because it has already
1101   *     completed normally; {@code true} otherwise
1102   */
1103  
1104  public boolean cancel(boolean mayInterruptIfRunning) {
1105    logger.log(FINER, "cancelling {0}", this);
1106    boolean cancelled = future.cancel(mayInterruptIfRunning);
1107    if (cancelled) {
1108      close();
1109    }
1110    return cancelled;
1111  }
1112
1113  private void close() {
1114    logger.log(FINER, "closing {0}", this);
1115    closeables.close();
1116  }
1117
1118  private <U> ClosingFuture<U> derive(FluentFuture<U> future) {
1119    ClosingFuture<U> derived = new ClosingFuture<>(future);
1120    becomeSubsumedInto(derived.closeables);
1121    return derived;
1122  }
1123
1124  private void becomeSubsumedInto(CloseableList otherCloseables) {
1125    checkAndUpdateState(OPEN, SUBSUMED);
1126    otherCloseables.add(closeables, directExecutor());
1127  }
1128
1129  /**
1130   * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link
1131   * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}.
1132   *
1133   * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object.
1134   */
1135  public static final class Peeker {
1136    private final ImmutableList<ClosingFuture<?>> futures;
1137    private volatile boolean beingCalled;
1138
1139    private Peeker(ImmutableList<ClosingFuture<?>> futures) {
1140      this.futures = checkNotNull(futures);
1141    }
1142
1143    /**
1144     * Returns the value of {@code closingFuture}.
1145     *
1146     * @throws ExecutionException if {@code closingFuture} is a failed step
1147     * @throws CancellationException if the {@code closingFuture}'s future was cancelled
1148     * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to
1149     *     {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)}
1150     * @throws IllegalStateException if called outside of a call to {@link
1151     *     CombiningCallable#call(DeferredCloser, Peeker)} or {@link
1152     *     AsyncCombiningCallable#call(DeferredCloser, Peeker)}
1153     */
1154    public final <D extends Object> D getDone(ClosingFuture<D> closingFuture)
1155        throws ExecutionException {
1156      checkState(beingCalled);
1157      checkArgument(futures.contains(closingFuture));
1158      return Futures.getDone(closingFuture.future);
1159    }
1160
1161    private <V extends Object> V call(
1162        CombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1163      beingCalled = true;
1164      CloseableList newCloseables = new CloseableList();
1165      try {
1166        return combiner.call(newCloseables.closer, this);
1167      } finally {
1168        closeables.add(newCloseables, directExecutor());
1169        beingCalled = false;
1170      }
1171    }
1172
1173    private <V extends Object> FluentFuture<V> callAsync(
1174        AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1175      beingCalled = true;
1176      CloseableList newCloseables = new CloseableList();
1177      try {
1178        ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this);
1179        closingFuture.becomeSubsumedInto(closeables);
1180        return closingFuture.future;
1181      } finally {
1182        closeables.add(newCloseables, directExecutor());
1183        beingCalled = false;
1184      }
1185    }
1186  }
1187
1188  /**
1189   * A builder of a {@link ClosingFuture} step that is derived from more than one input step.
1190   *
1191   * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to
1192   * instantiate this class.
1193   *
1194   * <p>Example:
1195   *
1196   * <pre>{@code
1197   * final ClosingFuture<BufferedReader> file1ReaderFuture = ...;
1198   * final ClosingFuture<BufferedReader> file2ReaderFuture = ...;
1199   * ListenableFuture<Integer> numberOfDifferentLines =
1200   *       ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture)
1201   *           .call(
1202   *               (closer, peeker) -> {
1203   *                 BufferedReader file1Reader = peeker.getDone(file1ReaderFuture);
1204   *                 BufferedReader file2Reader = peeker.getDone(file2ReaderFuture);
1205   *                 return countDifferentLines(file1Reader, file2Reader);
1206   *               },
1207   *               executor)
1208   *           .closing(executor);
1209   * }</pre>
1210   */
1211  // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8.
1212  
1213  public static class Combiner {
1214
1215    private final CloseableList closeables = new CloseableList();
1216
1217    /**
1218     * An operation that returns a result and may throw an exception.
1219     *
1220     * @param <V> the type of the result
1221     */
1222    @FunctionalInterface
1223    public interface CombiningCallable<V extends Object> {
1224      /**
1225       * Computes a result, or throws an exception if unable to do so.
1226       *
1227       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1228       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1229       * is done (but not before this method completes), even if this method throws or the pipeline
1230       * is cancelled.
1231       *
1232       * @param peeker used to get the value of any of the input futures
1233       */
1234      V call(DeferredCloser closer, Peeker peeker) throws Exception;
1235    }
1236
1237    /**
1238     * An operation that returns a {@link ClosingFuture} result and may throw an exception.
1239     *
1240     * @param <V> the type of the result
1241     */
1242    @FunctionalInterface
1243    public interface AsyncCombiningCallable<V extends Object> {
1244      /**
1245       * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so.
1246       *
1247       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1248       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1249       * is done (but not before this method completes), even if this method throws or the pipeline
1250       * is cancelled.
1251       *
1252       * @param peeker used to get the value of any of the input futures
1253       */
1254      ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception;
1255    }
1256
1257    private final boolean allMustSucceed;
1258    protected final ImmutableList<ClosingFuture<?>> inputs;
1259
1260    private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) {
1261      this.allMustSucceed = allMustSucceed;
1262      this.inputs = ImmutableList.copyOf(inputs);
1263      for (ClosingFuture<?> input : inputs) {
1264        input.becomeSubsumedInto(closeables);
1265      }
1266    }
1267
1268    /**
1269     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1270     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1271     * objects to be closed when the pipeline is done.
1272     *
1273     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1274     * fail, so will the returned step.
1275     *
1276     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1277     * cancelled.
1278     *
1279     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1280     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1281     */
1282    public <V> ClosingFuture<V> call(
1283        final CombiningCallable<V> combiningCallable, Executor executor) {
1284      Callable<V> callable =
1285          new Callable<V>() {
1286            @Override
1287            public V call() throws Exception {
1288              return new Peeker(inputs).call(combiningCallable, closeables);
1289            }
1290
1291            @Override
1292            public String toString() {
1293              return combiningCallable.toString();
1294            }
1295          };
1296      ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor));
1297      derived.closeables.add(closeables, directExecutor());
1298      return derived;
1299    }
1300
1301    /**
1302     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1303     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1304     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1305     * captured by the returned {@link ClosingFuture}).
1306     *
1307     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1308     * fail, so will the returned step.
1309     *
1310     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1311     * cancelled.
1312     *
1313     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1314     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1315     *
1316     * <p>If the combiningCallable throws any other exception, it will be used as the failure of the
1317     * derived step.
1318     *
1319     * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture},
1320     * then none of the closeable objects in that {@code ClosingFuture} will be closed.
1321     *
1322     * <p>Usage guidelines for this method:
1323     *
1324     * <ul>
1325     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1326     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1327     *       Executor)} instead, with a function that returns the next value directly.
1328     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1329     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1330     *       capture it for later closing.
1331     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1332     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1333     * </ul>
1334     *
1335     * <p>The same warnings about doing heavyweight operations within {@link
1336     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1337     */
1338    public <V> ClosingFuture<V> callAsync(
1339        final AsyncCombiningCallable<V> combiningCallable, Executor executor) {
1340      AsyncCallable<V> asyncCallable =
1341          new AsyncCallable<V>() {
1342            @Override
1343            public ListenableFuture<V> call() throws Exception {
1344              return new Peeker(inputs).callAsync(combiningCallable, closeables);
1345            }
1346
1347            @Override
1348            public String toString() {
1349              return combiningCallable.toString();
1350            }
1351          };
1352      ClosingFuture<V> derived =
1353          new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor));
1354      derived.closeables.add(closeables, directExecutor());
1355      return derived;
1356    }
1357
1358    private FutureCombiner<Object> futureCombiner() {
1359      return allMustSucceed
1360          ? Futures.whenAllSucceed(inputFutures())
1361          : Futures.whenAllComplete(inputFutures());
1362    }
1363
1364    private static final Function<ClosingFuture<?>, FluentFuture<?>> INNER_FUTURE =
1365        new Function<ClosingFuture<?>, FluentFuture<?>>() {
1366          @Override
1367          public FluentFuture<?> apply(ClosingFuture<?> future) {
1368            return future.future;
1369          }
1370        };
1371
1372    private ImmutableList<FluentFuture<?>> inputFutures() {
1373      return FluentIterable.from(inputs).transform(INNER_FUTURE).toList();
1374    }
1375  }
1376
1377  /**
1378   * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link
1379   * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this
1380   * combination.
1381   *
1382   * @param <V1> the type returned by the first future
1383   * @param <V2> the type returned by the second future
1384   */
1385  public static final class Combiner2<V1 extends Object, V2 extends Object>
1386      extends Combiner {
1387
1388    /**
1389     * A function that returns a value when applied to the values of the two futures passed to
1390     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1391     *
1392     * @param <V1> the type returned by the first future
1393     * @param <V2> the type returned by the second future
1394     * @param <U> the type returned by the function
1395     */
1396    @FunctionalInterface
1397    public interface ClosingFunction2<
1398        V1 extends Object, V2 extends Object, U extends Object> {
1399
1400      /**
1401       * Applies this function to two inputs, or throws an exception if unable to do so.
1402       *
1403       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1404       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1405       * is done (but not before this method completes), even if this method throws or the pipeline
1406       * is cancelled.
1407       */
1408      U apply(DeferredCloser closer, V1 value1, V2 value2) throws Exception;
1409    }
1410
1411    /**
1412     * A function that returns a {@link ClosingFuture} when applied to the values of the two futures
1413     * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1414     *
1415     * @param <V1> the type returned by the first future
1416     * @param <V2> the type returned by the second future
1417     * @param <U> the type returned by the function
1418     */
1419    @FunctionalInterface
1420    public interface AsyncClosingFunction2<
1421        V1 extends Object, V2 extends Object, U extends Object> {
1422
1423      /**
1424       * Applies this function to two inputs, or throws an exception if unable to do so.
1425       *
1426       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1427       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1428       * is done (but not before this method completes), even if this method throws or the pipeline
1429       * is cancelled.
1430       */
1431      ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2) throws Exception;
1432    }
1433
1434    private final ClosingFuture<V1> future1;
1435    private final ClosingFuture<V2> future2;
1436
1437    private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
1438      super(true, ImmutableList.of(future1, future2));
1439      this.future1 = future1;
1440      this.future2 = future2;
1441    }
1442
1443    /**
1444     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1445     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1446     * objects to be closed when the pipeline is done.
1447     *
1448     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1449     * any of the inputs fail, so will the returned step.
1450     *
1451     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1452     *
1453     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1454     * ExecutionException} will be extracted and used as the failure of the derived step.
1455     */
1456    public <U extends Object> ClosingFuture<U> call(
1457        final ClosingFunction2<V1, V2, U> function, Executor executor) {
1458      return call(
1459          new CombiningCallable<U>() {
1460            @Override
1461            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1462              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1463            }
1464
1465            @Override
1466            public String toString() {
1467              return function.toString();
1468            }
1469          },
1470          executor);
1471    }
1472
1473    /**
1474     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1475     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1476     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1477     * captured by the returned {@link ClosingFuture}).
1478     *
1479     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1480     * any of the inputs fail, so will the returned step.
1481     *
1482     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1483     *
1484     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1485     * ExecutionException} will be extracted and used as the failure of the derived step.
1486     *
1487     * <p>If the function throws any other exception, it will be used as the failure of the derived
1488     * step.
1489     *
1490     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1491     * the closeable objects in that {@code ClosingFuture} will be closed.
1492     *
1493     * <p>Usage guidelines for this method:
1494     *
1495     * <ul>
1496     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1497     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1498     *       Executor)} instead, with a function that returns the next value directly.
1499     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1500     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1501     *       capture it for later closing.
1502     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1503     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1504     * </ul>
1505     *
1506     * <p>The same warnings about doing heavyweight operations within {@link
1507     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1508     */
1509    public <U extends Object> ClosingFuture<U> callAsync(
1510        final AsyncClosingFunction2<V1, V2, U> function, Executor executor) {
1511      return callAsync(
1512          new AsyncCombiningCallable<U>() {
1513            @Override
1514            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1515              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1516            }
1517
1518            @Override
1519            public String toString() {
1520              return function.toString();
1521            }
1522          },
1523          executor);
1524    }
1525  }
1526
1527  /**
1528   * A generic {@link Combiner} that lets you use a lambda or method reference to combine three
1529   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1530   * ClosingFuture)} to start this combination.
1531   *
1532   * @param <V1> the type returned by the first future
1533   * @param <V2> the type returned by the second future
1534   * @param <V3> the type returned by the third future
1535   */
1536  public static final class Combiner3<
1537          V1 extends Object, V2 extends Object, V3 extends Object>
1538      extends Combiner {
1539    /**
1540     * A function that returns a value when applied to the values of the three futures passed to
1541     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1542     *
1543     * @param <V1> the type returned by the first future
1544     * @param <V2> the type returned by the second future
1545     * @param <V3> the type returned by the third future
1546     * @param <U> the type returned by the function
1547     */
1548    @FunctionalInterface
1549    public interface ClosingFunction3<
1550        V1 extends Object,
1551        V2 extends Object,
1552        V3 extends Object,
1553        U extends Object> {
1554      /**
1555       * Applies this function to three inputs, or throws an exception if unable to do so.
1556       *
1557       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1558       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1559       * is done (but not before this method completes), even if this method throws or the pipeline
1560       * is cancelled.
1561       */
1562      U apply(DeferredCloser closer, V1 value1, V2 value2, V3 v3) throws Exception;
1563    }
1564
1565    /**
1566     * A function that returns a {@link ClosingFuture} when applied to the values of the three
1567     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1568     *
1569     * @param <V1> the type returned by the first future
1570     * @param <V2> the type returned by the second future
1571     * @param <V3> the type returned by the third future
1572     * @param <U> the type returned by the function
1573     */
1574    @FunctionalInterface
1575    public interface AsyncClosingFunction3<
1576        V1 extends Object,
1577        V2 extends Object,
1578        V3 extends Object,
1579        U extends Object> {
1580      /**
1581       * Applies this function to three inputs, or throws an exception if unable to do so.
1582       *
1583       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1584       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1585       * is done (but not before this method completes), even if this method throws or the pipeline
1586       * is cancelled.
1587       */
1588      ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3)
1589          throws Exception;
1590    }
1591
1592    private final ClosingFuture<V1> future1;
1593    private final ClosingFuture<V2> future2;
1594    private final ClosingFuture<V3> future3;
1595
1596    private Combiner3(
1597        ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
1598      super(true, ImmutableList.of(future1, future2, future3));
1599      this.future1 = future1;
1600      this.future2 = future2;
1601      this.future3 = future3;
1602    }
1603
1604    /**
1605     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1606     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1607     * objects to be closed when the pipeline is done.
1608     *
1609     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1610     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1611     *
1612     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1613     *
1614     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1615     * ExecutionException} will be extracted and used as the failure of the derived step.
1616     */
1617    public <U extends Object> ClosingFuture<U> call(
1618        final ClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1619      return call(
1620          new CombiningCallable<U>() {
1621            @Override
1622            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1623              return function.apply(
1624                  closer,
1625                  peeker.getDone(future1),
1626                  peeker.getDone(future2),
1627                  peeker.getDone(future3));
1628            }
1629
1630            @Override
1631            public String toString() {
1632              return function.toString();
1633            }
1634          },
1635          executor);
1636    }
1637
1638    /**
1639     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1640     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1641     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1642     * captured by the returned {@link ClosingFuture}).
1643     *
1644     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1645     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1646     *
1647     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1648     *
1649     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1650     * ExecutionException} will be extracted and used as the failure of the derived step.
1651     *
1652     * <p>If the function throws any other exception, it will be used as the failure of the derived
1653     * step.
1654     *
1655     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1656     * the closeable objects in that {@code ClosingFuture} will be closed.
1657     *
1658     * <p>Usage guidelines for this method:
1659     *
1660     * <ul>
1661     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1662     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1663     *       Executor)} instead, with a function that returns the next value directly.
1664     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1665     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1666     *       capture it for later closing.
1667     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1668     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1669     * </ul>
1670     *
1671     * <p>The same warnings about doing heavyweight operations within {@link
1672     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1673     */
1674    public <U extends Object> ClosingFuture<U> callAsync(
1675        final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1676      return callAsync(
1677          new AsyncCombiningCallable<U>() {
1678            @Override
1679            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1680              return function.apply(
1681                  closer,
1682                  peeker.getDone(future1),
1683                  peeker.getDone(future2),
1684                  peeker.getDone(future3));
1685            }
1686
1687            @Override
1688            public String toString() {
1689              return function.toString();
1690            }
1691          },
1692          executor);
1693    }
1694  }
1695
1696  /**
1697   * A generic {@link Combiner} that lets you use a lambda or method reference to combine four
1698   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1699   * ClosingFuture)} to start this combination.
1700   *
1701   * @param <V1> the type returned by the first future
1702   * @param <V2> the type returned by the second future
1703   * @param <V3> the type returned by the third future
1704   * @param <V4> the type returned by the fourth future
1705   */
1706  public static final class Combiner4<
1707          V1 extends Object,
1708          V2 extends Object,
1709          V3 extends Object,
1710          V4 extends Object>
1711      extends Combiner {
1712    /**
1713     * A function that returns a value when applied to the values of the four futures passed to
1714     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}.
1715     *
1716     * @param <V1> the type returned by the first future
1717     * @param <V2> the type returned by the second future
1718     * @param <V3> the type returned by the third future
1719     * @param <V4> the type returned by the fourth future
1720     * @param <U> the type returned by the function
1721     */
1722    @FunctionalInterface
1723    public interface ClosingFunction4<
1724        V1 extends Object,
1725        V2 extends Object,
1726        V3 extends Object,
1727        V4 extends Object,
1728        U extends Object> {
1729      /**
1730       * Applies this function to four inputs, or throws an exception if unable to do so.
1731       *
1732       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1733       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1734       * is done (but not before this method completes), even if this method throws or the pipeline
1735       * is cancelled.
1736       */
1737      U apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4) throws Exception;
1738    }
1739
1740    /**
1741     * A function that returns a {@link ClosingFuture} when applied to the values of the four
1742     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1743     * ClosingFuture)}.
1744     *
1745     * @param <V1> the type returned by the first future
1746     * @param <V2> the type returned by the second future
1747     * @param <V3> the type returned by the third future
1748     * @param <V4> the type returned by the fourth future
1749     * @param <U> the type returned by the function
1750     */
1751    @FunctionalInterface
1752    public interface AsyncClosingFunction4<
1753        V1 extends Object,
1754        V2 extends Object,
1755        V3 extends Object,
1756        V4 extends Object,
1757        U extends Object> {
1758      /**
1759       * Applies this function to four inputs, or throws an exception if unable to do so.
1760       *
1761       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1762       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1763       * is done (but not before this method completes), even if this method throws or the pipeline
1764       * is cancelled.
1765       */
1766      ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4)
1767          throws Exception;
1768    }
1769
1770    private final ClosingFuture<V1> future1;
1771    private final ClosingFuture<V2> future2;
1772    private final ClosingFuture<V3> future3;
1773    private final ClosingFuture<V4> future4;
1774
1775    private Combiner4(
1776        ClosingFuture<V1> future1,
1777        ClosingFuture<V2> future2,
1778        ClosingFuture<V3> future3,
1779        ClosingFuture<V4> future4) {
1780      super(true, ImmutableList.of(future1, future2, future3, future4));
1781      this.future1 = future1;
1782      this.future2 = future2;
1783      this.future3 = future3;
1784      this.future4 = future4;
1785    }
1786
1787    /**
1788     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1789     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1790     * objects to be closed when the pipeline is done.
1791     *
1792     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1793     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1794     *
1795     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1796     *
1797     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1798     * ExecutionException} will be extracted and used as the failure of the derived step.
1799     */
1800    public <U extends Object> ClosingFuture<U> call(
1801        final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1802      return call(
1803          new CombiningCallable<U>() {
1804            @Override
1805            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1806              return function.apply(
1807                  closer,
1808                  peeker.getDone(future1),
1809                  peeker.getDone(future2),
1810                  peeker.getDone(future3),
1811                  peeker.getDone(future4));
1812            }
1813
1814            @Override
1815            public String toString() {
1816              return function.toString();
1817            }
1818          },
1819          executor);
1820    }
1821
1822    /**
1823     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1824     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1825     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1826     * captured by the returned {@link ClosingFuture}).
1827     *
1828     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1829     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1830     *
1831     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1832     *
1833     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1834     * ExecutionException} will be extracted and used as the failure of the derived step.
1835     *
1836     * <p>If the function throws any other exception, it will be used as the failure of the derived
1837     * step.
1838     *
1839     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1840     * the closeable objects in that {@code ClosingFuture} will be closed.
1841     *
1842     * <p>Usage guidelines for this method:
1843     *
1844     * <ul>
1845     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1846     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1847     *       Executor)} instead, with a function that returns the next value directly.
1848     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1849     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1850     *       capture it for later closing.
1851     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1852     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1853     * </ul>
1854     *
1855     * <p>The same warnings about doing heavyweight operations within {@link
1856     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1857     */
1858    public <U extends Object> ClosingFuture<U> callAsync(
1859        final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1860      return callAsync(
1861          new AsyncCombiningCallable<U>() {
1862            @Override
1863            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1864              return function.apply(
1865                  closer,
1866                  peeker.getDone(future1),
1867                  peeker.getDone(future2),
1868                  peeker.getDone(future3),
1869                  peeker.getDone(future4));
1870            }
1871
1872            @Override
1873            public String toString() {
1874              return function.toString();
1875            }
1876          },
1877          executor);
1878    }
1879  }
1880
1881  /**
1882   * A generic {@link Combiner} that lets you use a lambda or method reference to combine five
1883   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1884   * ClosingFuture, ClosingFuture)} to start this combination.
1885   *
1886   * @param <V1> the type returned by the first future
1887   * @param <V2> the type returned by the second future
1888   * @param <V3> the type returned by the third future
1889   * @param <V4> the type returned by the fourth future
1890   * @param <V5> the type returned by the fifth future
1891   */
1892  public static final class Combiner5<
1893          V1 extends Object,
1894          V2 extends Object,
1895          V3 extends Object,
1896          V4 extends Object,
1897          V5 extends Object>
1898      extends Combiner {
1899    /**
1900     * A function that returns a value when applied to the values of the five futures passed to
1901     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture,
1902     * ClosingFuture)}.
1903     *
1904     * @param <V1> the type returned by the first future
1905     * @param <V2> the type returned by the second future
1906     * @param <V3> the type returned by the third future
1907     * @param <V4> the type returned by the fourth future
1908     * @param <V5> the type returned by the fifth future
1909     * @param <U> the type returned by the function
1910     */
1911    @FunctionalInterface
1912    public interface ClosingFunction5<
1913        V1 extends Object,
1914        V2 extends Object,
1915        V3 extends Object,
1916        V4 extends Object,
1917        V5 extends Object,
1918        U extends Object> {
1919      /**
1920       * Applies this function to five inputs, or throws an exception if unable to do so.
1921       *
1922       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1923       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1924       * is done (but not before this method completes), even if this method throws or the pipeline
1925       * is cancelled.
1926       */
1927      U apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4, V5 value5)
1928          throws Exception;
1929    }
1930
1931    /**
1932     * A function that returns a {@link ClosingFuture} when applied to the values of the five
1933     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1934     * ClosingFuture, ClosingFuture)}.
1935     *
1936     * @param <V1> the type returned by the first future
1937     * @param <V2> the type returned by the second future
1938     * @param <V3> the type returned by the third future
1939     * @param <V4> the type returned by the fourth future
1940     * @param <V5> the type returned by the fifth future
1941     * @param <U> the type returned by the function
1942     */
1943    @FunctionalInterface
1944    public interface AsyncClosingFunction5<
1945        V1 extends Object,
1946        V2 extends Object,
1947        V3 extends Object,
1948        V4 extends Object,
1949        V5 extends Object,
1950        U extends Object> {
1951      /**
1952       * Applies this function to five inputs, or throws an exception if unable to do so.
1953       *
1954       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1955       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1956       * is done (but not before this method completes), even if this method throws or the pipeline
1957       * is cancelled.
1958       */
1959      ClosingFuture<U> apply(
1960          DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4, V5 value5)
1961          throws Exception;
1962    }
1963
1964    private final ClosingFuture<V1> future1;
1965    private final ClosingFuture<V2> future2;
1966    private final ClosingFuture<V3> future3;
1967    private final ClosingFuture<V4> future4;
1968    private final ClosingFuture<V5> future5;
1969
1970    private Combiner5(
1971        ClosingFuture<V1> future1,
1972        ClosingFuture<V2> future2,
1973        ClosingFuture<V3> future3,
1974        ClosingFuture<V4> future4,
1975        ClosingFuture<V5> future5) {
1976      super(true, ImmutableList.of(future1, future2, future3, future4, future5));
1977      this.future1 = future1;
1978      this.future2 = future2;
1979      this.future3 = future3;
1980      this.future4 = future4;
1981      this.future5 = future5;
1982    }
1983
1984    /**
1985     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1986     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1987     * objects to be closed when the pipeline is done.
1988     *
1989     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1990     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
1991     * returned step.
1992     *
1993     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1994     *
1995     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1996     * ExecutionException} will be extracted and used as the failure of the derived step.
1997     */
1998    public <U extends Object> ClosingFuture<U> call(
1999        final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2000      return call(
2001          new CombiningCallable<U>() {
2002            @Override
2003            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
2004              return function.apply(
2005                  closer,
2006                  peeker.getDone(future1),
2007                  peeker.getDone(future2),
2008                  peeker.getDone(future3),
2009                  peeker.getDone(future4),
2010                  peeker.getDone(future5));
2011            }
2012
2013            @Override
2014            public String toString() {
2015              return function.toString();
2016            }
2017          },
2018          executor);
2019    }
2020
2021    /**
2022     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
2023     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
2024     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
2025     * captured by the returned {@link ClosingFuture}).
2026     *
2027     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
2028     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
2029     * returned step.
2030     *
2031     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
2032     *
2033     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
2034     * ExecutionException} will be extracted and used as the failure of the derived step.
2035     *
2036     * <p>If the function throws any other exception, it will be used as the failure of the derived
2037     * step.
2038     *
2039     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
2040     * the closeable objects in that {@code ClosingFuture} will be closed.
2041     *
2042     * <p>Usage guidelines for this method:
2043     *
2044     * <ul>
2045     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
2046     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
2047     *       Executor)} instead, with a function that returns the next value directly.
2048     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
2049     *       closer.eventuallyClose()} for every closeable object this step creates in order to
2050     *       capture it for later closing.
2051     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
2052     *       ClosingFuture} call {@link #from(ListenableFuture)}.
2053     * </ul>
2054     *
2055     * <p>The same warnings about doing heavyweight operations within {@link
2056     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
2057     */
2058    public <U extends Object> ClosingFuture<U> callAsync(
2059        final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2060      return callAsync(
2061          new AsyncCombiningCallable<U>() {
2062            @Override
2063            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
2064              return function.apply(
2065                  closer,
2066                  peeker.getDone(future1),
2067                  peeker.getDone(future2),
2068                  peeker.getDone(future3),
2069                  peeker.getDone(future4),
2070                  peeker.getDone(future5));
2071            }
2072
2073            @Override
2074            public String toString() {
2075              return function.toString();
2076            }
2077          },
2078          executor);
2079    }
2080  }
2081
2082  @Override
2083  public String toString() {
2084    // TODO(dpb): Better toString, in the style of Futures.transform etc.
2085    return toStringHelper(this).add("state", state.get()).addValue(future).toString();
2086  }
2087
2088  @Override
2089  protected void finalize() {
2090    if (state.get().equals(OPEN)) {
2091      logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this);
2092      FluentFuture<V> unused = finishToFuture();
2093    }
2094  }
2095
2096  private static void closeQuietly(final AutoCloseable closeable, Executor executor) {
2097    if (closeable == null) {
2098      return;
2099    }
2100    try {
2101      executor.execute(
2102          new Runnable() {
2103            @Override
2104            public void run() {
2105              try {
2106                closeable.close();
2107              } catch (Exception e) {
2108                logger.log(WARNING, "thrown by close()", e);
2109              }
2110            }
2111          });
2112    } catch (RejectedExecutionException e) {
2113      if (logger.isLoggable(WARNING)) {
2114        logger.log(
2115            WARNING, String.format("while submitting close to %s; will close inline", executor), e);
2116      }
2117      closeQuietly(closeable, directExecutor());
2118    }
2119  }
2120
2121  private void checkAndUpdateState(State oldState, State newState) {
2122    checkState(
2123        compareAndUpdateState(oldState, newState),
2124        "Expected state to be %s, but it was %s",
2125        oldState,
2126        newState);
2127  }
2128
2129  private boolean compareAndUpdateState(State oldState, State newState) {
2130    return state.compareAndSet(oldState, newState);
2131  }
2132
2133  // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap?
2134  private static final class CloseableList extends IdentityHashMap<AutoCloseable, Executor>
2135      implements Closeable {
2136    private final DeferredCloser closer = new DeferredCloser(this);
2137    private volatile boolean closed;
2138    private volatile CountDownLatch whenClosed;
2139
2140    <V, U> ListenableFuture<U> applyClosingFunction(
2141        ClosingFunction<? super V, U> transformation, V input) throws Exception {
2142      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2143      CloseableList newCloseables = new CloseableList();
2144      try {
2145        return immediateFuture(transformation.apply(newCloseables.closer, input));
2146      } finally {
2147        add(newCloseables, directExecutor());
2148      }
2149    }
2150
2151    <V, U> FluentFuture<U> applyAsyncClosingFunction(
2152        AsyncClosingFunction<V, U> transformation, V input) throws Exception {
2153      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2154      CloseableList newCloseables = new CloseableList();
2155      try {
2156        ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input);
2157        closingFuture.becomeSubsumedInto(newCloseables);
2158        return closingFuture.future;
2159      } finally {
2160        add(newCloseables, directExecutor());
2161      }
2162    }
2163
2164    @Override
2165    public void close() {
2166      if (closed) {
2167        return;
2168      }
2169      synchronized (this) {
2170        if (closed) {
2171          return;
2172        }
2173        closed = true;
2174      }
2175      for (Map.Entry<AutoCloseable, Executor> entry : entrySet()) {
2176        closeQuietly(entry.getKey(), entry.getValue());
2177      }
2178      clear();
2179      if (whenClosed != null) {
2180        whenClosed.countDown();
2181      }
2182    }
2183
2184    void add(AutoCloseable closeable, Executor executor) {
2185      checkNotNull(executor);
2186      if (closeable == null) {
2187        return;
2188      }
2189      synchronized (this) {
2190        if (!closed) {
2191          put(closeable, executor);
2192          return;
2193        }
2194      }
2195      closeQuietly(closeable, executor);
2196    }
2197
2198    /**
2199     * Returns a latch that reaches zero when this objects' deferred closeables have been closed.
2200     */
2201    CountDownLatch whenClosedCountDown() {
2202      if (closed) {
2203        return new CountDownLatch(0);
2204      }
2205      synchronized (this) {
2206        if (closed) {
2207          return new CountDownLatch(0);
2208        }
2209        checkState(whenClosed == null);
2210        return whenClosed = new CountDownLatch(1);
2211      }
2212    }
2213  }
2214
2215  /**
2216   * Returns an object that can be used to wait until this objects' deferred closeables have all had
2217   * {@link Runnable}s that close them submitted to each one's closing {@link Executor}.
2218   */
2219  @VisibleForTesting
2220  CountDownLatch whenClosedCountDown() {
2221    return closeables.whenClosedCountDown();
2222  }
2223
2224  /** The state of a {@link CloseableList}. */
2225  enum State {
2226    /** The {@link CloseableList} has not been subsumed or closed. */
2227    OPEN,
2228
2229    /**
2230     * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed
2231     * into any other.
2232     */
2233    SUBSUMED,
2234
2235    /**
2236     * Some {@link ListenableFuture} has a callback attached that will close the {@link
2237     * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed.
2238     */
2239    WILL_CLOSE,
2240
2241    /**
2242     * The callback that closes the {@link CloseableList} is running, but it has not completed. The
2243     * {@link CloseableList} may not be subsumed.
2244     */
2245    CLOSING,
2246
2247    /** The {@link CloseableList} has been closed. It may not be further subsumed. */
2248    CLOSED,
2249
2250    /**
2251     * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been
2252     * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called.
2253     */
2254    WILL_CREATE_VALUE_AND_CLOSER,
2255  }
2256}