Instrument Source.queue

:wave: everybody!

I’d like to gauge the saturation of the buffer maintained by the Source returned by Source.queue. Is there any known pattern (or out-of-the-box component) to achieve this?

I’m under the impression that this is not really possible and that the cleanest way to do this (without re-implementing the QueueSource graph stage) is to use Source.actorRef with bufferSize = 0 and maintain (and gauge) the queue inside it.

Going into another direction, my first attempt (that at least seems to compile) would be to preMaterialize the Source, wrap the materialized value into an implementation of SourceQueueWithComplete that:

  1. increments a (volatile) counter when offer is invoked
  2. delegates everything else to the wrapped SourceQueueWithComplete
  3. exposes an onDeque method that decrements the counter at point 1

and then maps the pre-materialized source to the wrapped SourceQueueWithComplete and appends a map that calls onDeque from the wrapper described above (I’ve been advised to not use wireTap as it may not be called in certain occasions).

Does it make sense?

In code:

object InstrumentedSource {

  final class InstrumentedSourceQueueWithComplete[T](
      delegate: SourceQueueWithComplete[T],
      bufferSize: Int,
  )(implicit executionContext: ExecutionContext)
      extends SourceQueueWithComplete[T] {

    override def complete(): Unit = delegate.complete()

    override def fail(ex: Throwable): Unit = delegate.fail(ex)

    override def watchCompletion(): Future[Done] = delegate.watchCompletion()

    private val buffered = new AtomicLong(0)

    private[InstrumentedSource] def onDequeue(): Unit = {
      val _ = buffered.decrementAndGet()
    }

    object BufferSaturationRatioGauge extends RatioGauge {
      override def getRatio: RatioGauge.Ratio = RatioGauge.Ratio.of(buffered.get(), bufferSize)
    }

    lazy val bufferSaturationGauge: RatioGauge = BufferSaturationRatioGauge

    override def offer(elem: T): Future[QueueOfferResult] = {
      val result = delegate.offer(elem)
      result.foreach {
        case QueueOfferResult.Enqueued =>
          val _ = buffered.incrementAndGet()
        case _ => // do nothing
      }
      result
    }
  }

  def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy)(
      implicit executionContext: ExecutionContext,
      materializer: Materializer,
  ): Source[T, InstrumentedSourceQueueWithComplete[T]] = {
    val (queue, source) = Source.queue[T](bufferSize, overflowStrategy).preMaterialize()
    val instrumentedQueue = new InstrumentedSourceQueueWithComplete[T](queue, bufferSize)
    source.mapMaterializedValue(_ => instrumentedQueue).map { item =>
      instrumentedQueue.onDequeue()
      item
    }
  }

}

One issue I noticed about the solution above is that buffered can at most be eventually consistent with the actual number of items in the queue, but that’s probably fine.

I think the solution very much depends on the problem that needs to be solved. Not exactly what you asked for, but one solution I could think of would be to have an actor keep track of the buffer size (count).

Since you are dealing with Flows, you could tell that actor to increment the count when it is added to the Queue, and then tell it to decrement when the element has left the buffer for processing. The actor can then support other messages (eg. asking for the current ratio, waiting for a certain ratio using ask, etc) that can be used as building blocks for other application logic.

Whether the asynchronous nature of such a solution works for you would of course depend on the rest of the architecture.

I use a similar pattern a lot in situations where I want to limit an asynchronous operation to a certain maximum parallelism, where the initiation and completion of the asynchronous operation are observed in separate places. The actor then coordinates the number of available “tokens”.

1 Like