diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index f83a2ab..18faaf9 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -27,18 +27,27 @@ import org.springframework.stereotype.Component @Component class KafkaMtbFileSender( - private val kafkaTemplate: KafkaTemplate, - private val objectMapper: ObjectMapper + private val kafkaTemplate: KafkaTemplate, + private val objectMapper: ObjectMapper ) : MtbFileSender { private val logger = LoggerFactory.getLogger(KafkaMtbFileSender::class.java) override fun send(mtbFile: MtbFile): MtbFileSender.Response { return try { - kafkaTemplate.sendDefault(String.format("{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id, - mtbFile.episode.id), objectMapper.writeValueAsString(mtbFile)) - logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + var result = kafkaTemplate.sendDefault( + String.format( + "{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id, + mtbFile.episode.id + ), objectMapper.writeValueAsString(mtbFile) + ) + if (result.get() != null) { + logger.debug("Sent file via KafkaMtbFileSender"); + MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS); + } else { + MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + } + } catch (e: Exception) { logger.error("An error occured sending to kafka", e) MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)