I have simple TCP server that echoes back data received from the client connection.
I want to do the following
- When a client connect happens I want to respond back immediately with some data. ByteBuffer for simplicity. I have done this using the keepAliveInject flow but I dont think its the right way to do this.
- I want to allow n number of connections. Currently I am doing it through a connection count member variable and throw an exception to terminate the individual connection. Though that’s a little too late, where the connection succeeds and then terminated. Does having the connection count as a member variable fall into the incorrect closing over variable pattern?
- I want the ability to send messages from the server at any time on specific connections. So, if I have 3 connections, I want to choose a connection I want to send a message to. Currently snippet is for trying to allow one connection only.
- Ability to shut down the server after closing the connections.
- I have tried to use watchTermination at various places, my expectation was if I disconnect a client connection, it will trigger the watchTermination but it’s never invoked,
Any pointers / snippets of codes would be greatly appreciated. Primarily a java dev but can wade through scala tips.
Code snippet that I am working with
package com.example;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Tcp;
import akka.stream.javadsl.Tcp.IncomingConnection;
import akka.stream.javadsl.Tcp.ServerBinding;
import akka.util.ByteString;
public class SimpleStream {
private static final int maxConnectionCount = 1;
private static int connectionCount;
private static int cc;
// public static int connectionCount=0;
public static void main(String[] args) throws InterruptedException {
ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
final ByteString keepAliveMessage = ByteString.fromString("KEEP ALIVE");
Flow<ByteString, ByteString, NotUsed> keepAliveInject = Flow.of(ByteString.class)
.keepAlive(Duration.ofSeconds(1), () -> keepAliveMessage);
final Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> {
System.out.println("Client connected from: " + conn.remoteAddress());
conn.handleWith(keepAliveInject.via(Flow.<ByteString>create()), actorSystem);
// Is not handling a good away to avoid the incoming connection?
// When I use netcat to connect to this server I see it saying connected and
// then the connection goes away when a second connection is attempted.
});
Source<IncomingConnection, CompletionStage<ServerBinding>> source = Tcp.get(actorSystem).bind("127.0.0.1",
8888).watchTermination((prevMatValue, completionStage) -> {
completionStage.whenComplete(
(done, exc) -> {
cc--;
if (done != null)
System.out.println("The stream materialized " + prevMatValue.toString());
else
System.out.println(exc.getMessage());
});
return prevMatValue;
});;
Flow<IncomingConnection, IncomingConnection, NotUsed> flow1 = Flow.of(IncomingConnection.class).map(conn -> {
cc++;
if (cc > maxConnectionCount) {
throw new Exception("Max connections reached.");
}
System.out.println(conn);
return conn;
});
CompletionStage<ServerBinding> bindingFuture = source.via(flow1).to(handler).run(actorSystem);
bindingFuture.handle((binding, throwable) -> {
if (binding != null) {
System.out.println("Server started, listening on: " + binding.localAddress());
} else {
System.err.println("Server could not bind to : " + throwable.getMessage());
actorSystem.terminate();
}
return NotUsed.getInstance();
});
}
}