OutOfMemoryError on a universal:packageBin application

Hi,

We used universal:packageBin to create deployables of our microservices. In one of our microservice that we deployed, we fed it with 1G xmx but we’re still getting OOME.

This microservice is pretty simple, it listens to 3 other microservices, then it does some filtering and aggregation, then generate outgoing events out of it. Our code does not keep things in memory like in a static field or a private field of a singleton service.

However, when I took at look at our heap dump via Eclipse’s Memory Analyzer Tool. It seems like the incoming DTO classes from the microservices it was listening too was getting accumulated really fast and not getting let go.

Any body has an idea as to what’s going on here?

And what is a safe memory allocation for a simple lagom microservice?

Thanks,
Franz

Since it looks like most of that memory is retained by object arrays, my first guess is that these objects are being accumulated into a list somewhere. It’s hard to say without seeing the code or knowing exactly what it’s doing. Can you use MAT to trace what’s retaining those arrays?

Thanks @TimMoore, here’s some more details

Basically, the BigDecimal is being held by the client’s incoming DTO class (the topic this microservice is subscribed in). And those are in turn held by the command class. Beyond that, I am not sure how to interpret.

Is some internal akka or lagom class holding on to my command classes? Or are these just a method’s local variable (which means they’ll be garbage collected after the thread executes those methods). But if they are just a method’s local variable, then why do I see to have so many commands (480 from the previous screenshot) firing all at the same time?

Thanks in advance!

Those are Akka internally queueing up messages to be processed. This could happen if you are sending commands to an entity much faster than they can be processed.

Thanks @TimMoore that confirms our suspicion. So how do we speed things up?

Btw, I did some more analysis of the heap dumps, and I wanted to see the number of instances of certain key objects . But to give an idea on the flow of our code, this is what’s happening (which i believe is a very typical lagom pattern).

We subscribe to a client, and are listening for a topic of class InputEvent. Then for each InputEvent, we create an instance of CommandA. Then the command handler for that just creates an OutputEvent.

When I took a look at the heap, at the time of OOME, there were only 10 instances of InputEvent. And there were 1073 instances of CommandA. And only 303 instances of OutputEvent.

That supports your theory that commands are not being processed as fast (or faster) than the rate at which InputEvents were coming in (because there were only 10 instances of InputEvent which suggests that they come in, then a CommandA gets created for them and queued, then the InputEvent gets garbage collected immediately. And then from the 1073 queued up CommandAs, only 303 where being processed (or only 303 were retained))

With that said, how can I tell Akka to process things faster?

I’d suggest using a profiler to find where the bottleneck is. My suspicion would be a slow database, but I’m just guessing. It’s also possible that your stream is written in a way that floods the entity with commands rather than back-pressuring the topic consumer. Can you post the code for your subscriber?

@TimMoore

The subscriber looks like this

  private void subscribeToClientService() {
    clientService.eventTopic()
        .subscribe().atLeastOnce(Flow.fromFunction((InputEvent inputEvent) -> {
            if (inputEvent instanceof InputEvent.Updated) {
                InputEvent.Updated updated = (InputEvent.Updated) inputEvent;
                Payload payload = updated.getPayload();
                PersistentEntityRef<MyCommand> ref = persistentEntityRegistry.refFor(MyEntity.class, payload.getName());
                ref.ask(new MyCommand.Update(payload));
            }

            return Done.getInstance();
    }));
  }

Note: the code is exactly like that except class and variable names have been renamed

Additional Info:
We tried some more testing, and looks like we are not hitting OOME anymore. One possible reason that we can think of is that we deployed our client microservice first and it has been up for a few days (and has been publishing events to kafka since then), before we deployed the 2nd microservice (the one that’s encountering OOME). So our guess is that when the 2nd microservice went up, there was a huge backpressure of events that came in all at the same time and choked that 2nd microservice.

Is that a possibility?

@franz from your code there will be no back pressure on the subsribe processing because you are not returning result of ask but always return Done.
Taking you do not have back pressure and there were many messages in kafka to process, that could be a main reason for OOM.

Also I suggest instancing PersistentEntityRef once and not per message.

@franz do be able to return ask to get a back pressure you will need to use mapAsync.

Thanks @aklikic

there will be no back pressure on the subsribe processing because you are not returning result of ask but always return Done.

Can you expound on this? ^ The code snippet that I posted is for the microservice that’s experiencing the OOME (i.e. let’s call this Microservice B). Why I think there is a back pressure because the microservice that’s producing the incoming messages (i.e. Microservice A) have been up for at least a day before Microservice B was deployed and started.

Microservice A - produces EventA. This was deployed and started up on Day 1
Microservice B - consumes EventA. This is the service that’s experiencing the OOME. This was deployed and started up on Day 2.

Which means, Microservice A has been producing EventA for about a day, and when Microsevice B started up, it received a day’s worth of events all at the same time.

Taking you do not have back pressure and there were many messages in kafka to process, that could be a main reason for OOM.

