Collapsing a stream of updates/appends when the consumer is slow


(Oscar Broman) #1

I have a stream of updates going over a WebSocket, with updates similar to this:

Update("key1", "foo")
Update("key2", "bar")
Update("key1", "foo1")
Append("key3", "AA")
Append("key3", "BB")

If the previous messages are buffered, they can be reduced before being sent. As such:

Update("key2", "bar")
Update("key1", "foo1")
Append("key3", "AABB")

How can I go about collapsing the stream when the messages are buffered?


(Martynas Mickevičius) #2

Hi Oscar,

take a look at the conflate operator.


(Oscar Broman) #3

Hi,

Thanks! Using conflate, I was able to avoid overflowing the buffer.

I use a Map[String, String] for the updates, but that gives me another problem: I have no control about the size of that map.

Ideally, I want to send each element of the map while still being allowed to change the elements that have not been sent.

For example, if my consumer is consuming over the network then sending a huge map will not really fix the problem. If I could send small updates and if an update is produced, while waiting for the consumer, I want to update the buffer.

In summary:
Conflate allows me to chunk everything up, but I don’t want to send it chunked.

I realize that flattening after conflate may do what I just described. I’ll do some testing!


(Oscar Broman) #4

I couldn’t find any existing functions to do this, so I ended up creating my own GraphStage:

case class ConflateBatchBuffer[A, B](max: Int) extends GraphStage[FlowShape[(A, B), Map[A, B]]] {
  type In = (A, B)
  type Out = Map[A, B]

  val in = Inlet[In]("ConflateBatchBuffer.in")
  val out = Outlet[Out]("ConflateBatchBuffer.out")

  override val shape: FlowShape[In, Out] = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
    private val buf = mutable.Queue.empty[In]

    private def flush(): Unit = {
      val batch = (1 to max).flatMap(_ => if (buf.nonEmpty) Some(buf.dequeue) else None).toMap
      push(out, batch)
    }

    override def preStart(): Unit = {
      pull(in)
    }

    override def onPull(): Unit = {
      flush()
    }

    override def onPush(): Unit = {
      val el = grab(in)
      val existingIdx = buf.indexWhere(_._1 == el._1)

      pull(in)

      if (existingIdx == -1) {
        buf.enqueue(el)
      } else {
        buf(existingIdx) = el
      }

      if (isAvailable(out)) {
        flush()
      }
    }

    setHandlers(in, out, this)
  }
}