Taking a function and turning it into a sink?

streams

(Mark Hammons) #1

I have a function of type Source[ByteString,_] => Future[Either[Fault,(String,Long)]]] that uses sttp to upload data. I have data as a Source[ByteString,_], and I have a pipeline. I’d like to use this function in my pipeline, but I don’t know how to turn it into a flow or a sink. Any ideas?

edit: to make it a bit more clear, I can easily turn this function into a Flow[Source[ByteString,_],T]. What I want is a Flow[ByteString,T] from that.


(Mark Hammons) #2

I found a solution:

  def uploadGraph(uuid: UUID,
                  hostURI: String,
                  datum: Datum)
                 (implicit actorSystem: ActorSystem,
                  materializer: Materializer,
                  ec: ExecutionContext) = {
    GraphDSL.create() { implicit b =>
      val client = Client(hostURI)

      val (q,source) = Source
        .queue[ByteString](5, OverflowStrategy.backpressure)
        .preMaterialize()

      val flow = Flow[ByteString]
        .fold[(Future[PTry[(String,Long)]], SourceQueueWithComplete[ByteString])](
          client.uploadData(uuid, source, datum.location) -> q
        ){
          case ((futureHash,queue), s) =>
            queue.offer(s).map{
              case Enqueued => futureHash
              case QueueClosed => Future.successful(
                PFailure(
                  QueuePrematurelyClosedFault(
                    "The stream queue was closed before it should've been."
                  )
                )
              )
            }.recover{
              case f => Future.successful(
                PFailure(
                  ExceptionFault(f)
                )
              )
            }.flatten -> queue
        }.map{
          case (hash, queue) =>
          queue.complete()
          hash
        }
      val r = b.add(flow)

      FlowShape(r.in, r.out)
    }
  }

(Mark Hammons) #3

The above does not work, because the queue doesn’t seem to block on backpressure, leading to me crashing it by submitting elements when it is full.


(Martynas Mickevičius) #4

The Future returned from queue.offer(...) will be completed only when the element has been successfully put to the queue. You could use that as a backpressure signal and replace fold with foldAsync.


(Mark Hammons) #5

i thought I had done that with map in queue.offer(s).map, but I can try again later.


(Mark Hammons) #6

i wanted to come back and say that I have reimplemented it with foldAsync in the following way:

    val client = Client(hostURI)

    val (q, source) = Source
      .queue[ByteString](16, OverflowStrategy.backpressure)
      .preMaterialize()

    val futureResult = client.uploadData(uuid, source, experimentFile)
    val flow: Flow[ByteString, PTry[String], _] = Flow[ByteString]
      .foldAsync[PTry[SourceQueueWithComplete[ByteString]]](
        PSuccess(q)
      ) {
        case (PSuccess(queue), s) =>
          queue
            .offer(s)
            .flatMap {
              case Enqueued => Future.successful(PSuccess(queue))
              case QueueClosed =>
                futureResult.map {
                  case PSuccess((_, _)) =>
                    PFailure(
                      QueuePrematurelyClosedFault(
                        "The stream queue was closed before it should've been."
                      )
                    )

                  case PFailure(f) =>
                    PFailure(
                      QueuePrematurelyClosedFault(
                        "The stream queue was closed before it should've been",
                        Some(f))
                    )

                }
              case QueueOfferResult.Failure(cause) =>
                queue.complete()
                Future.successful(
                  PFailure(
                    ExceptionFault(cause)
                  ))
              case Dropped =>
                queue.complete()
                Future.successful(
                  PFailure(
                    QueuePrematurelyClosedFault("elements were dropped!!")
                  ))
            }
            .recover {
              case f =>
                queue.complete()
                PFailure(
                  ExceptionFault(f)
                )
            }
        case (failure, _) =>
          Future.successful(failure)
      }
      .mapAsync(1) {
        case PSuccess(queue) =>
          queue.complete()
          futureResult
        case o =>
          val res = futureResult.map(r => o.flatMap(_ => r))
          res
      }

And it works as expected now. Thanks!