Reading messages from AWS SQS in Java

java
(karthik vasisht) #1

I am trying to run the below code but however, it fails to successfully just read from the SQS. The thing am trying to play around is to just have a message in SQS which has my S3 bucket file names to be downloaded. I have the downloading part working individually using Alpakka but the part where I am trying to read from SQS is getting a bit messy. I am completely new to the Reactive programming world so if it sounds very trivial please forgive me, but am looking for some help here! thanks a lot in advance.

P.S: I tried the samples from this github repo: git@github.com:akka/alpakka-samples.git

public class SQSEventHandler {

    final static ObjectMapper mapper = new ObjectMapper();
    final static ObjectReader fromSqsReader = mapper.readerFor(MessageFromSqs.class);
    final static ObjectWriter enrichedMessageWriter = mapper.writerFor(EnrichedMessage.class);
    final S3OperationsImpl s3Operations = new S3OperationsImpl();

    final ActorSystem system;
    final Materializer materializer;
    final ActorRef enrichingActor;
    final LoggingAdapter log;


    public static void main(String[] args) throws Exception {
        SQSEventHandler me = new SQSEventHandler();
        me.run();
    }

    public SQSEventHandler() {
        system = ActorSystem.create();
        log = Logging.getLogger(system, this);
        materializer = ActorMaterializer.create(system);
        enrichingActor = system.actorOf(Props.create(EnrichActor.class, EnrichActor::new));
    }

    void run() throws Exception {
        // create SQS client
        AWSCredentialsProvider credentialsProvider =
                new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY));
        AmazonSQSAsync sqsClient =
                AmazonSQSAsyncClientBuilder.standard()
                        .withCredentials(credentialsProvider)
                        .withEndpointConfiguration(
                                new AwsClientBuilder.EndpointConfiguration(sqsEndpoint, SIGNING_REGION))
                        .build();
        system.registerOnTermination(() -> sqsClient.shutdown());

        // configure SQS
        SqsSourceSettings settings = SqsSourceSettings.create().withCloseOnEmptyReceive(false);
        SqsAckSettings ackSettings = SqsAckSettings.create();

        // create running stream
        CompletionStage<Done> streamCompletion = SqsSource.create(sourceQueueUrl, settings, sqsClient)
                .log("read from SQS", log)
                .mapAsync(8, (Message msg) -> {
                    MessageFromSqs messageFromSqs = fromSqsReader.readValue(msg.getBody());
                    //now to download the file from S3 by using the information necessary in the message from SQS
                    Stream.of(s3Operations.fileDownloader(messageFromSqs.bucketName, messageFromSqs.bucketKey, materializer, system))
                    //NOT sure what to do here for me to download the file and yet delete the message from the SQS
                    //need help here!!!
                })
                .map(msg -> MessageAction.delete(msg)
                .runWith(
                        SqsAckSink.create(sourceQueueUrl, ackSettings, sqsClient),
                        materializer
                );
        // terminate the actor system when the stream completes (see withCloseOnEmptyReceive)
        streamCompletion.thenAccept(done -> system.terminate());
    }

    //just for reference pasting the src code for fileDownloader
    public String fileDownloader(String bucketName, String bucketKey, Materializer materializer, ActorSystem system) throws InterruptedException, TimeoutException, ExecutionException {
        final Source<Optional<Pair<Source<ByteString, NotUsed>, ObjectMetadata>>, NotUsed>
                sourceAndMeta = S3.download(bucketName, bucketKey);

        final Pair<Source<ByteString, NotUsed>, ObjectMetadata>  dataAndMetadata = sourceAndMeta
                .runWith(Sink.head(), materializer)
                .toCompletableFuture()
                .get(15, TimeUnit.SECONDS)
                .get();

        final Source<ByteString, NotUsed> data = dataAndMetadata.first();

        final CompletionStage<String> resultCompletionStage =
                data.map(ByteString::utf8String).runWith(Sink.head(), materializer);
        return resultCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS);
    }
}
(Enno) #2

Hi @kartzoft,

Have you been able to get on with your SQS triggered S3 download?

You should be able to run your S3 downloading stream in the mapAsync which received the SAS message. The lambda should return the CompletionStage from that download stream.

Cheers,
Enno.