Streaming data over multiple Services not working

Hello all

One of my services is consume a security service which relays some calls to other services depending on the operation requested.

I will try to include all the code involved in the path because I am not yet experienced enough to rule anything out as “this is definitely not it” :slight_smile:. Also keep in mind, this is mostly POC work to get used to the framework, so maybe some things are unnecessarily complicated.

One of the operations this security service is offering is like this:
ServiceCall<Source<ByteString, ?>, SignatureResponse> sign(String keyName);

The implementation is very simple, it simply relays it to a software signature service:

    @Override
    public ServiceCall<Source<ByteString, ?>, SignatureResponse> sign(String keyName) {
        return req -> softwareSignatureService.sign(keyName).invoke(req);
    }

The software signature service in return is forwarding it to a class in its module:

    @Override
    public ServiceCall<Source<ByteString, ?>, SignatureResponse> sign(String keyName) {
        return req -> signatureService.sign(keyName, req);
    }

The implementation of the signature operation:

    public CompletionStage<SignatureResponse> sign(String keyName, Source<ByteString, ?> inputSource) {
        logger.info("Got SIGN request for key {}", keyName);
        Signature signature;
        try {
            PrivateKey key = keyRepository.getKeyByName(keyName);
            signature = Signature.getInstance("SHA256withDSA");
            signature.initSign(key, RANDOM);
        } catch (GeneralSecurityException ex) {
            return completedFuture(new SignatureResponse(false, ex.getLocalizedMessage(), null));
        }

        return inputSource
                .map(ByteString::asByteBuffer)
                .map(ByteBuffer::array)
                .runForeach(signature::update, materializer)
                .thenApply(done -> {
                    byte[] digest;
                    try {
                        digest = signature.sign();
                    } catch (GeneralSecurityException ex) {
                        return new SignatureResponse(false, ex.getLocalizedMessage(), null);
                    }

                    logger.debug("Finished signature: {} bytes", digest == null ? "null" : digest.length);
                    return new SignatureResponse(true, null, Base64.getEncoder().encodeToString(digest));
                });
    }

The initial call to the security service with the sign operation is built from an Alpakka initiated stream:

    private CompletionStage<Pair<FileCopyItemState, SignatureResponse>> signFile(FileCopyItemState item) {
        logger.info("Calling signature service for file {}", item.getOutputPath());
        return securityService.sign("mykey").invoke(FileIO.fromPath(item.getOutputPath()))
                .thenApply(response -> new Pair<>(item, response));
    }

For a simple file that is resulting in an issue:

2018-10-31T14:33:53.643Z [error] security-service [] - Exception in PathCallId{pathPattern='/sign/:keyName'}
io.netty.handler.codec.CorruptedFrameException: control frame with payload length > 125 octets

At first I assumed it has something to do with the size of the files, however the ‘control frame’ seems to indicate, that this is not related to the data size but an internal protocol issue in the websockets.

What could be the cause for this?

BR
Yanick

When changing my code to not transfer ByteString but String I get a different kind of Exception.

Code is now:

        return securityService.sign("mykey").invoke(
                FileIO.fromPath(item.getOutputPath()).map(bs -> bs.decodeString(StandardCharsets.ISO_8859_1))
        ).thenApply(response -> new Pair<>(item, response));

and the consuming part:

     return inputSource
                .map(s -> s.getBytes(StandardCharsets.ISO_8859_1))
                .runForeach(signature::update, materializer)
                .thenApply(done -> {
                    byte[] digest;
                    try {
                        digest = signature.sign();
                    } catch (GeneralSecurityException ex) {
                        return new SignatureResponse(false, ex.getLocalizedMessage(), null);
                    }

                    logger.debug("Finished signature: {} bytes", digest == null ? "null" : digest.length);
                    return new SignatureResponse(true, null, Base64.getEncoder().encodeToString(digest));
                });

Now the exception I can find is:

