Combining materialized values in Java GraphDSL.Builder

Hi,

I am trying to port an existing Java “workflow system” over to Akka Streams. Since the different processing stages in the existing system are heavily stateful, I created my own GraphStages with their own GraphStageLogic. That works perfectly. Also mapping the existing workflow layout over to Akka Streams using the GraphDSL.Builder in the following way (code directly copied from the Akka documentation, not my custom GraphStages) was straight forward:

  RunnableGraph.fromGraph(
      GraphDSL.create(builder -> {
      final SourceShape<Integer> A = builder.add(Source.single(0));
      final UniformFanOutShape<Integer, Integer> B = builder.add(Broadcast.create(2));
      final UniformFanInShape<Integer, Integer> C = builder.add(Merge.create(2));
      final FlowShape<Integer, Integer> D =
        builder.add(Flow.of(Integer.class).map(i -> i + 1));
      final UniformFanOutShape<Integer, Integer> E = builder.add(Balance.create(2));
      final UniformFanInShape<Integer, Integer> F = builder.add(Merge.create(2));
      final SinkShape<Integer> G = builder.add(Sink.foreach(System.out::println));

      builder.from(F.out()).toInlet(C.in(0));
      builder.from(A).toInlet(B.in());
      builder.from(B.out(0)).toInlet(C.in(1));
      builder.from(C.out()).toInlet(F.in(0));
      builder.from(B.out(1)).via(D).toInlet(E.in());
      builder.from(E.out(0)).toInlet(F.in(1));
      builder.from(E.out(1)).to(G);
      return ClosedShape.getInstance();
    }));

All that is already running with a very good performance :slight_smile:

What I am struggling with now is the problem, that each of my stages produces a materialized value. I would like to combine all these materialized values using my own combination method so that the resulting graph will produce that combined materialized value.

Can anybody point me into the correct direction how do this? Or am I thinking in the totally wrong direction and should do it totaly differnt?

Thanks in advance,
Lay

First, please note that such huge GraphDSLs are usually not a good sign – you should strive to simplify them and only use the GraphDSL if really necessary.

Having that said, it would be something like this:

    Graph<ClosedShape, CompletionStage<Done>> g = GraphDSL.create7(
        Source.single(0),
        Broadcast.create(2),
        Merge.create(2),
        Flow.of(Integer.class).map(i -> i + 1),
        Balance.create(2),
        Merge.create(2),
        Sink.foreach(System.out::println),
        (mA, mB, mC, mD, mE, mF, mG) -> mG,
        (builder, A, B, C, D, E, F, G) -> {
      builder.from(F.out()).toInlet(C.in(0));
      builder.from(A).toInlet(B.in());
      builder.from(B.out(0)).toInlet(C.in(1));
      builder.from(C.out()).toInlet(F.in(0));
      builder.from(B.out(1)).via(D).toInlet(E.in());
      builder.from(E.out(0)).toInlet(F.in(1));
      builder.from(E.out(1)).to(G);
      return ClosedShape.getInstance();
    });
    RunnableGraph.fromGraph(g);

Notice that first instead of using the builder.add() we import in the create right away; and then we use the combineMat function to combine the materialized values. The last function is the builder, and all the shapes that you have “added” by passing them to the create function.

This style allows the GraphDSL to remain completely typesafe in the shape types as well as the types of the materialied values.

As mentioned though, this should be seen rather as power user API.

Hi Konrad, thanks for your reply! I am aware of the method to specify multiple input graphs to the builder togehter with a method to combine their materialized values. But to use this method, you have to know your graph at compiile time.

I was not really clear about that in my first post: I don’t know the layout of my graph at compile time. The user of the system can build up a workflow of arbitrary processing nodes which all work on the same message type a kind of document that is enriched, filtered, mapped etc. A workflow node in this system has 1 to N inputs and 1 to M outputs. Also sinks exist which have only 1 to N inputs:

I am mapping these workflow nodes to a combination of FanIn, Flow and FanOut stages in Akka Streams. The FanIn and FanOut stages are optional, they are suppressed if I am only dealing with one input/output:

So I am using the following code (simplified, FanIn and FanOut suppressed etc.):

    ArrayList<GraphStageWithMaterializedValue<FlowShape<OutputMessage,OutputMessage>, CompletionStage<WorkflowResult>>> flows = ...;
    
    Flow<ByteString,OutputMessage,CompletionStage<WorkflowResult>> inputFlow = Flow.fromGraph(_byteStringSourceStage);
    
    // Create the Akka Streams Sink which takes an arbitrary sequence of ByteStrings
    _sink = Sink.fromGraph(
      GraphDSL.create(
          inputFlow,
          (builder, flow) -> {           
            // Add the source node
            _byteStringSourceStage.getOutputPort().setOutlet(flow.out());            
            
            // Add all flows 
            for (GraphStageWithMaterializedValue<FlowShape<OutputMessage,OutputMessage>, CompletionStage<WorkflowResult>> f : flows) {
              builder.add(f);
            }
            
            // Link all port's
            for (HashMap.Entry<String,OutputPort> e : _outputPorts.entrySet()) {
              OutputPort out = e.getValue();
              
              builder.from(out.getOutlet()).toInlet(out.getPeerPort().getInlet());
            }
            
            return SinkShape.of(flow.in());
          }));

All that already works perfectly except that I am not able to get the “combined” materialized value of all processing stages.

The only two solutions that I have currently in my mind (and I am not happy with both):

    switch (flows.size) {
      case 1:
        _sink = Sink.fromGraph(GraphDSL.create(inputFlow, flows.get(0), (m1, m2) -> mC, (builder, m1, mw) -> {...});
        break;
        
      case 2:
        _sink = Sink.fromGraph(GraphDSL.create3(inputFlow, flows.get(0), flows.get(1), (m1, m2, m3) -> mC, (builder, m1, mw) -> {...});
        break;
      ...  
      // But we have only a llimited number of createX methods, flows.size() could be bigger... 
    }
  

Or I could iteratively build up the graph using the builder multiple times first building only the combined graph of my intial sink and the first flow. Then building a combined graph of the previous result and flow2. But that can get arbitrary complex to handle the unconnected Inlets and Outlets of these intermediate graphs. At least I am not really sure about the numbering schem of the open Inlets and Outlets of the resulting graph.

What I am really missing is a public method builder.add(flow, (m1, m2) -> mc) which takes the current materialized value of the builder result and combines it with the materialized value of the newly added flow. I think internally such an API exists but is not made available to the public (perhaphs for some good reason :slight_smile: ).

Regards,
Lay

Perhaps you are interested in https://github.com/akka/akka/pull/24102 ?

Yeah, it would seem the seq based create is just what was needed here :) “Soon…” then, as the PR seems basically ready

I’m happy that not I’m the only one who wanted this feature :wink:

Yes, that seems to be exactly what I need :slight_smile:

As soon as I will find some time I will pull that version from github and give you some feedback!

So, I got tg44’s pull request from github and compiled a Akka Streams version.

But I am still struggeling to get everything to work, most likely due to my limited knowledge about the GraphDSL interface.

Basically I have 3 types of “nodes” in my workflow (again I am suppressing the FanIn and FanOut stages here):

  1. N times Sink<OutputMessage, CompletionStage<WorkflowResult>>

  2. M times Flow<OutputMessage, OutputMessage, CompletionStage<WorkflowResult>>

  3. One Flow<ByteString, OutputMessage, CompletionStage<WorkflowResult>>

Together all these nodes form a Sink with one ByteString input in the Akka Streams sense. But when I use the new List API from tg44’s pull request, I have to set up a List for each of these 3 GraphStage types. For instance for my sinks:

ArrayList<Graph<SinkShape<OutputMessage>, CompletionStage<WorkflowResult>>> sinkNodes = ...

Then I am calling:

  GraphDSL.create(sinkNodes,
                    (GraphDSL.Builder<List<CompletionStage<WorkflowResult>>> builder, List<SinkShape<OutputMessage>> outs) -> {
                      ...
                      return ?????????????????;                      
                    });

But what do I have to return at the end? All the examples I found - including the new testcases - either return a ClosedShape, a SinkShape, a SourceShape or a FlowShape in the Akka Streams sense. But my resulting graph here is in fact a Sink with N inputs and not a Sink with only one input as in the Akka Streams sense.

I think we reached the point when we could give better advices if we would know more of the concrete problem.

You can invent shapes too. But when I did that every time it was bcs of a bad initial concept. So I help you to solve this problem, but maybe the concept is the wrong and not the toolkit has missing parts :slight_smile:

I never tried the java api, the docs is missing there too but there it is.

Not tested example shape with M inputs.

case class MSinkShape[In](
  ins:         immutable.Seq[Inlet[In]]
  ) extends Shape {

  override val inlets: immutable.Seq[Inlet[_]] = ins

  override def deepCopy() = MSinkShape(
    ins.carbonCopy()
)

}

And now you can use it with return new MSinkShape(Seq(flow1.in(),flow2.in(),flow3.in(),flow4.in(), ..., flowM.in())); or something like that with java->scala conversion.

Hi Gergő, thanks for your patience with me :slight_smile: and I don’t say the toolkit has missing parts. Most likely it is my limited knowledge about the toolkit and even my concept might be wrong.

To clarify what I am trying to achieve, I have created another sketch. In my system the user can configure an arbitrary workflow, that consists of a single input source which delivers ByteStrings. A special Flow converts these ByteString to Messages (you can think of them like something similar to a XML DOM Document). These messages are then processed within the workflow. Filtered, routed, enriched etc. In the end the documents are then written to one or more Sinks which are converting the Messages into Files, Kafka Messages or whatever. I mapped that workflow to the following Akka Stream components:

If one of my workflow nodes has more than one input or output, I map it into a combination of FanIn -> Flow -> FanOut. If the number of inputs or outputs are one, the FanIn and FanOut stages are suppressed.

As said previously, all that is already working perfectly. I was only struggeling with the problem, that I would like to get the materialized values of

a) the single Flow<ByteString,Message>
b) all the Flow<Message,Message>
c) all the Sink<Message>

