Custom GraphStageLogic for Nats Jetstream

Hi there,

We are heavy users of akka-stream at MyCoach Pro and have been willing to use it with NATS JetStream for a while.

We’ve built a custom source stage, but are having some issues with it in production and want to fix it/rebuild it properly.

When building a custom source I’m wondering what we should do when onPull is called, and we have no message to push ? How do we indicate downstream that there are no messages at the moment, but there might be in the future.

Also, we want to wrap the API which is blocking with Future’s and are looking for guidance.

Here’s the current implementation: https://github.com/GlobalSport/akka-streams-nats/blob/develop/src/main/scala/com/mycoachsport/JetStreamSourceStage.scala

Thanks for your help

From that stage it looks like you could possibly rely on the existing Source.unfoldResource rather than implementing your own stage.

For implementing your own stage, it’s fine to not immediately take action onPull, a common scheme is to have a tryPush method that looks at the output to see if it is ready (isAvailable(Outlet)) and if it has some element ready, and only then pushes the next element if possible. Then call that method both onPull and where elements come “in” to the source.

For blocking consumers you can run the entire stage on the pool for blocking tasks, instead of using futures. Involving futures further complicates things a bit since they must communicate with the stage over async callbacks (Custom stream processing • Akka Documentation )

We wrote an overview blog article back in 2016 that may be useful for inspiration on how to connect existing (blocking) APIs with streams: Writing Akka Streams Connectors for existing APIs | Akka

1 Like

Hey @johanandren thanks a lot for your help and the pointers !

We’re reimplemeting the GraphStageLogic by taking inspiration on the FileTailSource from the article you’ve provided, this implementation is the closest to what we want. I’ll post a link to the implementation when completed.

Here’s the implementation : https://github.com/GlobalSport/akka-streams-nats/commit/718b830cb5d7aa123b31dae8706892e7d1e3d30b