Dynamically assemble graphs

Hi There,
We basically built a nice looking Akka Stream GUI, where people can point and click together any kind of complex stream. I have this represented in our own graph data structure and now want to convert that into an Akka stream graph. My challenge is when building the stream graph usingRunnableGraph.fromGraph(GraphDSL.create(... I only get NotUsed as completion stage, however, I need CompletionStage or if I have multiple sinks a list of completion stages.

I can not pre-create my sinks and pass in as I would not know which sink wires up with wich another element. I tried name or attribute but can’t access those as only SinkShapes are available in the builder function.

I’m sure I’m missing something.
Any pointer how I could get the completion stages?
Thanks.

1 Like

If you want a completion stage, you’d typically materialize a graph and then call a Sink.seq on it. Make sure you put a limit on the number of CompletionStage you are getting back.

final int MAX_ALLOWED_SIZE = 100;

// OK. Future will fail with a `StreamLimitReachedException`
// if the number of incoming elements is larger than max
final CompletionStage<List<String>> strings =
  mySource.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq(), mat);

// OK. Collect up until max-th elements only, then cancel upstream
final CompletionStage<List<String>> strings =
  mySource.take(MAX_ALLOWED_SIZE).runWith(Sink.seq(), mat);

https://doc.akka.io/docs/akka/2.5/stream/stream-cookbook.html

If you want something more dynamic, you would typically use BroadcastHub / MergeHub to dynamically work with streams, but I’m not sure this is what you’re asking.

I guess you are using the Java API?

There are more constructors in GraphDSL that allow to access and propagate materialized values. E.g. see create3 GraphDSL

See this section of the documentation. Especially, the examples containing topHeadSink and GraphDSL.create(list, ...) seem relevant to your question.

Johannes