1
0
mirror of https://github.com/pcvolkmer/etl-processor.git synced 2025-04-19 17:26:51 +00:00

fix: wait for kafka to accept message and return success than

This commit is contained in:
Jakub Lidke 2023-07-27 11:49:31 +02:00
parent e9e7139ca4
commit cf8c5a8692

View File

@ -27,18 +27,27 @@ import org.springframework.stereotype.Component
@Component
class KafkaMtbFileSender(
private val kafkaTemplate: KafkaTemplate<String, String>,
private val objectMapper: ObjectMapper
private val kafkaTemplate: KafkaTemplate<String, String>,
private val objectMapper: ObjectMapper
) : MtbFileSender {
private val logger = LoggerFactory.getLogger(KafkaMtbFileSender::class.java)
override fun send(mtbFile: MtbFile): MtbFileSender.Response {
return try {
kafkaTemplate.sendDefault(String.format("{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id,
mtbFile.episode.id), objectMapper.writeValueAsString(mtbFile))
logger.debug("Sent file via KafkaMtbFileSender")
MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
var result = kafkaTemplate.sendDefault(
String.format(
"{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id,
mtbFile.episode.id
), objectMapper.writeValueAsString(mtbFile)
)
if (result.get() != null) {
logger.debug("Sent file via KafkaMtbFileSender");
MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS);
} else {
MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
}
} catch (e: Exception) {
logger.error("An error occured sending to kafka", e)
MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)