Use a single source multiple times

I have a requirement where a file is uploaded by the end-user and it has to stream to two different services.
I tried with pre-materialize, that didn’t work.

Is there any API in akka-stream ? or Should I build custom graph ?

Hi @scalaLearner. Welcome to the Lightbend discuss board.

You could implement this by building (or extending an existing) graph.

If the file is uploaded with akka-http then you could add stages using the operator DSL to process the file in some way. Since akka-http uses and exposes akka streams types to the end user you can integrate an HTTP request stream with your custom app stream. It would only need to be materialized once.

If the file is uploaded by some other means you could see if there’s an existing Alpakka connector implemented for the technology you need.

If you provide more specifics about your use case I could offer more advice.

Thank you @seglo. I am new to Akka as well as Scala. I am not sure whether I am doing right or wrong, please correct me.

I am using fileUpload directive from akka-http. Uploading file as multipart.

fileUpload("file") {
          case (_, filedata) =>
            val (_, b) = filedata.preMaterialize()
            val req1 = request1(b)
            val req2 = request2(b)
            val res = Future.sequence(List(req1, req2))
            onComplete(res) {
              case Success(_) =>
                complete(StatusCodes.OK, "Successfully uploaded")
              case Failure(ex) =>
                 complete(StatusCodes.InternalServerError)
            }
        }

So here I want to pass the filedata source to two different requests.

Hi @scalaLearner. I see. fileData is an akka stream Source that must be materialized in order to process its contents. Assuming that request 1 and 2 do not accept a Source then you will likely want to process the file stream in some way and then send the output to the requests. For example, if your file was a text file and you wanted to send a list of its contents split by newline characters to your other services then you could pass the materialized value of the stream (as a Future[Seq[String]]) to your request objects, and then complete the HTTP request as you already included in your snippet.

        def request1(fut: Future[Seq[String]]): Future[Unit] = ???
        def request2(fut: Future[Seq[String]]): Future[Unit] = ???

        fileUpload("file") {
          case (_, filedata) =>
            val lines = filedata
              .via(Framing.delimiter(ByteString("\n"), 1024))
              .map(_.utf8String)
              .toMat(Sink.seq)(Keep.right)
              .run()

            val req1 = request1(lines)
            val req2 = request2(lines)
            val res = Future.sequence(List(req1, req2))
            onComplete(res) {
              case Success(_) =>
                complete(StatusCodes.OK, "Successfully uploaded")
              case Failure(_) =>
                complete(StatusCodes.InternalServerError)
            }
        }

Materialising the source keeps content in memory, instead is there anyway I can pass the source to two different http endpoints and then they run the stream at thier end.

You can use .alsoTo to branch out (or use the Graph DSL for even more advanced stream graphs: https://doc.akka.io/docs/akka/current/stream/stream-graphs.html).

Make sure to carefully think about how both failures and backpressure will affect your upload all the way to the client though.

The fileUploadAll directive which buffers the file on disk so may be worth checking out as it allows completing the upload from the client, and running the two uploads as separate streams getting more isolation between them.

2 Likes

Thanks @johanandren. I will try them.