1
0
mirror of https://github.com/pcvolkmer/etl-processor.git synced 2025-07-03 23:12:54 +00:00

Decouple request and response processing

This commit is contained in:
2023-08-09 18:15:20 +02:00
parent 7f048e2483
commit 1a640ff9df
10 changed files with 579 additions and 194 deletions

View File

@ -20,7 +20,6 @@
package dev.dnpm.etl.processor.config
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.output.KafkaMtbFileSender
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor
@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.ApplicationEventPublisher
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.annotation.Order
@ -70,10 +70,10 @@ class AppKafkaConfiguration {
@Bean
fun kafkaResponseProcessor(
requestRepository: RequestRepository,
applicationEventPublisher: ApplicationEventPublisher,
objectMapper: ObjectMapper
): KafkaResponseProcessor {
return KafkaResponseProcessor(requestRepository, objectMapper)
return KafkaResponseProcessor(applicationEventPublisher, objectMapper)
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.config.KafkaTargetProperties
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
@ -43,13 +44,13 @@ class KafkaMtbFileSender(
)
if (result.get() != null) {
logger.debug("Sent file via KafkaMtbFileSender")
MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
MtbFileSender.Response(RequestStatus.UNKNOWN)
} else {
MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
MtbFileSender.Response(RequestStatus.ERROR)
}
} catch (e: Exception) {
logger.error("An error occurred sending to kafka", e)
MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
MtbFileSender.Response(RequestStatus.UNKNOWN)
}
}
@ -72,13 +73,13 @@ class KafkaMtbFileSender(
if (result.get() != null) {
logger.debug("Sent deletion request via KafkaMtbFileSender")
MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
MtbFileSender.Response(RequestStatus.UNKNOWN)
} else {
MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
MtbFileSender.Response(RequestStatus.ERROR)
}
} catch (e: Exception) {
logger.error("An error occurred sending to kafka", e)
MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
MtbFileSender.Response(RequestStatus.UNKNOWN)
}
}

View File

@ -20,22 +20,31 @@
package dev.dnpm.etl.processor.output
import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.springframework.http.HttpStatusCode
interface MtbFileSender {
fun send(request: MtbFileRequest): Response
fun send(request: DeleteRequest): Response
data class Response(val status: ResponseStatus, val reason: String = "")
data class Response(val status: RequestStatus, val body: String = "")
data class MtbFileRequest(val requestId: String, val mtbFile: MtbFile)
data class DeleteRequest(val requestId: String, val patientId: String)
enum class ResponseStatus {
SUCCESS,
WARNING,
ERROR,
UNKNOWN
}
fun Int.asRequestStatus(): RequestStatus {
return when (this) {
200 -> RequestStatus.SUCCESS
201 -> RequestStatus.WARNING
in 400 .. 999 -> RequestStatus.ERROR
else -> RequestStatus.UNKNOWN
}
}
fun HttpStatusCode.asRequestStatus(): RequestStatus {
return this.value().asRequestStatus()
}

View File

@ -20,13 +20,13 @@
package dev.dnpm.etl.processor.output
import dev.dnpm.etl.processor.config.RestTargetProperties
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.http.HttpEntity
import org.springframework.http.HttpHeaders
import org.springframework.http.MediaType
import org.springframework.web.client.RestClientException
import org.springframework.web.client.RestTemplate
import org.springframework.web.util.UriComponentsBuilder
class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) : MtbFileSender {
@ -46,21 +46,17 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
)
if (!response.statusCode.is2xxSuccessful) {
logger.warn("Error sending to remote system: {}", response.body)
return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Status-Code: ${response.statusCode.value()}")
return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}")
}
logger.debug("Sent file via RestMtbFileSender")
return if (response.body?.contains("warning") == true) {
MtbFileSender.Response(MtbFileSender.ResponseStatus.WARNING, "${response.body}")
} else {
MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
}
return MtbFileSender.Response(response.statusCode.asRequestStatus())
} catch (e: IllegalArgumentException) {
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
} catch (e: RestClientException) {
logger.info(restTargetProperties.uri!!.toString())
logger.error("Cannot send data to remote system", e)
}
return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
}
override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response {
@ -74,14 +70,14 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
String::class.java
)
logger.debug("Sent file via RestMtbFileSender")
return MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
return MtbFileSender.Response(RequestStatus.SUCCESS)
} catch (e: IllegalArgumentException) {
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
} catch (e: RestClientException) {
logger.info(restTargetProperties.uri!!.toString())
logger.error("Cannot send data to remote system", e)
}
return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
}
}

View File

