About NoMoreElementsNeeded exception

Hi, after recent akka version update, I start seeing this error akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$ .

Looking at Akka 2.6.15 - akka.stream.Attributes.CancellationStrategy, it seems to be a new behavior for 2.6.x. The default is PropagateFailure , which is not configurable, has this documentation:

Strategy that treats cancelStage in different ways depending on the cause that was given to the cancellation.
If the cause was a regular, active cancellation (SubscriptionWithCancelException.NoMoreElementsNeeded), the stage receiving this cancellation is completed regularly.
If another cause was given, this is treated as an error and the behavior is the same as with failStage.

So it seems it shouldn’t raise this error to user level. What am I missing? I have tried with 2.6.10, 2.6.15 etc after seeing this akka/akka#30071 but it doesn’t help in my case.

My code look like this:

      S3.download(bucket = bucket, key = key)
      .collect {
        case Some((byteStringSource, _)) => byteStringSource
      }
      .flatMapConcat(identity)
      .via(flow)
      .grouped(10000)
      .mapAsyncUnordered(parallelism = 2) { items =>
        doSomething(items)
      }
      .runWith(Sink.ignore)

It always runs fine on my local but fails with the above exception on production (in AWS + k8s). With the same input, it fails at different time. Sometime it fails at 90% stream completion, sometimes 99%.

Am I missing something? How should I handle this error?

That stream itself shouldn’t lead to a cancellation, Sink.ignore consumes until it sees the end of the stream, and none of the other used operators shown cancels upstream. This should mean it is either something in your flow or an issue in the S3.download internals.

Where/how are you observing the cancellation exception, logged by internals or by your own code (a custom graph stage for example)?

Hello,
I have same issue. File I’m downloading is new line delimited JSON of about 100Mb size.

  implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
  implicit val typedSystem: ActorSystem[_] = ActorSystem(Behaviors.empty[Any], "Service")

  def upsertSleep(suggestions: Seq[ByteString]): Future[Done] = {
    Future {
      Thread.sleep(1000)
    }.map(_ => Done)
      .andThen {
        case Success(res) => logger.info(s"Inserted batch $res")
        case Failure(exception) => logger.error("Could not insert batch", exception)
      }
  }

  S3.download("bucketName", "fileName")
    .collect {
      case Some((stream, metadata)) =>
        logger.info(s"Downloading file of size ${metadata.contentLength}")
        stream.log("collect")
    }
    .flatMapConcat(identity)
    .via(Framing.delimiter(ByteString("\n"), 100000, allowTruncation = true))
    .grouped(1000)
    .mapAsync[Done](1)(upsertSleep)
    .runWith(Sink.ignore)

Logs are:

2021-09-21 12:24:20.326 [Service-akka.actor.default-dispatcher-5] INFO  c.a.c.m.a.Runner$ - Downloading file of size 113906841
...
...
...
2021-09-21 12:26:59.404 [pool-1-thread-1] INFO  c.a.c.m.a.Runner$ - Inserted batch Done
2021-09-21 12:27:00.407 [pool-1-thread-1] INFO  c.a.c.m.a.Runner$ - Inserted batch Done
2021-09-21 12:27:01.409 [pool-1-thread-1] INFO  c.a.c.m.a.Runner$ - Inserted batch Done
2021-09-21 12:27:02.414 [pool-1-thread-1] INFO  c.a.c.m.a.Runner$ - Inserted batch Done
2021-09-21 12:27:03.417 [pool-1-thread-1] INFO  c.a.c.m.a.Runner$ - Inserted batch Done
2021-09-21 12:27:04.131 [Service-akka.actor.default-dispatcher-16] ERROR a.s.Materializer - [collect] Upstream failed.
akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$: null

Lib versions:

val alpakkaS3Lib = "com.lightbend.akka" %% "akka-stream-alpakka-s3" % "3.0.3"
val akkaHttpLib = "com.typesafe.akka" %% "akka-http" % "10.2.6"
val akkaActorLib = "com.typesafe.akka" %% "akka-actor" % "2.6.16"
val akkaActorTypedLib = "com.typesafe.akka" %% "akka-actor-typed" % "2.6.16"