Akka Stream from Resource

With Akka Streams what’s the best way to describe a stream that handles a resource, like a file handle? How can I describe this?

def fromFile(f: File): Source[InputStream, NotUsed]

I need a generic method that could be applied to all resources, not interested in utilities from FileIO.

The actual use-case is that I need a way to convert a cats.effect.Resource into an Akka Stream.

UPDATE - I’ve also asked the question on the Gitter channel July 24, 2020 10:12 AM

Thanks,

1 Like

I am tempted to use Source.unfoldResourceAsync in a way that it doesn’t seem to have been meant to be used and I worry about resource safety.

import akka.stream.scaladsl.Source
import akka.{ Done, NotUsed }
import cats.effect.{ Resource, Sync }
import cats.implicits._
import monix.execution.atomic.{ Atomic, AtomicBoolean }
import scala.concurrent.Future

def fromResource[F[_]: Effect, A](r: Resource[F, A]): Source[A, NotUsed] = {
  val res: F[(A, AtomicBoolean, F[Unit])] = r.allocated.map {
    case (res, cancel) =>
      (res, Atomic(true), cancel)
  }
  Source.unfoldResourceAsync[A, (A, AtomicBoolean, F[Unit])](
    create = () => unsafeToFuture(res),
    read = {
      case (res, isActive, _) =>
        if (isActive.compareAndSet(expect = true, update = false))
          Future.successful(Some(res))
        else
          Future.successful(None)
    },
    close = {
      case (_, _, cancel) =>
        unsafeToFuture(cancel.as(Done))
    },
  )
}

Didn’t dig into what/how the monix API works there but I can’t see how the usage of unfoldResourceAsync would be a problem except for an extra allocation of a future for each element in the stream which could be a reason to aim for unfoldResource instead, it already runs on a separate threadpool so blocking in open and close could be ok there, not sure.

Important to note is that in general a source can be materialized any number of times, so if that code leads to a single shared atomic that could be a problem.

Unfortunately what I’m thinking of doesn’t work. Here’s a dirty code sample:

import akka.Done
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import java.util.concurrent.atomic._

// Note the Atomics here are just for boxing values, nothing more
val source = Source.unfoldResourceAsync[AtomicInteger, (AtomicInteger, AtomicBoolean)](
  create = () => Future((new AtomicInteger(1), new AtomicBoolean(true))),
  read = {
    case (ref, isActive) =>
      Future {
        if (isActive.compareAndSet(true, false))
          Some(ref)
        else
          None
      }
  },
  close = {
    case (ref, _) => Future { ref.set(0); Done }
  }
)

implicit val as = ActorSystem("test")

val result = source.flatMapConcat { number =>
  println(s"Received (1): ${number.get()}")
  Source.lazyFuture(() =>
    Future {
      Thread.sleep(2000)
      number.get()
    }
  )
}.runFold(-1)((_, num) => num)

val r = Await.result(result, 10.seconds)
println(s"Received (2): $r")

Await.result(as.terminate(), 10.seconds)

And the output:

Received (1): 1
Received (2): 0

(these values received should have been equal)

As I suspected, the close isn’t back-pressured and so the resource is closed sooner than I’d like. It makes some sense to behave like that, if there’s buffering, plus the “reactive streams” protocol doesn’t back-pressure the final complete, but I don’t think we can leak resources from unfoldResourceAsync like that.

Any other way I could make this work?

UPDATE: modified sample to use just the standard library and akka

Damn, I just realized that what I’m trying to do is incompatible with org.reactivestreams.Publisher.

If the resource is streamed and the Publisher must be in charge of closing it, the problem is there’s no way to force the Subscriber to just request(1). This means that the Publisher has no way of knowing when the Subscriber is done processing the streamed resource, if that processing is asynchronous, as the only back-pressuring mechanism is request(n).

This makes me so sad :cry:

Sample code: https://gist.github.com/alexandru/b258f67ab1e21d61d06dcfd6ec73557a

The other problem, this time with Akka Streams, seems to be that flatMapConcat does not back-pressure, doing request(16) on my source. Or maybe that’s from Source.fromPublisher :man_shrugging:

How could I force a request(1) from that Source.fromPublisher(source).flatMapConcat?

M’kay this is a related question about buffering:

Yes, you are correct, if you expect to be able to force a single element in flight in a stream at a given moment that is not really possible since there can be explicit buffers, implicit in stage buffers, fan out, cycles etc downstream.

In general this is not a problem because you use Akka streams to pass immutable elements, or if they are mutable, at least only emit the individual instances once per instance. Resources are managed and extracted from in a single stage/operator and the result of interacting with them is sent downstream (unfoldResource/unfoldResourceAsync for example).

If you want your source to not emit another element before the previous one has reached some other point in the stream you cannot do this through the built in backpressure but will need to create some custom channel back from there to your source and have it deal with both backpressure and the additional custom one-at-a-time backpressure. (Not saying this is a great idea though)

1 Like