Taking a function and turning it into a sink?

I have a function of type Source[ByteString,_] => Future[Either[Fault,(String,Long)]]] that uses sttp to upload data. I have data as a Source[ByteString,_], and I have a pipeline. I’d like to use this function in my pipeline, but I don’t know how to turn it into a flow or a sink. Any ideas?

edit: to make it a bit more clear, I can easily turn this function into a Flow[Source[ByteString,_],T]. What I want is a Flow[ByteString,T] from that.

I found a solution:

  def uploadGraph(uuid: UUID,
                  hostURI: String,
                  datum: Datum)
                 (implicit actorSystem: ActorSystem,
                  materializer: Materializer,
                  ec: ExecutionContext) = {
    GraphDSL.create() { implicit b =>
      val client = Client(hostURI)

      val (q,source) = Source
        .queue[ByteString](5, OverflowStrategy.backpressure)
        .preMaterialize()

      val flow = Flow[ByteString]
        .fold[(Future[PTry[(String,Long)]], SourceQueueWithComplete[ByteString])](
          client.uploadData(uuid, source, datum.location) -> q
        ){
          case ((futureHash,queue), s) =>
            queue.offer(s).map{
              case Enqueued => futureHash
              case QueueClosed => Future.successful(
                PFailure(
                  QueuePrematurelyClosedFault(
                    "The stream queue was closed before it should've been."
                  )
                )
              )
            }.recover{
              case f => Future.successful(
                PFailure(
                  ExceptionFault(f)
                )
              )
            }.flatten -> queue
        }.map{
          case (hash, queue) =>
          queue.complete()
          hash
        }
      val r = b.add(flow)

      FlowShape(r.in, r.out)
    }
  }

The above does not work, because the queue doesn’t seem to block on backpressure, leading to me crashing it by submitting elements when it is full.

The Future returned from queue.offer(...) will be completed only when the element has been successfully put to the queue. You could use that as a backpressure signal and replace fold with foldAsync.

i thought I had done that with map in queue.offer(s).map, but I can try again later.

i wanted to come back and say that I have reimplemented it with foldAsync in the following way:

    val client = Client(hostURI)

    val (q, source) = Source
      .queue[ByteString](16, OverflowStrategy.backpressure)
      .preMaterialize()

    val futureResult = client.uploadData(uuid, source, experimentFile)
    val flow: Flow[ByteString, PTry[String], _] = Flow[ByteString]
      .foldAsync[PTry[SourceQueueWithComplete[ByteString]]](
        PSuccess(q)
      ) {
        case (PSuccess(queue), s) =>
          queue
            .offer(s)
            .flatMap {
              case Enqueued => Future.successful(PSuccess(queue))
              case QueueClosed =>
                futureResult.map {
                  case PSuccess((_, _)) =>
                    PFailure(
                      QueuePrematurelyClosedFault(
                        "The stream queue was closed before it should've been."
                      )
                    )

                  case PFailure(f) =>
                    PFailure(
                      QueuePrematurelyClosedFault(
                        "The stream queue was closed before it should've been",
                        Some(f))
                    )

                }
              case QueueOfferResult.Failure(cause) =>
                queue.complete()
                Future.successful(
                  PFailure(
                    ExceptionFault(cause)
                  ))
              case Dropped =>
                queue.complete()
                Future.successful(
                  PFailure(
                    QueuePrematurelyClosedFault("elements were dropped!!")
                  ))
            }
            .recover {
              case f =>
                queue.complete()
                PFailure(
                  ExceptionFault(f)
                )
            }
        case (failure, _) =>
          Future.successful(failure)
      }
      .mapAsync(1) {
        case PSuccess(queue) =>
          queue.complete()
          futureResult
        case o =>
          val res = futureResult.map(r => o.flatMap(_ => r))
          res
      }

And it works as expected now. Thanks!

1 Like