I’m trying to work with this method: LogRotatorSink
(In the end, I would like to modify some part of code to make the method work according to my process )
I’m copy/paste implementation method and the following message appears: “symbol Mapasync is inaccessible from this place”
In the line: “import akka.stream.impl.fusing.MapAsync.{Holder, NotYetThere}”
when I change my package name by : package akka.stream.alpakka.file.scaladsl
the message disappears ^^’
I’m newbie in scala and akka
So the problem may come from my incompetence in scala
If you don’t understand my question, I would be happy to rephrase :)
Thank you for reaching out with your issue, we’re happy to have you here!
So if I understand you correctly, you try to copy the implementation of LogRotatorSink to your code to change some details.
The implementation in Alpakka uses a bit of Akka-internal API that is not accessible by user code (as indicated by the akka.stream.impl package name. MapAsync is declared as private[akka].
What is it you are trying to change in the LogRotatorSink?
This connector was build with some kind of rotation in mind. So for example you want to write 100Mb to a file, and then change to an another file. Or write logs to files and each file contains one hour.
If you have a msg queue where some messages come, and every msg will go to separated file or some msges will go the same file but other goes to other I would not use this connector.
Instead I would build some kind of batching and writing thing. Something like this: http://blog.colinbreck.com/akka-streams-a-motivating-example/ with a group by, and ofcs some kind of file sink in the end of it (which opens a file and releases it too).
tg44, I think that’s what I want with the process that I gave you
I just have to transform it to write in a file and not in a BDD ^^
ennru, your solution is beatifull
but I need to modify my files, because 1 file contains many messages
that’s why I wanted to use LogRotatorSink method
So, I will try to take some tips from your 2 solutions
val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
.withBootstrapServers("BootstrapAdress")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withGroupId("IdGroup")
Then, I create the source variable (kafka topic):
val source: Source[ConsumerMessage.CommittableMessage[String, String], Consumer.Control] = Consumer.committableSource(consumerSettings, Subscriptions.topics("MyTopic"))
And after, I don’t know how to do !
I need to read the first line of my source.
I don’t find on the Internet
What is it you want to achieve?
You started asking about LogRotatorSink and are asking about the Kafka Connector now.
I assume you might want to read from Kafka and write to files?
Yes I started with LogRotatorSink,
But thanks to your help, I will use on other methods
(i rename the channel)
Yes, i need to read my message queue,
So, my message queue is cut into the message
The first line of my message defines the name of my csv files (or text files)
And I would like to save the message in csv files.
several messages can be part of a csv file