stages.

I have splitted my graph at the boundary Source<ByteString> => Flow<ByteString,Message>. At that splitting point, my two graph components are “real” Akka Streams processing stages. The left side is the Source<ByteString> while the right side is a Sink<ByteString>.

To create the Sink on the right side, I tried to use your new API. As far as I understood I can add a List<> of a certain Graph type to the builder. In my case I have two of these lists:

a) a List<Sink<Message>>
b) a List<Flow<Message,Message>>

But in both cases the result of that GraphDSL.create(List<...> ...) call is neither aSink<Message> (with a single input) nor a Flow<Message,Message> with a single input and output. In both cases the result is an “OpenShape” with an arbitrary number of inputs and outputs (for theList<Flow> case) and a “SinkShape” with an arbitrary number of inputs for the List<Sink> case.

What I am missing (or what I don’t know about) is the chance to return an arbitrary Graph shape form the GraphDSL.create() method which is not necessary a strict Flow or Sink with a single input/output but instead allows an arbitrary number of open input/outputs. I will look into your suggestion of creating my own shape.

I hope this clarifies what I am trying to achieve. Any comments and suggestions are highly welcome.

Regards,
Lay

Meh… I see now. The “problem” is that you want all the materialized values, which is not really common, so this makes the twisted solution. As I see now, in this use-case you can’t really can choose another way.

You can split your graph in 3 separated part.
a) the initial Source with 1 output.
b) a huge fanout with N output
c) a huge multiSink

a) is pretty straightforward.
In b) if you are lucky you can get a list of Flow<Msg,Msg> and you can do the N input M output thingy inside your fanout. With my PR you can get out the mat value from all the given flows.
In c) you have 2 options.

  • One is definening a new shape (as described before).
  • The other is using a fanin shape and a sink.ignore. (This will be hacky but working perfectly.) So you have a list of sinks. With my PR you can get those mat values back. The only trick is that you need to make a single dummy source like Source.empty. If you have N sink and 1 source this is a perfectly normal fanin stage + you have all the materialized values you want. The only thing you need to take one more dummy sink.ignore at the end when you wiring all your stages to one closed shape.

In Java world maybe I would prefer the hacky fanin method bcs of the undocumented Shape creation. In scala world I would definitely write my multisink shape bcs of the reuseability. But the deadlines and the others in the project can be a strong decision point too.

For faster execution (and if the problem enables you) maybe you get better performances if you use buffers. If you have a huge graph and some slow and some fast parts and the load is not balanced but the message ratio is correlates with the processing speed some buffer can help you A LOT!

Thanks Gergő! Your input and the Github PR helped me a lot. I finally got it to work. As suggested I create 3 different partial graphs: the initial Source, a UniformFanOut and finally my “own” Java MultiSink based on your Scala code snippet. And it is working :smile:

I was struggeling a little bit with the Akka library I build myself based on your PR from Github. I don’t know the exact reason, but I had to add some more libraries (scala-java8-compat_2.12, org.reactivestreams/reactive-streams) to my project compared to using the standard Akka build from the Maven repostiory otherwise I got some ClassNotFound exceptions.

One final question I still have. In your last post you said:

The “problem” is that you want all the materialized values, which is not really common, so this makes the twisted solution

Perhaps I did not understand the conept correctly. So let me try to explain what I want to achieve and then you can bring me perhaps back on the right track: There are certain error conditions which can happen in each of my processing nodes from the sketch. In the GraphStageLogic I have an interface to notify the downstream components about that problem (fail(Outlet, Throwable)). So I can propagate the error condition to all the downstream components. But how can I propagate the error to the upstream components? If that would be possible, my leftmost Sink which converts the ByteString to Message would be able to receive the information from the downstream components and could return all that information in a single materialized value. On the right side (“downstream side”) I have multiple Sinks, so I need multiple materialzed values there.

I don’t like the idea of just throwing an exception from the GraphStageLogic because I want to use the Future returned by RunnableGraph.Run as a single point where I handle either success or failure:

      future = runnableGraph.run(ActorMaterializer.create(getContext()));
      future.whenComplete((r,t) -> { 
        if (r != null) {
          System.out.println("Future completed with success!");
          ...
      }  else  {
        System.out.println("Future completed with throwable!");
        ...
      });

Does that make sense or is there a better solution?

Regards,
Lay

Hey!

I have never handled errors in stages like this. So I can’t properly help with this. (Thats why I waited to others.)

When you fails a stage it probably fails the whole graph. (The error propagates only down, but the stage input port will be closed, and this can spread through in all the upper nodes (and ofc to all of the nodes below them).)
I have honestly no idea how you can wire the exception back to the caller. Only thing I can think of right now is making a lonely sink which is just there for the exception handling. (Something like: if the upstream fails with the exception send this exception to an actor. Or just using that mat value (if its a Sink.ignore this will do this exactly).) This feels not so optimal too so I have no idea…

Hi Lay,
Any chance we can have an offline discussion about your experience using akka as workflow system? I’m considering a similar approach and would love to compare notes if picking Akka for the job makes sense. How to best contact you? Github? Or, would you be open to sharing your experience publicly?
Thanks.
J

Hi Joa,

since that discussion might also be of interest to other people I think its best to share this publicly. This also gives the Akka/Akka Streams developers a chance to comment.

In general I am pretty happy with my choice to use Akka streams. The Akka Streams part is now working without any problems and we have moved on to port other parts of our system. So I was not directly dealing with the Akka related stuff in the last weeks.

On the positive side I would say:

  • The system works pretty fast compared to the old traditional threading model we are porting from.
  • You have the rich Akka Streams eco system out of the box. We are immediately supporting reading and writing from/to File, Amazon S3, Kafka etc. with minimal effort on our side.
  • The refactoring of our existing components to be able to work in a reactive stream environment (being able to push from one side and pull form the other side) took some time but also improved their quality. During that refactoring process we tried to minimize copying of data as much as possible leveraging the Akka ByteString with very positive effects (would be good to add a similar CharString to the framework too :slight_smile:)

The following things are in my opinion sub-optimal for a workflow system and could be improved (but that might also be based on my limit experience with the system and not the system itself):

  • The entire stream creation process seems to be build for the use case that the stream layout is known at compile time and also the examples only deal with that case. I was struggling a long time with the GraphDSL Builder to connect the components of an arbitrary stream since it is in my opinion really not intuitive what is happening inside the builder, when deep copies of components are done etc. and how to connect the inlets and outlets later on based on the links in the network the user has modelled in the configuration.
  • The stream components (Sources and Sinks) are in most of the cases tightly coupled to the data they process (and not only to the type of data). For instance you have to specify the exact file to read and write in the constructor of the FileSource and FileSink. That leads to the fact that you have to connect your stream components again and again for every new file to process. That might not be a problem for large files, but if you are processing millions of small files a day it adds up. For a generic framework like the one we are building in my opinion it would be better to have two separate phases: 1) create the workflow layout and connect the components (which has only to be done once and is static afterwards) 2) materialize the workflow together with the data to process.
    Unfortunately these two phases are combined in one phase in the Akka Streams/ Alpakka Framework. In theory you could use the attributes specified during stream materialization to achieve this, but the existing Alpakka components do not support that and then you would have to write your own components.

