Cannot get the rate of this stream to work right

I have created a pretty simple stream here:

    val source = Source.queue[(Int,Int,Int,Int,Int)](0, OverflowStrategy.dropHead).conflate((_,newest) => newest)
 
    val planeFlow = Flow.fromGraph(new PlaneFlow(file))
 
    source.viaMat(planeFlow)(Keep.left).toMat(new ImageSink(xDimension,yDimension,view))(Keep.left)

PlaneFlow is a graphStage that accesses a file and gives some intensity values from it. It can be very slow depending on its inputs. What I hope to have happen in this code is that if PlaneFlow is taking a long time, the most recent element given to the source queue is what it handles next. Nothing in between. What actually happens is that even though planeflow is taking a second or two to do its work, the inputs received during that time are being queued so instead of seeing 2 updates at most, I see 7. Can anyone help with this?

Just rephrase it to see if I’m understand well:
Fast source, slow sink. (the whole planeFlow with the actual sink can be “the slow sink”.)
When the sink requests an element you want to get the most recent element. (source.buffer(1, OverflowStrategy.dropBuffer) so far)

I had a bad time to reproduce this :smiley: My main problem was the implementation of the delay stage… That attribute magic may be what caused your problems too.

Full working test:

import akka.actor.ActorSystem
import akka.stream.Attributes.InputBuffer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes, DelayOverflowStrategy, OverflowStrategy}
import org.scalatest.{Matchers, WordSpecLike}

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}

class TestSpec extends WordSpecLike with Matchers {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val executionContext: ExecutionContext = system.dispatcher

  "test" in {
    val delayBuffer = InputBuffer(0,1)
    val fastSource = Source.repeat(1).delay(1.seconds, DelayOverflowStrategy.backpressure).take(10)
    val slowSink = Flow[Int].delay(5.seconds, DelayOverflowStrategy.backpressure).addAttributes(Attributes(delayBuffer :: Nil)).toMat(Sink.seq)(Keep.right)
    val result = fastSource.buffer(1, OverflowStrategy.dropBuffer).runWith(slowSink)
    Await.result(result, 45.seconds).size should be < 4 // the size will be 2 or 3
  }
}

Yes, I’m trying to get a slow sink to only receive the most recent input from a fast source.

Will this code work for what I’m trying to do?

I’m still unclear as to what I’m doing wrong here.

No idea neither. The test is green, and its exactly doing what you want to achieve. If you use delay like stages maybe those can use an inner InputBuffer which has bigger than 1 inner buffer. Otherwise I have no idea. Try to make the two code (your nonworking one and my green test) mix a bit, and see if that solve your issue or not. I would start with the addAttributes part.

If you have .async anywhere that can buffer elements too.

After copying alot of what you wrote, I’ve got it working. I guess I needed to add delay to the source to get it working right. Unfortunately, I still get one extra element buffered, but that’s better than what I had before.

Ehh, adding a delay sounds bad, I only add them to make the test like “real life”.

"test" in {
    val fastSource = Source.repeat(1).statefulMapConcat{() => {var i=0; {x => i+=1;List(x+i)}}}
    val slowSink = Flow[Int].initialDelay(2.seconds).take(1).toMat(Sink.head)(Keep.right)
    val result = fastSource.buffer(1, OverflowStrategy.dropBuffer).runWith(slowSink)
    Await.result(result, 4.seconds) should be > 10
  }

I could reprod this without the delay stage. (The initialDelay has no build in buffer.)