JVM crash because of native LMDB error

Hi

I’m using Akka sharding (v2.5.19) in way to distribute two type of entities accross the cluster nodes.
I’ve configured the remember entities property to true.
Each shard region is started with a role corresponding to the hosted entity type.
When a cluster node is started with all roles (which means that the node will host all entity type), and there are
several shard region updates (because of several entities start for example), the node crashes because of LMDB native library error.
This error is not always the same, but it denotes an LMDB corruption. The most frequent error is the following :

mdb.c:2433: Assertion ‘mp->mp_pgno != pgno’ failed in mdb_page_touch()

I have not this behaviour when I configure one role per node.
I guess this is due to concurrent access to LMDB.
I have patched the LmdbDurableStore in my project in way to remove the EnvFlags.MDB_NOLOCK on LMDB env creation.
It seams to solve my problem but I don’t understand why because the LmdbDurableStore is an actor and there would not
be concurrent access.

I use following classes for my tests :

The entity actor :

package net.awl.scc.contact.data.monitoring.bench.lmdb

import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.cluster.sharding.ShardRegion.Passivate
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import net.awl.scc.contact.data.monitoring.bench.lmdb.TestActor.{Init, Stop}

class TestActor extends Actor with ActorLogging {

  override def preStart(): Unit = log.info("Starting...")

  override def postStop(): Unit = log.info("Stopped.")

  override def receive: Receive = {
    case Init(id) => log.info("Received Init({}) message.", id)
    case m@Stop(id) =>
      log.info("Received Stop({}) message.", id)
      context.become(stoppingBehaviour)
      context.parent ! Passivate(m)
  }

  private def stoppingBehaviour: Receive = {
    case Stop(_) => context.stop(self)
  }

}

object TestActor {

  case class Init(id: String)

  case class Stop(id: String)

  def props: Props = Props(new TestActor)

  def createShardRegion(typeName: String)(implicit system: ActorSystem): ActorRef = {
    val nbShards = 10

    val extractEntityId: ShardRegion.ExtractEntityId = {
      case msg@Init(id) => (id, msg)
      case msg@Stop(id) => (id, msg)
    }

    val shardIdFromEntityId = (msg: Any) => (Math.abs(extractEntityId(msg)._1.hashCode) % nbShards).toString

    val extractShardId: ShardRegion.ExtractShardId = {
      case msg: Init => shardIdFromEntityId(msg)
      case msg: Stop => shardIdFromEntityId(msg)
      case ShardRegion.StartEntity(id) => (Math.abs(id.hashCode) % nbShards).toString
    }

    ClusterSharding(system).start(
      typeName = typeName,
      entityProps = props,
      settings = ClusterShardingSettings(system).withRole(typeName),
      extractEntityId = extractEntityId,
      extractShardId = extractShardId
    )
  }
}

The main object :

package net.awl.scc.contact.data.monitoring.bench.lmdb

import java.util.UUID
import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

object LmdbTest extends App {

  val config = ConfigFactory.load()
  implicit val system: ActorSystem = ActorSystem("LMDB-test", config)

  val shardRegionA = TestActor.createShardRegion("TypeA")

  val shardRegionB = TestActor.createShardRegion("TypeB")

  args.find(arg => arg == "initActors").foreach(_ => {
    TimeUnit.SECONDS.sleep(10)

    println("Initialize actors ...")
    for (_ <- 0 until 1000) {
      shardRegionA ! TestActor.Init(UUID.randomUUID().toString)
      shardRegionB ! TestActor.Init(UUID.randomUUID().toString)
    }

    TimeUnit.SECONDS.sleep(5)

    println("Stopping actors ...")
    for (_ <- 0 until 1000) {
      shardRegionA ! TestActor.Stop(UUID.randomUUID().toString)
      shardRegionB ! TestActor.Stop(UUID.randomUUID().toString)
    }
  })
}

And finally the configuration file :


