Integrating an Observable within a FlowShape

Hello,

I’m trying to integrate an Observable within a Graph, using a FlowShape.
The idea is to have this kind of pipeline:

                                             Observable

                                                  +
                                                  |
                                                  |
+-----------------------+             +-----------v-----------+         +-----------------------+
|                       |   String    |                       |    T    |                       |
|   Source[String, _]   +------------->   Flow[String, T, _]  +--------->   Sink[T, _]          |
|                       |             |                       |         |                       |
+-----------------------+             +-----------------------+         +-----------------------+


The String is an input to the Observable, but I have a hard time figuring this out, between the Observable, the Observer, the InHandler and OutHandler.

The Observable comes from the MongoDb Scala driver.
Here is a sample code:

final class WithObservableStage[T](teamCollection: MongoCollection[T])
  extends GraphStage[FlowShape[String, T]] {

  val in: Inlet[String] = Inlet[String]("WithObservableStage.in")
  val out: Outlet[T] = Outlet[T]("WithObservableStage.out")

  override def shape = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {

    private val observer: Observer[T] = new Observer[T] {
      override def onNext(result: T): Unit = (doc: T) => {

        getAsyncCallback[T](emit(out, _)).invoke(doc)
      }

      override def onError(e: Throwable): Unit = (e: Throwable) => {

        getAsyncCallback((t: Throwable) => fail(out, t)).invoke(e)
      }

      override def onComplete(): Unit = () => getAsyncCallback((_: Unit) => complete(out)).invoke(())
    }

    override def preStart(): Unit = pull(in)

    setHandlers(in, out, new InHandler with OutHandler {

      override def onPush(): Unit = {
        val id = grab(in)

        teamCollection
          .find(Filters.eq("id", id))
          .first()
          .subscribe(observer)
      }

      override def onUpstreamFinish(): Unit = observer.onComplete()
      override def onUpstreamFailure(t: Throwable): Unit = observer.onError(t)

      override def onPull(): Unit = ()
    })
  }
}

When I execute this code, I get a timeout, because the observer is never completed.

I’m kinda new to this, so maybe it does not make sense at all :grimacing:

Thank you very much,
Nicolas

Sorry, I didn’t made a lot of effort to fully understand your problem, just the ideas/experiences in this topic:
The mongo scala driver’s observeable has a toFuture or something similar method. Sooo you can use mapAsync(1)(str => collection.find(Filters.eq(“id”, str)).toFuture) or sth like that instead of a custom stage.

And in the alpakka repo there is a mongo driver or at least a pr for that, maybe that code can help too.

1 Like

Thank you very much.

That’s a good idea, I was wondering if there was a way to avoid to materialize this using a Future?

Another option is to convert Observable to Reactive Streams publisher and then create an Akka Streams source directly from a publisher.