Hello,
I’m trying to integrate an Observable within a Graph, using a FlowShape.
The idea is to have this kind of pipeline:
Observable
+
|
|
+-----------------------+ +-----------v-----------+ +-----------------------+
| | String | | T | |
| Source[String, _] +-------------> Flow[String, T, _] +---------> Sink[T, _] |
| | | | | |
+-----------------------+ +-----------------------+ +-----------------------+
The String is an input to the Observable, but I have a hard time figuring this out, between the Observable, the Observer, the InHandler and OutHandler.
The Observable comes from the MongoDb Scala driver.
Here is a sample code:
final class WithObservableStage[T](teamCollection: MongoCollection[T])
extends GraphStage[FlowShape[String, T]] {
val in: Inlet[String] = Inlet[String]("WithObservableStage.in")
val out: Outlet[T] = Outlet[T]("WithObservableStage.out")
override def shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private val observer: Observer[T] = new Observer[T] {
override def onNext(result: T): Unit = (doc: T) => {
getAsyncCallback[T](emit(out, _)).invoke(doc)
}
override def onError(e: Throwable): Unit = (e: Throwable) => {
getAsyncCallback((t: Throwable) => fail(out, t)).invoke(e)
}
override def onComplete(): Unit = () => getAsyncCallback((_: Unit) => complete(out)).invoke(())
}
override def preStart(): Unit = pull(in)
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
val id = grab(in)
teamCollection
.find(Filters.eq("id", id))
.first()
.subscribe(observer)
}
override def onUpstreamFinish(): Unit = observer.onComplete()
override def onUpstreamFailure(t: Throwable): Unit = observer.onError(t)
override def onPull(): Unit = ()
})
}
}
When I execute this code, I get a timeout, because the observer is never completed.
I’m kinda new to this, so maybe it does not make sense at all
Thank you very much,
Nicolas