Make sure elements fanned-in belong to same element fanned-out

Cross-posting this here and will link the answer.

Let’s say we have a graph that looks like this:

          broadcast ~> flowA ~> fanIn
source ~> broadcast ~> flowB ~> fanIn ~> sink
          broadcast ~> flowC ~> fanIn

Flows flowA, flowB,flowC all perform a transformation on the incoming elements. fanIn performs some combining action on the results of all three flows.

The challenge is that Flows A/B/C do not emit elements at the same rate. For some elements of the source, flowA has nothing to emit, while flowB and C continue emitting.

Now, at fanIn I want to be sure that the received elements on all three ports “belong” to the same element emitted from the source, i.e. that they are result of the transformations of the same element.

How would one go about this?

My current solution is to have Flows A/B/C emit Options. Each flow emits a Some if it can perform a transformation, and a None if it cannot. This way the number of emitted elements and rate on all three flows remains the same, and I can guarantee that received elements belong to the same source element. I’m looking for a more better performing solution that, if possible, does not require unnecessary object creation and wrapping.

Returning a None, will not really create a new object…
Wraping to a Some (not to an Option which will do the nullcheck) can make better performance too.
I think you can’t really bypass without a null element, but if you have a nonValid/null in your return type that can be used as None too. (For ex if they are objects with long id, you can create an invalid element with the id=-1 element and filter it out.)
I think there will be no silver bullet here.

BUT: I think this is not a problem, you will not lost significant performance, your code will have probably other mutch larger bottlenecks, so let it go :D

None will not really create a new object

Great point! That escaped me.

Another solution I’ve come up with, for certain cases:

Some flows categorize incoming elements into subcategories. Sometimes the incoming element cannot be categorized. In that case I make a component that has an outlet for every category, and and one outlet emitting only Unit every time an incoming element cannot be categorized, somewhat like this:

categorizer ~> categoryA  ~> fanIn
categorizer ~> categoryB  ~> fanIn
categorizer ~> noCategory ~> fanIn

fanIn emits whether the incoming element could be categorized or not, thus keeping the number or received and emitted elements equal.

In my actual graph there are a lot of flows that would just emit None most of the time, and every time such a flow splits into further sub-flows for transformation, there are more Nones and Nones etc. (It’s neural network pre-processing) That’s why I wanted to confirm if I’m missing something. But from what you are saying I’m right on track, thanks.

I’m not sure with your exact use-case but maybe this can help:
Now you have this:

          broadcast ~> flowA ~> zip
source ~> broadcast ~> flowB ~> zip ~> sink
          broadcast ~> flowC ~> zip

If you refactor it a bit you can have this:

          balance ~> flowA ~> flowB ~> flowC ~> merge
source ~> balance ~> flowA ~> flowB ~> flowC ~> merge ~> sink
          balance ~> flowA ~> flowB ~> flowC ~> merge

Possible problem that you lost the order of the elements.
Possible gain: you can use sth like a (Data, List[Classification]) where the a :: oldClasses is insanely well performed (you still need to wrap it to a tuple), and only the rare “classification happened” cases creates/wraps new objects.
Plus you can finetune the parallelism a little bit more easily.