Akka Persistence Typed 2.6.4 - Event Sourcing - Couchbase

Hi,

I am using the latest version of Akka i.e. 2.6.4, also using couchbase in my application and the same want to use for persisting event while using Akka persistence - event sourcing.
For attaching Akka Persistence Couchbase plugin, it seems only compatible with Akka-2.5.x version as mentioned here…
https://doc.akka.io/docs/akka-persistence-couchbase/current/overview.html#project-info

Kindly let us knew how we can use this plugin with Akka - 2.6.4.

Earlier was in delima whether I can use couchbase with Akka - 2.6.4 but as mentioned by @patriknw…I can
Please suggest the documentation that states the use of couchbase as Journal plugin with Event Sourcing

As mentioned in the akka docs:

This plugin depends on Akka 2.5.x and note that it is important that all akka-* dependencies are in the same version, so it is recommended to depend on them explicitly to avoid problems with transient dependencies causing an unlucky mix of versions.

Can anyone guide, how I can use in Akka - 2.6.4

Define the Akka dependency explicitly in your build, such as:

<properties>
  <akka.version>2.6.4</akka.version>
</properties>
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-persistence-couchbase_2.12</artifactId>
  <version>1.0</version>
</dependency>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-persistence_2.12</artifactId>
  <version>${akka.version}</version>
</dependency>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-persistence-query_2.12</artifactId>
  <version>${akka.version}</version>
</dependency>

Thanks Patrik

Just want to update that I am using typed version so added as

com.typesafe.akka
akka-persistence-typed_2.12
${akka.version}

Also when added dependency as

com.lightbend.akka
akka-persistence-couchbase_2.12
1.0

it internally have akka-persistence-query_2.12:2.5.21

Does it must require to add explicitly akka-persistence-query dependency with akka as 2.6.4

Kindly share your valuable views on the same

Thanks

1 Like

akka-persistence-typed_2.12 is all good.
akka-persistence-query_2.12 is probably also needed, otherwise it will end up with a mix of 2.5.21 and 2.6.4 versions and that is not supported. There is a runtime check for that all Akka versions are the same (2.6.4 in this case).

1 Like

Thanks Patrik for your valuable feedback.

One more query I have

Inside the dependency akka-persistence-couchbase_2.12:1.0, I can see couchbase java client as 2.7.2 and the latest client is 3.0.x, can it be compatible with that as it seems quite fast in comparison to 2.x versions…?
I tried making it but throwing class not found exception for com.couchbase.client.java.query.N1qlQuery as this class not present in 3.0.3(latest) couchbase client version.

Please suggest that how I can use the 3.0.* versions with couchbase pluginns

Thanks a lot in advance

I can see couchbase java client as 2.7.2 and the latest client is 3.0.x, can it be compatible with that as it seems quite fast in comparison to 2.x versions…?

That’s not possible at the moment since it’s not compatible.

1 Like

Thanks a lot @patriknw for clarifying the doubts

Hi @patriknw,

I had integrated couchbase journal plugin with my application and able to persist and recover the persistent entities in failover-failback scenarios of the seed-nodes, but was facing some issue as listed below:

  1. Whenever application will boot up, and I am trying to insert some data so that respective event should also persist, but getting Circuit Breaker Timed Out Exception at the first time on each nodes, after that all the other request for insertion are successfully processing.The exception which I am getting is mentioned below:

[2020-04-29 12:21:59,825] [ERROR] [akka://AkkaSystem@10.201.8.22:2553] [com.rsys.cpaas.sharding.AccountSvcES] [AkkaSystem-akka.actor.default-dispatcher-11] [akka://AkkaSystem/system/sharding/id/51/AC%3A1] - Supervisor StopSupervisor saw failure: Exception during recovery. Last known sequence number [0]. PersistenceId [id|AC:1], due to: Exception during recovery. Last known sequence number [0]. PersistenceId [id|AC:1], due to: Circuit Breaker Timed out.
akka.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [0]. PersistenceId [id|AC:1], due to: Exception during recovery. Last known sequence number [0]. PersistenceId [id|AC:1], due to: Circuit Breaker Timed out.

  •    at akka.persistence.typed.internal.ReplayingEvents.onRecoveryFailure(ReplayingEvents.scala:221)*
    
  •    at akka.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:153)*
    

System is trying for recovery but nothing is present as its coming on the initial request.

  1. Whenever failback on the seednode happened, and re-balancing of the shards will happen, again seeing the same Circuit Breaker Timed Out Exception as mentioned above for any random records but after some time able to fetch everything.

