Transform a CSV file into multiple CSV files using Akka Stream


(Syed Ali) #1

Hi,

I am trying to figure out a solution using Akka Stream but after spending couple of days couldn’t figure out how to solve this particular problem.

Following is my problem:

Given a Source[String, NotUsed], following is an example of source:

abc,...,....
xyz,...,..
lmn,...,...
xyz,...,...
abc,...,...

Now we need to transfer given source to Source[Map(String, Seq[String), NotUsed], where key of the map is the first value of each line of CSV file. Once we have this we need to save the values of each key-value pair in separate file (actually it needs to save in the S3) where name of the needs to be in the following format:

<key>_<length_of_seq>_<random_generated_uuid>.csv

For example, name of one the file would be:

abc_2_d339f8f8-92b8-46f2-bdd0-31406e9c44da.csv

and its content would be:

abc,...,....
abc,...,....

Any suggestion how to approach this?

Regards,

Syed


(Gergő Törcsvári) #2

Hy!

This is not as trivial as it should be in an ideal world :frowning:

So you want:

  • parse the csv to some data representation
  • group the lines by the first field
  • write the groups to separate files csv
  • name the files with a naming convention

The problems are:

  • you cant write to a file inside a stream with a name given by the data (or at least its not easy)
  • if you want to name the file with the number of lines, then you either need to buffer all the lines to the memory (it kills the whole streaming idea), or you need to name the file after you finished the writing to it.

I would start with a hacky solution:

  • first stream (data processing):
  • after the stream finished I would start a new stream
  • second stream (file naming)
    • get the filenames from the dir that you used as an output dir from the prev stream as a Source[String]
    • open the file
    • get the first line’s first block and count the lines (statefulMapConcat could do this)
    • rename the files with the gathered information

The problem with this method is; when you recieve the “stream ended” signal in the first stream, the file descriptors are not necessarly released the files at the end of the stream, so the next phase not necessary sees the whole files… You need to wait a bit between the first stream end and the second stream start. (This is why I said its the hacky solution).

If you have this solution you could try a better one. The main problem in this scenario that we not know when the filesinks finished. You could ducktape a solution with a custom filesink which not a “sink” but a flow with the materialized value given downstreams when the upstream finished. If you could build this stage, then you could use the prebild second stream with a mergeSubstream and a map, and you could easily call the uploadToS3 at the end. This is “ducktape” bcs you need to place this class to the good package hierarhy to reach internal functions, but totally doable. Maybe worth a PR to the main library or to the alpakka file connector.


(Johan Andrén) #3

For the file naming one trick you could potentially do is to combine groupBy with lazy sink, it’s a bit messy and highlights a deprecated factory method with no replacement, but I think it should work:

case class Entry(group: String, field1: String, field2: String)
val sink: Sink[Entry, NotUsed] = Flow[Entry]
  .groupBy(200, entry => entry.group)
  .to(Sink.lazyInit(first =>
    Future.successful(
      Flow[Entry].map(entry =>
        ByteString(entry.toString) // make bytes out of it
      ).to(FileIO.toPath(Paths.get("/somewhere", first.group)))
    ),
    () => Future.failed[IOResult](new RuntimeException("")) // won't be used so doesn't really matter
  ))

(Syed Ali) #4

Hi guys,

Thanks for your replies, I’ll try it out.

Regards,

Syed Farhan Ali