Skip to content

Commit b3fca14

Browse files
committed
add scala.concurrent.java8.FutureConverter
Conversions between Future and CompletionStage, with JavaDoc. Also contains accessors for ExecutionContext converters and Promise construction (which are basically inaccessible in the Scala library because they are located in companion objects to traits).
1 parent 81c108b commit b3fca14

File tree

5 files changed

+567
-1
lines changed

5 files changed

+567
-1
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,7 @@
11
target
2+
.cache
3+
.classpath
4+
.project
5+
.settings/
6+
.target/
7+
bin/

build.sbt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ test in Test := {
3333
(test in Test).value
3434
}
3535

36+
libraryDependencies ++= Seq(
37+
"junit" % "junit" % "4.10" % "test",
38+
"com.novocode" % "junit-interface" % "0.8" % "test"
39+
)
40+
41+
testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a")
42+
3643
sourceGenerators in Compile <+= sourceManaged in Compile map { dir =>
3744
def write(name: String, content: String) = {
3845
val f = dir / "java" / "scala" / "compat" / "java8" / s"${name}.java"
@@ -59,3 +66,24 @@ initialize := {
5966
if (Set("1.5", "1.6", "1.7") contains specVersion)
6067
sys.error("Java 8 or higher is required for this project.")
6168
}
69+
70+
lazy val JavaDoc = config("genjavadoc") extend Compile
71+
72+
inConfig(JavaDoc)(Defaults.configSettings) ++ Seq(
73+
packageDoc in Compile <<= packageDoc in JavaDoc,
74+
sources in JavaDoc <<= (target, compile in Compile, sources in Compile) map ((t, c, s) =>
75+
(t / "java" ** "*.java").get ++ s.filter(_.getName.endsWith(".java"))
76+
),
77+
javacOptions in JavaDoc := Seq(),
78+
artifactName in packageDoc in JavaDoc := ((sv, mod, art) => "" + mod.name + "_" + sv.binary + "-" + mod.revision + "-javadoc.jar"),
79+
libraryDependencies += compilerPlugin("com.typesafe.genjavadoc" %% "genjavadoc-plugin" % "0.5" cross CrossVersion.full),
80+
scalacOptions <+= target map (t => "-P:genjavadoc:out=" + (t / "java"))
81+
)
82+
83+
initialCommands :=
84+
"""|import scala.concurrent._
85+
|import ExecutionContext.Implicits.global
86+
|import java.util.concurrent.{CompletionStage,CompletableFuture}
87+
|import scala.concurrent.java8.FutureConverter._
88+
|""".stripMargin
89+
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
package scala.concurrent.java8
2+
3+
import scala.concurrent.{ Future, Promise, ExecutionContext, ExecutionContextExecutorService, ExecutionContextExecutor, impl }
4+
import java.util.concurrent.{ CompletionStage, Executor, ExecutorService, CompletableFuture }
5+
import scala.util.{ Try, Success, Failure }
6+
import java.util.function.{ BiConsumer, Function JF, Consumer, BiFunction }
7+
8+
/**
9+
* This class contains static methods which convert between Java CompletionStage
10+
* and Scala Future. This is useful when mediating between Scala and Java
11+
* libraries with asynchronous APIs.
12+
*
13+
* Note that the bridge is implemented at the read-only side of asynchronous
14+
* handles, namely scala.concurrent.Future instead of scala.concurrent.Promise
15+
* and CompletionStage instead of CompletableFuture. This is intentional, as
16+
* the semantics of bridging the write-handles would be prone to race
17+
* conditions; if both ends (CompletableFuture and Promise) are completed
18+
* independently at the same time, they may contain different values afterwards.
19+
* For this reason, <code>toCompletableFuture()</code> is not supported on the
20+
* created CompletionStages.
21+
*
22+
* Example usage:
23+
*
24+
* {{{
25+
* import java.util.concurrent.CompletionStage;
26+
* import scala.concurrent.Future;
27+
* import static scala.concurrent.java8.FutureConverter.*;
28+
*
29+
* final CompletionStage<String> cs = ... // from an async Java API
30+
* final Future<String> f = toScala(cs);
31+
* ...
32+
* final Future<Integer> f2 = ... // from an async Scala API
33+
* final CompletionStage<Integer> cs2 = toJava(f2);
34+
* }}}
35+
*/
36+
object FutureConverter {
37+
38+
private class CF[T] extends CompletableFuture[T] with (Try[T] => Unit) {
39+
override def apply(t: Try[T]): Unit = t match {
40+
case Success(v) complete(v)
41+
case Failure(e) completeExceptionally(e)
42+
}
43+
44+
/*
45+
* Ensure that completions of this future cannot hold the Scala Future’s completer hostage.
46+
*/
47+
override def thenApply[U](fn: JF[_ >: T, _ <: U]): CompletableFuture[U] = thenApplyAsync(fn)
48+
override def thenAccept(fn: Consumer[_ >: T]): CompletableFuture[Void] = thenAcceptAsync(fn)
49+
override def thenRun(fn: Runnable): CompletableFuture[Void] = thenRunAsync(fn)
50+
override def thenCombine[U, V](cs: CompletionStage[_ <: U], fn: BiFunction[_ >: T, _ >: U, _ <: V]): CompletableFuture[V] = thenCombineAsync(cs, fn)
51+
override def thenAcceptBoth[U](cs: CompletionStage[_ <: U], fn: BiConsumer[_ >: T, _ >: U]): CompletableFuture[Void] = thenAcceptBothAsync(cs, fn)
52+
override def runAfterBoth(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterBothAsync(cs, fn)
53+
override def applyToEither[U](cs: CompletionStage[_ <: T], fn: JF[_ >: T, U]): CompletableFuture[U] = applyToEitherAsync(cs, fn)
54+
override def acceptEither(cs: CompletionStage[_ <: T], fn: Consumer[_ >: T]): CompletableFuture[Void] = acceptEitherAsync(cs, fn)
55+
override def runAfterEither(cs: CompletionStage[_], fn: Runnable): CompletableFuture[Void] = runAfterEitherAsync(cs, fn)
56+
override def thenCompose[U](fn: JF[_ >: T, _ <: CompletionStage[U]]): CompletableFuture[U] = thenComposeAsync(fn)
57+
override def whenComplete(fn: BiConsumer[_ >: T, _ >: Throwable]): CompletableFuture[T] = whenCompleteAsync(fn)
58+
override def handle[U](fn: BiFunction[_ >: T, Throwable, _ <: U]): CompletableFuture[U] = handleAsync(fn)
59+
override def exceptionally(fn: JF[Throwable, _ <: T]): CompletableFuture[T] = {
60+
val cf = new CompletableFuture[T]
61+
whenCompleteAsync(new BiConsumer[T, Throwable] {
62+
override def accept(t: T, e: Throwable): Unit = {
63+
if (e == null) cf.complete(t)
64+
else {
65+
val n: AnyRef =
66+
try {
67+
fn(e).asInstanceOf[AnyRef]
68+
} catch {
69+
case thr: Throwable cf.completeExceptionally(thr); this
70+
}
71+
if (n ne this) cf.complete(n.asInstanceOf[T])
72+
}
73+
}
74+
})
75+
cf
76+
}
77+
78+
override def toCompletableFuture(): CompletableFuture[T] =
79+
throw new UnsupportedOperationException("this CompletionStage represents a read-only Scala Future")
80+
81+
override def toString: String = super[CompletableFuture].toString
82+
}
83+
84+
/**
85+
* Returns a CompletionStage that will be completed with the same value or
86+
* exception as the given Scala Future when that completes. Since the Future is a read-only
87+
* representation, this CompletionStage does not support the
88+
* <code>toCompletableFuture</code> method. The semantics of Scala Future
89+
* demand that all callbacks are invoked asynchronously by default, therefore
90+
* the returned CompletionStage routes all calls to synchronous
91+
* transformations to their asynchronous counterparts, i.e.
92+
* <code>thenRun</code> will internally call <code>thenRunAsync</code>.
93+
*
94+
* @param f The Scala Future which may eventually supply the completion for
95+
* the returned CompletionStage
96+
* @return a CompletionStage that runs all callbacks asynchronously and does
97+
* not support the CompletableFuture interface
98+
*/
99+
def toJava[T](f: Future[T]): CompletionStage[T] = {
100+
val cf = new CF[T]
101+
implicit val ec = Future.InternalCallbackExecutor
102+
f onComplete cf
103+
cf
104+
}
105+
106+
private class P[T] extends impl.Promise.DefaultPromise[T] with BiConsumer[T, Throwable] {
107+
override def accept(v: T, e: Throwable): Unit = {
108+
if (e == null) complete(Success(v))
109+
else complete(Failure(e))
110+
}
111+
}
112+
113+
/**
114+
* Returns a Scala Future that will be completed with the same value or
115+
* exception as the given CompletionStage when that completes. Transformations
116+
* of the returned Future are executed asynchronously as specified by the
117+
* ExecutionContext that is given to the combinator methods.
118+
*
119+
* @param cs The CompletionStage which may eventually supply the completion
120+
* for the returned Scala Future
121+
* @return a Scala Future that represents the CompletionStage's completion
122+
*/
123+
def toScala[T](cs: CompletionStage[T]): Future[T] = {
124+
val p = new P[T]
125+
cs whenComplete p
126+
p.future
127+
}
128+
129+
/**
130+
* Creates an ExecutionContext from a given ExecutorService, using the given
131+
* Consumer for reporting errors. The latter can be created as in the
132+
* following example:
133+
*
134+
* {{{
135+
* final ExecutionContext ec = Converter.fromExecutorService(es, thr -> thr.printStackTrace());
136+
* }}}
137+
*
138+
* @param e an ExecutorService
139+
* @param reporter a Consumer for reporting errors during execution
140+
* @return an ExecutionContext backed by the given ExecutorService
141+
*/
142+
def fromExecutorService(e: ExecutorService, reporter: Consumer[Throwable]): ExecutionContextExecutorService =
143+
ExecutionContext.fromExecutorService(e, reporter.accept)
144+
145+
/**
146+
* Creates an ExecutionContext from a given ExecutorService, using the
147+
* default reporter for uncaught exceptions which will just call
148+
* <code>.printStackTrace()</code>.
149+
*
150+
* @param e an ExecutorService
151+
* @return an ExecutionContext backed by the given ExecutorService
152+
*/
153+
def fromExecutorService(e: ExecutorService): ExecutionContextExecutorService =
154+
ExecutionContext.fromExecutorService(e, ExecutionContext.defaultReporter)
155+
156+
/**
157+
* Creates an ExecutionContext from a given Executor, using the given
158+
* Consumer for reporting errors. The latter can be created as in the
159+
* following example:
160+
*
161+
* {{{
162+
* final ExecutionContext ec = Converter.fromExecutor(es, thr -> thr.printStackTrace());
163+
* }}}
164+
*
165+
* @param e an Executor
166+
* @param reporter a Consumer for reporting errors during execution
167+
* @return an ExecutionContext backed by the given Executor
168+
*/
169+
def fromExecutor(e: Executor, reporter: Consumer[Throwable]): ExecutionContextExecutor =
170+
ExecutionContext.fromExecutor(e, reporter.accept)
171+
172+
/**
173+
* Creates an ExecutionContext from a given Executor, using the
174+
* default reporter for uncaught exceptions which will just call
175+
* <code>.printStackTrace()</code>.
176+
*
177+
* @param e an Executor
178+
* @return an ExecutionContext backed by the given Executor
179+
*/
180+
def fromExecutor(e: Executor): ExecutionContextExecutor =
181+
ExecutionContext.fromExecutor(e, ExecutionContext.defaultReporter)
182+
183+
/**
184+
* Return the global ExecutionContext for Scala Futures.
185+
*
186+
* @return the ExecutionContext
187+
*/
188+
def globalExecutionContext: ExecutionContext = ExecutionContext.global
189+
190+
/**
191+
* Construct an empty <code>scala.concurrent.Promise</code>.
192+
*
193+
* @return a Promise which is not yet completed
194+
*/
195+
def promise[T](): Promise[T] = Promise()
196+
197+
/**
198+
* Construct an already fulfilled <code>scala.concurrent.Promise</code> which holds the given value.
199+
*
200+
* @return the fulfilled Promise
201+
*/
202+
def keptPromise[T](v: T): Promise[T] = Promise.successful(v)
203+
204+
/**
205+
* Construct an already fulfilled <code>scala.concurrent.Promise</code> which holds the given failure.
206+
*
207+
* @return the fulfilled Promise
208+
*/
209+
def failedPromise[T](ex: Throwable): Promise[T] = Promise.failed(ex)
210+
211+
}

src/test/java/scala/compat/java8/LambdaTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
package scala.compat.java8;
55

66
import scala.runtime.*;
7-
import static scala.compat.java8.TestAPI.*;
87
import static scala.compat.java8.JFunction.*;
8+
import static scala.compat.java8.TestAPI.*;
9+
910
import org.junit.Test;
1011

1112
public class LambdaTest {

0 commit comments

Comments
 (0)