TLS SSL TCP Actor

Hello there

I am trying to implement ssl encryption with tcp server socket. I could achieve it using streams but I am not sure how to achieve it using Actors. For now I am using sslengine.unwrap on the Received.class object I received. Unfortunately I am unable to figure out how to read the decrypted text. Any help or advice will be appreciated.

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManagerFactory;

import org.apache.logging.log4j.*;
import akka.actor.*;
import akka.io.Tcp.*;
import akka.stream.TLSRole;
import akka.io.TcpMessage;
import akka.util.ByteString;
import scala.concurrent.duration.Duration;


public class TcpConnectionHandlerActor extends AbstractActor {

	private final static Logger log = LogManager.getLogger(TcpConnectionHandlerActor.class); 
	private final String clientIP;
	private ActorRef sender;
	private final SSLEngine engine;
	
	public TcpConnectionHandlerActor(String clientIP) {
		this.clientIP = clientIP;
		this.engine = createSSLEngine(getSSLContext(), TLSRole.server());
	}

	public static Props props(String clientIP) {
		return Props.create(TcpConnectionHandlerActor.class, clientIP);
	}
	
	@Override
	public void preStart() throws Exception {
	    log.trace("=============================START---OF---LOG================================");
		log.trace(getSelf().path().name()+" starting tcp-handler");
	}

	@Override
	public Receive createReceive() {
		return receiveBuilder()
				.match(Received.class, msg->{
					sender = getSender();
					
					ByteBuffer buff = msg.data().asByteBuffer();
                    ByteBuffer dest = ByteBuffer.allocate(2024);
                    try{
                    	engine.beginHandshake();
                    	ByteBuffer inb  = msg.data().toByteBuffer();
                    	ByteBuffer outb = ByteBuffer.allocate(
                    	            engine.getSession().getPacketBufferSize());
                    	inb.put(msg.data().toArray());
                    	//inb.flip();
                    	SSLEngineResult result = engine.unwrap(inb, outb);
                    	log.trace("Wrapped " + outb.position() + " octets ("
                    	                + result + ").");
                    	outb.flip();
                    	log.trace("Cyphertext: " + outb);
                    }catch(SSLException e) {
                    	log.error(e.getMessage());
                    	log.error(e.getCause());
                    	getContext().stop(getSelf());
                    }
                    String sslmessageX = ByteString.fromByteBuffer(dest).utf8String();
					log.info(getSelf().path().name()+" received-tcp: "+ sslmessageX);

				})
				
				.match(String.class, s->{
					if(s.contains("Error")){
						log.info(getSelf().path().name()+" sending out NACK to "+clientIP.toString()+" : "+s);  //if not ack then kill the ips actor
					
					}
					sender.tell(TcpMessage.write(ByteString.fromString(s)), getSelf());
				})
				.match(ReceiveTimeout.class, r -> {
					
					getContext().setReceiveTimeout(Duration.Undefined());
				})
				.match(ConnectionClosed.class, closed->{
					log.debug(getSelf().path().name()+" Server: Connection Closure "+closed);
					getContext().stop(getSelf());
				})
				.match(CommandFailed.class, conn->{
					log.fatal(getSelf().path().name()+" Server: Connection Failed "+conn);
					getContext().stop(getSelf());
				})
				.match(Terminated.class,s->{
				})
				.build();
	}

	@Override
	public void postStop() throws Exception {
		log.trace(getSelf().path().name()+" stopping tcp-handler");
		log.trace("=============================END---OF---LOG================================");
	}
	
	 private static SSLContext getSSLContext(){
		  SSLContext context;
	    try {
	      // Don't hardcode your password in actual code
	      char[] password = "abcd1234".toCharArray();

	      KeyStore keyStore = KeyStore.getInstance("PKCS12");
	      keyStore.load(new FileInputStream("config/ssl/mykeystore.p12"), password);

	      TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("SunX509");
	      trustManagerFactory.init(keyStore);

	      KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
	      keyManagerFactory.init(keyStore, password);

	      context = SSLContext.getInstance("TLSv1.2");
	      context.init(
	          keyManagerFactory.getKeyManagers(),
	          trustManagerFactory.getTrustManagers(),
	          new SecureRandom());

	    } catch (KeyStoreException
	        | IOException
	        | NoSuchAlgorithmException
	        | CertificateException
	        | UnrecoverableKeyException
	        | KeyManagementException e) {
	      throw new RuntimeException(e);
	    }
	    return context;
	  }

	  private static SSLEngine createSSLEngine(SSLContext context,TLSRole role) {
	    SSLEngine engine = context.createSSLEngine();
	    engine.setUseClientMode(role.equals(akka.stream.TLSRole.client()));
	    engine.setEnabledCipherSuites(new String[] {"TLS_DHE_RSA_WITH_AES_256_GCM_SHA384"});
	    engine.setEnabledProtocols(new String[] {"TLSv1.2"});
	    return engine;
	  }
	  
	  
}

Using SSLEngine directly is very complex and not recommended (and not really related to Akka itself). I’d recommend to use stream components and interface them with actors (if that’s really what is needed).

More information about stream-actor interop: Actors interop • Akka Documentation

Thanks for the quick response. Is this the right approach to achieve the same?

package com.pranavkapoorr;

import java.io.*;
import java.net.InetSocketAddress;
import java.security.*;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import javax.net.ssl.*;
import org.apache.logging.log4j.*;
import akka.Done;
import akka.actor.*;
import akka.stream.*;
import akka.stream.javadsl.*;
import akka.stream.javadsl.Tcp.*;
import akka.util.*;

