Skip to content

Commit 8744ab4

Browse files
committed
use snapshot vavr version
1 parent 144df8c commit 8744ab4

File tree

12 files changed

+362
-142
lines changed

12 files changed

+362
-142
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
<assertj.core.version>3.5.2</assertj.core.version>
4141
<eclipse.lifecycle.mapping.version>1.0.0</eclipse.lifecycle.mapping.version>
4242
<java.version>1.8</java.version>
43-
<vavr.version>0.9.2</vavr.version>
43+
<vavr.version>1.0.0-SNAPSHOT</vavr.version>
4444
<junit.version>4.12</junit.version>
4545
<gwt.version>2.8.0</gwt.version>
4646
<maven.bundle.version>3.2.0</maven.bundle.version>

src/main/resources/super/io/vavr/concurrent/CurrentThreadExecutorService.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,21 @@
1-
/* __ __ __ __ __ ___
2-
* \ \ / / \ \ / / __/
3-
* \ \/ / /\ \ \/ / /
4-
* \____/__/ \__\____/__/.ɪᴏ
5-
* ᶜᵒᵖʸʳᶦᵍʰᵗ ᵇʸ ᵛᵃᵛʳ ⁻ ˡᶦᶜᵉⁿˢᵉᵈ ᵘⁿᵈᵉʳ ᵗʰᵉ ᵃᵖᵃᶜʰᵉ ˡᶦᶜᵉⁿˢᵉ ᵛᵉʳˢᶦᵒⁿ ᵗʷᵒ ᵈᵒᵗ ᶻᵉʳᵒ
1+
/* __ __ __ __ __ ___
2+
* \ \ / / \ \ / / __/
3+
* \ \/ / /\ \ \/ / /
4+
* \____/__/ \__\____/__/
5+
*
6+
* Copyright 2014-2017 Vavr, http://vavr.io
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
619
*/
720
package io.vavr.concurrent;
821

src/main/resources/super/io/vavr/concurrent/CurrentThreadFuture.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,21 @@
1-
/* __ __ __ __ __ ___
2-
* \ \ / / \ \ / / __/
3-
* \ \/ / /\ \ \/ / /
4-
* \____/__/ \__\____/__/.ɪᴏ
5-
* ᶜᵒᵖʸʳᶦᵍʰᵗ ᵇʸ ᵛᵃᵛʳ ⁻ ˡᶦᶜᵉⁿˢᵉᵈ ᵘⁿᵈᵉʳ ᵗʰᵉ ᵃᵖᵃᶜʰᵉ ˡᶦᶜᵉⁿˢᵉ ᵛᵉʳˢᶦᵒⁿ ᵗʷᵒ ᵈᵒᵗ ᶻᵉʳᵒ
1+
/* __ __ __ __ __ ___
2+
* \ \ / / \ \ / / __/
3+
* \ \/ / /\ \ \/ / /
4+
* \____/__/ \__\____/__/
5+
*
6+
* Copyright 2014-2017 Vavr, http://vavr.io
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
619
*/
720
package io.vavr.concurrent;
821

Lines changed: 143 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,34 @@
1-
/* __ __ __ __ __ ___
2-
* \ \ / / \ \ / / __/
3-
* \ \/ / /\ \ \/ / /
4-
* \____/__/ \__\____/__/.ɪᴏ
5-
* ᶜᵒᵖʸʳᶦᵍʰᵗ ᵇʸ ᵛᵃᵛʳ ⁻ ˡᶦᶜᵉⁿˢᵉᵈ ᵘⁿᵈᵉʳ ᵗʰᵉ ᵃᵖᵃᶜʰᵉ ˡᶦᶜᵉⁿˢᵉ ᵛᵉʳˢᶦᵒⁿ ᵗʷᵒ ᵈᵒᵗ ᶻᵉʳᵒ
1+
/* __ __ __ __ __ ___
2+
* \ \ / / \ \ / / __/
3+
* \ \/ / /\ \ \/ / /
4+
* \____/__/ \__\____/__/
5+
*
6+
* Copyright 2014-2017 Vavr, http://vavr.io
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
619
*/
720
package io.vavr.concurrent;
821

