With akka-stream-alpakka-elasticsearch version 1.1.2 there was an option to implement RetryLogic
which allowed us to override boolean shouldRetry(final int retries, final Seq<String> errors)
method.
That gave us an option to dynamically decide based on an error message whether or not retry should be made for a particular document. For example if the error type is version_conflict_engine_exception we don’t want to retry while we want to retry for other errors.
Can something similar be achieved with 2.0.2 release? I was thinking about wrapping the flow with RetryFlow
for which can define a decider function but not sure if that’s recommended for our use case.
Any advice is highly appreciated!
This is how the flow looks like:
Flow<Record<T, C>, Record<T, C>, NotUsed> sinkFlow = Flow.create();
Flow<Record<T, C>, Record<T, C>, NotUsed> sink = sinkFlow
.map(record -> {
ElasticsearchDocument doc = recordToDocument.apply(record);
WriteMessage<Object, Record<T, C>> message = recordToWriteMessage
.apply(record, doc)
.withPassThrough(record);
if (doc.indexName != null) {
message = message.withIndexName(doc.indexName);
} else {
message = message.withIndexName(configuration.defaultIndexName);
}
return message;
})
.via(ElasticsearchFlow.createWithPassThrough(
configuration.defaultIndexName,
configuration.indexType,
sinkSettings,
client,
JsonUtil.JSON_MAPPER))
.map(result -> {
if (result.getError().isPresent()) {
LOG.warn(
"Error while publishing message to Elasticsearch: {}", result.getErrorReason());
}
return result.message().passThrough();
});