diff --git a/README.md b/README.md index d49ef02..6575192 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ Für REST-Requests als auch (parallel) zur Nutzung von Kafka-Topics können Endp Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an das bwHC-Backend gesendet wird: -* `APP_REST_URI`: URI der zu benutzenden bwHC-Backend-Instanz +* `APP_REST_URI`: URI der zu benutzenden API der bwHC-Backend-Instanz. z.B.: `http://localhost:9000/bwhc/etl/api` ### Kafka-Topics @@ -31,4 +31,4 @@ Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an ein * `APP_KAFKA_TOPIC`: Zu verwendendes Topic * `APP_KAFKA_SERVERS`: Zu verwendende Kafka-Bootstrap-Server als kommagetrennte Liste -Weitere Konfigrationen können über die Parameter \ No newline at end of file +Weitere Einstellungen können über die Parameter von Spring Kafka konfiguriert werden. \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index 3947b92..04c73ef 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -40,7 +40,7 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) headers.contentType = MediaType.APPLICATION_JSON val entityReq = HttpEntity(request.mtbFile, headers) val response = restTemplate.postForEntity( - restTargetProperties.uri!!, + "${restTargetProperties.uri}/MTBFile", entityReq, String::class.java ) @@ -69,7 +69,7 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) headers.contentType = MediaType.APPLICATION_JSON val entityReq = HttpEntity(null, headers) restTemplate.delete( - "${restTargetProperties.uri}/${request.patientId}", + "${restTargetProperties.uri}/Patient/${request.patientId}", entityReq, String::class.java ) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt new file mode 100644 index 0000000..cd3a525 --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -0,0 +1,210 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import com.fasterxml.jackson.databind.ObjectMapper +import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.monitoring.* +import dev.dnpm.etl.processor.output.MtbFileSender +import dev.dnpm.etl.processor.pseudonym.PseudonymizeService +import org.apache.commons.codec.binary.Base32 +import org.apache.commons.codec.digest.DigestUtils +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Service +import reactor.core.publisher.Sinks +import java.util.* + +@Service +class RequestProcessor( + private val pseudonymizeService: PseudonymizeService, + private val senders: List, + private val requestRepository: RequestRepository, + private val objectMapper: ObjectMapper, + private val statisticsUpdateProducer: Sinks.Many +) { + + private val logger = LoggerFactory.getLogger(RequestProcessor::class.java) + + fun processMtbFile(mtbFile: MtbFile): RequestStatus { + val pid = mtbFile.patient.id + val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) + + val lastRequestForPatient = + requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) + .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } + + if (null != lastRequestForPatient && lastRequestForPatient.fingerprint == fingerprint(mtbFile)) { + requestRepository.save( + Request( + patientId = pseudonymized.patient.id, + pid = pid, + fingerprint = fingerprint(mtbFile), + status = RequestStatus.DUPLICATION, + type = RequestType.MTB_FILE, + report = Report("Duplikat erkannt - keine Daten weitergeleitet") + ) + ) + statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) + return RequestStatus.DUPLICATION + } + + val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized) + + val responses = senders.map { + val responseStatus = it.send(request) + if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { + logger.info( + "Sent file for Patient '{}' using '{}'", + pseudonymized.patient.id, + it.javaClass.simpleName + ) + } else { + logger.error( + "Error sending file for Patient '{}' using '{}'", + pseudonymized.patient.id, + it.javaClass.simpleName + ) + } + responseStatus + } + + val requestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) { + RequestStatus.ERROR + } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.WARNING)) { + RequestStatus.WARNING + } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) { + RequestStatus.SUCCESS + } else { + RequestStatus.UNKNOWN + } + + requestRepository.save( + Request( + uuid = request.requestId, + patientId = request.mtbFile.patient.id, + pid = pid, + fingerprint = fingerprint(request.mtbFile), + status = requestStatus, + type = RequestType.MTB_FILE, + report = when (requestStatus) { + RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") + RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", + responses.joinToString("\n") { it.reason }) + + RequestStatus.UNKNOWN -> Report("Keine Informationen") + else -> null + } + ) + ) + + statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) + + return requestStatus + } + + fun processDeletion(patientId: String): RequestStatus { + val requestId = UUID.randomUUID().toString() + + try { + val patientPseudonym = pseudonymizeService.patientPseudonym(patientId) + + val responses = senders.map { + val responseStatus = it.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) + when (responseStatus.status) { + MtbFileSender.ResponseStatus.SUCCESS -> { + logger.info( + "Sent delete for Patient '{}' using '{}'", + patientPseudonym, + it.javaClass.simpleName + ) + } + + MtbFileSender.ResponseStatus.ERROR -> { + logger.error( + "Error deleting data for Patient '{}' using '{}'", + patientPseudonym, + it.javaClass.simpleName + ) + } + + else -> { + logger.error( + "Unknown result on deleting data for Patient '{}' using '{}'", + patientPseudonym, + it.javaClass.simpleName + ) + } + } + responseStatus + } + + val overallRequestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) { + RequestStatus.ERROR + } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) { + RequestStatus.SUCCESS + } else { + RequestStatus.UNKNOWN + } + + requestRepository.save( + Request( + uuid = requestId, + patientId = patientPseudonym, + pid = patientId, + fingerprint = fingerprint(patientPseudonym), + status = overallRequestStatus, + type = RequestType.DELETE, + report = when (overallRequestStatus) { + RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") + RequestStatus.UNKNOWN -> Report("Keine Informationen") + else -> null + } + ) + ) + + return overallRequestStatus + } catch (e: Exception) { + requestRepository.save( + Request( + uuid = requestId, + patientId = "???", + pid = patientId, + fingerprint = "", + status = RequestStatus.ERROR, + type = RequestType.DELETE, + report = Report("Fehler bei der Pseudonymisierung") + ) + ) + + return RequestStatus.ERROR + } + } + + private fun fingerprint(mtbFile: MtbFile): String { + return fingerprint(objectMapper.writeValueAsString(mtbFile)) + } + + private fun fingerprint(s: String): String { + return Base32().encodeAsString(DigestUtils.sha256(s)) + .replace("=", "") + .lowercase() + } + +} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt index 146e8c8..a2cc953 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt @@ -19,104 +19,23 @@ package dev.dnpm.etl.processor.web -import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.monitoring.* -import dev.dnpm.etl.processor.output.MtbFileSender -import dev.dnpm.etl.processor.pseudonym.PseudonymizeService -import org.apache.commons.codec.binary.Base32 -import org.apache.commons.codec.digest.DigestUtils +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.services.RequestProcessor import org.slf4j.LoggerFactory import org.springframework.http.ResponseEntity import org.springframework.web.bind.annotation.* -import reactor.core.publisher.Sinks -import java.util.* @RestController class MtbFileController( - private val pseudonymizeService: PseudonymizeService, - private val senders: List, - private val requestRepository: RequestRepository, - private val objectMapper: ObjectMapper, - private val statisticsUpdateProducer: Sinks.Many + private val requestProcessor: RequestProcessor, ) { private val logger = LoggerFactory.getLogger(MtbFileController::class.java) @PostMapping(path = ["/mtbfile"]) fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity { - val pid = mtbFile.patient.id - val pseudonymized = pseudonymizeService.pseudonymize(mtbFile) - - val lastRequestForPatient = - requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id) - .firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING } - - if (null != lastRequestForPatient && lastRequestForPatient.fingerprint == fingerprint(mtbFile)) { - requestRepository.save( - Request( - patientId = pseudonymized.patient.id, - pid = pid, - fingerprint = fingerprint(mtbFile), - status = RequestStatus.DUPLICATION, - type = RequestType.MTB_FILE, - report = Report("Duplikat erkannt - keine Daten weitergeleitet") - ) - ) - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - return ResponseEntity.noContent().build() - } - - val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized) - - val responses = senders.map { - val responseStatus = it.send(request) - if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { - logger.info( - "Sent file for Patient '{}' using '{}'", - pseudonymized.patient.id, - it.javaClass.simpleName - ) - } else { - logger.error( - "Error sending file for Patient '{}' using '{}'", - pseudonymized.patient.id, - it.javaClass.simpleName - ) - } - responseStatus - } - - val requestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) { - RequestStatus.ERROR - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.WARNING)) { - RequestStatus.WARNING - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) { - RequestStatus.SUCCESS - } else { - RequestStatus.UNKNOWN - } - - requestRepository.save( - Request( - uuid = request.requestId, - patientId = request.mtbFile.patient.id, - pid = pid, - fingerprint = fingerprint(request.mtbFile), - status = requestStatus, - type = RequestType.MTB_FILE, - report = when (requestStatus) { - RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", - responses.joinToString("\n") { it.reason }) - - RequestStatus.UNKNOWN -> Report("Keine Informationen") - else -> null - } - ) - ) - - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) + val requestStatus = requestProcessor.processMtbFile(mtbFile) return if (requestStatus == RequestStatus.ERROR) { ResponseEntity.unprocessableEntity().build() @@ -127,89 +46,13 @@ class MtbFileController( @DeleteMapping(path = ["/mtbfile/{patientId}"]) fun deleteData(@PathVariable patientId: String): ResponseEntity { - val requestId = UUID.randomUUID().toString() + val requestStatus = requestProcessor.processDeletion(patientId) - try { - val patientPseudonym = pseudonymizeService.patientPseudonym(patientId) - - val responses = senders.map { - val responseStatus = it.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) - when (responseStatus.status) { - MtbFileSender.ResponseStatus.SUCCESS -> { - logger.info( - "Sent delete for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } - MtbFileSender.ResponseStatus.ERROR -> { - logger.error( - "Error deleting data for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } - else -> { - logger.error( - "Unknown result on deleting data for Patient '{}' using '{}'", - patientPseudonym, - it.javaClass.simpleName - ) - } - } - responseStatus - } - - val overallRequestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) { - RequestStatus.ERROR - } else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) { - RequestStatus.SUCCESS - } else { - RequestStatus.UNKNOWN - } - - requestRepository.save( - Request( - uuid = requestId, - patientId = patientPseudonym, - pid = patientId, - fingerprint = fingerprint(patientPseudonym), - status = overallRequestStatus, - type = RequestType.DELETE, - report = when (overallRequestStatus) { - RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.UNKNOWN -> Report("Keine Informationen") - else -> null - } - ) - ) - - return ResponseEntity.unprocessableEntity().build() - } catch (e: Exception) { - requestRepository.save( - Request( - uuid = requestId, - patientId = "???", - pid = patientId, - fingerprint = "", - status = RequestStatus.ERROR, - type = RequestType.DELETE, - report = Report("Fehler bei der Pseudonymisierung") - ) - ) - - return ResponseEntity.noContent().build() + return if (requestStatus == RequestStatus.ERROR) { + ResponseEntity.unprocessableEntity().build() + } else { + ResponseEntity.noContent().build() } } - private fun fingerprint(mtbFile: MtbFile): String { - return fingerprint(objectMapper.writeValueAsString(mtbFile)) - } - - private fun fingerprint(s: String): String { - return Base32().encodeAsString(DigestUtils.sha256(s)) - .replace("=", "") - .lowercase() - } - } \ No newline at end of file