From cf8c5a86928da3a109e700a5221b5aa26cfe4aa7 Mon Sep 17 00:00:00 2001 From: Jakub Lidke Date: Thu, 27 Jul 2023 11:49:31 +0200 Subject: [PATCH] fix: wait for kafka to accept message and return success than --- .../processor/output/KafkaMtbFileSender.kt | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index f83a2ab..18faaf9 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -27,18 +27,27 @@ import org.springframework.stereotype.Component @Component class KafkaMtbFileSender( - private val kafkaTemplate: KafkaTemplate, - private val objectMapper: ObjectMapper + private val kafkaTemplate: KafkaTemplate, + private val objectMapper: ObjectMapper ) : MtbFileSender { private val logger = LoggerFactory.getLogger(KafkaMtbFileSender::class.java) override fun send(mtbFile: MtbFile): MtbFileSender.Response { return try { - kafkaTemplate.sendDefault(String.format("{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id, - mtbFile.episode.id), objectMapper.writeValueAsString(mtbFile)) - logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + var result = kafkaTemplate.sendDefault( + String.format( + "{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id, + mtbFile.episode.id + ), objectMapper.writeValueAsString(mtbFile) + ) + if (result.get() != null) { + logger.debug("Sent file via KafkaMtbFileSender"); + MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS); + } else { + MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + } + } catch (e: Exception) { logger.error("An error occured sending to kafka", e) MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)