Akka Streams takeWhile processing elements even after condition fails

EDIT: I think this may be a bug, so i have reported this as a bug as well.

I have a very simple actor that just prints the number :-


public class PrintLineActor extends AbstractLoggingActor {

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(Integer.class, i -> {
          System.out.println("Processing: " + i);
          sender().tell(i, self());
        }).build();
  }
}

Now, I have a stream to print the even numbers until I encounter an odd element:-

@Test
public void streamsTest() throws Exception {

ActorSystem system = ActorSystem.create("testSystem");
ActorRef printActor = system.actorOf(Props.create(PrintLineActor.class));

Integer[] intArray = new Integer[]{2,4,6,8,9,10,12};
CompletionStage<List<Integer>> result = Source.from(Arrays.asList(intArray))
    .ask(1, printActor, Integer.class, Timeout.apply(10, TimeUnit.SECONDS))
    .takeWhile(i -> i != 9)
    .runWith(Sink.seq(), ActorMaterializer.create(system));

List<Integer> result1 = result.toCompletableFuture().get();
System.out.println("Result :- ");
result1.forEach(System.out::println);

}

I do NOT expect any element after 9 being processed aka being sent to actor. However, I see the number “10” also being processed by actor (but not 12) as seen in below output


Processing: 2
Processing: 4
Processing: 6
Processing: 8
Processing: 9
Processing: 10 //WHY IS THIS BEING PROCESSED BY ACTOR??

Result :- 
2
4
6
8
Why is 10 being processed by actor? How to stop this?

I have tried adding delays and debugging by recording timestamp of events, just to see if 10 is being processed before 9 actually completes, but no, 10 is taken after 9 is processed fully. here are the logs :-

Before Ask: 2 in 1596035906509
Processing inside Actor: 2 at 1596035906509
Inside TakeWhile 2 at  in 1596035906509

Before Ask: 4 in 1596035906609
Processing inside Actor: 4 at 1596035906610
Inside TakeWhile 4 at  in 1596035906610

Before Ask: 6 in 1596035906712
Processing inside Actor: 6 at 1596035906712
Inside TakeWhile 6 at  in 1596035906712

Before Ask: 8 in 1596035906814
Processing inside Actor: 8 at 1596035906814
Inside TakeWhile 8 at  in 1596035906815

Before Ask: 9 in 1596035906915
Processing inside Actor: 9 at 1596035906915
Inside TakeWhile 9 at  in 1596035906916

Before Ask: 10 in 1596035907017 //so 10 is taken much after the 9 is processed fully
Processing inside Actor: 10 at 1596035907017

Result :- 
2
4
6
8

Also, if i replace the .ask with a direct .map(print…), then 10 does not get printed. So why this happens when actor.ask is involved is very strange to me.

.ask in a stream will pull from upstream and send the next element to the actor asynchronously as soon as the previous element went downstream, this is as designed (it uses mapAsync internally which behaves like this).

map on the other hand does not do anything asynchronously and will only ask for a new element from upstream once it has seen demand from downstream.

The only thing you can know for certain with a takeWhile is that it will not emit any elements downstream and stop requesting elements from upstream once the predicate returns false, not what the other operators upstream of it may do.

Thanks for your response.
Is there a way to stop this element from flowing to the actor? I need to stop further elements from being processed by actor if the actor returns an unfavorable result for previous elements

You’d need both the act of sending to actor and the deciding if the response be completely sequential and share a toggle state.

I can’t think of an obvious way to do that with existing stream operators, you could write a custom graph stage that does it or put an actor inbetween the stream and the actual actor that does something along these lines: forward only one request at a time, evaluate the response and never sends another request to the actor once the predicate has returned false, it’d need to encode that it stopped evaluating elements in the responses so that the stream can complete.

link to bug report?

@SethTisue https://github.com/akka/akka/issues/29441 (closed because of what is explained in this thread)

1 Like