Akka stream get last item received in interval


(Jiri) #1

I am receiving frequent updates and I want to forward the latest update each one second (not used updates are thrown away). I wrote test, it works on 50%, because it does not sends latest update and instead of it 1 second old update.

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.testkit.{ImplicitSender, TestKit}
import org.joda.time.DateTime
import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
import scala.concurrent.duration._

class ThrottlingTest
  extends TestKit(ActorSystem("ThrottlingTest"))
    with ImplicitSender
    with FunSuiteLike
    with BeforeAndAfterAll {

  private case class Wrapper(timestamp: DateTime, payload: Int)

  private implicit val materializer: ActorMaterializer = ActorMaterializer()

  override def afterAll: Unit = {
    TestKit.shutdownActorSystem(system)
  }

  private var id = 1
  private def create() = {
    val res = Wrapper(DateTime.now(), id)
    id += 1
    res
  }

  test("throttling test") {
    val queue = Source
      .queue[Wrapper](1, OverflowStrategy.dropHead)
      .throttle(1, 1.second)
      .toMat(
        Sink
          .foreach { o =>
            val now = DateTime.now()
            val diff = now.getMillis - o.timestamp.getMillis
            println(s"[$now] $o diff: $diff ms")
          })(Keep.left)
      .run()
    
    println(s"start ${DateTime.now()}")
    Range.apply(1, 1000).foreach { _ =>
      queue.offer(create())
      Thread.sleep(100)
    }

  }
}

Output looks like:
[2019-03-13T12:26:39.988+01:00] Wrapper(2019-03-13T12:26:38.895+01:00,19) diff: 1093 ms
[2019-03-13T12:26:40.989+01:00] Wrapper(2019-03-13T12:26:39.932+01:00,29) diff: 1057 ms
[2019-03-13T12:26:41.977+01:00] Wrapper(2019-03-13T12:26:40.968+01:00,39) diff: 1009 ms
[2019-03-13T12:26:42.988+01:00] Wrapper(2019-03-13T12:26:41.897+01:00,48) diff: 1091 ms
[2019-03-13T12:26:43.988+01:00] Wrapper(2019-03-13T12:26:42.937+01:00,58) diff: 1051 ms

Problem is that my code does not forward/send latest update. What I need is to see diff around 100ms. Any idea. Thank you.


(Johan Andrén) #2

Check out .conflate, it allows for summarizing/aggregating values (for example picking the last one) when downstream is slow: https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/conflate.html


(Jiri) #3

Thanks for reply, but it does not seems as what I need. If I add conflate nothing change, because I already dropping the first object. Problem is in throttling, which takes first object, process it and waits 1 second. What I need is to wait 1 second and then take a object.


(Patrik Nordwall) #4

I’m not sure but you can check out TimeWindow in akka-stream-contrib


(Jiri) #5

It is exactly what I want, thank you Patrik,