akka {

  actor {
    provider = "cluster"

    allow-java-serialization = on
  }

  remote {
    log-remote-lifecycle-events = info

    artery {
      enabled = on
      canonical.hostname = "127.0.0.1"
      canonical.port = 62500
    }
  }

  cluster {
    seed-nodes = ["akka://LMDB-test@127.0.0.1:62500"]

    # The roles of this member. List of strings, e.g. roles = ["A", "B"].
    # The roles are part of the membership information and can be used by
    # routers or other services to distribute work to certain member types,
    # e.g. front-end and back-end nodes.
    # Roles are not allowed to start with "dc-" as that is reserved for the
    # special role assigned from the data-center a node belongs to (see the
    # multi-data-center section below)
    roles = ["TypeA", "TypeB"]

    # Enable or disable JMX MBeans for management of the cluster
    jmx.enabled = on

    # How often the current internal stats should be published.
    # A value of 0s can be used to always publish the stats, when it happens.
    # Disable with "off".
    publish-stats-interval = off

    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    #
    # auto-down-unreachable-after = 10s

    # Settings for the DistributedPubSub extension
    pub-sub {
      # The routing logic to use for 'Send'
      # Possible values: random, round-robin, broadcast
      routing-logic = broadcast
    }

    distributed-data {
      durable {
        lmdb {
          # Directory of LMDB file. There are two options:
          # 1. A relative or absolute path to a directory that ends with 'ddata'
          #    the full name of the directory will contain name of the ActorSystem
          #    and its remote port.
          # 2. Otherwise the path is used as is, as a relative or absolute path to
          #    a directory.
          #
          # When running in production you may want to configure this to a specific
          # path (alt 2), since the default directory contains the remote port of the
          # actor system to make the name unique. If using a dynamically assigned
          # port (0) it will be different each time and the previously stored data
          # will not be loaded.
          dir = "target/ddata-lmdb-test-node-1"

          # Size in bytes of the memory mapped file.
          map-size = 10 MiB

          # Accumulate changes before storing improves performance with the
          # risk of losing the last writes if the JVM crashes.
          # The interval is by default set to 'off' to write each update immediately.
          # Enabling write behind by specifying a duration, e.g. 200ms, is especially
          # efficient when performing many writes to the same key, because it is only
          # the last value for each key that will be serialized and stored.
          # write-behind-interval = 200 ms
          write-behind-interval = off
        }
      }
    }

    sharding {

      # Setting for the default shard allocation strategy
      least-shard-allocation-strategy {
        # Threshold of how large the difference between most and least number of
        # allocated shards must be to begin the rebalancing.
        rebalance-threshold = 10

        # The number of ongoing rebalancing processes is limited to this number.
        max-simultaneous-rebalance = 3
      }

      # When this is set to 'on' the active entity actors will automatically be restarted
      # upon Shard restart. i.e. if the Shard is started on a different ShardRegion
      # due to rebalance or crash.
      remember-entities = on

      # Settings for the Distributed Data replicator.
      # Same layout as akka.cluster.distributed-data.
      # The "role" of the distributed-data configuration is not used. The distributed-data
      # role will be the same as "akka.cluster.sharding.role".
      # Note that there is one Replicator per role and it's not possible
      # to have different distributed-data settings for different sharding entity types.
      # Only used when state-store-mode=ddata
      distributed-data = ${akka.cluster.distributed-data}
      distributed-data {
        # minCap parameter to MajorityWrite and MajorityRead consistency level.
        majority-min-cap = 5
        durable.keys = ["shard-*"]

        # When using many entities with "remember entities" the Gossip message
        # can become to large if including to many in same message. Limit to
        # the same number as the number of ORSet per shard.
        max-delta-elements = 5

      }
    }

    #
    # Dedicated dispatcher for cluster internal actors in way to avoid disturbance from other actors.
    #
    cluster-dispatcher {
      type = "Dispatcher"
      executor = "fork-join-executor"
      fork-join-executor {
        parallelism-min = 2
        parallelism-max = 4
      }
    }
    use-dispatcher = akka.cluster.cluster-dispatcher

  }

  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "DEBUG"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
}

I update the cluster port and the distributed-data.durable.lmdb.dir for the other nodes.
I have made my tests on windows 7 and linux CentOs 6 hosts.

Thank you in advance for the explanations you can give me about this behavior that I do not understand.

Regards