Timeout handling in Alpakka S3


#1

Hi,

I have a question regarding the timeout handling in a stream that lists the content of a S3 bucket. Basically I have an actor that periodically starts a Akka Stream that lists the content of a S3 bucket and sends the contained objects as messages to the actor itself:

  /**
   * Execute the polling of the source
   */
  @Override
  protected UniqueKillSwitch executePolling() {
    ActorRef self = getSelf();

    Timeout timeout = Timeout.apply(5, TimeUnit.SECONDS);

    self.tell(SourceS3ActorAPI.PollingStarted.create(), getSelf());

    SourceS3ConfigData config = (SourceS3ConfigData) getConfig();

    Pair<UniqueKillSwitch, CompletionStage<Done>> pair =
    S3.listBucket(config.getBucket(), Option.apply(config.getPrefix()))
            .withAttributes(s3Attributes)
            .mapAsync(5, e -> FutureConverters.toJava(
                    Patterns.ask(
                            self,
                            S3Object.create(e.getKey(), e.getSize(), e.getStorageClass(), e.getLastModified()),
                            timeout)).toCompletableFuture())
            .viaMat(KillSwitches.single(), Keep.right())
            .toMat(Sink.ignore(), Keep.both())
            .run(config.getMaterializer());

    pair.second().toCompletableFuture().whenComplete(
            (r, t) -> {
              if (r != null) {
                self.tell(SourceS3ActorAPI.PollingFinished.create(), getSelf());
              } else {
                self.tell(SourceS3ActorAPI.PollingFailed.create(t), getSelf());
              }
            });

    return pair.first();
  }

That method returns a UniqueKillSwitch which I want to use to abort the stream if necessary.

Today my S3 provider seems to have some problems so that even the Web-Consol of the provider is not responsive. Anyhow I can not terminate the Akka Stream from above using the KillSwitch. It seems as if the dispatcher thread is blocked within the listBucket source and so the KillSwitch never gets evaluated.

From my logs I am getting the following output:

10:41:07.207 INFO  Layline.Workflow.S3ToLocalFile       - [LAY-02510] workflow 'S3ToLocalFile' is running
10:41:07.215 INFO  Layline.Workflow.S3ToLocalFile.P1    - [LAY-02533] processor 1 for workflow 'S3ToLocalFile' started
10:41:07.216 INFO  Layline.Source.SourceS3              - [LAY-01113] actor 'Actor[akka://layline/user/controller/workflows/S3ToLocalFile/Processor-1#1067835194]' subscribed to source 'SourceS3'
[WARN] [01/25/2019 10:41:10.468] [layline-akka.actor.default-blocking-io-dispatcher-38] [akka.tcp://layline@127.0.0.1:1111/system/IO-DNS/inet-address/$a] No caching TTL defined. Using default value Ttl(30 seconds).
[WARN] [01/25/2019 10:41:10.468] [layline-akka.actor.default-blocking-io-dispatcher-41] [akka.tcp://layline@127.0.0.1:1111/system/IO-DNS/inet-address/$d] No caching TTL defined. Using default value Ttl(30 seconds).
[WARN] [01/25/2019 10:41:10.468] [layline-akka.actor.default-blocking-io-dispatcher-40] [akka.tcp://layline@127.0.0.1:1111/system/IO-DNS/inet-address/$c] No caching TTL defined. Using default value Ttl(30 seconds).
[WARN] [01/25/2019 10:41:10.468] [layline-akka.actor.default-blocking-io-dispatcher-39] [akka.tcp://layline@127.0.0.1:1111/system/IO-DNS/inet-address/$b] No caching TTL defined. Using default value Ttl(30 seconds).
[WARN] [01/25/2019 10:41:11.931] [layline-akka.actor.default-dispatcher-29] [layline/Pool(shared->https://s3.wasabisys.com:443)] Connection attempt failed. Backing off new connection attempts for at least 100 milliseconds.
[WARN] [01/25/2019 10:41:13.388] [layline-akka.actor.default-dispatcher-30] [layline/Pool(shared->https://s3.wasabisys.com:443)] Connection attempt failed. Backing off new connection attempts for at least 200 milliseconds.
[WARN] [01/25/2019 10:41:14.918] [layline-akka.actor.default-dispatcher-34] [layline/Pool(shared->https://s3.wasabisys.com:443)] Connection attempt failed. Backing off new connection attempts for at least 400 milliseconds.
[WARN] [01/25/2019 10:41:16.669] [layline-akka.actor.default-dispatcher-34] [layline/Pool(shared->https://s3.wasabisys.com:443)] Connection attempt failed. Backing off new connection attempts for at least 800 milliseconds.
[WARN] [01/25/2019 10:41:18.952] [layline-akka.actor.default-dispatcher-35] [layline/Pool(shared->https://s3.wasabisys.com:443)] Connection attempt failed. Backing off new connection attempts for at least 1600 milliseconds.
[WARN] [01/25/2019 10:41:22.034] [layline-akka.actor.default-dispatcher-17] [layline/Pool(shared->https://s3.wasabisys.com:443)] Connection attempt failed. Backing off new connection attempts for at least 3200 milliseconds.
10:42:07.238 ERROR Layline.Source.SourceS3              - [LAY-03123] missing heartbeat from polling operation
10:42:07.238 ERROR Layline.Source.SourceS3              - [LAY-03124] killing polling operation
10:43:07.236 ERROR Layline.Source.SourceS3              - [LAY-03123] missing heartbeat from polling operation
10:44:07.249 ERROR Layline.Source.SourceS3              - [LAY-03123] missing heartbeat from polling operation
10:45:07.237 ERROR Layline.Source.SourceS3              - [LAY-03123] missing heartbeat from polling operation

Are there any timeout settings I have to apply on the S3 settings?

Regards,
Lay


#2

Hi,

the Thread can be deleted. It was a problem in my code that I did not handle the PollingFailed message correctly. The KillSwitch was triggering correctly.

Regards,
Lay


(Martynas Mickevičius) #3

Good to know that you found the root cause!