diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index f81d3fb..7adcb02 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -43,9 +43,10 @@ class AppKafkaConfiguration { @Bean fun kafkaMtbFileSender( kafkaTemplate: KafkaTemplate, + kafkaTargetProperties: KafkaTargetProperties, objectMapper: ObjectMapper ): MtbFileSender { - return KafkaMtbFileSender(kafkaTemplate, objectMapper) + return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper) } @Bean @@ -54,7 +55,7 @@ class AppKafkaConfiguration { kafkaTargetProperties: KafkaTargetProperties, kafkaResponseProcessor: KafkaResponseProcessor ): KafkaMessageListenerContainer { - val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic); + val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic) containerProperties.messageListener = kafkaResponseProcessor return KafkaMessageListenerContainer(consumerFactory, containerProperties) } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index da25576..d903745 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -22,11 +22,13 @@ package dev.dnpm.etl.processor.output import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.Consent import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.config.KafkaTargetProperties import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate class KafkaMtbFileSender( private val kafkaTemplate: KafkaTemplate, + private val kafkaTargetProperties: KafkaTargetProperties, private val objectMapper: ObjectMapper ) : MtbFileSender { @@ -34,13 +36,14 @@ class KafkaMtbFileSender( override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response { return try { - val result = kafkaTemplate.sendDefault( - header(request), + val result = kafkaTemplate.send( + kafkaTargetProperties.topic, + key(request), objectMapper.writeValueAsString(request.mtbFile) ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) } else { MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) } @@ -61,14 +64,15 @@ class KafkaMtbFileSender( .build() return try { - val result = kafkaTemplate.sendDefault( - header(request), + val result = kafkaTemplate.send( + kafkaTargetProperties.topic, + key(request), objectMapper.writeValueAsString(dummyMtbFile) ) if (result.get() != null) { logger.debug("Sent deletion request via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) } else { MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) } @@ -78,13 +82,13 @@ class KafkaMtbFileSender( } } - private fun header(request: MtbFileSender.MtbFileRequest): String { + private fun key(request: MtbFileSender.MtbFileRequest): String { return "{\"pid\": \"${request.mtbFile.patient.id}\", " + "\"eid\": \"${request.mtbFile.episode.id}\", " + "\"requestId\": \"${request.requestId}\"}" } - private fun header(request: MtbFileSender.DeleteRequest): String { + private fun key(request: MtbFileSender.DeleteRequest): String { return "{\"pid\": \"${request.patientId}\", " + "\"requestId\": \"${request.requestId}\"}" } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index 547833c..fd047d0 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -42,6 +42,9 @@ class KafkaResponseProcessor( val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java) requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) + + println("${responseBody.statusCode}") + when (responseBody.statusCode) { 200 -> { it.status = RequestStatus.SUCCESS @@ -69,6 +72,16 @@ class KafkaResponseProcessor( requestRepository.save(it) } + in 900..999 -> { + it.status = RequestStatus.ERROR + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + it.report = Report( + "Fehler bei der Datenübertragung, keine Verbindung zum bwHC-Backend möglich", + objectMapper.writeValueAsString(responseBody.statusBody) + ) + requestRepository.save(it) + } + else -> { logger.error("Cannot process Kafka response: Unknown response code!") } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 72edde6..5cd47c0 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,8 +1,6 @@ spring: kafka: bootstrap-servers: ${app.kafka.servers} - template: - default-topic: ${app.kafka.topic} consumer: group-id: ${app.kafka.group-id} flyway: