diff --git a/README.md b/README.md index db6ae44..1d5a7f3 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,11 @@ Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an das Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an ein Kafka-Topic übermittelt wird: -* `APP_KAFKA_TOPIC`: Zu verwendendes Topic +* `APP_KAFKA_TOPIC`: Zu verwendendes Topic zum Versenden von Anfragen +* `APP_KAFKA_RESPONSE_TOPIC`: Topic mit Antworten über den Erfolg des Versendens. Standardwert: `APP_KAFKA_TOPIC` mit Anhang "_response". +* `APP_KAFKA_GROUP_ID`: Kafka GroupID des Consumers. Standardwert: `APP_KAFKA_TOPIC` mit Anhang "_group". * `APP_KAFKA_SERVERS`: Zu verwendende Kafka-Bootstrap-Server als kommagetrennte Liste +Wird keine Rückantwort über Apache Kafka empfangen und gibt es keine weitere Möglichkeit den Status festzustellen, verbleibt der Status auf `UNKNOWN`. + Weitere Einstellungen können über die Parameter von Spring Kafka konfiguriert werden. \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt index 0c4ab68..5d28c97 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/EtlProcessorApplication.kt @@ -28,4 +28,3 @@ class EtlProcessorApplication fun main(args: Array) { runApplication(*args) } - diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt index 64be70d..6502a1b 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt @@ -48,7 +48,7 @@ data class GPasConfigProperties( val password: String?, val sslCaLocation: String?, -) { + ) { companion object { const val NAME = "app.pseudonymize.gpas" } @@ -66,6 +66,8 @@ data class RestTargetProperties( @ConfigurationProperties(KafkaTargetProperties.NAME) data class KafkaTargetProperties( val topic: String = "etl-processor", + val responseTopic: String = "${topic}_response", + val groupId: String = "${topic}_group", val servers: String = "" ) { companion object { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index c677f2b..cbba1f1 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -21,7 +21,6 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.monitoring.ReportService -import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.output.RestMtbFileSender import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator @@ -32,7 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.kafka.core.KafkaTemplate import reactor.core.publisher.Sinks @Configuration @@ -60,7 +58,10 @@ class AppConfiguration { } @Bean - fun pseudonymizeService(generator: Generator, pseudonymizeConfigProperties: PseudonymizeConfigProperties): PseudonymizeService { + fun pseudonymizeService( + generator: Generator, + pseudonymizeConfigProperties: PseudonymizeConfigProperties + ): PseudonymizeService { return PseudonymizeService(generator, pseudonymizeConfigProperties) } @@ -70,15 +71,6 @@ class AppConfiguration { return RestMtbFileSender(restTargetProperties) } - @ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) - @Bean - fun kafkaMtbFileSender( - kafkaTemplate: KafkaTemplate, - objectMapper: ObjectMapper - ): MtbFileSender { - return KafkaMtbFileSender(kafkaTemplate, objectMapper) - } - @Bean fun reportService(objectMapper: ObjectMapper): ReportService { return ReportService(objectMapper) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt new file mode 100644 index 0000000..f81d3fb --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -0,0 +1,70 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.config + +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.output.KafkaMtbFileSender +import dev.dnpm.etl.processor.output.MtbFileSender +import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.listener.ContainerProperties +import org.springframework.kafka.listener.KafkaMessageListenerContainer + +@Configuration +@EnableConfigurationProperties( + value = [KafkaTargetProperties::class] +) +@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) +class AppKafkaConfiguration { + + @Bean + fun kafkaMtbFileSender( + kafkaTemplate: KafkaTemplate, + objectMapper: ObjectMapper + ): MtbFileSender { + return KafkaMtbFileSender(kafkaTemplate, objectMapper) + } + + @Bean + fun kafkaListenerContainer( + consumerFactory: ConsumerFactory, + kafkaTargetProperties: KafkaTargetProperties, + kafkaResponseProcessor: KafkaResponseProcessor + ): KafkaMessageListenerContainer { + val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic); + containerProperties.messageListener = kafkaResponseProcessor + return KafkaMessageListenerContainer(consumerFactory, containerProperties) + } + + @Bean + fun kafkaResponseProcessor( + requestRepository: RequestRepository, + objectMapper: ObjectMapper + ): KafkaResponseProcessor { + return KafkaResponseProcessor(requestRepository, objectMapper) + } + +} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt index ecd8219..c1d4d43 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt @@ -36,9 +36,9 @@ data class Request( val patientId: String, val pid: String, val fingerprint: String, - val status: RequestStatus, val type: RequestType, - val processedAt: Instant = Instant.now(), + var status: RequestStatus, + var processedAt: Instant = Instant.now(), @Embedded.Nullable var report: Report? = null ) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt new file mode 100644 index 0000000..f0c91cb --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -0,0 +1,87 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services.kafka + +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.Report +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.slf4j.LoggerFactory +import org.springframework.kafka.listener.MessageListener +import java.time.Instant + +class KafkaResponseProcessor( + private val requestRepository: RequestRepository, + private val objectMapper: ObjectMapper +) : MessageListener { + + private val logger = LoggerFactory.getLogger(KafkaResponseProcessor::class.java) + + override fun onMessage(data: ConsumerRecord) { + try { + val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java) + requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { + val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) + when (responseBody.statusCode) { + 200 -> { + it.status = RequestStatus.SUCCESS + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + requestRepository.save(it) + } + + 201 -> { + it.status = RequestStatus.WARNING + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + it.report = Report( + "Warnungen über mangelhafte Daten", + responseBody.statusBody + ) + requestRepository.save(it) + } + + 400, 422 -> { + it.status = RequestStatus.ERROR + it.processedAt = Instant.ofEpochMilli(data.timestamp()) + it.report = Report( + "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", + responseBody.statusBody + ) + requestRepository.save(it) + } + + else -> { + logger.error("Cannot process Kafka response: Unknown response code!") + } + } + } + } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) + } + } + + data class ResponseKey(val requestId: String) + + data class ResponseBody( + @JsonProperty("status code") val statusCode: Int, + @JsonProperty("status_body") val statusBody: String + ) +} \ No newline at end of file diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 99e4bbf..551f3f8 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -5,10 +5,11 @@ spring: app: rest: - uri: http://localhost:9000/bwhc/etl/api/MTBFile - #kafka: - # topic: test - # servers: kafka:9092 + uri: http://localhost:9000/bwhc/etl/api + kafka: + topic: test + response-topic: test-response + servers: kafka:9092 server: port: 8000