MergeHub is not cleaning up in the heap area

We are experiencing something weird on Akka streams implementation. When so many incoming messages are there(after creating a set of actors simultaneously), we get full GC issues. We have analyzed backend using MAT and VisualVM. It complains that we have lot of immutable scala lists(Top dominators seen in MAT) connected to BroadcasHub is remaining in the heap, which is taking much space. When actors are killed, still MergeHub, BroadcastHub and AbstractNodeQueue and not cleaning in the heap area. They are keeping growing more and more when we instantiate more actors. But Flow is getting cleared though under this situation also.

Play version: 2.7.3
Akka version: 2.6.18

Code for creating hub sink.

        Pair<Sink<JsonNode, NotUsed>, Source<JsonNode, NotUsed>> sinkSourcePair =
        MergeHub.of(JsonNode.class, 16).toMat(BroadcastHub.of(JsonNode.class, 256), Keep.both()).run(getContext().getSystem());

        this.hubSink = sinkSourcePair.first();
        Source<JsonNode, NotUsed> hubSource = sinkSourcePair.second();

        adminResponseQueue = hubSink.runWith(Source.queue(1, OverflowStrategy.dropHead()), getContext().getSystem());

        Sink<JsonNode, CompletionStage<Done>> jsonSink = Sink.foreach(this::messageReceivedFromClient); // this method accepts json messages

        this.clientFlow = Flow.fromSinkAndSourceCoupled(jsonSink, hubSource)
            .watchTermination((n, stage) -> {
                // When the flow shuts down, make sure this actor also stops.
                stage.whenComplete((__, error) -> {
                    if(error != null) {
                        logger.error("Sink completed with exception", error);
                    }
                    context().stop(self());
                });
                return NotUsed.getInstance();
            }
        );

We expect a possible kind answer here.
Can you give me any possible solution? @jrudolph

1 Like

Not sure if that explains your leak, maybe: You must not call context.stop from inside a future callback, it’s actor internal state so should not be touched from arbitrary threads. Instead pipe the future completion back to the actor as a message using pipeTo and make the actor stop when receiving that message.

1 Like

@johanandren Context stop call was there in Akka example repos also. Please refer here: play-java-websocket-example/UserActor.java at 2.7.x · playframework/play-java-websocket-example · GitHub

We use whenComplete() instead of thenAccept(). That is the only change.

And we have noticed same kind of issue has been raised here: [akka-actor_2.12 2.5.21] AbstractNodeQueue is not getting garbage collected even after multiple full GC · Issue #27255 · akka/akka · GitHub, but the scenario is bit different. Eventually they are also having the memory leak in heap.

Due to this issue, we have to restart server time to time to clear the GC, which is hectic. In our service, there are lot of actors running in real time around 500 per 1 service instance.

Is there any other way to create Sink and Source dynamically, without using MergeHub or BroadcastHub? Could you please let me know? I could not find it in the docs.

That is a bug in that sample, I’d recommend that you start with changing it into a stop message and see if the leak is still there after that.

That kind of dynamic fan-in/out is exactly what the merge/broadcast-hub are for, the alternative I can think of would be to make a lower level solution using Source.actorRef and Sink.actorRef and make the dynamic routing with regular actor messaging.

1 Like

Stop message in the sense? Could you please elaborate more @johanandren ?

I’ve added a PR to fix this in the 2.7.x branch.

1 Like

@wsargent @johanandren We have tried a different way. We have implemented the hub sink with kill switch attached to sink. Then while closing WS, we cleared the hub using that kill switch…

Pair<Pair<Sink<JsonNode, NotUsed>, UniqueKillSwitch>, Source<JsonNode, NotUsed>> sinkSourcePair =
                MergeHub.of(JsonNode.class, 16)
                        .viaMat(KillSwitches.single(), Keep.both())
                        .toMat(BroadcastHub.of(JsonNode.class, 256), Keep.both())
                        .run(getContext().getSystem());

        this.hubSink = sinkSourcePair.first().first();
        this.hubKillSwitch = sinkSourcePair.first().second();
        Source<JsonNode, NotUsed> hubSource = sinkSourcePair.second();
        Sink<JsonNode, CompletionStage<Done>> jsonSink = Sink.foreach(this::messageReceivedFromClient);
        this.clientFlow = Flow.fromSinkAndSourceCoupled(jsonSink, hubSource)
            .watchTermination((n, stage) -> {
                stage.whenComplete((__, error) -> {
                    if(error != null) {
                        logger.error("Sink completed with exception", error);
                    }
                    cleanUpHub(error);
                    context().stop(self());
                });
                return NotUsed.getInstance();
            }
        );
    }

    // cleanup method for hub
    protected void cleanUpHub(Throwable t) {
        try {
            if(t == null)
                this.hubKillSwitch.shutdown();
            else
                this.hubKillSwitch.abort(t);
        } catch(Exception ex) {
            logger.error("Error cleaning up", ex);
        }
    }

When we locally checked this using VisualVm heap dumps, we saw that heap is not getting filled as previous.

And this was deployed also. Currently it’s under monitoring. So far, we did not get any Full GC issues. We have checked production heap dumps also using MAT. The heap size itself has been drastically reduced! Previously it was more than 2GB(since service Xmx = 2GB, heap grows until that). But now the max heap size we saw is around 600MB! And now these heap dumps does not contain LocalActorRef / BroadcastHub / Scala.Immutable Lists objects which was causing the memory leak…

I will update after a monitoring of 1 week…

1 Like