Processing a csv file with Akka Stream

(Chaitanya Ekre) #1

I am writing a sample program using akka streams where I am reading contents of csv file.
The csv file contains 5 lines of data. Now I want to process ( in this case display on console) 2 lines at time so I grouped the stream of lines by 2. It gave me 2 groups and skipped the last line ideally which should have been in 3rd group.
So how do i access elements in the last group is the last group has a size less that which was specified (in my case specified size is 2) ???

Sample code:

public class MyClass{
public static void main(String[] args)
final ActorSystem system = ActorSystem.create(“SplitAndPrint”);
final Materializer materializer = ActorMaterializer.create(system);

	Sink<ByteString, CompletionStage<Done>> splitAndPrintSink = Sink.<ByteString>foreach(chunk -> splitLine(chunk) );
	Sink<ByteString, CompletionStage<Done>> lineSink = Sink.<ByteString>foreach(s-> convert(s));
	Path filePath = Paths.get("C:\\Users\\admin\\oxy\\AkkaStreaming\\src\\main\\resources\\csvdata.csv");
	CompletionStage<Done> done= FileIO.fromPath(filePath)
			.via(Framing.delimiter(ByteString.fromString(System.lineSeparator()), Integer.MAX_VALUE,FramingTruncation.DISALLOW))
				lines.forEach(l -> convert(l));
			}, materializer)


public static void convert(ByteString xString)
System.out.println(" ----- “);
String converted = xString.utf8String();
System.out.println(converted.length()+” | "+converted);


Region,Country,Item Type,Sales Channel,Order Priority,Order Date,Order ID,Ship Date,Units Sold,Unit Price,Unit Cost,Total Revenue,Total Cost,Total Profit
Sub-Saharan Africa,South Africa,Fruits,Offline,M,7/27/2012,443368995,7/28/2012,1593,9.33,6.92,14862.69,11023.56,3839.13
Middle East and North Africa,Morocco,Clothes,Online,M,9/14/2013,667593514,10/19/2013,4611,109.28,35.84,503890.08,165258.24,338631.84
Australia and Oceania,Papua New Guinea,Meat,Offline,M,5/15/2015,940995585,6/4/2015,360,421.89,364.69,151880.40,131288.40,20592.00
Sub-Saharan Africa,Djibouti,Clothes,Offline,H,5/17/2017,880811536,7/2/2017,562,109.28,35.84,61415.36,20142.08,41273.28

(Martynas Mickevičius) #2

As the documentation indicates, the grouped operator emits not only when it collects the required number of elements but also when the upstream completes. If that is not the case, then it could be a bug. Could you open a ticket for it in the Akka issue tracker with a small reproducer?

(Chaitanya Ekre) #3

Hi Martynas,
with given input file this program prints 4 lines whereas input file has 5 lines. This program will work and prints 5 lines (2+2+1) as the group size is 2)only when the input csv file has “\n” at the end of last line.
But there is no guaranty that input file always has “\n” character at the very end. The problem is with grouped() it does not consider last line if there is “\n”.

(Martynas Mickevičius) #4

Aha! Now I see. It is not the grouped operator behaviour that is unexpected, but Framing.delimeter truncation parameter.

From the docs:

if set to DISALLOW, then when the last frame being decoded contains no valid delimiter this Flow fails the stream instead of returning a truncated frame.

So you should either allow truncation, or, better yet, switch to Alpakka CSV connector which is designed to be Akka Streams API for CSV parsing and rendering.

(Chaitanya Ekre) #5

Yes your are right I am going to allow truncation as we may have other characters as delimiters but will definitely check Alpakka CSV connector.

Thanks a lot :pray::pray:

(Konrad `ktoso` Malawski) #6

@chaitanya I have edited away a “f… you” emote ( : fu :) which you seem to have used. I think it was likely accidental. Please watch out for those typos, it could have been perceived as intentional/offensive.

Happy hakking!

(Chaitanya Ekre) #7

Oh ! Thanks a lot !!!