Reading messages from AWS SQS in Java

java

(karthik vasisht) #1

I am trying to run the below code but however, it fails to successfully just read from the SQS. The thing am trying to play around is to just have a message in SQS which has my S3 bucket file names to be downloaded. I have the downloading part working individually using Alpakka but the part where I am trying to read from SQS is getting a bit messy. I am completely new to the Reactive programming world so if it sounds very trivial please forgive me, but am looking for some help here! thanks a lot in advance.

P.S: I tried the samples from this github repo: git@github.com:akka/alpakka-samples.git

public class SQSEventHandler {

    final static ObjectMapper mapper = new ObjectMapper();
    final static ObjectReader fromSqsReader = mapper.readerFor(MessageFromSqs.class);
    final static ObjectWriter enrichedMessageWriter = mapper.writerFor(EnrichedMessage.class);
    final S3OperationsImpl s3Operations = new S3OperationsImpl();

    final ActorSystem system;
    final Materializer materializer;
    final ActorRef enrichingActor;
    final LoggingAdapter log;


    public static void main(String[] args) throws Exception {
        SQSEventHandler me = new SQSEventHandler();
        me.run();
    }

    public SQSEventHandler() {
        system = ActorSystem.create();
        log = Logging.getLogger(system, this);
        materializer = ActorMaterializer.create(system);
        enrichingActor = system.actorOf(Props.create(EnrichActor.class, EnrichActor::new));
    }

    void run() throws Exception {
        // create SQS client
        AWSCredentialsProvider credentialsProvider =
                new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY));
        AmazonSQSAsync sqsClient =
                AmazonSQSAsyncClientBuilder.standard()
                        .withCredentials(credentialsProvider)
                        .withEndpointConfiguration(
                                new AwsClientBuilder.EndpointConfiguration(sqsEndpoint, SIGNING_REGION))
                        .build();
        system.registerOnTermination(() -> sqsClient.shutdown());

        // configure SQS
        SqsSourceSettings settings = SqsSourceSettings.create().withCloseOnEmptyReceive(false);
        SqsAckSettings ackSettings = SqsAckSettings.create();

        // create running stream
        CompletionStage<Done> streamCompletion = SqsSource.create(sourceQueueUrl, settings, sqsClient)
                .log("read from SQS", log)
                .mapAsync(8, (Message msg) -> {
                    MessageFromSqs messageFromSqs = fromSqsReader.readValue(msg.getBody());
                    //now to download the file from S3 by using the information necessary in the message from SQS
                    Stream.of(s3Operations.fileDownloader(messageFromSqs.bucketName, messageFromSqs.bucketKey, materializer, system))
                    //NOT sure what to do here for me to download the file and yet delete the message from the SQS
                    //need help here!!!
                })
                .map(msg -> MessageAction.delete(msg)
                .runWith(
                        SqsAckSink.create(sourceQueueUrl, ackSettings, sqsClient),
                        materializer
                );
        // terminate the actor system when the stream completes (see withCloseOnEmptyReceive)
        streamCompletion.thenAccept(done -> system.terminate());
    }

    //just for reference pasting the src code for fileDownloader
    public String fileDownloader(String bucketName, String bucketKey, Materializer materializer, ActorSystem system) throws InterruptedException, TimeoutException, ExecutionException {
        final Source<Optional<Pair<Source<ByteString, NotUsed>, ObjectMetadata>>, NotUsed>
                sourceAndMeta = S3.download(bucketName, bucketKey);

        final Pair<Source<ByteString, NotUsed>, ObjectMetadata>  dataAndMetadata = sourceAndMeta
                .runWith(Sink.head(), materializer)
                .toCompletableFuture()
                .get(15, TimeUnit.SECONDS)
                .get();

        final Source<ByteString, NotUsed> data = dataAndMetadata.first();

        final CompletionStage<String> resultCompletionStage =
                data.map(ByteString::utf8String).runWith(Sink.head(), materializer);
        return resultCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS);
    }
}