mapVia as a combination of map and via in streams

I’ve hit a number of scenarios where I have a map followed by via just so that I can form the types expected by the via’s Flow. This adds more code than I’d prefer.

My thinking is that a ‘mapVia’ style of operation might be useful ie an input type as per ‘map’ and an output type as per ‘via’.

I might play around with this, but I was wondering if there had already been some thinking on it.

Cheers
C

This does not sound like it would pull it’s weight.

A map is a map, a via is a via, I really struggle to see how much code you’d save by such operator?

Actually, I understand that a map is actually implemented using a via, but I hear you. Just asking if there had been some prior thinking in this area.

That’s because every operator can be expressed as a via

class Source { 
  def anything = this.via(AnythingImpl)
}

that’s how any flows/sources/sinks are composed, yeah :blush:

I owe you a concrete example of the problem.

I think that the case for a mapVia style scenario is where you don’t have convenient control over the types with the Flow as it is in another library. A good example of this is with Alpakka’s MQTT flow:

...
  .map { 
    case (payload, span) =>
      (MqttMessage(mqttEventTopic, payload), span)
  }
  .via(MqttFlow(settings, 8, MqttQoS.atLeastOnce))

In this example, I’d like to pass through the span. I can’t as the library doesn’t accommodate me. This is a variant on the problem I originally described i.e. just moving the types around, but I feel that it is a useful scenario to illustrate the problem I’m having.

As a potential solution, let’s imagine that via can implement my mapVia by taking a partial function and returning a Source (warning - untested code):

...
  .via { 
    case (payload, span) =>
      Source
        .single(MqttMessage(mqttEventTopic, payload))
        .via(
          MqttFlow(settings, 8, MqttQoS.atLeastOnce)
            .map { mqttMessage =>
              (mqttMessage, span)
            }
          )
  }

Hmmm. Maybe all that I need here is a flatMapConcat

MQTT is just one example. Here’s another:

  .map {
    case ((someOtherData, decryptedData), span) =>
      ((getSecret("path-to-secret"), ByteString(decryptedData)), (someOtherData, span))
  }
  .via(IdentityStreams.encrypter)

In this one, the encrypter flow requires a secret and unencrypted data so that it may encrypt. Its signature is:

Flow[((GetSecret, ByteString), A), (ByteString, A), NotUsed]

So, because I control the encrypter, I’ve made it accommodate the ability to carry through elements. However, I must still reorder the types as the encrypter is general-purpose and part of another library.

I’ve come across a number of these scenarios. As stated, this problem I’m finding is having an additional map when I have no convenient control over the `Flow’s types.

The original MQTT example was really nice! In my opinion it would worth a .viaWithPassThrought[A,PT,B](f: Flow[A,B, _]). (And it could be a Flow[(A,PT), (B,PT), _]) (I can’t really express how much I like it :smiley: )

Thanks. I don’t want to make this just about passing through additional elements though. My real issue is having to conveniently manage the presentation of types when calling via.

Correct me if I’m wrong but I think this is the old unwrap/wrap, focus/unfocus, watchamacallit-issue. Discussed previously here:

I think the problem so far is that nobody figured out how to implement it. The biggest issue is that there is no guarantee that your wrapped Flow[In, Out] emits all elements passed into it or even that they come in the same order as you sent into it, so how long do you hold onto the context before throwing it away?

flatMapConcat could be a solution but isn’t great because it has to materialize a new substream for every element.

1 Like

You are entirely correct. I swear that I did look prior to posting! Thanks for pulling the issues together; it is great to see that I’m not on my own here.

Now, what to do about it. :-)