public class TcpServer {
	private static Logger log = LogManager.getLogger(TcpServer.class);
	public static void main(String[] args) {
		
		server(ActorSystem.create(), new InetSocketAddress("0.0.0.0", 40001));
	}
	private static SSLContext getSSLContext(){
		  SSLContext context;
	    try {
	      char[] password = "abc123".toCharArray();

	      KeyStore keyStore = KeyStore.getInstance("PKCS12");
	      keyStore.load(new FileInputStream("config/ssl/mystore.p12"), password);

	      TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("SunX509");
	      trustManagerFactory.init(keyStore);

	      KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
	      keyManagerFactory.init(keyStore, password);

	      context = SSLContext.getInstance("TLSv1.2");
	      context.init(
	          keyManagerFactory.getKeyManagers(),
	          trustManagerFactory.getTrustManagers(),
	          new SecureRandom());

	    } catch (KeyStoreException
	        | IOException
	        | NoSuchAlgorithmException
	        | CertificateException
	        | UnrecoverableKeyException
	        | KeyManagementException e) {
	      throw new RuntimeException(e);
	    }
	    return context;
	  }

	  public static SSLEngine createSSLEngine(SSLContext context,TLSRole role) {
	    SSLEngine engine = context.createSSLEngine();
	    engine.setUseClientMode(role.equals(akka.stream.TLSRole.client()));
	    engine.setEnabledCipherSuites(new String[] {"TLS_DHE_RSA_WITH_AES_256_GCM_SHA384"});
	    engine.setEnabledProtocols(new String[] {"TLSv1.2"});
	    return engine;
	  }
	  
	public static void server(ActorSystem system, InetSocketAddress serverAddress) {
	    final ActorMaterializer materializer = ActorMaterializer.create(system);
	 	SSLContext context = getSSLContext();
	    SSLEngine engine = createSSLEngine(context, TLSRole.server());
	 	final Source <IncomingConnection,CompletionStage<ServerBinding>> connections =
	 		      Tcp.get(system)
	 		      .bindTls(serverAddress.getHostString(),
	 		    		  serverAddress.getPort(),
	 		    		  context,
	 		    		  TLSProtocol.negotiateNewSession()
	 		    		  .withProtocols(engine.getSupportedProtocols())
	 		    		  .withCipherSuites(engine.getEnabledCipherSuites())
	 		    		  .withParameters(engine.getSSLParameters())
	 		    		  .withClientAuth(TLSClientAuth.need())
	 		       );
	 	
	 	connections
	 		.log("listening at : "+serverAddress.toString(),system.log())
	 		.runForeach((IncomingConnection con) ->{
	 			system.log().info("connection -> "+ con.remoteAddress());
	 			ActorRef tcpHandler = system.actorOf(TcpHandler.props(),"handler:");
	 			final Flow<ByteString, ByteString, CompletionStage<Done>> out =
	 					Flow.of(ByteString.class)
	 					.ask(tcpHandler, ByteString.class, Timeout.create(Duration.ofSeconds(10)))
	 					.watchTermination(Keep.right());
	 			
	 			
		      con.handleWith(out, materializer).whenComplete((Success,Exception)->{
		    	  system.stop(tcpHandler);
		      });
	 	}, materializer);
	    
	}
	
	public static class TcpHandler extends AbstractActor{
		private ActorRef sender;
		public static Props props() {
			return Props.create(TcpHandler.class);
		}
		@Override
		public Receive createReceive() {
			return receiveBuilder()
					.match(ByteString.class, in -> {
						sender = getSender();
						log.info("in handler: "+in.utf8String());
						sender.tell(ByteString.fromString(in.utf8String()+" ack"), sender);
					})
					.match(String.class, s -> log.info("in handler :"+s))
					.build();
		}
		@Override
		public void preStart() throws Exception {
			log.info("Starting handler");
			super.preStart();
		}
		@Override
		public void postStop() throws Exception {
			log.info("stopping handler");
			super.postStop();
		}
	}
}

I just realised that .ask only returns one message to tcp from actor. Is there a way to publish more than one message to stream from actor in response to one request message from stream? Many thanks in advance

@jrudolph any suggestions please?

The new stream operator for .ask allows you to add parallelism. For example, you’re asking entities in an Akka Cluster. The old way was to wrap the ask in a .mapAsync, which I still prefer since you may want to do some transformation of the response before returning it to the stream’s flow.

Thanks everyone. For now the following flow works for me to receive messages in actor and replying back to stream through dest actor in the receiving actor. Any advices will be appreciated for betterment.

private Flow<ByteString, ByteString, NotUsed> createTcpSocketFlow() {
		ActorRef actor = getContext().actorOf(SSLTCPConnectionHandlerWorker.props());

		Source<ByteString, NotUsed> source = Source.<ByteString>actorRef(Integer.MAX_VALUE, OverflowStrategy.fail())
				.map((outgoing) -> outgoing)
				.<NotUsed>mapMaterializedValue(destinationRef -> {
					actor.tell(new OutgoingDestination(destinationRef), ActorRef.noSender());
					return NotUsed.getInstance();
				});

		Sink<ByteString, NotUsed> sink = Flow.<ByteString>create()
				.map((incoming) -> incoming)
				.to(Sink.actorRef(actor, PoisonPill.getInstance()));
		return Flow.fromSinkAndSource(sink, source);
	}