Stream data into a PersistentEntity

My Lagom service API includes a streamed request. I want the PersistentEntity to persist the stream content as a single event. I’d rather not process the stream and create a big command that has to be serialized between persistent entity shard nodes and such. I’d rather send a command with an Akka streams StreamRef. The result of processing the stream is a Future[Seq[_]]. How can I get that future sequence into an event for the thenPersist payload?

case class CmdWithSrcRef(srcRef: SourceRef[DataItem])

.onCommand[CmdWithSrcRef,SrcProcessedEvent] {
   case (CmdWithSrcRef(srcRef), ctx, state) =>
   
   val futureEvent =  srcRef.source.runWith(Sink.seq).map(seq => SrcProcessedEvent(items = seq.toList))

  ctx.thenPersist( ??? )(ctx.reply)
}

Hi @PkPwc,

this sounds like your command is rather large and you are just trying to avoid an in-memory copy on the node where the ServiceImpl is running.
I think you will still have the problem in the node where the Persistent Entity is running.

Then, my next suspicion is: what will you do with the DataItem (assuming that’s the big object) once the command in the Persistent Entity is processed? Will you produce a huge event? I’d highly discourage that. If, OTOH, you’ll do a call to a 3rd party to store the huge blob in a side-band storage, I’d discourage that too since Persistent Entities should better not interact with 3rd party services.

I think the meaning of processing the stream is important for the solution you are trying to implement.

Cheers,

There can be a large number of data items on the stream. When we used strict messaging we experienced some instances of message too large exceptions on the HTTP request. We changed to a streamed API and created a large command from the streamed items and we got message too large exceptions getting the command to the sharded persistent entity. We’ve batched items off the streamed request and created multiple commands and that works well enough. Since we’re presented a stream though and there’s no benefit to processing the stream before it hits the entity I’d rather just pass the stream ref along.

In the entity we don’t really ‘process’ the items. All we do is create and persist one large event from the stream of items. That event is never used outside the persistent entity. In the service impl it’s translated to make an API event that includes only a count of items rather than the list of items. The service API only provides for paged retrieval so once the items are persisted we don’t really have to worry about message sizes. We also have an API with a streamed response if a service consumer wants to process them as a stream.