R2DBC offset store cleaning up

Hi, I’m starting using R2DBC plugin with Postgres DB and after running some traffic I see that akka_projection_timestamp_offset_store table is getting bigger without cleaning up of the saved offsets. The running projection looks like this:

        settings = None,
          .eventsBySlices[TransactionCore.Event](system, readJournalPluginId = R2dbcReadJournal.Identifier, TransactionCore.EntityType, slice.min, slice.max),
      .withRestartBackoff(1.second, 3.seconds, 0.2)
      .withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)

It already has 129,880 rows. Is it a proper behavior or something is missing in the code?

So, after investigating the problem I have some details about it. It happens when there are small amount of persistence ids. In my case there is only one entity in slice 512-639 and it persisted a lot of events (about 2714522). In terms of the code I think it is related to the function of calculating size of the state in R2dbcOffsetStore:

def size: Int = byPid.size

byPid is Map[Pid, Record] and size of it is and amount of different persistence ids exist in the DB which is always 1 if you have only one persistence id.

As a workaround for now I set akka.projection.r2dbc.offset-store.keep-number-of-entries = 0 then entries from akka_projection_timestamp_offset_store are properly deleted.

Thanks for reporting and investigating. We’ll improve that.

1 Like