How to exit stream after n elements recieved?

Hello, I’m brand new to Akka and I’m just trying to get the hang of it.

As an experiment, I want to read from a Kinesis stream and collect n messages and stop.

The only one I found that would stop reading records was Sink.head(). But that only returns one record, I’d like to get more than that.

I can’t quite figure out how to stop reading from the stream after receiving the n messages though.

Here’s the code I have tried so far

  @Test
  public void testReadingFromKinesisNRecords() throws ExecutionException, InterruptedException {
    final ActorSystem system = ActorSystem.create("foo");
    final Materializer materializer = ActorMaterializer.create(system);

    ProfileCredentialsProvider profileCredentialsProvider = ProfileCredentialsProvider.create();

    final KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
        .credentialsProvider(profileCredentialsProvider)
        .region(Region.US_WEST_2)
            .httpClient(AkkaHttpClient.builder()
                .withActorSystem(system).build())
            .build();

    system.registerOnTermination(kinesisClient::close);

    String streamName = "akka-test-stream";
    String shardId = "shardId-000000000000";

    int numberOfRecordsToRead = 3;

    final ShardSettings settings = ShardSettings.create(streamName, shardId)
            .withRefreshInterval(Duration.ofSeconds(1))
            .withLimit(numberOfRecordsToRead) // return a maximum of n records (and quit?!)
            .withShardIterator(ShardIterators.latest());

    final Source<Record, NotUsed> sourceKinesisBasic = KinesisSource.basic(settings, kinesisClient);

    Flow<Record, String, NotUsed> flowMapRecordToString = Flow.of(Record.class).map(record -> extractDataFromRecord(record));
    Flow<String, String, NotUsed> flowPrinter = Flow.of(String.class).map(s -> debugPrint(s));
//    Flow<String, List<String>, NotUsed> flowGroupedWithinMinute =
//        Flow.of(String.class).groupedWithin(
//            numberOfRecordsToRead, // group size
//            Duration.ofSeconds(60) // group time
//        );

    Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
        .via(flowMapRecordToString)
        .via(flowPrinter);
//        .via(flowGroupedWithinMinute); // nope

    // sink to list of strings
//    Sink<String, CompletionStage<List<String>>> sinkToList = Sink.seq();
    Sink<String, CompletionStage<List<String>>> sink10 = Sink.takeLast(10);
//    Sink<String, CompletionStage<String>> sinkHead = Sink.head(); // only gives you one message

    CompletionStage<List<String>> streamCompletion = sourceStringsFromKinesisRecords
        .runWith(sink10, materializer);
    CompletableFuture<List<String>> completableFuture = streamCompletion.toCompletableFuture();
    completableFuture.join(); // never stops running...
    List<String> result = completableFuture.get();
    int foo = 1;
  }

  private String extractDataFromRecord(Record record) {
    String encType = record.encryptionTypeAsString();
    Instant arrivalTimestamp = record.approximateArrivalTimestamp();
    String data = record.data().asString(StandardCharsets.UTF_8);
    return data;
  }

  private String debugPrint(String s) {
    System.out.println(s);
    return s;
  }

Thank you for any clues

Hi Julie,

Welcome to Akka Streams!

It’s the take operator you are looking for.

I know it may be overwhelming, but skimming through the operators listing at https://doc.akka.io/docs/akka/current/stream/operators/ may help you to find other potentially helpful operators.

The Streams Cookbook is another good point to learn more.

Cheers,
Enno.

1 Like

To add to @ennru’s response, note that take is something that you do on the Flow level, not the Sink level like Sink.head. After take(n), You can then do something with each element using another Sink, such as Sink.foreach.

2 Likes

Thanks it works now!

...
    Flow<String, String, NotUsed> flowTakeN = Flow.of(String.class).take(numberOfRecordsToRead);

    Source<String, NotUsed> sourceStringsFromKinesisRecords = sourceKinesisBasic
        .via(flowMapRecordToString)
        .via(flowPrinter)
        .via(flowTakeN);
...