Dynamically decide on whether to do Elasticsearch retries based on error messages

With akka-stream-alpakka-elasticsearch version 1.1.2 there was an option to implement RetryLogic which allowed us to override boolean shouldRetry(final int retries, final Seq<String> errors) method.

That gave us an option to dynamically decide based on an error message whether or not retry should be made for a particular document. For example if the error type is version_conflict_engine_exception we don’t want to retry while we want to retry for other errors.

Can something similar be achieved with 2.0.2 release? I was thinking about wrapping the flow with RetryFlow for which can define a decider function but not sure if that’s recommended for our use case.

Any advice is highly appreciated!

This is how the flow looks like:

Flow<Record<T, C>, Record<T, C>, NotUsed> sinkFlow = Flow.create();

            Flow<Record<T, C>, Record<T, C>, NotUsed> sink = sinkFlow
                .map(record -> {
                    ElasticsearchDocument doc = recordToDocument.apply(record);

                    WriteMessage<Object, Record<T, C>> message = recordToWriteMessage
                        .apply(record, doc)
                        .withPassThrough(record);

                    if (doc.indexName != null) {
                        message = message.withIndexName(doc.indexName);
                    } else {
                        message = message.withIndexName(configuration.defaultIndexName);
                    }

                    return message;
                })
                .via(ElasticsearchFlow.createWithPassThrough(
                    configuration.defaultIndexName,
                    configuration.indexType,
                    sinkSettings,
                    client,
                    JsonUtil.JSON_MAPPER))
                .map(result -> {
                    if (result.getError().isPresent()) {
                        LOG.warn(
                            "Error while publishing message to Elasticsearch: {}", result.getErrorReason());
                    }

                    return result.message().passThrough();
                });

Hi @pateem,

We reimplemented retrying in Alpakka Elasticsearch as the old implementation could lead to reordering of the replies. The new implementation relies on the RetryFlow internally.

The use is a bit intricate as it will retry parts of a batch and than sort the results before emitting them. If you want to look at more details than just success you can re-implement this part:

Hope this helps.

Cheers,
Enno.

many thanks @ennru, I will give it a try

So public api doesn’t have an option to do retries based on failure types? In case of bulk request, some may fail due to version conflict(as @pateem mentioned) and for example based on the program logic this should not be retried, but other types of failures(connection loss etc.) should be retried

@mbesida I agree that would be useful. That’s something we could add as an option to ElasticsearchWriteSettings. I added a GitHub issue to track it. Is it something you would be interested in working on?