Dynamically assemble graphs

Hi There,
We basically built a nice looking Akka Stream GUI, where people can point and click together any kind of complex stream. I have this represented in our own graph data structure and now want to convert that into an Akka stream graph. My challenge is when building the stream graph usingRunnableGraph.fromGraph(GraphDSL.create(... I only get NotUsed as completion stage, however, I need CompletionStage or if I have multiple sinks a list of completion stages.

I can not pre-create my sinks and pass in as I would not know which sink wires up with wich another element. I tried name or attribute but can’t access those as only SinkShapes are available in the builder function.

I’m sure I’m missing something.
Any pointer how I could get the completion stages?
Thanks.

1 Like

If you want a completion stage, you’d typically materialize a graph and then call a Sink.seq on it. Make sure you put a limit on the number of CompletionStage you are getting back.

final int MAX_ALLOWED_SIZE = 100;

// OK. Future will fail with a `StreamLimitReachedException`
// if the number of incoming elements is larger than max
final CompletionStage<List<String>> strings =
  mySource.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq(), mat);

// OK. Collect up until max-th elements only, then cancel upstream
final CompletionStage<List<String>> strings =
  mySource.take(MAX_ALLOWED_SIZE).runWith(Sink.seq(), mat);

https://doc.akka.io/docs/akka/2.5/stream/stream-cookbook.html

If you want something more dynamic, you would typically use BroadcastHub / MergeHub to dynamically work with streams, but I’m not sure this is what you’re asking.

I guess you are using the Java API?

There are more constructors in GraphDSL that allow to access and propagate materialized values. E.g. see create3 https://doc.akka.io/japi/akka/2.5.19/akka/stream/javadsl/GraphDSL.html#create3-akka.stream.Graph-akka.stream.Graph-akka.stream.Graph-akka.japi.function.Function3-akka.japi.function.Function4-

See this section of the documentation. Especially, the examples containing topHeadSink and GraphDSL.create(list, ...) seem relevant to your question.

Johannes