Skip to content

Commit b16eda9

Browse files
committed
fix AsyncUtils
1 parent 74c0035 commit b16eda9

File tree

4 files changed

+317
-64
lines changed

4 files changed

+317
-64
lines changed

src/main/java/ch/petikoch/examples/mvvm_rxjava/utils/AsyncUtils.java

Lines changed: 40 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -19,42 +19,49 @@
1919
import rx.Single;
2020
import rx.Subscription;
2121
import rx.schedulers.Schedulers;
22-
import rx.subjects.ReplaySubject;
22+
import rx.subjects.AsyncSubject;
2323

2424
import java.util.concurrent.Callable;
2525

26+
/**
27+
* Something like <a href="https://github.com/ReactiveX/RxJavaAsyncUtil/blob/0.x/src/main/java/rx/util/async/Async.java">RxJavaAsyncUtil</a>
28+
* but with {@link Single} as return types.
29+
*/
2630
public class AsyncUtils {
2731

32+
/**
33+
* @see #executeAsync(Callable)
34+
*/
2835
// experimental
2936
public static Single<FinishedIndicator> executeAsync(Runnable runnable) {
30-
ReplaySubject<FinishedIndicator> finished = ReplaySubject.create();
31-
32-
final Subscription asyncOp = Single.<FinishedIndicator>create(singleSubscriber -> {
33-
try {
34-
runnable.run();
35-
if (!singleSubscriber.isUnsubscribed()) {
36-
singleSubscriber.onSuccess(FinishedIndicator.INSTANCE);
37-
}
38-
} catch (Exception e) {
39-
if (!singleSubscriber.isUnsubscribed()) {
40-
singleSubscriber.onError(e);
41-
}
42-
}
43-
}).subscribeOn(Schedulers.io()).subscribe(
44-
finishedIndicator -> {
45-
finished.onNext(finishedIndicator);
46-
finished.onCompleted();
47-
},
48-
finished::onError
49-
);
50-
51-
return finished.share().doOnUnsubscribe(asyncOp::unsubscribe).toSingle();
37+
return executeAsync(() -> {
38+
runnable.run();
39+
return FinishedIndicator.INSTANCE;
40+
});
5241
}
5342

43+
/**
44+
* Starts immediately the execution of the given Callable with a thread from
45+
* {@link Schedulers#io()}.
46+
*
47+
* The caller can await the execution termination through subscribing to the {@link Single} return value.
48+
* It's safe to "share" the {@link Single} return value reference and subscribe to it as many times as you want.
49+
* All subscribers get the result value (or the error) individually.
50+
*
51+
* Cancellation? The execution is automatically cancelled when all subscribers do unsubscribe while
52+
* the execution is still running. The given {@link Callable} will be interrupted.
53+
*
54+
* If there is no subscriber ever to the {@link Single} return value, the callable will be executed unobserved.
55+
* Make sure to have some kind of "exception handling" also for that case (like try-catch-logging blocks or
56+
* {@link Thread.UncaughtExceptionHandler}) to not "miss" issues.
57+
*
58+
* @param callable the code to execute
59+
* @param <T> type of result
60+
* @return Single instance delivering asynchronously the result of the callable
61+
*/
5462
// experimental
5563
public static <T> Single<T> executeAsync(Callable<T> callable) {
56-
ReplaySubject<T> finished = ReplaySubject.create();
57-
64+
AsyncSubject<T> resultSubject = AsyncSubject.create();
5865
final Subscription asyncOp = Single.<T>create(singleSubscriber -> {
5966
try {
6067
T result = callable.call();
@@ -67,46 +74,16 @@ public static <T> Single<T> executeAsync(Callable<T> callable) {
6774
}
6875
}
6976
}).subscribeOn(Schedulers.io()).subscribe(
70-
finishedIndicator -> {
71-
finished.onNext(finishedIndicator);
72-
finished.onCompleted();
77+
t -> {
78+
resultSubject.onNext(t);
79+
resultSubject.onCompleted();
7380
},
74-
finished::onError
75-
);
76-
77-
return finished.share().doOnUnsubscribe(asyncOp::unsubscribe).toSingle();
78-
}
79-
80-
// experimental
81-
public static Single<FinishedIndicator> executeLazyAsync(Runnable runnable) {
82-
return Single.<FinishedIndicator>create(singleSubscriber -> {
83-
try {
84-
runnable.run();
85-
if (!singleSubscriber.isUnsubscribed()) {
86-
singleSubscriber.onSuccess(FinishedIndicator.INSTANCE);
87-
}
88-
} catch (Exception e) {
89-
if (!singleSubscriber.isUnsubscribed()) {
90-
singleSubscriber.onError(e);
81+
throwable -> {
82+
resultSubject.onError(throwable);
83+
resultSubject.onCompleted();
9184
}
92-
}
93-
}).subscribeOn(Schedulers.io()).toObservable().share().toSingle();
94-
}
95-
96-
// experimental
97-
public static <T> Single<T> executeLazyAsync(Callable<T> callable) {
98-
return Single.<T>create(singleSubscriber -> {
99-
try {
100-
T result = callable.call();
101-
if (!singleSubscriber.isUnsubscribed()) {
102-
singleSubscriber.onSuccess(result);
103-
}
104-
} catch (Exception e) {
105-
if (!singleSubscriber.isUnsubscribed()) {
106-
singleSubscriber.onError(e);
107-
}
108-
}
109-
}).subscribeOn(Schedulers.io()).toObservable().share().toSingle();
85+
);
86+
return resultSubject.doOnUnsubscribe(asyncOp::unsubscribe).share().toSingle();
11087
}
11188
}
11289

src/test/groovy/ch/petikoch/examples/mvvm_rxjava/AwaitUtils.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package ch.petikoch.examples.mvvm_rxjava
1717

1818
class AwaitUtils {
1919

20-
static void await(Closure<Boolean> condition) {
20+
static void awaitUntil(Closure<Boolean> condition) {
2121
while (!condition.call()) {
2222
Thread.sleep(1)
2323
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* Copyright 2015 Peti Koch
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package ch.petikoch.examples.mvvm_rxjava;
17+
18+
import com.google.common.base.Stopwatch;
19+
import com.google.common.base.Throwables;
20+
import org.apache.commons.lang3.builder.ToStringBuilder;
21+
import org.apache.commons.lang3.builder.ToStringStyle;
22+
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.TimeoutException;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
27+
/**
28+
* A utility class for testing multi-threaded code.
29+
*
30+
* What's the difference e.g. to a {@link java.util.concurrent.CountDownLatch}?
31+
*
32+
* In simple testcases one {@link java.util.concurrent.CountDownLatch} instance is typically
33+
* fine and perfect to coordinate 2 or more threads.
34+
* But if you have more than one {@link java.util.concurrent.CountDownLatch} in a testcase,
35+
* it's probably easier to use <b>one</b> {@link TestingClock} instead.
36+
*/
37+
public class TestingClock {
38+
39+
private final AtomicLong internalTime;
40+
41+
public TestingClock() {
42+
this(0);
43+
}
44+
45+
public TestingClock(long startTime) {
46+
internalTime = new AtomicLong(startTime);
47+
}
48+
49+
public void advanceTime() {
50+
long time = internalTime.incrementAndGet();
51+
System.out.println("Time is now: " + time + ". The time was advanced by '" + Thread.currentThread().getName() + "'.");
52+
}
53+
54+
public long getTime() {
55+
return internalTime.get();
56+
}
57+
58+
public void awaitTime(long time) {
59+
try {
60+
awaitTime(time, Long.MAX_VALUE);
61+
} catch (TimeoutException e) {
62+
throw Throwables.propagate(e);
63+
}
64+
}
65+
66+
public void awaitTime(long time, long timeoutMs) throws TimeoutException {
67+
System.out.println("Thread '" + Thread.currentThread().getName() + "' is waiting for time: " + time);
68+
69+
Stopwatch stopWatch = Stopwatch.createStarted();
70+
Stopwatch showLifeSignStopWatch = Stopwatch.createStarted();
71+
72+
while (internalTime.get() < time) {
73+
sleep_a_little_while();
74+
checkTimeout(timeoutMs, stopWatch);
75+
if (showLifeSignStopWatch.elapsed(TimeUnit.SECONDS) >= 1) {
76+
showLifeSignStopWatch = Stopwatch.createStarted();
77+
System.out.println("Thread '" + Thread.currentThread().getName() + "' is still waiting for time: " + time);
78+
}
79+
}
80+
81+
System.out.println("Time " + time + " arrived for Thread '" + Thread.currentThread().getName() + "'");
82+
}
83+
84+
private void checkTimeout(long timeoutMs, Stopwatch stopWatch) throws TimeoutException {
85+
if (stopWatch.elapsed(TimeUnit.MILLISECONDS) >= timeoutMs) {
86+
throw new TimeoutException();
87+
}
88+
}
89+
90+
private void sleep_a_little_while() {
91+
try {
92+
Thread.sleep(1);
93+
} catch (InterruptedException e) {
94+
throw Throwables.propagate(e);
95+
}
96+
}
97+
98+
@Override
99+
public String toString() {
100+
return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
101+
}
102+
103+
@Override
104+
public boolean equals(final Object o) {
105+
if (this == o) return true;
106+
if (o == null || getClass() != o.getClass()) return false;
107+
108+
final TestingClock that = (TestingClock) o;
109+
110+
return internalTime.get() == that.internalTime.get();
111+
}
112+
113+
@Override
114+
public int hashCode() {
115+
return Long.valueOf(internalTime.get()).intValue();
116+
}
117+
}

0 commit comments

Comments
 (0)