Merging message before processing


(Yarichard) #1

Hi eveyone,
I’m trying to create an akka actor based architecture, and I need some advices.
The messages I want to manage contain some data, related to a connected object state.

As processing the messages can lead to long time process,I want all the messages concerning the same object to be routed to the same actor/routee (easy), but I’d like merge the content with unprocessed messages, in order to keep only one message in mailbox queue.
Ex: Msg2(ObjId: 1, state:{temperature:10°, speed: 12 mph})

Currentlly unprocessed message in queue:
Msg1(ObjId: 1, state:{speed: 19 mph})

I’d like to merge and create only one message
Msg2(ObjId: 1, state:{temperature:10°, speed: 19 mph})
that will be processed by actor.

How can this be done ? By customizing a mailbox ? Is it an anti pattern for Akka framework ?
Thanks in advance,
Yann


(Muthu Jayakumar) #2

Some thought Yann…

  1. If Msg1 and Msg2 come in out-of-order fashion, then you’ll need something to correlate Msg1 and Msg2. Right?
  2. If the number of messages of kind Msg1 & Msg2 can reside in Java Heap, perhaps you can use FSM to correlate them.
  3. If not, then you’ll have to externalize those messages and correlate them during the arrival of Msg1 and/or Msg2. — This can be done elagently Akka streams

Thanks,
Muthu


(Johan Andrén) #3

I’d strongly recommend against going down the path of custom mailboxes.

If you want to stay in actors making an aggregating actor and then having the “actual” actor pull from it using messages (HeyGiveMeAnotherOne, OkHereYouGo(potentiallyAggregated)).

It could be easier to do using a stream rather than raw actors, given that you are willing to pay the price to learn streams. Source.actorRef gives you an ActorRef for sending messages to, which then end up inside of the stream, you can then use for example Flow.conflateWithSeed or Flow.batch to do the aggregating and a Sink.actorRefWithAck or Sink.queue to pull the results into an actor again.


(Yarichard) #4

Hi Johanandren,
thanks for your answer. I followed the Akka stream way, and I didn’t manage to get something work as I would expect.
Below is a test I wrote:

final ActorSystem system = ActorSystem.create("pulpo-connector");

    Materializer mat = ActorMaterializer.create(
            ActorMaterializerSettings.create(system)
                    .withInputBuffer(1024, 1024), system);

    int bufferSize = 1;
    int elementsToProcess = 1;

    Flow<Integer, String, NotUsed> flow = Flow.of(Integer.class)
            .map(x -> "Coucou " + x);

    SourceQueueWithComplete<Integer> sourceQueue =
            Source.<Integer>queue(bufferSize, OverflowStrategy.backpressure())
                    .conflate((currentMsg, newMsg) -> {
                      System.out.println("Conflate Src => " + currentMsg + " " +  newMsg);
                      return currentMsg;
                    })
                    //.throttle(elementsToProcess, Duration.ofSeconds(1))
                    //.map(x -> x)
                    .via(flow)
                    .to(Sink.foreach(x -> {
                      System.out.println("ACTOR PROCESS: " + x);
                      Thread.sleep(5000);
                    }))
                    .run(mat);

    int index = 0;
    for(int i=0; i< 1000; i++){
      sourceQueue.offer(i);
      System.out.println("GEN: " + i);
      try {
        Thread.sleep(500);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

In this exemple, the source queue simulates the messages that are coming from a server.
The long time processing code would be somewhere in the Sink (or maybe somewhere else if I didn’t understand ?)
I tried to use the conflate method to merge my messages, in order to get only one available for consumption.
The problem I have is that conflate is called only if I’m restricting the source flow by using the throttle method (which I don’t want to do).
Is there any other way to use the conflate method I didn’t get ?
Thanks in advance,
Yann


(Johan Andrén) #5

Never put a Thread.sleep in a stream (or an Actor), the stream stages are fused into a single actor so you will effectively stop the stream from doing anything during that sleep.

In the end you’d put an Sink.actorRefWithAck and have your actor that should process the results send back ack messages when it is done with processing an element to get the next element from the stream. This turns the actor being busy into back pressure which will trigger conflate to start aggregating.