@@ -21,47 +21,21 @@ import org.scalatest.time.{ Millis, Seconds, Span }
2121
2222import scala .concurrent .Future
2323import scala .language .implicitConversions
24- import scala .util .{ Failure , Success }
2524
2625trait FuturesSpec extends ScalaFutures {
2726
2827 implicit val defaultPatience : PatienceConfig = PatienceConfig (timeout = Span (60 , Seconds ), interval = Span (5 , Millis ))
2928
30- implicit def observableToFuture [TResult ](observable : Observable [TResult ]): Future [Seq [TResult ]] =
31- observable.toFuture()
32-
33- implicit def observableToFutureConcept [T ](observable : Observable [T ]): FutureConcept [Seq [T ]] = {
34- val future : Future [Seq [T ]] = observable
35- new FutureConcept [Seq [T ]] {
36- def eitherValue : Option [Either [Throwable , Seq [T ]]] = {
37- future.value.map {
38- case Success (o) => Right (o)
39- case Failure (e) => Left (e)
40- }
41- }
42- def isExpired : Boolean = false
29+ implicit def observableToFuture [T ](observable : Observable [T ]): Future [Seq [T ]] =
30+ observable.collect().toFuture()
4331
44- // Scala Futures themselves don't support the notion of a timeout
45- def isCanceled : Boolean = false // Scala Futures don't seem to be cancelable either
46- }
47- }
48-
49- implicit def observableToFuture [TResult ](observable : SingleObservable [TResult ]): Future [TResult ] =
32+ implicit def singleObservableToFuture [T ](observable : SingleObservable [T ]): Future [T ] =
5033 observable.toFuture()
51- implicit def observableToFutureConcept [T ](observable : SingleObservable [T ]): FutureConcept [T ] = {
52- val future : Future [T ] = observable.toFuture()
53- new FutureConcept [T ] {
54- def eitherValue : Option [Either [Throwable , T ]] = {
55- future.value.map {
56- case Success (o) => Right (o)
57- case Failure (e) => Left (e)
58- }
59- }
60- def isExpired : Boolean = false
6134
62- // Scala Futures themselves don't support the notion of a timeout
63- def isCanceled : Boolean = false // Scala Futures don't seem to be cancelable either
64- }
65- }
35+ implicit def observableToFutureConcept [T ](observable : Observable [T ]): FutureConcept [Seq [T ]] =
36+ convertScalaFuture(observable.collect().toFuture())
37+
38+ implicit def singleObservableToFutureConcept [T ](observable : SingleObservable [T ]): FutureConcept [T ] =
39+ convertScalaFuture(observable.toFuture())
6640
6741}
0 commit comments