Akka source restart with previous state as parameter


(Liel Shraga) #1

Hi,
Is there a way to store a returned value of a flow and send it as parameter to the next restart of the source? this question is about how to save states in akka streams.
I’m reading a large file from a remote server using akka http get single request, and my purpose is:

  1. restart the source whenever the remote server crashed
  2. save a state in the flow(save the last successful byte that the flow consumed successfully) and send it to the source when restart.

This is my code for restart the source:
I would like to call the sendGetRequest with the state that I saved in parseAndIngestLargeFile(x)
RestartSource.withBackoff(minBackoff = 1 second, maxBackoff = 30 seconds,randomFactor = 0.2) { () =>
Source.single(sendGetRequest())
//send gakka http single request get to a remote server
.map(x => {parseAndIngestLargeFile(x)})
.map(connection => println(“Failed to connect to go server, going to retry get request…” + connection))
}
.runWith(Sink.ignore)

what is the way to do it?


(Johannes Rudolph) #2

Hi @lielShraga,

I would probably try to implement the restarting logic outside of the streaming infrastructure. E.g. using something like this

def tryDownload(uri, startFrom, …): Future[DownloadResult] =
Http().singleRequest(HttpRequest(uri = uri, …) // e.g. set headers to use startFrom
.flatMap(…) // handle response and response entity here, make sure to catch exceptions when the server crashes

The result could be either a finished download or a half-finished download with added info about what you have received so far in which case you would restart at the correct position.

You might need a custom stream component that handles receiving the response entity stream and keeps track of the current offset which fails gracefully and returns the last offset seen. This might be a bit of a challenge to get right.

Johannes