Does dev mode have a download limit?

configuration

(Mark Hammons) #1

I have some file download functionality in my play application, and the files in question can be very large. Lately, I decided to test it with a 1TB dataset, asking it to download it to me and piping the download to /dev/null. However, around the 250GB download point, the server resets the connection without any error or logging. Here’s what I got from a client using sttp:

[ERROR] [10/01/2018 13:08:30.770] [webservice-client-akka.stream.default-blocking-io-dispatcher-25] [akka://webservice-client/system/StreamSupervisor-0/flow-29-1-fileSink] Tearing down FileSink(/dev/null) due to upstream error (akka.stream.StreamTcpException: The connection closed with error: Connection reset by peer)

I’m stuck here. I cannot figure out why the server is resetting the connection and there’s no error serverside to indicate a problem. My only guess is that this is a limitation of the testing configuration, but I cannot be sure. Any ideas?


(Marcos Pereira) #2

Hi @markehammons,

No, there is not a hard limit to download files. But since you are seeing Connection reset by peer, it appears that the client itself is closing the connection. How are you testing the download? Using curl? Can you also share the code for the download action?

Best.


(Mark Hammons) #3

The client is sttp, using the akka http backend. I’ll copy the code in tomorrow when I’m back at work, but I’m using a custom stream element I made called a ZipFlow, which takes a series of paths to files, and zips the files together to the download client. I tested another dataset, and it’s still running and has been running for hours, so I’m thinking at this point there may be a flaw in the dataset i was downloading that is triggering some error serverside. however, that still leaves me puzzled as I’m pretty sure I made sure that my custom stage is failing properly and it should be logging errors, so I’m still at a loss for what’s happening.

here’s the code of the ZipFlow:

class ZipFlow(rootPath: String, chunkSize: Int)(
    implicit service: IRODSServiceActor,
    scheduler: Scheduler,
    ec: ExecutionContext)
    extends GraphStage[FlowShape[IRODSPath, ByteString]] {

  val in = Inlet[IRODSPath]("ZipFlow.in")
  val out = Outlet[ByteString]("ZipFlow.out")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with StageLogging {

      implicit val timeout: Timeout = new Timeout(3.minutes)
      val byteOutputStream = new ByteArrayOutputStream()
      val zipOutputStream = new ZipOutputStream(byteOutputStream)
      zipOutputStream.setLevel(0)
      //todo: add a bytecount to make certain the file is downloaded entirely
      private var fileIterable: Iterator[ByteString] = Iterator.empty
      private var upstreamDone = false

      val buffer = Array.ofDim[Byte](chunkSize)

      setHandler(
        out,
        new OutHandler {
          override def onPull(): Unit = {
            transmit()
          }
        }
      )

      def transmit(): Unit = {
        if (fileIterable.hasNext) {
          val bs = fileIterable.next()
          bs.copyToArray(buffer, 0, bs.size)
          zipOutputStream.write(buffer, 0, bs.size)
          push(out, ByteString.fromArray(byteOutputStream.toByteArray))
          byteOutputStream.reset()
        } else {
          log.info("done, closing...")
          zipOutputStream.closeEntry()
          if (!upstreamDone) {
            log.info("pulling new info")
            pull(in)
          } else {
            log.info("upstream done")
            zipOutputStream.close()
            emit(out,
                 ByteString.fromArray(byteOutputStream.toByteArray),
                 () => completeStage())
          }
        }
      }

      setHandler(
        in,
        new InHandler {
          override def onPush(): Unit = {
            val file = grab(in)
            log.info(s"got file $file")

            fileIterable = Await
              .result(PTryT(file.createReader(chunkSize, 2.minutes)).map { r =>
                r.iterable
              }.value, timeout.duration)
              .throwFault

            log.info(s"creating new zip entry $rootPath/${file.getName}")
            val zipEntry = new ZipEntry(s"$rootPath/${file.getName}")
            zipOutputStream.putNextEntry(zipEntry)
            log.info(s"zip entry ${zipEntry.getName} loaded")
            transmit()
          }

          override def onUpstreamFinish(): Unit = {
            upstreamDone = true
          }
        }
      )

    }
  override def shape: FlowShape[IRODSPath, ByteString] = FlowShape.of(in, out)
}

(Mark Hammons) #4

I looked through my code and determined that I was at fault. In a piece of code deeper in my implementation, there’s a point where I allow errors to be swallowed, which would create this sudden, unexplained end effect.