Akka actor Receivers stopped processing of sent messages

In our Java Spring application, we are using Akka messaging library to send and receive the messages.
In Message we are passing the object to be process.

This sending of objects to receivers happens frequently at every 5 minutes. We are having a cron job which fetches the list of objects to be process.
Then we are sending those objects to the receiver actors. After sometime, we are seeing that receivers are stopped processing the sending messages.

When we restart the application then only it pick the last sent messages and started processing.

Could you let us know why it stopped in between?

akka-version - 2.3.9
akka-actor_2.11-2.3.9.jar
akka-testkit_2.11-2.3.9.jar

we are using tell() method while sending the message
this.actorRef.tell(new Work(message), null);

we are using ask() method to get the response
this.ref.tell(new Work(message), null);

Hii @arunkumar1416,

It’s very hard to tell what is happening without seeing the code.

Also, you are using a very very old version of Akka which makes me think that this is an old application that used to work and now started to malfunction.

I suggest two things to start with:

  • try to track any recent changes that could have introduced a regression in the application.
  • add log statements in the receiving actors in an attempt to get insight on the cause

After you solve this issue, consider updating your application to the most recent Akka version. Akka 2.3.9 is a quite old version that is not maintained anymore.

Hi @octonato thanks for suggestion, will work on it.

Below is the code snippet.

public class WorkGroupClient <T>
{
    private final static Log log = LogFactory.getLog(WorkGroupClient.class);
    private final ActorRef ref;
    
    public WorkGroupClient(ActorRef ref)
    {
        this.ref = ref;
    }

    public void send(T message)
    {
        if (message!=null) {
            log.info("STEP: Send Akka message for class : "+message.getClass());
        }
        this.ref.tell(new Work(message), null);
    }
    
    public void sendOnCommit(final T message)
    {
        if (message!=null) {
            log.info("STEP: Send Akka message : "+message.getClass());
        }
        TxUtil.runOnCommit(new Runnable()
        {
            @Override
            public void run()
            {
                send(message);
            }
        });
    }
    
    public Object ask(T message, long timeout, TimeUnit unit)
    {
        if (message!=null) {
            log.info("STEP: ask message for class : "+message.getClass() ");
        }
        return AkkaUtil.ask(this.ref, new Work(message), timeout, unit);
    }
}


@Bean
    public ActorSystem actorSystem()
    {
        ConfigBuilder builder = new ConfigBuilder();
        Map<String,String> external = Maps.newHashMap();
        Iterator<String> keys = config.getKeys();
        builder.withValues(external);
        if( config.getBoolean(DEBUG_ALL, false) )
        {
            builder.withDebugAll();
        }
        Config config = builder.build();
        ActorSystem system = ActorSystem.create("myActorSystem", config);
        return system;
    }
    

public abstract class WorkGroupWorker <T> extends UntypedActor
{
    private final static Log log = LogFactory.getLog(WorkGroupWorker.class);

    private final FailureCounter COUNTER = new FailureCounter();
    private volatile T work;
    private final Class<T> type;
    private final String metric;
    
    private volatile boolean complete = false;
    
      @Override
    public final void onReceive(Object msg) throws Exception
    {
        log.info("STEP: onReceive method BEGIN");
        Scope scope = Profiler.start(this.metric);
        try
        {
            if(  msg instanceof Work )
            {
                log.info("STEP: msg is instance of Work");
                Work work = (Work) msg;
                this.work = this.type.cast(work.getTarget());
                log.info("STEP: work = "+work);
                workAssigned();
            } 
            else if ( work != null )
            {
                log.info("STEP: Executing onReceive elseIf for msg class = "+msg.getClass()+", msg = "+msg);
                onMessage(msg); 
            }
            else
            {
                log.info("STEP: Message is null");
                unhandled(msg);
            }
        }
        catch (Exception e)
        {
            log.error("error in processing " + this.metric + " : "  +msg, e);
            COUNTER.count(this.metric, e);
            throw e;
        }
        // treat as non-fatal, normally akka will shutdown with any thread error
        catch ( Error e)
        {
            log.error("critical error in processing " + this.metric + " : "  +msg, e);
            throw new CriticalWorkerFailure("critical error in processing " + this.metric + " : "  +msg, e);
        }
        finally
        {
           Profiler.stop(scope); 
        }
        log.info("STEP: onReceive method    End");
    }
    
    protected void onMessage(Object msg) 
    {
        log.info("STEP: onMessage method BEGIN");
        unhandled(msg);
        log.info("STEP: onMessage method END");
    }

    protected abstract void workAssigned() throws Exception;
    
    protected void workComplete()
    {
        log.info("STEP: workComplete method BEGIN");
        this.work = null;
        getContext().stop(getSelf());
        this.complete = true;
        log.info("STEP: workComplete method END");
    }
    
    public boolean isComplete()
    {
        return complete;
    }
    
    protected T getWork()
    {
        return work;
    }
}

Found below logs, I am seeing dead letters for some other jobs which uses same messaging system.

When I inspect logs, found some other scheduled jobs throwing dead letters.

INFO [com.test.akka.messaging.impl.SingletonWorkItem] [marketport-akka.actor.default-dispatcher-207] STEP: sender = Actor[akka://mysystem/deadLetters], work = com.test.report.service.messaging.PostCustomizationMessage@bd980dc

INFO [com.test.akka.messaging.impl.SingletonWorkItem] [marketport-akka.actor.default-dispatcher-207] STEP: sender = Actor[akka://mysystem/deadLetters], work = com.test.report.service.messaging.FileIndexChangeMessage@bd980dc

Will this stops from sending or receiving the messages?

I suspect that the dead-letter sender there is because you are using null as the sender in your tell. The STEP log is from your actor (AFAICT), so that indicates that no message is going to dead-letters (if you’re sending a reply to the sender, then you would be sending it to dead letters, but that’s a “you asked for it, you got it”).

I am seeing deadLetters for different message type. For my message type I am seeing the logs for sent. But I am not seeing the logs for actor receiving the messages.

After restarting the services I am seeing the messages are sent again and getting received to the receiver actors.

Will this happen when we sent more messages? I saw around 160 messages are sent. Will it handle this much messages?

I saw around 160 messages are sent. Will it handle this much messages?

160 messages are nothing for Akka. Akka is designed for extreme scale and you’ll see many examples of Akka handling millions and millions of messages or more.

I don’t really have an answer to why you are losing messages. Frankly you’re using such an ancient version of Akka that I’m not sure anyone can help you. For example, it sounds like you might be using durable mailboxes. But durable mailboxes haven’t even existed for a very, very long time.

But I, in principle, agree with Renato and Levi. With an application this old, you are going to have to focus on what has changed in your application/config. Trying to debug a 10+ year old version of Akka is going to be very difficult. [e.g. I was originally going to add a few details about how messages get “received” by Actors (i.e. that, when local, they don’t really get “received”, they are just directly inserted into the mailbox by the sender). But when I started suspecting that you might be using durable mailboxes I realized any advice I had would very likely be moot for such an old version of Akka.]