java.util.concurrent.CompletionException: com.lightbend.lagom.javadsl.api.deser.DeserializationException: No content to map due to end-of-input
 at [Source: (akka.util.ByteIterator$ByteArrayIterator$$anon$1); line: 1, column: 0] (TransportErrorCode{http=400, webSocket=1003, description='Unsupported Data/Bad Request'})
        at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at scala.concurrent.java8.FuturesConvertersImpl$CF.apply(FutureConvertersImpl.scala:21)
        at scala.concurrent.java8.FuturesConvertersImpl$CF.apply(FutureConvertersImpl.scala:18)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at scala.concurrent.BatchingExecutor$Batch.processBatch$1(BatchingExecutor.scala:63)
        at scala.concurrent.BatchingExecutor$Batch.$anonfun$run$1(BatchingExecutor.scala:78)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
        at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:55)
        at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
        at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:106)
        at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
        at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
        at scala.concurrent.Promise.tryFailure(Promise.scala:108)
        at scala.concurrent.Promise.tryFailure$(Promise.scala:108)
        at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:183)
        at akka.pattern.CircuitBreaker$State.$anonfun$callThrough$4(CircuitBreaker.scala:760)
        at akka.pattern.CircuitBreaker$State.$anonfun$callThrough$4$adapted(CircuitBreaker.scala:755)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:73)
        at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:77)
        at akka.dispatch.BatchingExecutor.execute(BatchingExecutor.scala:120)
        at akka.dispatch.BatchingExecutor.execute$(BatchingExecutor.scala:114)
        at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:76)
        at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
        at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
        at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
        at scala.concurrent.Promise.complete(Promise.scala:49)
        at scala.concurrent.Promise.complete$(Promise.scala:48)
        at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:183)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
        at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
        at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
        at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.lightbend.lagom.javadsl.api.deser.DeserializationException: No content to map due to end-of-input
 at [Source: (akka.util.ByteIterator$ByteArrayIterator$$anon$1); line: 1, column: 0]
        at com.lightbend.lagom.javadsl.jackson.JacksonSerializerFactory$JacksonMessageSerializer$JacksonDeserializer.deserialize(JacksonSerializerFactory.java:135)
        at com.lightbend.lagom.javadsl.jackson.JacksonSerializerFactory$JacksonMessageSerializer$JacksonDeserializer.deserialize(JacksonSerializerFactory.java:118)
        at com.lightbend.lagom.internal.javadsl.client.JavadslServiceApiBridge.negotiatedDeserializerDeserialize(JavadslServiceApiBridge.scala:99)
        at com.lightbend.lagom.internal.javadsl.client.JavadslServiceApiBridge.negotiatedDeserializerDeserialize$(JavadslServiceApiBridge.scala:99)
        at com.lightbend.lagom.internal.javadsl.client.JavadslClientServiceCallInvoker.negotiatedDeserializerDeserialize(JavadslServiceClientImplementor.scala:134)
        at com.lightbend.lagom.internal.javadsl.client.JavadslClientServiceCallInvoker.negotiatedDeserializerDeserialize(JavadslServiceClientImplementor.scala:134)
        at com.lightbend.lagom.internal.client.ClientServiceCallInvoker.$anonfun$makeStreamedRequestCall$3(ClientServiceCallInvoker.scala:135)
        at scala.util.Success.$anonfun$map$1(Try.scala:251)
        at scala.util.Success.map(Try.scala:209)
        at scala.concurrent.Future.$anonfun$map$1(Future.scala:288)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
        ... 13 common frames omitted
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
 at [Source: (akka.util.ByteIterator$ByteArrayIterator$$anon$1); line: 1, column: 0]
        at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
        at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1355)
        at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:358)
        at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1596)
        at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1188)
        at com.lightbend.lagom.javadsl.jackson.JacksonSerializerFactory$JacksonMessageSerializer$JacksonDeserializer.deserialize(JacksonSerializerFactory.java:133)
        ... 23 common frames omitted

Ok, some more progress (it is not really related to the title anymore to be honest).

Lets quickly outline my service landscape:

Service A, Service B, Service C

A creates a source of bytes (Source<String, ?>) which it sends to B. B directly forwards them to C for this operation. C performs some operations until the source is depleted and then returns a single response which should be passed through B to A.

The call from A to B (in A):

        return B.sign("mykey")
                .invoke(FileIO.fromPath(item.getOutputPath()).map(bs -> bs.decodeString(StandardCharsets.ISO_8859_1)))
                .thenApply(response -> new Pair<>(item, response));

The call from B to C (in B):

    @Override
    public ServiceCall<Source<String, ?>, SignatureResponse> sign(String keyName) {
        return req -> C.sign(keyName).invoke(req);
    }

The handler in C:

    @Override
    public ServiceCall<Source<String, ?>, SignatureResponse> sign(String keyName) {
        return req -> completedFuture(new SignatureResponse(false, "ABCD", null));
    }

This results in the exception (same as in previous post):

java.util.concurrent.CompletionException: com.lightbend.lagom.javadsl.api.deser.DeserializationException: No content to map due to end-of-input
 at [Source: (akka.util.ByteIterator$ByteArrayIterator$$anon$1); line: 1, column: 0] (TransportErrorCode{http=400, webSocket=1003, description='Unsupported Data/Bad Request'})

Now if we remove the relay from B to C and return the value directly in B things work.

So changing B to look like this (in B):

    @Override
    public ServiceCall<Source<String, ?>, SignatureResponse> sign(String keyName) {
        return req -> completedFuture(new SignatureResponse(false, "ABCD", null));
    }

Works correctly. It seems to me that passing Sources through multiple services seems to cause problems. Maybe I will create an issue for that.