Alpakka TCP Streaming: Sending data

Hi everyone,

I’m still new to Akka streams and Alpakka.

Refering to the documentation Working with Streams I have a question:
How can I implement a TCP service which is just sending data when there is a new connection.

The snippet:

import akka.stream.scaladsl.Framing

val connections: Source[IncomingConnection, Future[ServerBinding]] =
  Tcp().bind(host, port)
connections.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")

  val echo = Flow[ByteString]
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
    .map(_.utf8String)
    .map(_ + "!!!\n")
    .map(ByteString(_))

  connection.handleWith(echo)
}

Shows how to react when data is received.

In my case I just want to send data continuously from another Source when a client connects.
But from my understanding a Flow[ByteString] requires receiving something first.
I’m probably wrong, but I cant figure out how to achieve just sending data.

Tcp.IncomingConnection doesn’t have a write method or something similar.

Any advice is highly appreciated :slight_smile:

Hi @jannvck and welcome to Akka Streams!

A TCP connection is indeed handled with a Flow. A Flow basically is just something that receives and produces data. In most cases like in the example, it transforms incoming data and produces outgoing data from that. But that’s not the only way a flow can be built. You can also create a Flow where receiving incoming data and producing outgoing data is completely independent. You can use Flow.fromSinkAndSource to achieve that. If you are not interested in receiving any data all, you can use a Sink.ignore for the receiving side and provide whatever you like for the Source side.

val outgoingDataSource = ...
val sendFlow = Flow.fromSinkAndSource(Sink.ignore, outgoingDataSource)

Would that work for you?

Johannes

Hi @jrudolph

Thank you for your answer!

I think you pointed me into the right direction. Problem is, that I want to use a MergeHub as source.
So I basically have:

  val source = MergeHub.source[ByteString](perProducerBufferSize = 16)
  connections.runForeach { connection: IncomingConnection =>
	println(s"New connection from: ${connection.remoteAddress}")

	val sendFlow = Flow.fromSinkAndSource(Sink.ignore, source)
	connection.handleWith(sendFlow)
  }

In this case, how could I send a ByteString using the MergeHub source?

I tried:

Source.single(ByteString("test")).runWith(source.to(Sink.ignore).run())

But it doesn’t have the desired effect.

Jan

Hi,

I solved it without using a MergeHub.
Basically, I used an actorRef for sending data:

  val actorRefSource = Source.actorRef[ByteString](bufferSize = 100, OverflowStrategy.dropTail)
  val (actorRef, source) = actorRefSource.preMaterialize()
...
  connectionsData.runForeach { connection: IncomingConnection =>
	println(s"New data connection from: ${connection.remoteAddress}")
	connection.handleWith(Flow.fromSinkAndSource(Sink.ignore, source))
  }

Then, to send actual data I used:

actorRef ! ByteString(data)

It’s working now, so thank you for your help :slight_smile:

Jan