These issue are happening after integrating with Couchbase, earlier was using Cassandra Journal Plugin and application was working seamlessly.

Kindly have a look at the same and share your valuable feedback and if possible, suggest the workaround for the same.

Any other logging that shows the root cause of the timeout?

1 Like

Below are the detailed error/exception logs while inserting data for the first time :

[2020-04-29 16:18:46,252] [INFO] [] [com.couchbase.client.core.node.Node] [cb-io-1-1] [] - Connected to Node 172.27.6.70/172.27.6.70
[2020-04-29 16:18:47,551] [ERROR] [akka://AkkaSystem@10.201.8.22:2553] [com.rsys.cpaas.sharding.AccountSvcES] [AkkaSystem-akka.actor.default-dispatcher-3] [akka://AkkaSystem/system/sharding/id/49/AC%3A1] - Supervisor StopSupervisor saw failure: Exception during recovery. Last known sequence number [0]. PersistenceId [id|AC:1], due to: Exception during recovery. Last known sequence number [0]. PersistenceId [id|AC:1], due to: Circuit Breaker Timed out.
akka.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [0]. PersistenceId [id|AC:1], due to: Exception during recovery. Last known sequence number [0]. PersistenceId [id|AC:1], due to: Circuit Breaker Timed out.
at akka.persistence.typed.internal.ReplayingEvents.onRecoveryFailure(ReplayingEvents.scala:221)
at akka.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:153)
at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:90)
at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:65)
at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:83)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
at akka.persistence.typed.internal.EventSourcedBehaviorImpl$$anon$1.aroundReceive(EventSourcedBehaviorImpl.scala:157)
at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:83)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:123)
at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:83)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:123)
at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:83)
at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:125)
at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:105)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
at akka.actor.ActorCell.invoke(ActorCell.scala:543)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
at akka.dispatch.Mailbox.run(Mailbox.scala:230)
at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: akka.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [0]. PersistenceId [id|AC:1], due to: Circuit Breaker Timed out.
at akka.persistence.typed.internal.ReplayingEvents.onRecoveryFailure(ReplayingEvents.scala:221)
at akka.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:143)
… 35 common frames omitted
Caused by: akka.pattern.CircuitBreaker$$anon$13: Circuit Breaker Timed out.
[2020-04-29 16:18:54,338] [INFO] [] [com.couchbase.client.core.config.ConfigurationProvider] [cb-computations-4] [] - Selected network configuration: default
[2020-04-29 16:18:54,351] [INFO] [] [com.couchbase.client.core.config.ConfigurationProvider] [cb-computations-4] [] - Opened bucket akka
[2020-04-29 16:18:56,881] [INFO] [akka://AkkaSystem@10.201.8.22:2553] [akka.actor.ActorSystemImpl] [AkkaSystem-akka.actor.default-dispatcher-3] [akka.actor.ActorSystemImpl(AkkaSystem)] - Request timeout encountered for request [POST /accounts Strict(182 bytes)

After this if again I re-initiate the request for same entity, it will be success

When I re-initiate the POST request for the same entity than get the below logs and entity persisted successfully.

*[2020-04-29 16:22:42,413] [WARN] [] [com.couchbase.client.core.env.CoreEnvironment] [AkkaSystem-akka.actor.default-dispatcher-23] [] - More than 1 Couchbase Environments found (2), this can have severe impact on performance and stability. Reuse environments!
[2020-04-29 16:22:42,417] [INFO] [] [com.couchbase.client.core.CouchbaseCore] [AkkaSystem-akka.actor.default-dispatcher-23] [] - CouchbaseEnvironment: {sslEnabled=false, sslKeystoreFile=‘null’, sslTruststoreFile=‘null’, sslKeystorePassword=false, sslTruststorePassword=false, sslKeystore=null, sslTruststore=null, bootstrapHttpEnabled=true, bootstrapCarrierEnabled=true, bootstrapHttpDirectPort=8091, bootstrapHttpSslPort=18091, bootstrapCarrierDirectPort=11210, bootstrapCarrierSslPort=11207, ioPoolSize=4, computationPoolSize=4, responseBufferSize=16384, requestBufferSize=16384, kvServiceEndpoints=1, viewServiceEndpoints=12, queryServiceEndpoints=12, searchServiceEndpoints=12, configPollInterval=2500, configPollFloorInterval=50, networkResolution=NetworkResolution{name=‘auto’}, ioPool=NioEventLoopGroup, kvIoPool=null, viewIoPool=null, searchIoPool=null, queryIoPool=null, analyticsIoPool=null, coreScheduler=CoreScheduler, memcachedHashingStrategy=DefaultMemcachedHashingStrategy, eventBus=DefaultEventBus, packageNameAndVersion=couchbase-java-client/2.7.2 (git: 2.7.2, core: 1.7.2), retryStrategy=BestEffort, maxRequestLifetime=75000, retryDelay=ExponentialDelay{growBy 1.0 MICROSECONDS, powers of 2; lower=100, upper=100000}, reconnectDelay=ExponentialDelay{growBy 1.0 MILLISECONDS, powers of 2; lower=32, upper=4096}, observeIntervalDelay=ExponentialDelay{growBy 1.0 MICROSECONDS, powers of 2; lower=10, upper=100000}, keepAliveInterval=30000, continuousKeepAliveEnabled=true, keepAliveErrorThreshold=4, keepAliveTimeout=2500, autoreleaseAfter=2000, bufferPoolingEnabled=true, tcpNodelayEnabled=true, mutationTokensEnabled=false, socketConnectTimeout=1000, callbacksOnIoPool=false, disconnectTimeout=25000, requestBufferWaitStrategy=com.couchbase.client.core.env.DefaultCoreEnvironment$4@3fb6a363, certAuthEnabled=false, coreSendHook=null, forceSaslPlain=false, compressionMinRatio=0.83, compressionMinSize=32, compressionEnabled=true, operationTracingEnabled=true, operationTracingServerDurationEnabled=true, tracer=ThresholdLogTracer, orphanResponseReportingEnabled=true, orphanResponseReporter=DefaultOrphanResponseReporter, keyValueServiceConfig=KeyValueServiceConfig{minEndpoints=1, maxEndpoints=1, pipelined=true, idleTime=0}, queryServiceConfig=QueryServiceConfig{minEndpoints=0, maxEndpoints=12, pipelined=false, idleTime=300}, searchServiceConfig=SearchServiceConfig{minEndpoints=0, maxEndpoints=12, pipelined=false, idleTime=300}, viewServiceConfig=ViewServiceConfig{minEndpoints=0, maxEndpoints=12, pipelined=false, idleTime=300}, analyticsServiceConfig=AnalyticsServiceConfig{minEndpoints=0, maxEndpoints=12, pipelined=false, idleTime=300}, queryTimeout=75000, viewTimeout=75000, searchTimeout=75000, analyticsTimeout=75000, kvTimeout=2500, connectTimeout=60000, dnsSrvEnabled=false, propagateParentSpan=true}
[2020-04-29 16:22:50,478] [INFO] [] [com.couchbase.client.core.node.Node] [cb-io-4-1] [] - Connected to Node 172.27.6.70/172.27.6.70
[2020-04-29 16:22:58,502] [INFO] [] [com.couchbase.client.core.config.ConfigurationProvider] [cb-computations-8] [] - Selected network configuration: default
[2020-04-29 16:22:58,504] [INFO] [] [com.couchbase.client.core.config.ConfigurationProvider] [cb-computations-8] [] - Opened bucket vgoel

I don’t know. Maybe the first initialization just takes longer than the default circuit breaker timeout?

1 Like

I was expecting the same the timeout configuration issue
Can you please suggest where we can configure this time out i mean how we can configure Circuit Breaker Configurations( may be inside application.conf file), some sample configs which I can refer to than it will be of great help

I am able to run without the above mentioned error by finding a way to increase the circuit breaker timeout by putting below mentioned configs inside application.conf file.

*akka.persistence.journal-plugin-fallback {

  class = ""
  plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
  replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"
  max-message-batch-size = 200
  recovery-event-timeout = 30s
  circuit-breaker {
    max-failures = 10
    call-timeout = 60s
    reset-timeout = 60s
  }
  replay-filter {
    mode = repair-by-discard-old
    window-size = 100
    max-old-writers = 10
    debug = off
  }
}*

The RCA for the cause is
We require to communicate couchbase for two different buckets – one for persisting entities and one for saving data respectively, so its taking more time to create a connection at the first time, after creating connection system will use the so will take minimal time.

Thanks @patriknw for you valuable advice of considering the circuit-breaker time out effectively.