Streaming Multiple files periodically from shared directory

Hi,
We are designing a Flow using akka streams which does following things

  1. Periodically look for files in a directory (an NFS mount) - Using Ticker source
  2. List the files
  3. Read lines from each file -Using flatMapConcat
  4. Delete the file upon reading - Using mapMaterializedValue
  5. Store each line to Kafka (Wrapping inside some object)

Sample code looks like this

  1. Ticker source to list and submit files periodically
    val ticker = Source
    .tick(1.second, 300.millis, ())
    .buffer(1, OverflowStrategy.backpressure)
    .mapConcat(_ => {
    Paths.get(“C:\Users\Test_Dir”).toFile.listFiles()
    })

  2. Logic for processing files one by one and delete once done
    def processFile(): Flow[File, String, NotUsed] = {
    Flow[File]
    .flatMapConcat(file =>
    FileIO
    .fromPath(Paths.get(file.getAbsolutePath))
    .mapMaterializedValue { f =>
    f.onComplete {
    case Success® => {
    if (r.getCount > 0 && r.status.isSuccess) {
    Files.delete(file.toPath)
    }
    }
    case Failure(e) => println(s"Something went wrong when reading: $e")
    }
    NotUsed
    }
    .recover {
    case ex: Exception => {
    ByteString("")
    }
    }
    .via(Framing.delimiter(ByteString("\n"), Int.MaxValue, allowTruncation = true)
    .map(_.utf8String)
    .mapAsync(1)(FileSerdes.deserialize(file.getAbsolutePath)) //here goes some custom //implementation for each line
    )
    }

  3. Graph builder
    ===========
    val graphBuilder= GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    val balance = builder.add(BalanceString)
    val merge = builder.add(MergeString)
    (1 to 2).foreach { _ => //2 is configurable. it can be multiple
    balance ~> processFile~> merge
    }
    FlowShape(balance.in, merge.out)
    }

  4. Sink definition
    ==================
    val storeReadings = Sink
    .foreachString // It will be Kafka eventually

  5. Materializing the grpah

val result2 = ticker
.via(graphBuilder)
.runWith(storeReadings )

  1. Now , Following are the challenges i’m running into
    ===================================
  • The deployment of this application has to be in multi nodes each watching the same NFS mount for files
  • Once the files are in, all the nodes list the same files this results duplicates at the sink (although, it is getting avoided to certain extent as we are deleting the file as soon as we read it. but not completely)
  • Assume we have large number files to process (say 3k). Ticker source re-list the same files(which are still left in the directory with in the same node and across) which will results in duplicates
  • Exception handling : Assume a file is processed and gets deleted. the other nodes will also try to process the same and see NoSuchFileFound exception. im handling these scenarios with recover block. will that results in message loss?

your help will be much appreciated.

First question: Can you move the files?

I would modify your flow to something like:

  • tick
  • read dir
  • pick a random file
  • move to another dir/rename
  • read the moved file
  • delete the moved file
  • start over

Pro with this method that the os will move the file, so the nfs will move the file so only one move will happen. Con, that you need to handle when the file is not moved by the instance.

Other method could be to use an another source of information, or an another helper;

  • One instance only move the files to separate dirs, and the worker instances only watch their own dir. (I would not go with this, single source of failure, and if one process is die the others can’t get its work.)
  • Some fast db (like Redis) is a single source of who works on what; the dir reading adds the new files to redis, the file choosing happens from redis instead of randomly. The choose is an atomic update to a redis key, so other updates will fail for the same key. (Or something similar logic.)
  • One instance scans the dir, add the files to a kafka stream, the workers read from that stream (this will have the problem like the first point).
  • Akka cluster and distributed pub/sub broadcast flow

Hi @sebatin,

I agree with @tg44’s suggestions, renaming will help you to avoid reading it from multiple nodes.

I would like to bring the DirectoryChangesSource in Alpakka for watching the source directory to your attention.

Cheers,
Enno.

Hi @tg44 ,

Thanks for the reply. i like the idea of renaming the file which i can try right away. definitely i will give it a try and keep you posted. coming to my follow up question, in case of Exception handling, may be a very basic but still would like to ask … will recover poses any threat of loosing messages?. Also, for now, our files are structured in a way where multiple messages are embedded in a single line.

Hi @ennru ,
Thanks for the suggestion. We tried DirectoryChangeSource initially to start with. but soon we hit up on performance issues (we were not able to process not even 500 files at once). That’s because, underneath it uses java.nio.file.WatchService implementation. So, when burst of files coming in (say 400-500 files. thats what we tried) its throws up OVERFLOW error. as per the docs, Source systems needs to be prepared of handling this scenarios and it is very difficult to hook a retries for us at the source level. also, its not performance friendly.

I’m not sure, I only use recover if the stream itself can fail (like a tcp connection could broke). I use Try and Future.recover. I think of it like the “data” could be broken or the “pipeline”. You can handle the data problems with basic FP things like Try and collect, or flatMap, or filter, or things like that. And you can handle the pipeline problems with the recover. Usually when I need to use the stream recover I just reread the docs :smiley:

In your case I would simply cover the File.rename with a Try block, and call a collect on it. If you expect that the flatMapConcat could drop an exception, that is a sad case, and I should read the docs too to move forward :frowning:

I just read through your whole code. Are you sure that you need the graph in the middle? If you need that (I mean it processes the files faster with that), try to add a .async in the foreach, or simply add a bigger parallelism to the mapasync, but in general, I think scaling the mapAsync should be enough, and the balance+merge is possibly just an unnecessary dance. At here we are speaking about reading one file from start to end is faster or reading multiple files paralell is faster, and my hunch is that it not really matters, which makes a one file read at a time is much easier and safer.

@tg44,

Apologies for the late reply. was completely lost at one of production incident. the main reason for me to use recover is to handle java.nio.NoSuchFileFoundException which was terminating the stream.(it will happen if multiple instances try to operate on same file)

You are right. i really don’t need to have Graph here. i will simplify it. Also, good news is that, i tried the file renaming approach suggested by you which solved concurrent file access issue from multiple instances.