Combine prefixAndTail(1) with Sink.lazySink for SubFlow created by .splitAfter

I am currently developing an akka-stream/alpakka application that has the following general logic

  1. Given a Flow, split it into a SubFlow using the splitAfter method.
  2. For each of the SubFlow's in point 1, use prefixAndTail(1) to create a key based on the first element of that Subflow
  3. Attach each Subflow to a Sink that is given that key as a parameter.

So assuming we have a function defined as following which gives us a Sink

def mySink(key: String): Sink[ByteString, Future[Done]]

Essentially what I want to do is something like this

val source = ???
val subFlows = source.splitAfter(...)
val logic = subflows.prefixAndTail(1).flatMapConcat { case (head, rest) =>
  val computedFlow: Flow[ByteString, ByteString]  = ???
  computedFlow
}
val finished = logic.prefixAndTail(1).to(
  Sink.lazySink { (head) => 
    val key = computeKey(head)
    mySink(key)
  }
)

This code evidently doesn’t compile since Sink.lazySink's thunk takes 0 parameters. I tried other variations however I haven’t gotten anywhere. In the documentation for Sink.lazySink it says that you can combine it with prefixAndTail(1) to get the first element out of the flow presumably so you can use the first element as an input to the final Sink which in my case is mySink(key) (like with the combination of prefixAndTail(1).flatMapConcat on a Flow). The problem is that the .to command doesn’t accept parameters (only a raw Sink) so you end up with the following problem

val finished = logic.prefixAndTail(1).to(
  Sink.lazySink { () => 
    // How do I get the first element out of the sink so I can compute
    // the key?
    val key = computeKey(???)
    mySink(key)
  }
)

I tried computing the key in the logic Flow however then you have to compose Sink's of two different types (one which is the key, computed from the head of the subflow and the other which is the original Sink that accepts a ByteStream) which doesn’t really work

val logic = subflows.prefixAndTail(1).flatMapConcat { case (head, rest) =>
  ...
  val computedFlow: Flow[ByteString, ByteString] = ???
  val key = computeKey(head)
  computedFlow.map(something => (something, key) )
}
val finished = logic.to(
  Sink.lazySink { () => 
    // Here I have to return a Sink[ByteString, String] but 
    // mySink only returns returns Sink[ByteString].
    // I essentially need to do something along the lines of Sink[String].flatMap(key => mySink(key))
    // but this doesn't work because Sink's don't have an output
  }
)

So hence I am kind of stuck. Since I am working with a SubFlow I have a limited set of methods and I can’t seem to find a way to pass the key (which you compute with the first element from each SubFlow) to pass as a parameter to mySink(key: String)

Sink.lazyInit appears to close to what I want, I managed to get the following to compile

val logic = subflows.prefixAndTail(1).flatMapConcat { case (head, rest) =>
  ...
  val computedFlow: Flow[ByteString, ByteString] = ???
  val key = computeKey(head)
  computedFlow.map(something => (something, key) )
}
val finished = logic.to(
  Sink.lazyInit( { case (_, key} =>
    Future.successful(
      mySink(key).contramap[(ByteString, String)] { case (content, _) => content }
    )
  ), ???)
)

Although the problem is that Sink.lazyInit is deprecated and so I would prefer to use the prefixAndTail(1).to(Sink.lazySink) combination (as stated in the depreciation message). Also I am unsure as to what the point of requiring a Future is (and whether Future.successful is appropriate or should I use standard Future.apply that requires an ExecutionContext).

Transform a CSV file into multiple CSV files using Akka Stream - #6 by ennru Seems to be related however I haven’t managed to figure out the extract what you need to create the sink part.

Hi @mdedetrich,

here’s a basic example that breaks down the steps:

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

import scala.concurrent.Future

object SplitAfterPrefixTest extends App {
  implicit val system = ActorSystem()
  import system.dispatcher

  val input = Seq(
    "A", "say", "a", "word", "STOP",
    "B", "be", "ready", "STOP"
  )

  def handleLine(prefix: Seq[String], stream: Source[String, Any]): Source[Any, Any] = {
    prefix.head match {
      case "A" =>
        stream
          .map(_.toUpperCase())
          .runFold("")(_ + " " + _)
          .onComplete(res => println(s"Result for $prefix was \"$res\""))
      case "B" =>
        stream
          .map(_.reverse.toLowerCase)
          .runFold("")(_ + " " + _)
          .onComplete(res => println(s"Result for $prefix was \"$res\""))
    }

    Source.empty
  }

  val handleChunk: Sink[String, Future[Any]] =
    Flow[String]
      .prefixAndTail(1)
      .flatMapConcat((handleLine _).tupled) // getting only a single element
      .toMat(Sink.ignore)(Keep.right)

  Source(input)
    .splitAfter(_ == "STOP")
    .to(handleChunk)
    .run()

  Thread.sleep(1000)
  system.terminate()
}

The basic issue is probably that prefixAndTail also creates something that resembles a subflow but isn’t exactly, so you need to make sure to run the streams you are getting manually.

A problem with that particular solution is that it’s a bit hard to figure out when the stream is done. Is that a feature that you need?

Johannes

Hi @jrudolph thanks for the response!

Regarding the handleLine function that you have in the solution, maybe I am misunderstanding something but this looks like a workaround/hack? Essentially you materialize the stream that is result of prefixAndTail(1) prematurely and just return an empty Source using Source.empty just to get around the fact that flatMapConcat doesn’t work when you return a Sink (rather it only returns when you work another Source).

At least to me it looks like there doesn’t appear to be an ergonomic/idiomatic way to handle this problem apart from manually recreating a Sink using wireTap/mapMaterializedValue like in akka - Combine prefixAndTail(1) with Sink.lazySink for SubFlow created by .splitAfter - Stack Overflow or in your case prematurely running the stream and then just returning Source.empty to satisfy the compiler.

The idea is that the stream will be permanently running so in production its not an issue but its annoying when it comes to testing because as you said its not possible to figure out when the stream finishes so currently the tests just have a timeout longer than the entire running time of the stream.

Is there an alternative to using splitAfter? As far as I understand from the akka documentation this is the canonical way to handle such cases.