Websocket client add element to source dynamically

Hi all,
i would like to keep multiple websocket client connections, and for each connection i will later send message, what would be the right way of implementing it?
i have actors sharded, and in each of them, i have a websocket connection, like bellow

val request = WebSocketRequest()
val websocketFlow = Http()(classicSystem).webSocketClientFlow(request)
val (((queue, upgradeResponse)), closed) =
      Source.queue[TextMessage](bufferSize = 10, OverflowStrategy.backpressure)
        .viaMat(websocketFlow)(Keep.both)
        .toMat(incoming)(Keep.both)
        .run()(materializer)

And i am using later

queue.offer()

to send new message
It works when there is one connection, however whenever i start a new connection (on another actor sharded), the first actor’s queue just closes

Hi @brabo-hi,

not clear why that would happen. Do you share anything between those actors? Maybe the incoming stream part?

Johannes

Hello @jrudolph

They don’t share anything. They are represented as a persistent actor.
I have a lagom service. For each request, I send a message to a persistent actor shared.

def entityRef(entityId: String): EntityRef[PeerEntity.Command] = clusterSharding.entityRefFor(PeerEntity.typeKey, entityId)

Inside each actor I start a websocket connection to an external websocket server
The issue I am having is :
The websocket is alive only in 1 actor.
When I receive another request and forward it to a new actor (that will start it own) websocket connection, the first actor drops it connection.
Which means at any give time I only have one actor with one websoket connection