I hope that information is of some value in your decision process.

Lay

1 Like

The sub-optimals:

  • GraphDSL is sometimes messy to understand, but you have every opportunity to build your streams at runtime (creating a function which creates the particular Flow element based on some input). If you know what type of stream do you want to build at least at runtime. Ofc. if you want to modify the processing steps based on the data you have its much harder. I think with some hardworking we could save and recreate streams from/to jsons too. (Probably finding the correct classes and inner function call dependencies at runtime would be the hardest part. It would be really interesting to build a graphical tool which could show/create graphdsl representations from/to code or other sources.)
  • Most of the time these coupled things are totally bypassed if you go one level higher. For example the file read:
//single read
val file = Paths.get("example.csv")
FileIO.fromPath(file).map(_.size).fold(0)(_ + _).to(Sink.foreach(println(_)))
//batch read
val files = List(Paths.get("example1.csv"), Paths.get("example2.csv"))
def source(p: Path) = FileIO.fromPath(p).map(_.size).fold(0)(_ + _)
Source(files).flatMapConcat(source(_)).to(Sink.foreach(println(_))) //one file at a time
Source(files).flatMapMerge(8, source(_)).to(Sink.foreach(println(_))) //8 file at a time

You can do something close to this with groupBy and/or fanout stages to write to multiple files at once but its more hacky… For file-rotation we have an alpakka connector but that maybe still not what you want, and sadly you have to write your own, but at least there are some example code to start with.

So these problems are totally solvable just not “off the shelf”.

Lay,
Sorry just saw your post as I didn’t get notifications for some reasons. I’m right with you regarding the struglle building dynamic streams on the fly. I struggled with simple things e.g. that you need to first create the sinks and pass them in to be able to get the completion stages. The solution is quite a hack but it seems to work.
Should we file a improvement request to help collect requirements for future versions?
Thx

First I want to wish you all a happy new year 2019! I had a few days off, so excuse me for the late response.

In your example you are processing multiple files at once at the input side. But how would you create an output file with the same/similar name for each input file without reconnecting the entire stream again and again? The LogRotatorSink is able to create new output files. But in the examples I have seen so far, the creation is only based on the size of the input or the current time which is bascially the sink’s own state and not the state of the entire stream. And that is only a special case for this sink. A Amazon S3 ouput sink would not support that behaviour.

