ProducerRecord has possibility to pass Kafka headers:
val kafkaHeaders: java.lang.Iterable[Header] = message.headers.map {
case (headerKey, headerValue) =>
new RecordHeader(headerKey, headerValue.getBytes(StandardCharsets.UTF_8)): Header
}.asJava
ProducerMessage.single(
new ProducerRecord[KeyType, ValueType](
topicName,
message.partition, // if you don't want to specify explicitly just put `null` here
message.key, // if you don't want to specify explicitly just put `null` here
message.payload,
kafkaHeaders
)
)