@ -31,8 +31,9 @@ import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith
import org.apache.commons.codec.binary.Base32
import org.apache.commons.codec.digest.DigestUtils
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Service
import reactor.core.publisher.Sinks
import java.time.Instant
import java.util.*
@Service
@ -41,73 +42,54 @@ class RequestProcessor(
private val sender: MtbFileSender,
private val requestService: RequestService,
private val objectMapper: ObjectMapper,
private val statisticsUpdateProducer: Sinks.Many<Any>
private val applicationEventPublisher: ApplicationEventPublisher
) {
private val logger = LoggerFactory.getLogger(RequestProcessor::class.java)
fun processMtbFile(mtbFile: MtbFile) {
val requestId = UUID.randomUUID().toString()
val pid = mtbFile.patient.id
mtbFile pseudonymizeWith pseudonymizeService
if (isDuplication(mtbFile)) {
requestService.save(
Request(
patientId = mtbFile.patient.id,
pid = pid,
fingerprint = fingerprint(mtbFile),
status = RequestStatus.DUPLICATION,
type = RequestType.MTB_FILE,
report = Report("Duplikat erkannt - keine Daten weitergeleitet")
)
)
statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
return
}
val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), mtbFile)
val responseStatus = sender.send(request)
if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) {
logger.info(
"Sent file for Patient '{}' using '{}'",
mtbFile.patient.id,
sender.javaClass.simpleName
)
} else {
logger.error(
"Error sending file for Patient '{}' using '{}'",
mtbFile.patient.id,
sender.javaClass.simpleName
)
}
val requestStatus = when (responseStatus.status) {
MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR
MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING
MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS
else -> RequestStatus.UNKNOWN
}
val request = MtbFileSender.MtbFileRequest(requestId, mtbFile)
requestService.save(
Request(
uuid = request.requestId,
uuid = requestId,
patientId = request.mtbFile.patient.id,
pid = pid,
fingerprint = fingerprint(request.mtbFile),
status = requestStatus,
type = RequestType.MTB_FILE,
report = when (requestStatus) {
RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar")
RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", responseStatus.reason)
RequestStatus.UNKNOWN -> Report("Keine Informationen")
else -> null
}
status = RequestStatus.UNKNOWN,
type = RequestType.MTB_FILE
)
)
statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
if (isDuplication(mtbFile)) {
applicationEventPublisher.publishEvent(
ResponseEvent(
requestId,
Instant.now(),
RequestStatus.DUPLICATION
)
)
return
}
val responseStatus = sender.send(request)
applicationEventPublisher.publishEvent(
ResponseEvent(
requestId,
Instant.now(),
responseStatus.status,
when (responseStatus.status) {
RequestStatus.WARNING -> Optional.of(responseStatus.body)
else -> Optional.empty()
}
)
)
}
private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean {
@ -126,55 +108,31 @@ class RequestProcessor(
try {
val patientPseudonym = pseudonymizeService.patientPseudonym(patientId)
val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym))
when (responseStatus.status) {
MtbFileSender.ResponseStatus.SUCCESS -> {
logger.info(
"Sent delete for Patient '{}' using '{}'",
patientPseudonym,
sender.javaClass.simpleName
)
}
MtbFileSender.ResponseStatus.ERROR -> {
logger.error(
"Error deleting data for Patient '{}' using '{}'",
patientPseudonym,
sender.javaClass.simpleName
)
}
else -> {
logger.error(
"Unknown result on deleting data for Patient '{}' using '{}'",
patientPseudonym,
sender.javaClass.simpleName
)
}
}
val requestStatus = when (responseStatus.status) {
MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR
MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING
MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS
else -> RequestStatus.UNKNOWN
}
requestService.save(
Request(
uuid = requestId,
patientId = patientPseudonym,
pid = patientId,
fingerprint = fingerprint(patientPseudonym),
status = requestStatus,
type = RequestType.DELETE,
report = when (requestStatus) {
RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar")
RequestStatus.UNKNOWN -> Report("Keine Informationen")
else -> null
status = RequestStatus.UNKNOWN,
type = RequestType.DELETE
)
)
val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym))
applicationEventPublisher.publishEvent(
ResponseEvent(
requestId,
Instant.now(),
responseStatus.status,
when (responseStatus.status) {
RequestStatus.WARNING, RequestStatus.ERROR -> Optional.of(responseStatus.body)
else -> Optional.empty()
}
)
)
} catch (e: Exception) {
requestService.save(
Request(
@ -188,7 +146,6 @@ class RequestProcessor(
)
)
}
statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
}
private fun fingerprint(mtbFile: MtbFile): String {

View File

@ -0,0 +1,96 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.services
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.monitoring.Report
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Service
import reactor.core.publisher.Sinks
import java.time.Instant
import java.util.*
@Service
class ResponseProcessor(
private val requestRepository: RequestRepository,
private val statisticsUpdateProducer: Sinks.Many<Any>,
private val objectMapper: ObjectMapper
) {
private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java)
@EventListener(classes = [ResponseEvent::class])
fun handleResponseEvent(event: ResponseEvent) {
requestRepository.findByUuidEquals(event.requestUuid).ifPresentOrElse({
it.processedAt = event.timestamp
it.status = event.status
when (event.status) {
RequestStatus.SUCCESS -> {
it.report = Report(
"Keine Probleme erkannt",
)
}
RequestStatus.WARNING -> {
it.report = Report(
"Warnungen über mangelhafte Daten",
objectMapper.writeValueAsString(event.body)
)
}
RequestStatus.ERROR -> {
it.report = Report(
"Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar",
objectMapper.writeValueAsString(event.body)
)
}
RequestStatus.DUPLICATION -> {
it.report = Report(
"Duplikat erkannt"
)
}
else -> {
logger.error("Cannot process response: Unknown response code!")
return@ifPresentOrElse
}
}
requestRepository.save(it)
statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
}, {
logger.error("Response for unknown request '${event.requestUuid}'!")
})
}
}
data class ResponseEvent(
val requestUuid: String,
val timestamp: Instant,
val status: RequestStatus,
val body: Optional<String> = Optional.empty()
)

