About NoMoreElementsNeeded exception

Hi, after recent akka version update, I start seeing this error akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$ .

Looking at Akka 2.9.0 - akka.stream.Attributes.CancellationStrategy, it seems to be a new behavior for 2.6.x. The default is PropagateFailure , which is not configurable, has this documentation:

Strategy that treats cancelStage in different ways depending on the cause that was given to the cancellation.
If the cause was a regular, active cancellation (SubscriptionWithCancelException.NoMoreElementsNeeded), the stage receiving this cancellation is completed regularly.
If another cause was given, this is treated as an error and the behavior is the same as with failStage.

So it seems it shouldn’t raise this error to user level. What am I missing? I have tried with 2.6.10, 2.6.15 etc after seeing this akka/akka#30071 but it doesn’t help in my case.

My code look like this:

      S3.download(bucket = bucket, key = key)
      .collect {
        case Some((byteStringSource, _)) => byteStringSource
      }
      .flatMapConcat(identity)
      .via(flow)
      .grouped(10000)
      .mapAsyncUnordered(parallelism = 2) { items =>
        doSomething(items)
      }
      .runWith(Sink.ignore)

It always runs fine on my local but fails with the above exception on production (in AWS + k8s). With the same input, it fails at different time. Sometime it fails at 90% stream completion, sometimes 99%.

Am I missing something? How should I handle this error?

1 Like

That stream itself shouldn’t lead to a cancellation, Sink.ignore consumes until it sees the end of the stream, and none of the other used operators shown cancels upstream. This should mean it is either something in your flow or an issue in the S3.download internals.

Where/how are you observing the cancellation exception, logged by internals or by your own code (a custom graph stage for example)?

Hello,
I have same issue. File I’m downloading is new line delimited JSON of about 100Mb size.

  implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
  implicit val typedSystem: ActorSystem[_] = ActorSystem(Behaviors.empty[Any], "Service")

  def upsertSleep(suggestions: Seq[ByteString]): Future[Done] = {
    Future {
      Thread.sleep(1000)
    }.map(_ => Done)
      .andThen {
        case Success(res) => logger.info(s"Inserted batch $res")
        case Failure(exception) => logger.error("Could not insert batch", exception)
      }
  }

  S3.download("bucketName", "fileName")
    .collect {
      case Some((stream, metadata)) =>
        logger.info(s"Downloading file of size ${metadata.contentLength}")
        stream.log("collect")
    }
    .flatMapConcat(identity)
    .via(Framing.delimiter(ByteString("\n"), 100000, allowTruncation = true))
    .grouped(1000)
    .mapAsync[Done](1)(upsertSleep)
    .runWith(Sink.ignore)

Logs are:

2021-09-21 12:24:20.326 [Service-akka.actor.default-dispatcher-5] INFO  c.a.c.m.a.Runner$ - Downloading file of size 113906841
...
...
...
2021-09-21 12:26:59.404 [pool-1-thread-1] INFO  c.a.c.m.a.Runner$ - Inserted batch Done
2021-09-21 12:27:00.407 [pool-1-thread-1] INFO  c.a.c.m.a.Runner$ - Inserted batch Done
2021-09-21 12:27:01.409 [pool-1-thread-1] INFO  c.a.c.m.a.Runner$ - Inserted batch Done
2021-09-21 12:27:02.414 [pool-1-thread-1] INFO  c.a.c.m.a.Runner$ - Inserted batch Done
2021-09-21 12:27:03.417 [pool-1-thread-1] INFO  c.a.c.m.a.Runner$ - Inserted batch Done
2021-09-21 12:27:04.131 [Service-akka.actor.default-dispatcher-16] ERROR a.s.Materializer - [collect] Upstream failed.
akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$: null

Lib versions:

val alpakkaS3Lib = "com.lightbend.akka" %% "akka-stream-alpakka-s3" % "3.0.3"
val akkaHttpLib = "com.typesafe.akka" %% "akka-http" % "10.2.6"
val akkaActorLib = "com.typesafe.akka" %% "akka-actor" % "2.6.16"
val akkaActorTypedLib = "com.typesafe.akka" %% "akka-actor-typed" % "2.6.16"

We have noticed that this bug is is much more noticable on fast connections (~1Gbps). On slower it almost never happens.

@qaemma @thebiftek did you find a workaround?

@joroKr21 Hi, yes i did. Now i download large file split to small chunks (1mb) and download those one by one. This small substream is now materialized, which “fixed” the error entirely. I tried not materializing them and retry downloading, but it failed too much. So materializing it was best option.

I found another solution. As suggested in akka/akka-http#3201 increasing akka.http.client.stream-cancellation-delay to 1 second worked for me.

I tried that, but it didnt help at all no matter what delay i put in.

Getting this error and it only occurs in github actions which is likely due to it having a much faster network connection. As you said increasing it to 1 second help.

Does it make sense to increase the default akka.http.client.stream-cancellation-delay to 1 second since that appears to be a lot more reasonable?