1- /* __ __ __ __ __ ___
2- * \ \ / / \ \ / / __/
3- * \ \/ / /\ \ \/ / /
4- * \____/__/ \__\____/__/.ɪᴏ
5- * ᶜᵒᵖʸʳᶦᵍʰᵗ ᵇʸ ᵛᵃᵛʳ ⁻ ˡᶦᶜᵉⁿˢᵉᵈ ᵘⁿᵈᵉʳ ᵗʰᵉ ᵃᵖᵃᶜʰᵉ ˡᶦᶜᵉⁿˢᵉ ᵛᵉʳˢᶦᵒⁿ ᵗʷᵒ ᵈᵒᵗ ᶻᵉʳᵒ
1+ /* __ __ __ __ __ ___
2+ * \ \ / / \ \ / / __/
3+ * \ \/ / /\ \ \/ / /
4+ * \____/__/ \__\____/__/
5+ *
6+ * Copyright 2014-2017 Vavr, http://vavr.io
7+ *
8+ * Licensed under the Apache License, Version 2.0 (the "License");
9+ * you may not use this file except in compliance with the License.
10+ * You may obtain a copy of the License at
11+ *
12+ * http://www.apache.org/licenses/LICENSE-2.0
13+ *
14+ * Unless required by applicable law or agreed to in writing, software
15+ * distributed under the License is distributed on an "AS IS" BASIS,
16+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+ * See the License for the specific language governing permissions and
18+ * limitations under the License.
619 */
720package io .vavr .concurrent ;
821
9- import io .vavr .CheckedFunction0 ;
22+ import io .vavr .CheckedConsumer ;
23+ import io .vavr .CheckedFunction1 ;
1024import io .vavr .collection .Queue ;
1125import io .vavr .control .Option ;
1226import io .vavr .control .Try ;
1327
1428import java .util .Objects ;
15- import java .util .concurrent .CancellationException ;
16- import java .util .concurrent .ExecutorService ;
29+ import java .util .concurrent .*;
1730import java .util .function .Consumer ;
31+ import java .util .function .Predicate ;
1832
1933/**
2034 * GWT emulated version of {@link FutureImpl} with removed uses of Object's wait and notify methods.
@@ -31,18 +45,24 @@ final class FutureImpl<T> implements Future<T> {
3145 */
3246 private final Object lock = new Object ();
3347
48+ /**
49+ * Indicates if this Future is cancelled
50+ */
51+ @ GuardedBy ("lock" )
52+ private volatile boolean cancelled ;
53+
3454 /**
3555 * Once the Future is completed, the value is defined.
3656 */
3757 @ GuardedBy ("lock" )
38- private volatile Option <Try <T >> value = Option . none () ;
58+ private volatile Option <Try <T >> value ;
3959
4060 /**
4161 * The queue of actions is filled when calling onComplete() before the Future is completed or cancelled.
4262 * Otherwise actions = null.
4363 */
4464 @ GuardedBy ("lock" )
45- private Queue <Consumer <? super Try <T >>> actions = Queue . empty () ;
65+ private Queue <Consumer <Try <T >>> actions ;
4666
4767 /**
4868 * Once a computation is started via run(), job is defined and used to control the lifecycle of the computation.
@@ -51,16 +71,68 @@ final class FutureImpl<T> implements Future<T> {
5171 * {@code value} instead.
5272 */
5373 @ GuardedBy ("lock" )
54- private java .util .concurrent .Future <?> job = null ;
74+ private java .util .concurrent .Future <?> job ;
75+
76+ // single constructor
77+ private FutureImpl (ExecutorService executorService , Option <Try <T >> value , Queue <Consumer <Try <T >>> actions , CheckedFunction1 <FutureImpl <T >, java .util .concurrent .Future <?>> jobFactory ) {
78+ this .executorService = executorService ;
79+ synchronized (lock ) {
80+ this .cancelled = false ;
81+ this .value = value ;
82+ this .actions = actions ;
83+ try {
84+ this .job = jobFactory .apply (this );
85+ } catch (Throwable x ) {
86+ tryComplete (Try .failure (x ));
87+ }
88+ }
89+ }
5590
5691 /**
57- * Creates a Future, {@link #run(CheckedFunction0)} has to be called separately .
92+ * Creates a {@code FutureImpl} that is immediately completed with the given value. No task will be started .
5893 *
5994 * @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions.
95+ * @param value the result of this Future
6096 */
61- FutureImpl (ExecutorService executorService ) {
62- Objects .requireNonNull (executorService , "executorService is null" );
63- this .executorService = executorService ;
97+ @ SuppressWarnings ("unchecked" )
98+ static <T > FutureImpl <T > of (ExecutorService executorService , Try <? extends T > value ) {
99+ return new FutureImpl <>(executorService , Option .some (Try .narrow (value )), null , ignored -> null );
100+ }
101+
102+ /**
103+ * Creates a {@code FutureImpl} that is eventually completed.
104+ * The given {@code computation} is <em>synchronously</em> executed, no thread is started.
105+ *
106+ * @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions.
107+ * @param computation A non-blocking computation
108+ * @param <T> value type of the Future
109+ * @return a new {@code FutureImpl} instance
110+ */
111+ static <T > FutureImpl <T > sync (ExecutorService executorService , CheckedConsumer <Predicate <Try <? extends T >>> computation ) {
112+ return new FutureImpl <>(executorService , Option .none (), Queue .empty (), future -> {
113+ computation .accept (future ::tryComplete );
114+ return null ;
115+ });
116+ }
117+
118+ /**
119+ * Creates a {@code FutureImpl} that is eventually completed.
120+ * The given {@code computation} is <em>asynchronously</em> executed, a new thread is started.
121+ *
122+ * @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions.
123+ * @param computation A (possibly blocking) computation
124+ * @param <T> value type of the Future
125+ * @return a new {@code FutureImpl} instance
126+ */
127+ static <T > FutureImpl <T > async (ExecutorService executorService , CheckedConsumer <Predicate <Try <? extends T >>> computation ) {
128+ // In a single-threaded context this Future may already have been completed during initialization.
129+ return new FutureImpl <>(executorService , Option .none (), Queue .empty (), future -> executorService .submit (() -> {
130+ try {
131+ computation .accept (future ::tryComplete );
132+ } catch (Throwable x ) {
133+ future .tryComplete (Try .failure (x ));
134+ }
135+ }));
64136 }
65137
66138 @ Override
@@ -69,20 +141,24 @@ public Future<T> await() {
69141 }
70142
71143 @ Override
72- public boolean cancel (boolean mayInterruptIfRunning ) {
73- synchronized (lock ) {
74- if (isCompleted ()) {
75- return false ;
76- } else {
77- return Try .of (() -> job == null || job .cancel (mayInterruptIfRunning )).onSuccess (cancelled -> {
78- if (cancelled ) {
79- value = Option .some (Try .failure (new CancellationException ()));
80- actions = null ;
81- job = null ;
82- }
83- }).getOrElse (false );
144+ public Future <T > await (long timeout , TimeUnit unit ) {
145+ return this ;
146+ }
147+
148+ @ Override
149+ public Future <T > cancel (boolean mayInterruptIfRunning ) {
150+ if (!isCompleted ()) {
151+ synchronized (lock ) {
152+ Try .of (() -> job == null || job .cancel (mayInterruptIfRunning ))
153+ .recover (ignored -> job != null && job .isCancelled ())
154+ .onSuccess (cancelled -> {
155+ if (cancelled ) {
156+ this .cancelled = tryComplete (Try .failure (new CancellationException ()));
157+ }
158+ });
84159 }
85160 }
161+ return this ;
86162 }
87163
88164 @ Override
@@ -95,11 +171,17 @@ public Option<Try<T>> getValue() {
95171 return value ;
96172 }
97173
174+ @ Override
175+ public boolean isCancelled () {
176+ return cancelled ;
177+ }
178+
98179 @ Override
99180 public boolean isCompleted () {
100181 return value .isDefined ();
101182 }
102183
184+ @ SuppressWarnings ("unchecked" )
103185 @ Override
104186 public Future <T > onComplete (Consumer <? super Try <T >> action ) {
105187 Objects .requireNonNull (action , "action is null" );
@@ -110,7 +192,7 @@ public Future<T> onComplete(Consumer<? super Try<T>> action) {
110192 if (isCompleted ()) {
111193 perform (action );
112194 } else {
113- actions = actions .enqueue (action );
195+ actions = actions .enqueue (( Consumer < Try < T >>) action );
114196 }
115197 }
116198 }
@@ -122,82 +204,54 @@ public Future<T> onComplete(Consumer<? super Try<T>> action) {
122204
123205 @ Override
124206 public String toString () {
125- return stringPrefix () + "(" + value .map (String ::valueOf ).getOrElse ("?" ) + ")" ;
207+ final Option <Try <T >> value = this .value ;
208+ final String s = (value == null || value .isEmpty ()) ? "?" : value .get ().toString ();
209+ return stringPrefix () + "(" + s + ")" ;
126210 }
127211
128212 /**
129- * Runs a computation using the underlying ExecutorService .
213+ * INTERNAL METHOD, SHOULD BE USED BY THE CONSTRUCTOR, ONLY .
130214 * <p>
131- * DEV-NOTE: Internally this method is called by the static {@code Future} factory methods.
132- *
133- * @throws IllegalStateException if the Future is pending, completed or cancelled
134- * @throws NullPointerException if {@code computation} is null.
135- */
136- void run (CheckedFunction0 <? extends T > computation ) {
137- Objects .requireNonNull (computation , "computation is null" );
138- synchronized (lock ) {
139- if (job != null ) {
140- throw new IllegalStateException ("The Future is already running." );
141- }
142- if (isCompleted ()) {
143- throw new IllegalStateException ("The Future is completed." );
144- }
145- try {
146- // if the ExecutorService runs the computation
147- // - in a different thread, the lock ensures that the job is assigned before the computation completes
148- // - in the current thread, the job is already completed and the `job` variable remains null
149- final java .util .concurrent .Future <?> tmpJob = executorService .submit (() -> complete (Try .of (computation )));
150- if (!isCompleted ()) {
151- job = tmpJob ;
152- }
153- } catch (Throwable t ) {
154- // ensures that the Future completes if the `executorService.submit()` method throws
155- if (!isCompleted ()) {
156- complete (Try .failure (t ));
157- }
158- }
159- }
160- }
161-
162- boolean tryComplete (Try <? extends T > value ) {
163- Objects .requireNonNull (value , "value is null" );
164- synchronized (lock ) {
165- if (isCompleted ()) {
166- return false ;
167- } else {
168- complete (value );
169- return true ;
170- }
171- }
172- }
173-
174- /**
175- * Completes this Future with a value.
215+ * Completes this Future with a value and performs all actions.
176216 * <p>
177- * DEV-NOTE: Internally this method is called by the {@code Future.run()} method and by {@code Promise} .
217+ * This method is idempotent. I.e. it does nothing, if this Future is already completed .
178218 *
179219 * @param value A Success containing a result or a Failure containing an Exception.
180- * @return The given {@code value} for convenience purpose.
181220 * @throws IllegalStateException if the Future is already completed or cancelled.
182221 * @throws NullPointerException if the given {@code value} is null.
183222 */
184- private void complete (Try <? extends T > value ) {
223+ private boolean tryComplete (Try <? extends T > value ) {
185224 Objects .requireNonNull (value , "value is null" );
186- final Queue <Consumer <? super Try <T >>> actions ;
187- // it is essential to make the completed state public *before* performing the actions
188- synchronized (lock ) {
189- if (isCompleted ()) {
190- throw new IllegalStateException ("The Future is completed." );
225+ if (isCompleted ()) {
226+ return false ;
227+ } else {
228+ final Queue <Consumer <Try <T >>> actions ;
229+ // it is essential to make the completed state public *before* performing the actions
230+ synchronized (lock ) {
231+ if (isCompleted ()) {
232+ actions = null ;
233+ } else {
234+ // the job isn't set to null, see isCancelled()
235+ actions = this .actions ;
236+ this .value = Option .some (Try .narrow (value ));
237+ this .actions = null ;
238+ this .job = null ;
239+ }
240+ }
241+ if (actions != null ) {
242+ actions .forEach (this ::perform );
243+ return true ;
244+ } else {
245+ return false ;
191246 }
192- actions = this .actions ;
193- this .value = Option .some (Try .narrow (value ));
194- this .actions = null ;
195- this .job = null ;
196247 }
197- actions .forEach (this ::perform );
198248 }
199249
200250 private void perform (Consumer <? super Try <T >> action ) {
201- Try .run (() -> executorService .execute (() -> action .accept (value .get ())));
251+ try {
252+ executorService .execute (() -> action .accept (value .get ()));
253+ } catch (Throwable x ) {
254+ // ignored // TODO: tell UncaughtExceptionHandler?
255+ }
202256 }
203257}
0 commit comments