How to consume Server Sent Events in play using akka without losing data

I want to consume SSE events without losing any data when the rate of production is > rate of consumption. Since SSE supports backpressure Akka should be able to do it. I tried a few different ways but the extra messages are being dropped.

This code works but with the following problems:

  1. Due to .take(5) when rate of consumption < rate of production, I am dropping events.
  2. Also I want to process each message as it comes, and don’t want to wait until 5 messages have reached. How can I do that ?
  3. I have to write the consumer in a while loop. This does not seem event based, rather looks like polling (very similar to calling GET with pagination and limit of 5)
  4. I am not sure about throttling, tried reading the docs but its very confusing. If I don’t want to lose any events, is throttling the right approach? I am expecting a rate of 5000 req / sec in peak hours and 10 req/sec otherwise. When the production rate is high I would I ideally want to apply backpressure. Is throttling the correct approach for that ? According to docs it seems correct as it says Backpressures when downstream backpressures or the incoming rate is higher than the speed limit

@Singleton
class SseConsumer @Inject()()(implicit ec: ExecutionContext) {

  implicit val system = ActorSystem()

  val send: HttpRequest => Future[HttpResponse] = foo

  def foo(x: HttpRequest) = {
    try {
      val authHeader = Authorization(BasicHttpCredentials("user", "pass"))
      val newHeaders = x.withHeaders(authHeader)
      Http().singleRequest(newHeaders)
    } catch {
      case e: Exception => {
        println("Exceptio12n", e.printStackTrace())
        throw e
      }
    }
  }


  val eventSource2: Source[ServerSentEvent, NotUsed] =
    EventSource(
      uri = Uri("https://xyz/a/events/user"),
      send,
      initialLastEventId = Some("2"),
      retryDelay = 1.second
    )


  def orderStatusEventStable() = {
    val events: Future[immutable.Seq[ServerSentEvent]] =
      eventSource2
        .throttle(elements = 1, per = 3000.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
        .take(5)
        .runWith(Sink.seq)
    events.map(_.foreach(x => {
      // TODO: push to sqs
      println("456")
      println(x.data)
    }))
  }


  Future {
    blocking {
      while (true) {
        try {
          Await.result(orderStatusEventStable() recover {
            case e: Exception => {
              println("exception", e)
              throw e
            }
          }, Duration.Inf)
        } catch {
          case e: Exception => {
            println("Exception", e.printStackTrace())
          }
        }
      }
    }
  }
}