9-
import io.vavr.CheckedFunction0;
22+
import io.vavr.CheckedConsumer;
23+
import io.vavr.CheckedFunction1;
1024
import io.vavr.collection.Queue;
1125
import io.vavr.control.Option;
1226
import io.vavr.control.Try;
1327

1428
import java.util.Objects;
15-
import java.util.concurrent.CancellationException;
16-
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.*;
1730
import java.util.function.Consumer;
31+
import java.util.function.Predicate;
1832

1933
/**
2034
* GWT emulated version of {@link FutureImpl} with removed uses of Object's wait and notify methods.
@@ -31,18 +45,24 @@ final class FutureImpl<T> implements Future<T> {
3145
*/
3246
private final Object lock = new Object();
3347

48+
/**
49+
* Indicates if this Future is cancelled
50+
*/
51+
@GuardedBy("lock")
52+
private volatile boolean cancelled;
53+
3454
/**
3555
* Once the Future is completed, the value is defined.
3656
*/
3757
@GuardedBy("lock")
38-
private volatile Option<Try<T>> value = Option.none();
58+
private volatile Option<Try<T>> value;
3959

4060
/**
4161
* The queue of actions is filled when calling onComplete() before the Future is completed or cancelled.
4262
* Otherwise actions = null.
4363
*/
4464
@GuardedBy("lock")
45-
private Queue<Consumer<? super Try<T>>> actions = Queue.empty();
65+
private Queue<Consumer<Try<T>>> actions;
4666

4767
/**
4868
* Once a computation is started via run(), job is defined and used to control the lifecycle of the computation.
@@ -51,16 +71,68 @@ final class FutureImpl<T> implements Future<T> {
5171
* {@code value} instead.
5272
*/
5373
@GuardedBy("lock")
54-
private java.util.concurrent.Future<?> job = null;
74+
private java.util.concurrent.Future<?> job;
75+
76+
// single constructor
77+
private FutureImpl(ExecutorService executorService, Option<Try<T>> value, Queue<Consumer<Try<T>>> actions, CheckedFunction1<FutureImpl<T>, java.util.concurrent.Future<?>> jobFactory) {
78+
this.executorService = executorService;
79+
synchronized (lock) {
80+
this.cancelled = false;
81+
this.value = value;
82+
this.actions = actions;
83+
try {
84+
this.job = jobFactory.apply(this);
85+
} catch(Throwable x) {
86+
tryComplete(Try.failure(x));
87+
}
88+
}
89+
}
5590

