1
0
mirror of https://github.com/pcvolkmer/etl-processor.git synced 2025-07-01 14:12:55 +00:00

Add processor to handle responses from Kafka topic

This commit is contained in:
2023-08-02 11:52:11 +02:00
parent db631fbd8e
commit 51cf7a7917
8 changed files with 176 additions and 21 deletions

View File

@ -40,7 +40,11 @@ Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an das
Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an ein Kafka-Topic übermittelt wird:
* `APP_KAFKA_TOPIC`: Zu verwendendes Topic
* `APP_KAFKA_TOPIC`: Zu verwendendes Topic zum Versenden von Anfragen
* `APP_KAFKA_RESPONSE_TOPIC`: Topic mit Antworten über den Erfolg des Versendens. Standardwert: `APP_KAFKA_TOPIC` mit Anhang "_response".
* `APP_KAFKA_GROUP_ID`: Kafka GroupID des Consumers. Standardwert: `APP_KAFKA_TOPIC` mit Anhang "_group".
* `APP_KAFKA_SERVERS`: Zu verwendende Kafka-Bootstrap-Server als kommagetrennte Liste
Wird keine Rückantwort über Apache Kafka empfangen und gibt es keine weitere Möglichkeit den Status festzustellen, verbleibt der Status auf `UNKNOWN`.
Weitere Einstellungen können über die Parameter von Spring Kafka konfiguriert werden.

View File

@ -28,4 +28,3 @@ class EtlProcessorApplication
fun main(args: Array<String>) {
runApplication<EtlProcessorApplication>(*args)
}

View File

@ -48,7 +48,7 @@ data class GPasConfigProperties(
val password: String?,
val sslCaLocation: String?,
) {
) {
companion object {
const val NAME = "app.pseudonymize.gpas"
}
@ -66,6 +66,8 @@ data class RestTargetProperties(
@ConfigurationProperties(KafkaTargetProperties.NAME)
data class KafkaTargetProperties(
val topic: String = "etl-processor",
val responseTopic: String = "${topic}_response",
val groupId: String = "${topic}_group",
val servers: String = ""
) {
companion object {

View File

@ -21,7 +21,6 @@ package dev.dnpm.etl.processor.config
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.monitoring.ReportService
import dev.dnpm.etl.processor.output.KafkaMtbFileSender
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.output.RestMtbFileSender
import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator
@ -32,7 +31,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.KafkaTemplate
import reactor.core.publisher.Sinks
@Configuration
@ -60,7 +58,10 @@ class AppConfiguration {
}
@Bean
fun pseudonymizeService(generator: Generator, pseudonymizeConfigProperties: PseudonymizeConfigProperties): PseudonymizeService {
fun pseudonymizeService(
generator: Generator,
pseudonymizeConfigProperties: PseudonymizeConfigProperties
): PseudonymizeService {
return PseudonymizeService(generator, pseudonymizeConfigProperties)
}
@ -70,15 +71,6 @@ class AppConfiguration {
return RestMtbFileSender(restTargetProperties)
}
@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"])
@Bean
fun kafkaMtbFileSender(
kafkaTemplate: KafkaTemplate<String, String>,
objectMapper: ObjectMapper
): MtbFileSender {
return KafkaMtbFileSender(kafkaTemplate, objectMapper)
}
@Bean
fun reportService(objectMapper: ObjectMapper): ReportService {
return ReportService(objectMapper)

View File

@ -0,0 +1,70 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.config
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.output.KafkaMtbFileSender
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.listener.KafkaMessageListenerContainer
@Configuration
@EnableConfigurationProperties(
value = [KafkaTargetProperties::class]
)
@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"])
class AppKafkaConfiguration {
@Bean
fun kafkaMtbFileSender(
kafkaTemplate: KafkaTemplate<String, String>,
objectMapper: ObjectMapper
): MtbFileSender {
return KafkaMtbFileSender(kafkaTemplate, objectMapper)
}
@Bean
fun kafkaListenerContainer(
consumerFactory: ConsumerFactory<String, String>,
kafkaTargetProperties: KafkaTargetProperties,
kafkaResponseProcessor: KafkaResponseProcessor
): KafkaMessageListenerContainer<String, String> {
val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic);
containerProperties.messageListener = kafkaResponseProcessor
return KafkaMessageListenerContainer(consumerFactory, containerProperties)
}
@Bean
fun kafkaResponseProcessor(
requestRepository: RequestRepository,
objectMapper: ObjectMapper
): KafkaResponseProcessor {
return KafkaResponseProcessor(requestRepository, objectMapper)
}
}

View File

@ -36,9 +36,9 @@ data class Request(
val patientId: String,
val pid: String,
val fingerprint: String,
val status: RequestStatus,
val type: RequestType,
val processedAt: Instant = Instant.now(),
var status: RequestStatus,
var processedAt: Instant = Instant.now(),
@Embedded.Nullable var report: Report? = null
)

View File

@ -0,0 +1,87 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.services.kafka
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.monitoring.Report
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.kafka.listener.MessageListener
import java.time.Instant
class KafkaResponseProcessor(
private val requestRepository: RequestRepository,
private val objectMapper: ObjectMapper
) : MessageListener<String, String> {
private val logger = LoggerFactory.getLogger(KafkaResponseProcessor::class.java)
override fun onMessage(data: ConsumerRecord<String, String>) {
try {
val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java)
requestRepository.findByUuidEquals(responseKey.requestId).ifPresent {
val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java)
when (responseBody.statusCode) {
200 -> {
it.status = RequestStatus.SUCCESS
it.processedAt = Instant.ofEpochMilli(data.timestamp())
requestRepository.save(it)
}
201 -> {
it.status = RequestStatus.WARNING
it.processedAt = Instant.ofEpochMilli(data.timestamp())
it.report = Report(
"Warnungen über mangelhafte Daten",
responseBody.statusBody
)
requestRepository.save(it)
}
400, 422 -> {
it.status = RequestStatus.ERROR
it.processedAt = Instant.ofEpochMilli(data.timestamp())
it.report = Report(
"Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar",
responseBody.statusBody
)
requestRepository.save(it)
}
else -> {
logger.error("Cannot process Kafka response: Unknown response code!")
}
}
}
} catch (e: Exception) {
logger.error("Cannot process Kafka response", e)
}
}
data class ResponseKey(val requestId: String)
data class ResponseBody(
@JsonProperty("status code") val statusCode: Int,
@JsonProperty("status_body") val statusBody: String
)
}

View File

@ -5,10 +5,11 @@ spring:
app:
rest:
uri: http://localhost:9000/bwhc/etl/api/MTBFile
#kafka:
# topic: test
# servers: kafka:9092
uri: http://localhost:9000/bwhc/etl/api
kafka:
topic: test
response-topic: test-response
servers: kafka:9092
server:
port: 8000