Kafka to Textfiles

Hello all,

I’m trying to work with this method: LogRotatorSink
(In the end, I would like to modify some part of code to make the method work according to my process )

You can find the implementation method here: https://github.com/akka/alpakka/tree/master/file/src/main/scala/akka/stream/alpakka/file/scaladsl
And you can find an example here: https://developer.lightbend.com/docs/alpakka/current/file.html

I’m developing in scala and I use IntelliJ IDE

I’m copy/paste implementation method and the following message appears: “symbol Mapasync is inaccessible from this place”
In the line: “import akka.stream.impl.fusing.MapAsync.{Holder, NotYetThere}”

when I change my package name by : package akka.stream.alpakka.file.scaladsl
the message disappears ^^’

I’m newbie in scala and akka
So the problem may come from my incompetence in scala

If you don’t understand my question, I would be happy to rephrase :)

Hi Sharox,

Thank you for reaching out with your issue, we’re happy to have you here!

So if I understand you correctly, you try to copy the implementation of LogRotatorSink to your code to change some details.
The implementation in Alpakka uses a bit of Akka-internal API that is not accessible by user code (as indicated by the akka.stream.impl package name. MapAsync is declared as private[akka].

What is it you are trying to change in the LogRotatorSink?

Cheers,
Enno.

Hi ennru, and thanks for your quick reply!

Ok, i understand better why this error message appears ^^

I don’t know what I need to change -_-
But i know the process

My source is my message queue kafka
And my Path is including in the first line of my source file

(it’s why i need to change this method)

Regards,

This connector was build with some kind of rotation in mind. So for example you want to write 100Mb to a file, and then change to an another file. Or write logs to files and each file contains one hour.

If you have a msg queue where some messages come, and every msg will go to separated file or some msges will go the same file but other goes to other I would not use this connector.
Instead I would build some kind of batching and writing thing. Something like this: http://blog.colinbreck.com/akka-streams-a-motivating-example/ with a group by, and ofcs some kind of file sink in the end of it (which opens a file and releases it too).

You might want to look at one of the examples found on the Alpakka site:
https://developer.lightbend.com/docs/alpakka/current/examples/jms-samples.html#example-read-text-messages-from-jms-queue-and-create-one-file-per-message
It creates a one file per incoming JMS message.

Hi tg44 and ennru,

Thank, for your replies

tg44, I think that’s what I want with the process that I gave you
I just have to transform it to write in a file and not in a BDD ^^

ennru, your solution is beatifull :slight_smile:
but I need to modify my files, because 1 file contains many messages
that’s why I wanted to use LogRotatorSink method

So, I will try to take some tips from your 2 solutions

Regards,

I’m sorry, but I don’t understand what i’m doing.

To start, I create my consumer :

  val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
    .withBootstrapServers("BootstrapAdress")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    .withGroupId("IdGroup")

Then, I create the source variable (kafka topic):

 val source: Source[ConsumerMessage.CommittableMessage[String, String], Consumer.Control] =  Consumer.committableSource(consumerSettings, Subscriptions.topics("MyTopic"))

And after, I don’t know how to do !
I need to read the first line of my source.
I don’t find on the Internet :confused:

What is it you want to achieve?
You started asking about LogRotatorSink and are asking about the Kafka Connector now.
I assume you might want to read from Kafka and write to files?

Please take your time and play around with some of the examples I pointed at earlier:
https://developer.lightbend.com/docs/alpakka/current/examples/index.html

Hi ennru,

Yes I started with LogRotatorSink,
But thanks to your help, I will use on other methods
(i rename the channel)

Yes, i need to read my message queue,
So, my message queue is cut into the message
The first line of my message defines the name of my csv files (or text files)
And I would like to save the message in csv files.
several messages can be part of a csv file

I looked for solutions, and I find this :

val FirstlinesStream = source.map(ByteString(_)).via(Framing.delimiter(
    ByteString("\r\n"), maximumFrameLength = 100, allowTruncation = true))
    .map(_.utf8String)

If I understand, I have the first line of my message here.
After this, I just split my line and write in my csv file