5691
/**
57-
* Creates a Future, {@link #run(CheckedFunction0)} has to be called separately.
92+
* Creates a {@code FutureImpl} that is immediately completed with the given value. No task will be started.
5893
*
5994
* @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions.
95+
* @param value the result of this Future
6096
*/
61-
FutureImpl(ExecutorService executorService) {
62-
Objects.requireNonNull(executorService, "executorService is null");
63-
this.executorService = executorService;
97+
@SuppressWarnings("unchecked")
98+
static <T> FutureImpl<T> of(ExecutorService executorService, Try<? extends T> value) {
99+
return new FutureImpl<>(executorService, Option.some(Try.narrow(value)), null, ignored -> null);
100+
}
101+
102+
/**
103+
* Creates a {@code FutureImpl} that is eventually completed.
104+
* The given {@code computation} is <em>synchronously</em> executed, no thread is started.
105+
*
106+
* @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions.
107+
* @param computation A non-blocking computation
108+
* @param <T> value type of the Future
109+
* @return a new {@code FutureImpl} instance
110+
*/
111+
static <T> FutureImpl<T> sync(ExecutorService executorService, CheckedConsumer<Predicate<Try<? extends T>>> computation) {
112+
return new FutureImpl<>(executorService, Option.none(), Queue.empty(), future -> {
113+
computation.accept(future::tryComplete);
114+
return null;
115+
});
116+
}
117+
118+
/**
119+
* Creates a {@code FutureImpl} that is eventually completed.
120+
* The given {@code computation} is <em>asynchronously</em> executed, a new thread is started.
121+
*
122+
* @param executorService An {@link ExecutorService} to run and control the computation and to perform the actions.
123+
* @param computation A (possibly blocking) computation
124+
* @param <T> value type of the Future
125+
* @return a new {@code FutureImpl} instance
126+
*/
127+
static <T> FutureImpl<T> async(ExecutorService executorService, CheckedConsumer<Predicate<Try<? extends T>>> computation) {
128+
// In a single-threaded context this Future may already have been completed during initialization.
129+
return new FutureImpl<>(executorService, Option.none(), Queue.empty(), future -> executorService.submit(() -> {
130+
try {
131+
computation.accept(future::tryComplete);
132+
} catch (Throwable x) {
133+
future.tryComplete(Try.failure(x));
134+
}
135+
}));
64136
}
65137

66138
@Override
@@ -69,20 +141,24 @@ public Future<T> await() {
69141
}
70142

71143
@Override
72-
public boolean cancel(boolean mayInterruptIfRunning) {
73-
synchronized (lock) {
74-
if (isCompleted()) {
75-
return false;
76-
} else {
77-
return Try.of(() -> job == null || job.cancel(mayInterruptIfRunning)).onSuccess(cancelled -> {
78-
if (cancelled) {
79-
value = Option.some(Try.failure(new CancellationException()));
80-
actions = null;
81-
job = null;
82-
}
83-
}).getOrElse(false);
144+
public Future<T> await(long timeout, TimeUnit unit) {
145+
return this;
146+
}
147+
148+
@Override
149+
public Future<T> cancel(boolean mayInterruptIfRunning) {
150+
if (!isCompleted()) {
151+
synchronized (lock) {
152+
Try.of(() -> job == null || job.cancel(mayInterruptIfRunning))
153+
.recover(ignored -> job != null && job.isCancelled())
154+
.onSuccess(cancelled -> {
155+
if (cancelled) {
156+
this.cancelled = tryComplete(Try.failure(new CancellationException()));
157+
}
158+
});
84159
}
85160
}
161+
return this;
86162
}
87163

88164
@Override
@@ -95,11 +171,17 @@ public Option<Try<T>> getValue() {
95171
return value;
96172
}
97173

174+
@Override
175+
public boolean isCancelled() {
176+
return cancelled;
177+
}
178+
98179
@Override
99180
public boolean isCompleted() {
100181
return value.isDefined();
101182
}
102183

184+
@SuppressWarnings("unchecked")
103185
@Override
104186
public Future<T> onComplete(Consumer<? super Try<T>> action) {
105187
Objects.requireNonNull(action, "action is null");
@@ -110,7 +192,7 @@ public Future<T> onComplete(Consumer<? super Try<T>> action) {
110192
if (isCompleted()) {
111193
perform(action);
112194
} else {
113-
actions = actions.enqueue(action);
195+
actions = actions.enqueue((Consumer<Try<T>>) action);
114196
}
115197
}
116198
}
@@ -122,82 +204,54 @@ public Future<T> onComplete(Consumer<? super Try<T>> action) {
122204

123205
@Override
124206
public String toString() {
125-
return stringPrefix() + "(" + value.map(String::valueOf).getOrElse("?") + ")";
207+
final Option<Try<T>> value = this.value;
208+
final String s = (value == null || value.isEmpty()) ? "?" : value.get().toString();
209+
return stringPrefix() + "(" + s + ")";
126210
}
127211

128212
/**
129-
* Runs a computation using the underlying ExecutorService.
213+
* INTERNAL METHOD, SHOULD BE USED BY THE CONSTRUCTOR, ONLY.
130214
* <p>
131-
* DEV-NOTE: Internally this method is called by the static {@code Future} factory methods.
132-
*
133-
* @throws IllegalStateException if the Future is pending, completed or cancelled
134-
* @throws NullPointerException if {@code computation} is null.
135-
*/
136-
void run(CheckedFunction0<? extends T> computation) {
137-
Objects.requireNonNull(computation, "computation is null");
138-
synchronized (lock) {
139-
if (job != null) {
140-
throw new IllegalStateException("The Future is already running.");
141-
}
142-
if (isCompleted()) {
143-
throw new IllegalStateException("The Future is completed.");
144-
}
145-
try {
146-
// if the ExecutorService runs the computation
147-
// - in a different thread, the lock ensures that the job is assigned before the computation completes
148-
// - in the current thread, the job is already completed and the `job` variable remains null
149-
final java.util.concurrent.Future<?> tmpJob = executorService.submit(() -> complete(Try.of(computation)));
150-
if (!isCompleted()) {
151-
job = tmpJob;
152-
}
153-
} catch (Throwable t) {
154-
// ensures that the Future completes if the `executorService.submit()` method throws
155-
if (!isCompleted()) {
156-
complete(Try.failure(t));
157-
}
158-
}
159-
}
160-
}
161-
162-
boolean tryComplete(Try<? extends T> value) {
163-
Objects.requireNonNull(value, "value is null");
164-
synchronized (lock) {
165-
if (isCompleted()) {
166-
return false;
167-
} else {
168-
complete(value);
169-
return true;
170-
}
171-
}
172-
}
173-
174-
/**
175-
* Completes this Future with a value.
215+
* Completes this Future with a value and performs all actions.
176216
* <p>
177-
* DEV-NOTE: Internally this method is called by the {@code Future.run()} method and by {@code Promise}.
217+
* This method is idempotent. I.e. it does nothing, if this Future is already completed.
178218
*
179219
* @param value A Success containing a result or a Failure containing an Exception.
180-
* @return The given {@code value} for convenience purpose.
181220
* @throws IllegalStateException if the Future is already completed or cancelled.
182221
* @throws NullPointerException if the given {@code value} is null.
183222
*/
184-
private void complete(Try<? extends T> value) {
223+
private boolean tryComplete(Try<? extends T> value) {
185224
Objects.requireNonNull(value, "value is null");
186-
final Queue<Consumer<? super Try<T>>> actions;
187-
// it is essential to make the completed state public *before* performing the actions
188-
synchronized (lock) {
189-
if (isCompleted()) {
190-
throw new IllegalStateException("The Future is completed.");
225+
if (isCompleted()) {
226+
return false;
227+
} else {
228+
final Queue<Consumer<Try<T>>> actions;
229+
// it is essential to make the completed state public *before* performing the actions
230+
synchronized (lock) {
231+
if (isCompleted()) {
232+
actions = null;
233+
} else {
234+
// the job isn't set to null, see isCancelled()
235+
actions = this.actions;
236+
this.value = Option.some(Try.narrow(value));
237+
this.actions = null;
238+
this.job = null;
239+
}
240+
}
241+
if (actions != null) {
242+
actions.forEach(this::perform);
243+
return true;
244+
} else {
245+
return false;
191246
}
192-
actions = this.actions;
193-
this.value = Option.some(Try.narrow(value));
194-
this.actions = null;
195-
this.job = null;
196247
}
197-
actions.forEach(this::perform);
198248
}
199249

200250
private void perform(Consumer<? super Try<T>> action) {
201-
Try.run(() -> executorService.execute(() -> action.accept(value.get())));
251+
try {
252+
executorService.execute(() -> action.accept(value.get()));
253+
} catch(Throwable x) {
254+
// ignored // TODO: tell UncaughtExceptionHandler?
255+
}
202256
}
203257
}

0 commit comments

Comments
 (0)