View File

@ -22,16 +22,18 @@ package dev.dnpm.etl.processor.services.kafka
import com.fasterxml.jackson.annotation.JsonAlias
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.monitoring.Report
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.output.asRequestStatus
import dev.dnpm.etl.processor.services.ResponseEvent
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationEventPublisher
import org.springframework.kafka.listener.MessageListener
import java.time.Instant
import java.util.*
class KafkaResponseProcessor(
private val requestRepository: RequestRepository,
private val eventPublisher: ApplicationEventPublisher,
private val objectMapper: ObjectMapper
) : MessageListener<String, String> {
@ -39,55 +41,44 @@ class KafkaResponseProcessor(
override fun onMessage(data: ConsumerRecord<String, String>) {
try {
val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java)
requestRepository.findByUuidEquals(responseKey.requestId).ifPresent {
val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java)
when (responseBody.statusCode) {
200 -> {
it.status = RequestStatus.SUCCESS
it.processedAt = Instant.ofEpochMilli(data.timestamp())
requestRepository.save(it)
}
201 -> {
it.status = RequestStatus.WARNING
it.processedAt = Instant.ofEpochMilli(data.timestamp())
it.report = Report(
"Warnungen über mangelhafte Daten",
objectMapper.writeValueAsString(responseBody.statusBody)
)
requestRepository.save(it)
}
400, 422 -> {
it.status = RequestStatus.ERROR
it.processedAt = Instant.ofEpochMilli(data.timestamp())
it.report = Report(
"Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar",
objectMapper.writeValueAsString(responseBody.statusBody)
)
requestRepository.save(it)
}
in 900..999 -> {
it.status = RequestStatus.ERROR
it.processedAt = Instant.ofEpochMilli(data.timestamp())
it.report = Report(
"Fehler bei der Datenübertragung, keine Verbindung zum bwHC-Backend möglich",
objectMapper.writeValueAsString(responseBody.statusBody)
)
requestRepository.save(it)
}
else -> {
logger.error("Cannot process Kafka response: Unknown response code!")
}
}
}
Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java))
} catch (e: Exception) {
logger.error("Cannot process Kafka response", e)
}
Optional.empty()
}.ifPresentOrElse({ responseKey ->
val event = try {
val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java)
ResponseEvent(
responseKey.requestId,
Instant.ofEpochMilli(data.timestamp()),
responseBody.statusCode.asRequestStatus(),
when (responseBody.statusCode.asRequestStatus()) {
RequestStatus.SUCCESS -> {
Optional.empty()
}
RequestStatus.WARNING, RequestStatus.ERROR -> {
Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
}
else -> {
logger.error("Kafka response: Unknown response code!")
Optional.empty()
}
}
)
} catch (e: Exception) {
logger.error("Cannot process Kafka response", e)
ResponseEvent(
responseKey.requestId,
Instant.ofEpochMilli(data.timestamp()),
RequestStatus.ERROR,
Optional.of("Cannot process Kafka response")
)
}
eventPublisher.publishEvent(event)
}, {
logger.error("No response key in Kafka response")
})
}
data class ResponseKey(val requestId: String)