Why is the SFTP connector so much slower than the basic SFTP client

I’m using this sample class to test out streaming over SFTP. What I ultimately want to do is use a sink which is another SFTP server and stream from source(sftp server A) to sink(sftp server B) with my app in the middle. But for now, I’m testing throughput with Sink.ignore and the performance is much worse than just a plain SFTP client writing the file locally. Can someone tell me if this is a configuration problem or if I’m just “doing it wrong” :slight_smile:

The timings are with a 10MB file pulling data from a VM in AWS to my laptop.

import akka.actor.ActorSystem
import akka.stream.alpakka.ftp.scaladsl.Sftp
import akka.stream.alpakka.ftp.{FtpCredentials, SftpSettings}
import akka.stream.scaladsl.Sink
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import com.workday.scala.logging.Slogger
import net.schmizz.sshj.transport.verification.PromiscuousVerifier
import net.schmizz.sshj.userauth.method.AuthPassword
import net.schmizz.sshj.userauth.password.{PasswordFinder, Resource}
import net.schmizz.sshj.{DefaultConfig, SSHClient}

import java.net.InetAddress
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt

class StreamingSftpTransport {
 implicit val system: ActorSystem = ActorSystem("my-service")

 private val PORT = 22
 private val USER = "conor.griffin"
 private val CREDENTIALS = FtpCredentials.create(USER, "password!")
 private val BASE_PATH = s"/Users/$USER"
 private val FILE_NAME = "10mfile"
 private val CHUNK_SIZE = 131072

 // Set up the source system connection
 private val SOURCE_HOSTNAME = "suv1"

 private val sourceSettings = SftpSettings(host = InetAddress.getByName(SOURCE_HOSTNAME))
   .withCredentials(FtpCredentials.create("testsftp", "t3st123"))
   .withPort(PORT)
   .withStrictHostKeyChecking(false)

 private val sourceClient = new SSHClient(new DefaultConfig) {

 }
 private val configuredSourceClient = Sftp(sourceClient)


 // Set up the destination system connection

 private val DEST_HOSTNAME = "localhost"
 private val destSettings = SftpSettings(host = InetAddress.getByName(DEST_HOSTNAME))
   .withCredentials(CREDENTIALS)
   .withPort(PORT)
   .withStrictHostKeyChecking(false)

 private val destClient = new SSHClient(new DefaultConfig)
 private val configuredDestClient = Sftp(destClient)

 private val decider: Supervision.Decider = {
   case a => print(a.getMessage)
     Supervision.resume
 }

 implicit val materializer = ActorMaterializer(
   ActorMaterializerSettings(system).withSupervisionStrategy(decider))


 def doTransfer(): Unit = {
   println("Streaming")
   val source = configuredSourceClient.fromPath(s"/home/testsftp/$FILE_NAME", sourceSettings, CHUNK_SIZE)
   // val sink = configuredDestClient.toPath(s"$BASE_PATH/$FILE_NAME.out", destSettings)
   val runnable = source
     .runWith(Sink.ignore)

   println("Streaming: Starting")
   val start = System.currentTimeMillis()
   Await.result(runnable, 180 seconds)
   val end = System.currentTimeMillis()
   println(s"Streaming: ${end - start}")

 }

 def doSftpTransfer(): Unit = {
   println("SFTP")
   val ssh = new SSHClient(new DefaultConfig)
   ssh.addHostKeyVerifier(new PromiscuousVerifier)
   ssh.connect(SOURCE_HOSTNAME, 22)
   val passwordAuth: AuthPassword = new AuthPassword(new PasswordFinder() {
     def reqPassword(resource: Resource[_]): Array[Char] = "t3st123".toCharArray
     def shouldRetry(resource: Resource[_]) = false
   })
   ssh.auth("testsftp", passwordAuth)

   println("SFTP: Starting")
   val start = System.currentTimeMillis()
   ssh.newSFTPClient().get("/home/testsftp/10mfile", "/Users/conor.griffin/Downloads/10mfile.sftp")
   val end = System.currentTimeMillis()
   println(s"SFTP: ${end - start}")

 }

}

Output

[18-Dec-2020 19:41:00.994 UTC] INFO <Slf4jLogger> Slf4jLogger started 
[18-Dec-2020 19:41:02.672 UTC] INFO <BouncyCastleRandom> Generating random seed from SecureRandom. 
[18-Dec-2020 19:41:02.732 UTC] INFO <BouncyCastleRandom> Generating random seed from SecureRandom. 
Streaming
Streaming: Starting
[18-Dec-2020 19:41:03.060 UTC] INFO <TransportImpl> Client identity string: SSH-2.0-SSHJ_0.27.0 
[18-Dec-2020 19:41:03.434 UTC] INFO <TransportImpl> Server identity string: SSH-2.0-OpenSSH_7.4 
[18-Dec-2020 19:41:05.117 UTC] INFO <SessionChannel> Will request `sftp` subsystem 
[18-Dec-2020 19:41:57.819 UTC] INFO <TransportImpl> Disconnected - BY_APPLICATION 
Streaming: 55019
SFTP
[18-Dec-2020 19:41:57.826 UTC] INFO <BouncyCastleRandom> Generating random seed from SecureRandom. 
[18-Dec-2020 19:41:58.032 UTC] INFO <TransportImpl> Client identity string: SSH-2.0-SSHJ_0.27.0 
[18-Dec-2020 19:41:58.247 UTC] INFO <TransportImpl> Server identity string: SSH-2.0-OpenSSH_7.4 
SFTP: Starting
[18-Dec-2020 19:41:59.869 UTC] INFO <SessionChannel> Will request `sftp` subsystem 
SFTP: 12987

A bit late reply, but I’d guess the SFTPClient does some buffering to read as fast as possible, you could try introducing a .buffer operator in the stream with backpressure as overflow strategy to see if that gets you closer to what the blocking client invocation does.

Thanks I’m pretty sure I tried that but will check again and report back

Taking a peek at the impl of the sync one it looks as though it keeps up to 16 reads of chunks at different offsets from the SFTP server running in parallel at any given time, while the connector reads the chunks sequentially so that may make a big difference even with a buffer allowing the sequential read happen as fast as possible.

Not sure how hard it would be to mimic that in the FTP source impl.

1 Like

Yeah I suspected this could be part of the reason. SFTP uses the combination of these chunks along with the number of outstanding requests permitted to overcome the throughput limitations of high-latency connections.

Taking a look at the man pages for BSD sftp these two parameters are controlled as follows.

-B buffer_size
        Specify the size of the buffer that sftp uses when transferring files.  Larger buffers require fewer round
        trips at the cost of higher memory consumption.  The default is 32768 bytes.
-R num_requests
        Specify how many requests may be outstanding at any one time.  Increasing this may slightly improve file
        transfer speed but will increase memory usage.  The default is 64 outstanding requests.

The reason I started looking at Alpakka was to work around a limitation that another SSH/SFTP library has (not OSS). This other library does not work well beyond the settings of 8 parallel 64K chunks. If the standard sftp client uses 64/32K by default then presumably it’s possible to use these settings in a wide variety of cases.

I want to use Alpakka as the volumes of data we transfer are very large at times and having a streaming implementation would be a better solution for this use case since policies around data at rest mean that today we download a set of files and encrypt them as we store them to disk, then send them to another internal service. Streaming these directly from one server to another through our service would avoid the overhead of encryption on disk and the capacity concerns that the volume of data being written to disk presents.

If you are up for giving it a try to adding the same kind of parallelising logic in the FTP connector a PR is definitely welcome.

I think I’ll have a go, need to get my head around how it all works internally first :scream: