Handling of connect/disconnect of connections to a TCP Server

I have simple TCP server that echoes back data received from the client connection.

I want to do the following

  1. When a client connect happens I want to respond back immediately with some data. ByteBuffer for simplicity. I have done this using the keepAliveInject flow but I dont think its the right way to do this.
  2. I want to allow n number of connections. Currently I am doing it through a connection count member variable and throw an exception to terminate the individual connection. Though that’s a little too late, where the connection succeeds and then terminated. Does having the connection count as a member variable fall into the incorrect closing over variable pattern?
  3. I want the ability to send messages from the server at any time on specific connections. So, if I have 3 connections, I want to choose a connection I want to send a message to. Currently snippet is for trying to allow one connection only.
  4. Ability to shut down the server after closing the connections.
  5. I have tried to use watchTermination at various places, my expectation was if I disconnect a client connection, it will trigger the watchTermination but it’s never invoked,

Any pointers / snippets of codes would be greatly appreciated. Primarily a java dev but can wade through scala tips.

Code snippet that I am working with

package com.example;

import java.time.Duration;
import java.util.concurrent.CompletionStage;

import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Tcp;
import akka.stream.javadsl.Tcp.IncomingConnection;
import akka.stream.javadsl.Tcp.ServerBinding;
import akka.util.ByteString;

public class SimpleStream {
    private static final int maxConnectionCount = 1;
    private static int connectionCount;
    private static int cc;

    // public static int connectionCount=0;
    public static void main(String[] args) throws InterruptedException {

        ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "actorSystem");
        final ByteString keepAliveMessage = ByteString.fromString("KEEP ALIVE");
        Flow<ByteString, ByteString, NotUsed> keepAliveInject = Flow.of(ByteString.class)
                .keepAlive(Duration.ofSeconds(1), () -> keepAliveMessage);

        final Sink<IncomingConnection, CompletionStage<Done>> handler = Sink.foreach(conn -> {
            System.out.println("Client connected from: " + conn.remoteAddress());
            conn.handleWith(keepAliveInject.via(Flow.<ByteString>create()), actorSystem);
            // Is not handling a good away to avoid the incoming connection?
            // When I use netcat to connect to this server I see it saying connected and
            // then the connection goes away when a second connection is attempted.

        });

        Source<IncomingConnection, CompletionStage<ServerBinding>> source = Tcp.get(actorSystem).bind("127.0.0.1",
                8888).watchTermination((prevMatValue, completionStage) -> {

                    completionStage.whenComplete(

                            (done, exc) -> {
                                cc--;
                                if (done != null)
                                    System.out.println("The stream materialized " + prevMatValue.toString());
                                else
                                    System.out.println(exc.getMessage());
                            });
                    return prevMatValue;
                });;

        Flow<IncomingConnection, IncomingConnection, NotUsed> flow1 = Flow.of(IncomingConnection.class).map(conn -> {
            cc++;
            if (cc > maxConnectionCount) {
                throw new Exception("Max connections reached.");
            }
            System.out.println(conn);
            
            return conn;
        });
       
        CompletionStage<ServerBinding> bindingFuture = source.via(flow1).to(handler).run(actorSystem);

        bindingFuture.handle((binding, throwable) -> {
            if (binding != null) {
                System.out.println("Server started, listening on: " + binding.localAddress());

            } else {
                System.err.println("Server could not bind to  : " + throwable.getMessage());
                actorSystem.terminate();
            }
            return NotUsed.getInstance();
        });
    }

}

I have managed to work through this and solution can be found on SO.

How to prevent Akka TCP Stream of incoming connections from connecting after a configured max number of connections have connected?