A typical (telecommunications) use case for our workflow system would be to poll a input directory for new files. Whenever an input file is dropped into the input directory, it is picked up by the system and processed. This processing includes a lot of filtering, routing and mapping steps for the single records within the file. Examples for these processing steps are checks for duplicate records, calculation of statistical data, mapping into another data format and so on. These are typically more than 20 stages and its more a tree structure than a linear workflow. So its a significant overhead to wire the things up again and again. As a result of the processing the system would for instance write an output file in another directory (having the same/similar filename), send some of the records into a Kafka queue of the fraud detection system and so on …

With your help I was able to map this workflow into 3 basic components: a single Source (I am using the existing Alpakka connectors), a huge Uniform-FanOut shape which includes all the processing stages I mentioned before and a “MultiSink” which basically is a sink with multiple inputs and internally consists of a number of FanIn stages and standard Alpakka connectors). So whenever a new input is processed, I am creating the Alpakka Source and Sinks with the parameters of the current processing unit (in this example a file) and connecting the output of the Source with the input of the Uniform-FanOut shape and the outputs of the Uniform-FanOut Shape with the “MultiSink” which has been again created from all the newly created Alpakka sinks. Then I am materializing the workflow and I am processing it.

For the “workflow system use case” it would be much better to have a static stream layout that is only configured and wired up once and afterwards only “reparameterized”. All my processing stages within the workflow for instance have to write to a special audit log that might be different for different input files. I was able to solve this using the attributes during stream materialization:

    LoggerAttribute loggerAttribute = new LoggerAttribute(_logger);

    List<CompletionStage<WorkflowResult>> futureList =
            runnableGraph
                    .withAttributes(Attributes.apply(loggerAttribute))
                    .run(ActorMaterializer.create(getContext()));

A similar concept for the standard Alpakka components would be very helpful (for my use case :slight_smile: ):

   LoggerAttribute loggerAttribute = new LoggerAttribute(_logger);
   InputFileAttribute inputFileAttribute = new InputFileAttribute(Paths.get("/in/myinput.csv");
   OutputFileAttribute outputFileAttribute = new OutputFileAttibute("MyFileSinkName", "/out/myoutput.cvs");
   OutputFileAttribute outputFileAttribute2 = new OutputFileAttibute("MyFileSinkName2", "/filtered/myoutput-filtered.cvs");
   KafkaOutputAttribute kafkaAttribute = new KafkaOutputAttribute("MyKafkaSinkName", ...);

    List<CompletionStage<WorkflowResult>> futureList =
            runnableGraph
                    .withAttributes(
                               Attributes.apply(loggerAttribute, 
                                                         inputFileAttribute, 
                                                         outputFileAttribute, 
                                                         outputFileAttribute2,
                                                         kafkaAttribute))
                    .run(ActorMaterializer.create(getContext()));

I am not suggesting exactly this attribute interface. Other interfaces like for instance an overridable method that is called when a Sink/Source is materialized might also work. I just wanted to show that in the case of a workflow system the stream layout is independent of the data that is processed while in the Alpakka world the standard sources and sinks combine both, the stream layout and the data to process.

I also know that I am somehow “abusing” the system. So my lengthy post should not be seen as criticism of the Alpakka system. It is working great. Instead my post shall just point to another use case, that perhaps might be of interest to other users too and could be addressed in future releases.

Regards,
Lay

I see it now! Yeah, you not really can do that. But I think it is possible to reimplement some of these stream elements to use Attributes instead of the constructors. Probably this is not a really good idea in general, and without the (akka) source I have no idea if it would actually work or not, but worth to consider if really the graph building is the bottleneck.

(BTW why you create a new materializer for every run? Its that slow to materialize these blueprints so you worth to pay the actor creation overhead? Do you don’t leak those actors? (EDIT: As I told before, I not used the java api, but in the scala land I always reuse the materializer, so its really strange for me to see a not reused one. I didn’t read throught the whole java/scala api for this, but this seems to be a smell in first sight for me. I could be wrong tho.))

@tg44,
it should be straight forward to traverse a graph and build a DOT file from it. Happy to build a little tool that does it. However, it looks like there is no public API to traverse the graph and all tools I found are quite a hack and hard wired to internals and therefore akka version depending.