That’s unfortunate, because this isn’t the full load and yet we are already experiencing this. The messages are coming in one at a time. And the way we process the incoming messages is pretty straight forward -> incoming event -> to impl command -> impl event. There are no additional IO (i.e. database calls, rest calls, etc) nor any substantial processing (just setters and getters).

However, even with that, at the time of OOME, we can see only 10 incoming messages in the heap space (which seems to indicate that incoming processes are being handled pretty fast), 1000+ impl commands (which seems to indicate that they’re not being handled in a timely manner), and only about 300+ impl events (which seems to indicate that at most, only 300+ commands are handled at a time. most likely, only 1 command is handled at at time, which means only 1 impl event is created at a time, and the other impl events seen in the heap have already been used but at are just awaiting garbage collection. but since there was an OOME, that means the jvm couldnt collect these yet because something else was holding on to them)

In an ideal situation, for each each impl command created, it would be processed immediately before the next incoming message arrives. That way, we wont end up with 1000+ impl commands in the queue. So IMHO, either, (a.) we are not processing the commands fast enough, or (b.) there was a backpressure of incoming messages and our microservice choked up on 1000+ incoming messages arriving all at the same time. Im currently leaning towards (b.) but I am curious why you think there is no back pressure.

do be able to return ask to get a back pressure you will need to use mapAsync.

Im not sure I understand. Can you expound on this? ^

@franz examples in the doc are referring to synchronous processing of the events where returning Done is enough to get a back pressure but in your case you are doing an asynchronous processing (ref.ask) which CompletionStage needs to be returned.

private void subscribeToClientService() {
    clientService.eventTopic()
        .subscribe().atLeastOnce(Flow.<InputEvent>create().mapAsync(1,evt -> {
            if (inputEvent instanceof InputEvent.Updated) {
                InputEvent.Updated updated = (InputEvent.Updated) inputEvent;
                Payload payload = updated.getPayload();
                PersistentEntityRef<MyCommand> ref = persistentEntityRegistry.refFor(MyEntity.class, payload.getName());
                return ref.ask(new MyCommand.Update(payload))
                          .exceptionally(e->{
                              //I also suggest checking which exceptions require re-processing of the event (then throw exception) and which exceptions do not require re-processing of the event (then returning Done.getInstance()) 
                              return Done.getInstance();
                          });
            }else{
                //events that do not need to be processed just return Done
                return CompletableFuture.completedFuture(Done.getInstance());
            }
    }));
  }

Hope this helps.
Br,
Alan

@franz I might be wrong, but I think you’re using a different definition of “back pressure” than the rest of us.

You seem to be using it to mean your service is overloaded. We mean back pressure as the mechanism that does essentially what you desire - it provides “back” “pressure” to signal that the service can’t handle more data and the upstream needs to wait until signaled again for more data.

Hopefully this clarifies the advice you’ve received so far :slightly_smiling_face:

@jibbers42 Oh i see :) thanks :slight_smile:

@aklikic btw, why instantiate only one PersistentEntityRef? We create one entity depending on the message coming in or we update an existing one if it already exists. How do you recommend we address that? Thanks

@franz ignore my comment. I thought you were creating PersistantEntityRegistry every time but this is not true.
P.S. Did this solve your problem?

@aklikic Kinda yes and no.

That microservice that I posted originally (Microservice B) seems to be doing fine for now. But the next microservice that’s listening to that (Microservice C) now has an OOME as well.

I am not sure if this is just because Microservice C just got deployed and Microservice B has been producing a bunch of events ever since, and now Microservice C is now choking on all the pending events of Microservice B.

Thoughts?

@franz the code that I shared handles event by event from kafka topic.
So the flow should be like this (assuming you are persisting events on this command):
Kafka topic event -> command (ref.ask) -> entity event persist -> next kafka topic event -> ...
Here the back pressure is assured based on event persist confirmation (ref.ask will finish when persist is done). So the next kafka topic event will not be collected and processed until process of the first event is not finsihed.

In you code, by just returning Done, automatically the next event is being collected and processed. So you end up with multiple, parallel, ref.ask-s. There is nothing to ensure back pressure, and that should be a cause of OOM.

Can you confirm that, when you have updated you code with suggested one, events are handled one by one?
If so, and you are still experiencing OOM, then the problem is somewhere else. But first be sure you got this part right.

Thanks @aklikic. Let me digest your reply and try the sample code. I’ll update this thread once im done. Thanks!

@aklikic @TimMoore

Just an update: The recommended fix by @aklikic works :slight_smile:

However, I still do get an OOME for a different reason. when I see a lot of stacktraces in my logs with the error of NoHostAvailableException due to cassandra OperationTimedOutException, then i start getting another OOME. If I get that NoHostAvailableException every now and then, the microservice doesnt crash with OOME. But if I keep getting it one after the other, it dies.

We’re currently playing around with the parameters

lagom.broker.kafka {
  batching-size = 1
  batching-interval = 1 seconds
}

So that it wont choke up. But that would be for a different forum post :slight_smile:

Thanks everyone !:slight_smile: