Akka Stream running forever - memory leak

Hi All,

I have a akka stream , which basically listen to Rabbitmq and process the message. So i made the stream is running for-ever. But when i watched closely, its leading to memory leak. Below is the snippet

// Stream Run

  • val amqpMessageGraph = amqpSourceContext.via(messageProcessFlow).via(amqpSinkAsFlow).asSource.via(dbDocumentFlow).via(insertAndAckFlow).to(Sink.ignore)*

  • val withCustomSupervision = *
    amqpMessageGraph.withAttributes(ActorAttributes.supervisionStrategy(decider))

  • withCustomSupervision.run()*

  1. Is it really necessary to stop the stream when there is no flow of data for x amount of time?
  2. By improve anything in the stream flow, i can avoid the memory leak ?

Thanks,
Iyappan S

What are the type objects you are seeing leaking over time?

The stream will occupy space in memory as long as it is running, also when no elements pass through it, but it should not occupy a growing amount of the heap.