Hi All,
We have a data upload service which writes user uploaded csv files to hdfs .We are using alpakka lib (version - 1.1.1) to write to hdfs .
sample code -
def uploadfile() {
val hdfsSink: Sink[RotationMessage, scala.concurrent.Future[RotationMessage]] = Sink.head[RotationMessage]
val sizeSink: Sink[Array[Byte], Future[Int]] = Sink.fold(0)(_ + _.length)
val g = RunnableGraph.fromGraph(GraphDSL.create(hdfsSink,sizeSink)((_, _, )) { implicit builder =>
(hdfsSink,sizeSink) =>
import GraphDSL.Implicits.
val bcast = builder.add(Broadcast(ByteString))
byteSource ~> transformer.transformFlow(columnSep, decimalDelim) ~> bcast.in
bcast.out(0) ~> hdfsWriter.hdfsWriterFlow ~> hdfsSink
bcast.out(1) ~> hdfsWriter.fileSizeFlow ~> sizeSink
ClosedShape
})
val streamResult: (Future[RotationMessage], Future[Seq[Map[String, String]]], Future[Int]) = g.run()
streamResult
}
val hdfsFlow: Flow[HdfsWriteMessage[ByteString, NotUsed], RotationMessage, NotUsed] = HdfsFlow.data(fileSystem, SyncStrategy.none, RotationStrategy.size(hdfsConfiguration.fileSize, FileUnit.MB), settings())
val hdfsWriterFlow: Flow[ByteString, RotationMessage, NotUsed] = Flow[ByteString].map(byteStr => HdfsWriteMessage(byteStr)).via(hdfsFlow)
May i know how can I run hdfsFlow in parallel to speed up the writing part . I tried using mapAsync instead of map in hdfsWriterFlow but can see a bottleneck in hdfsflow .
Thanks.