Basic example of consuming event from akka EventStream and publishing the event to Kafka


(Dima Gutzeit) #1

Dear board members,

With the release of new kafka integration, I decided to try a simple example that would do the following:

  1. Consume event from Akka EventStream.
  2. Publish the event to Kafka
  3. Accomplish the above through streams and alpakka-kafka, with Java API

I am using the test framework supplied with the module, but I seems not be able to crack it, since whatever I am publishing to the stream is not being consumed from EventStream to be published to Kafka. What am I missing in the code below:

@Test
  public void test() {
    var topicName = createTopicName(1);
    Source<String, ActorRef> sourceActor = Source.actorRef(1000, OverflowStrategy.fail());

   sourceActor.mapMaterializedValue(actor -> {
      sys.getEventStream()
          .subscribe(actor, String.class);
      return "ok";
    }).runWith(Sink.ignore(), mat);

    Sink<ProducerRecord<String, String>, CompletionStage<Done>> producerRecordCompletionStageSink = Producer
        .plainSink(producerDefaults());

    sourceActor.map(value -> {
      System.out.println("Consumed record  from event bus. Publishing to kafka");
      return new ProducerRecord<String, String>(topicName, value);
    }).
        runWith(producerRecordCompletionStageSink, mat);

    sys.getEventStream().publish("test event");

    try {
      Thread.sleep(5000);
    } catch (InterruptedException e) {
    }
  }

Any pointers would be highly appreciated. I know it sounds like a very basic question and i feel ashamed in advance for asking it :)


(Dima Gutzeit) #2

Found the reason, since I materialize twice, a new ActorRef is being created.

Source<String, ActorRef> sourceActor = Source.actorRef(1000, OverflowStrategy.fail());
    Sink<ProducerRecord<String, String>, CompletionStage<Done>> producerRecordCompletionStageSink = Producer
        .plainSink(producerDefaults());

    CompletionStage<Done> doneCompletionStage = sourceActor.mapMaterializedValue(actor -> {
      sys.getEventStream()
          .subscribe(actor, String.class);
      return "ok";
    }).map(value -> {
      System.out.println("Consumed record  from event bus. Publishing to kafka");
      return new ProducerRecord<String, String>(topicName, value);
    }).runWith(producerRecordCompletionStageSink, mat);//runWith(Sink.ignore(), mat);

works just fine.