GraphDSL, UniqueKillSwitch, CompletionStage

(Joa23) #1

I’m working on dynamically assembling Akka streams from a beautiful GUI. I’m trying to create a runnable graph with a completion stage and a kill switch. I got the CompletionStage working by pre-creating the all sinks that the user might add to the graph via GUI and later finding the freshly created SinkShapes that are actually passed into the builder (a dirty hack but seems to work).

for (int i = 0; i < sinkNodes.size(); i++) {
      String sinkNodeName = sinkNodes.get(i);
      Sink sink = ...
      map.put(sinkNodeName, i);

        (GraphDSL.Builder<List<Pair<UniqueKillSwitch, CompletionStage<Done>>>> builder, List<SinkShape<Tuple>> outs) -> {...

intr index = map.get(sinkNodeName);
// assuming order of sinks and SinkShapes remains identical 
SinkShape<Tuple> dst = outs.get(index);

However, I have challenges with my KillSwitch that I add to my Source:

Source<Tuple, CompletionStage<Done>> source =      ...
    // Add Kill Switch
Graph<FlowShape<Tuple, Tuple>, UniqueKillSwitch> single = KillSwitches.single();
Source<Tuple, UniqueKillSwitch> sourceWithKillSwitch =
sourceWithCheckpoint.viaMat(single, Keep.right());
SourceShape<Tuple> shape = builder.add(sourceWithKillSwitch);

When running my stream I basically only get a CompletableFuture but not the expected Pair<UniqueKillSwitch, CompletionStage<Done>>.
I assume that somewhere in my code the toMat(sink, Keep.both ()) is missing but I’m unclear where to add this using the GraphDSL. I did extensive googling but could not find anything. Any pointers are highly apprechiated.

(Gergő Törcsvári) #2

I never used the java api, but I hope I can help you. 2nd codeblock.
If you give the precreated component in to the GraphDSL.create then your builded graph can materialize the components materializedValue.

So if you create the killswitch before the create, add in to the create as parameter and use it inside; then it could work.