Substream Source has not been materialized in ... with splitWhen

streams

(Daniel Spasojevic) #1

Hi,

I’ve been seeing this error akka.stream.impl.SubscriptionTimeoutException: Substream Source has not been materialized in when (I think) using splitWhen in a pretty heavily loaded machine. I haven’t been able to reproduce it in a test. This makes me think that we aren’t leaking substreams, but they are just not being created in time.

The splitWhen implementation doesn’t honour the StreamSubscriptionTimeoutTerminationMode setting - only the timeout, so we can’t just turn the error into a warning.

To reproduce, I’ve tried placing sleeps in the stages of the substream that I control - e.g. the via and map steps. This hasn’t caused the error to occur.

The bit of code looks like:

  private def fileProcessing(): Flow[Path, LoaderOutcome, NotUsed] = Flow[Path]
    .via(startUnitOfWork)
    .flatMapConcat { file =>
      val documents = ArchiveStreamSource(() => new ArchiveInputStream(new FileInputStream(file.toFile)))
        .filter(data => data.entry.name.lastOption.exists(name => name.endsWith(".xml")))
        .splitWhen(_.isNewEntry)
        .map(_.data).alsoTo(ping(100))
        .via(documentsSplitter)
        .via(documentProcessing).alsoTo(pingAndLog())
        .concatSubstreams
        .alsoTo(archiveHandler(file))
        .recover {
          case NonFatal(t) =>
            Errored
        }

      documents
    }

Does the timeout occur when the first element is not pulled through the split stage within the timeout? I’m hoping that there is an issue with this stream and we don’t need to just increase the timeout to a totally safe value.

Thanks,
-Dan