The right way to handle incoming message as stream

Hi all

I have the following scenario:

enter image description here

As you can see on the image above, Actor1 sends messages to Actor2 .
In Actor1 , a stream is produced and will be send to Actor2 as the following:

import akka.NotUsed
import akka.actor.{ActorRef}
import akka.stream.scaladsl.Sink

object WsConnector {

  case object Ack

  case object Initialized

  case object Completed

  final case class Msg(value: String)

  final case class Failed(ex: Throwable)

  val createActorRefWithAck: ActorRef => Sink[Msg, NotUsed] = receiver =>
    Sink.actorRefWithAck(
      receiver,
      onInitMessage = Initialized,
      ackMessage = Ack,
      onCompleteMessage = Completed,
      onFailureMessage = Failed
    )

}

object ConsumerActor {
  def props(wsSink: Sink[Msg, NotUsed]): Props = Props(new ConsumerActor(wsSink))
}

final class ConsumerActor(wsSink: Sink[Msg, NotUsed]) extends Actor with ActorLogging {

  log.info("Consumer actor started.")

  private implicit val materializer = ActorMaterializer()
  private val config = context.system.settings.config.getConfig("akka.kafka.consumer")
  private val consumerSettings =
    ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("SAP-SENDER-GROUP")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

  val (consumerControl, streamComplete) = Consumer
    .plainSource(
      consumerSettings,
      Subscriptions.topics("SAP-EVENT-BUS"))
    .map(cr => Msg(cr.value()))
    .toMat(wsSink)(Keep.both)
    .run()

  override def receive: Receive = {
    case _ => log.info("Not identified message.")
  }
}

val wsSink = WsConnector.createActorRefWithAck(wsActor)
context.actorOf(ConsumerActor.props(wsSink), "consumer-actor")

On Actor2 , I would like to pack the incoming messages into stream, for example Source.single(msg) but I do not know, if it is the right way to do it.

At the end, the Actor2 should send the ACK message to Actor1 , to confirm, that the messages has been received from Actor2 and ready to consome further messages.

What I’ve done so far:

  override def receive: Receive = {
    case Initialized =>
      log.info("Initialization to receive messages via stream.")
      sender() ! Ack
    case Completed =>
      log.info("Streams completed.")
      sender() ! Ack
    case Msg(value) =>
      log.info(value)
      sender() ! Ack
    case Failed(ex) =>
      log.info(s"Stream failed with ${ex.getMessage}.")
  }

This part:

    case Msg(value) =>
      log.info(value)
      sender() ! Ack

I would like to wrap the value into a stream in accordance with Ack message confirmation.

Thanks