@@ -38,7 +38,7 @@ final class FutureImpl<T> implements Future<T> {
3838 /**
3939 * Used to start new threads.
4040 */
41- private final ExecutorService executorService ;
41+ private final Executor executor ;
4242
4343 /**
4444 * Used to synchronize state changes.
@@ -64,24 +64,15 @@ final class FutureImpl<T> implements Future<T> {
6464 @ GuardedBy ("lock" )
6565 private Queue <Consumer <Try <T >>> actions ;
6666
67- /**
68- * Once a computation is started via run(), job is defined and used to control the lifecycle of the computation.
69- * <p>
70- * The {@code java.util.concurrent.Future} is not intended to store the result of the computation, it is stored in
71- * {@code value} instead.
72- */
73- @ GuardedBy ("lock" )
74- private java .util .concurrent .Future <?> job ;
75-
7667 // single constructor
77- private FutureImpl (ExecutorService executorService , Option <Try <T >> value , Queue <Consumer <Try <T >>> actions , CheckedFunction1 < FutureImpl < T >, java . util . concurrent . Future <?>> jobFactory ) {
78- this .executorService = executorService ;
68+ private FutureImpl (Executor executor , Option <Try <T >> value , Queue <Consumer <Try <T >>> actions , Computation < T > computation ) {
69+ this .executor = executor ;
7970 synchronized (lock ) {
8071 this .cancelled = false ;
8172 this .value = value ;
8273 this .actions = actions ;
8374 try {
84- this . job = jobFactory . apply (this );
75+ computation . execute (this :: tryComplete , this :: updateThread );
8576 } catch (Throwable x ) {
8677 tryComplete (Try .failure (x ));
8778 }
@@ -91,46 +82,44 @@ private FutureImpl(ExecutorService executorService, Option<Try<T>> value, Queue<
9182 /**
9283 * Creates a {@code FutureImpl} that is immediately completed with the given value. No task will be started.
9384 *
94- * @param executorService An {@link ExecutorService } to run and control the computation and to perform the actions.
85+ * @param executor An {@link Executor } to run and control the computation and to perform the actions.
9586 * @param value the result of this Future
9687 */
9788 @ SuppressWarnings ("unchecked" )
98- static <T > FutureImpl <T > of (ExecutorService executorService , Try <? extends T > value ) {
99- return new FutureImpl <>(executorService , Option .some (Try .narrow (value )), null , ignored -> null );
89+ static <T > FutureImpl <T > of (Executor executor , Try <? extends T > value ) {
90+ return new FutureImpl <>(executor , Option .some (Try .narrow (value )), null , ( tryComplete , updateThread ) -> {} );
10091 }
10192
10293 /**
10394 * Creates a {@code FutureImpl} that is eventually completed.
10495 * The given {@code computation} is <em>synchronously</em> executed, no thread is started.
10596 *
106- * @param executorService An {@link ExecutorService } to run and control the computation and to perform the actions.
97+ * @param executor An {@link Executor } to run and control the computation and to perform the actions.
10798 * @param computation A non-blocking computation
10899 * @param <T> value type of the Future
109100 * @return a new {@code FutureImpl} instance
110101 */
111- static <T > FutureImpl <T > sync (ExecutorService executorService , CheckedConsumer <Predicate <Try <? extends T >>> computation ) {
112- return new FutureImpl <>(executorService , Option .none (), Queue .empty (), future -> {
113- computation .accept (future ::tryComplete );
114- return null ;
102+ static <T > FutureImpl <T > sync (Executor executor , CheckedConsumer <Predicate <Try <? extends T >>> computation ) {
103+ return new FutureImpl <>(executor , Option .none (), Queue .empty (), (tryComplete , updateThread ) -> {
104+ computation .accept (tryComplete );
115105 });
116106 }
117107
118108 /**
119109 * Creates a {@code FutureImpl} that is eventually completed.
120110 * The given {@code computation} is <em>asynchronously</em> executed, a new thread is started.
121111 *
122- * @param executorService An {@link ExecutorService } to run and control the computation and to perform the actions.
112+ * @param executor An {@link Executor } to run and control the computation and to perform the actions.
123113 * @param computation A (possibly blocking) computation
124114 * @param <T> value type of the Future
125115 * @return a new {@code FutureImpl} instance
126116 */
127- static <T > FutureImpl <T > async (ExecutorService executorService , CheckedConsumer <Predicate <Try <? extends T >>> computation ) {
128- // In a single-threaded context this Future may already have been completed during initialization.
129- return new FutureImpl <>(executorService , Option .none (), Queue .empty (), future -> executorService .submit (() -> {
117+ static <T > FutureImpl <T > async (Executor executor , CheckedConsumer <Predicate <Try <? extends T >>> computation ) {
118+ return new FutureImpl <>(executor , Option .none (), Queue .empty (), (tryComplete , updateThread ) -> executor .execute (() -> {
130119 try {
131- computation .accept (future :: tryComplete );
120+ computation .accept (tryComplete );
132121 } catch (Throwable x ) {
133- future . tryComplete (Try .failure (x ));
122+ tryComplete . test (Try .failure (x ));
134123 }
135124 }));
136125 }
@@ -148,22 +137,17 @@ public Future<T> await(long timeout, TimeUnit unit) {
148137 @ Override
149138 public Future <T > cancel (boolean mayInterruptIfRunning ) {
150139 if (!isCompleted ()) {
151- synchronized (lock ) {
152- Try .of (() -> job == null || job .cancel (mayInterruptIfRunning ))
153- .recover (ignored -> job != null && job .isCancelled ())
154- .onSuccess (cancelled -> {
155- if (cancelled ) {
156- this .cancelled = tryComplete (Try .failure (new CancellationException ()));
157- }
158- });
159- }
140+ this .cancelled = tryComplete (Try .failure (new CancellationException ()));
160141 }
161142 return this ;
162143 }
163144
145+ private void updateThread () {
146+ }
147+
164148 @ Override
165- public ExecutorService executorService () {
166- return executorService ;
149+ public Executor executor () {
150+ return executor ;
167151 }
168152
169153 @ Override
@@ -235,7 +219,6 @@ private boolean tryComplete(Try<? extends T> value) {
235219 actions = this .actions ;
236220 this .value = Option .some (Try .narrow (value ));
237221 this .actions = null ;
238- this .job = null ;
239222 }
240223 }
241224 if (actions != null ) {
@@ -249,9 +232,13 @@ private boolean tryComplete(Try<? extends T> value) {
249232
250233 private void perform (Consumer <? super Try <T >> action ) {
251234 try {
252- executorService .execute (() -> action .accept (value .get ()));
235+ executor .execute (() -> action .accept (value .get ()));
253236 } catch (Throwable x ) {
254237 // ignored // TODO: tell UncaughtExceptionHandler?
255238 }
256239 }
240+
241+ private interface Computation <T > {
242+ void execute (Predicate <Try <? extends T >> tryComplete , Runnable updateThread ) throws Throwable ;
243+ }
257244}
0 commit comments