The right way to handle incoming message as stream

Hi all

I have the following scenario:

enter image description here

As you can see on the image above, Actor1 sends messages to Actor2 .
In Actor1 , a stream is produced and will be send to Actor2 as the following:

import akka.NotUsed

object WsConnector {

  case object Ack

  case object Initialized

  case object Completed

  final case class Msg(value: String)

  final case class Failed(ex: Throwable)

  val createActorRefWithAck: ActorRef => Sink[Msg, NotUsed] = receiver =>
      onInitMessage = Initialized,
      ackMessage = Ack,
      onCompleteMessage = Completed,
      onFailureMessage = Failed


object ConsumerActor {
  def props(wsSink: Sink[Msg, NotUsed]): Props = Props(new ConsumerActor(wsSink))

final class ConsumerActor(wsSink: Sink[Msg, NotUsed]) extends Actor with ActorLogging {"Consumer actor started.")

  private implicit val materializer = ActorMaterializer()
  private val config = context.system.settings.config.getConfig("akka.kafka.consumer")
  private val consumerSettings =
    ConsumerSettings(config, new StringDeserializer, new StringDeserializer)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")

  val (consumerControl, streamComplete) = Consumer
    .map(cr => Msg(cr.value()))

  override def receive: Receive = {
    case _ =>"Not identified message.")

val wsSink = WsConnector.createActorRefWithAck(wsActor)
context.actorOf(ConsumerActor.props(wsSink), "consumer-actor")

On Actor2 , I would like to pack the incoming messages into stream, for example Source.single(msg) but I do not know, if it is the right way to do it.

At the end, the Actor2 should send the ACK message to Actor1 , to confirm, that the messages has been received from Actor2 and ready to consome further messages.

What I’ve done so far:

  override def receive: Receive = {
    case Initialized =>"Initialization to receive messages via stream.")
      sender() ! Ack
    case Completed =>"Streams completed.")
      sender() ! Ack
    case Msg(value) =>
      sender() ! Ack
    case Failed(ex) =>"Stream failed with ${ex.getMessage}.")

This part:

    case Msg(value) =>
      sender() ! Ack

I would like to wrap the value into a stream in accordance with Ack message confirmation.