MultiPart Form Data causing 'Substream Source cannot be materialized more than once' errors


#1

Hi there,

I am new to akka and akka http and have been running into a problem where when using a multi-form request, I’m getting the error ‘Substream Source cannot be materialized more than once’ when I execute my ‘upload’ route multiple times (calls after the first cause this error to happen).

After doing some searching online it seems like this is being caused by pulling out the formField “metadata” (which consumes the stream) and then trying to use the same multi-form data in storeUploadFile.

Does anyone know of a workaround or a better way to handle a multi-form request with an upload?

import java.io.File

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.StatusCodes.OK
import akka.http.scaladsl.server.Directives.{complete, path, storeUploadedFile}
import akka.http.scaladsl.server.directives.FileInfo
import akka.stream.ActorMaterializer
import com.google.common.io.Files.{getFileExtension, getNameWithoutExtension}
import com.google.inject.{AbstractModule, Guice}
import com.typesafe.config.{Config, ConfigFactory}
import javax.inject.{Inject, Singleton}
import net.codingwell.scalaguice.InjectorExtensions._
import net.codingwell.scalaguice.ScalaModule

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext}
import scala.util.{Failure, Success}

object FileStorerMain extends App {
  val actorSystem = ActorSystem()
  val injector = Guice.createInjector(new Module(actorSystem))
  val httpRouter = injector.instance[HttpServer]
  httpRouter.start()

  private def shutdown() = {
    httpRouter.stop()
    actorSystem.terminate()
  }

  sys.addShutdownHook(shutdown())
}

@Singleton
class HttpServer @Inject()(config: Config, private implicit val actorSystem: ActorSystem) {
  private implicit val materializer: ActorMaterializer = ActorMaterializer()
  private implicit val executionContext: ExecutionContext = actorSystem.dispatcher

  private val routes = {
    path("upload") {
      formField("metadata") { metadata =>
        storeUploadedFile("file", createTempFile) {
          case (_, file) => complete(OK)
        }
      }
    }
  }

  private val host = "localhost"
  private val port = 9008
  private val bindingFuture = Http().bindAndHandle(routes, host, port)
  bindingFuture.onComplete {
    case Success(serverBinding) => println(s"Server bound to ${serverBinding.localAddress}")
    case Failure(_) => println(s"Failed to bind to $host:$port")
  }

  private def createTempFile(fileInfo: FileInfo): File = {
    val filename = getNameWithoutExtension(fileInfo.fileName)
    val extension = getFileExtension(fileInfo.fileName)
    val file = File.createTempFile(s"$filename-", s".$extension")
    file
  }

  def start(): Unit = {
    Await.result(bindingFuture, Duration.Inf)
  }

  def stop(): Unit = {
    Await.result(bindingFuture.flatMap(_.unbind()), Duration.Inf)
  }
}

class Module(actorSystem: ActorSystem) extends AbstractModule with ScalaModule {
  override def configure(): Unit = {
    bind[ActorSystem].toInstance(actorSystem)
    bind[Config].toInstance(ConfigFactory.load().resolve())
  }
}

(Johan Andrén) #2

This is an unfortunate side effect of the HTTP request body being a stream, the way to solve it is to transform the body to a “strict” body (collect it entirely in memory) in a toStrictEntity route wrapping all the routes that consume the body. When the entity stream is in memory rather than being consumed directly from the network it can be consumed any number of times without problems.