From 1a640ff9dff1cc182c4ffc1d00dff370e42a25de Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 9 Aug 2023 18:15:20 +0200 Subject: [PATCH] Decouple request and response processing --- .../processor/config/AppKafkaConfiguration.kt | 6 +- .../processor/output/KafkaMtbFileSender.kt | 13 +- .../etl/processor/output/MtbFileSender.kt | 21 ++- .../etl/processor/output/RestMtbFileSender.kt | 16 +- .../processor/services/RequestProcessor.kt | 139 ++++++----------- .../processor/services/ResponseProcessor.kt | 96 ++++++++++++ .../services/kafka/KafkaResponseProcessor.kt | 93 ++++++------ .../services/RequestProcessorTest.kt | 128 ++++++++++++---- .../services/ResponseProcessorTest.kt | 142 ++++++++++++++++++ .../kafka/KafkaResponseProcessorTest.kt | 119 +++++++++++++++ 10 files changed, 579 insertions(+), 194 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt create mode 100644 src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index 6d0254e..309ff2d 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -20,7 +20,6 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper -import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor @@ -28,6 +27,7 @@ import org.slf4j.LoggerFactory import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.ApplicationEventPublisher import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.core.annotation.Order @@ -70,10 +70,10 @@ class AppKafkaConfiguration { @Bean fun kafkaResponseProcessor( - requestRepository: RequestRepository, + applicationEventPublisher: ApplicationEventPublisher, objectMapper: ObjectMapper ): KafkaResponseProcessor { - return KafkaResponseProcessor(requestRepository, objectMapper) + return KafkaResponseProcessor(applicationEventPublisher, objectMapper) } } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index d903745..9448e29 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.Consent import de.ukw.ccc.bwhc.dto.MtbFile import dev.dnpm.etl.processor.config.KafkaTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate @@ -43,13 +44,13 @@ class KafkaMtbFileSender( ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(RequestStatus.ERROR) } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } } @@ -72,13 +73,13 @@ class KafkaMtbFileSender( if (result.get() != null) { logger.debug("Sent deletion request via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(RequestStatus.ERROR) } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + MtbFileSender.Response(RequestStatus.UNKNOWN) } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt index 6914ba5..de0efaa 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt @@ -20,22 +20,31 @@ package dev.dnpm.etl.processor.output import de.ukw.ccc.bwhc.dto.MtbFile +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.springframework.http.HttpStatusCode interface MtbFileSender { fun send(request: MtbFileRequest): Response fun send(request: DeleteRequest): Response - data class Response(val status: ResponseStatus, val reason: String = "") + data class Response(val status: RequestStatus, val body: String = "") data class MtbFileRequest(val requestId: String, val mtbFile: MtbFile) data class DeleteRequest(val requestId: String, val patientId: String) - enum class ResponseStatus { - SUCCESS, - WARNING, - ERROR, - UNKNOWN +} + +fun Int.asRequestStatus(): RequestStatus { + return when (this) { + 200 -> RequestStatus.SUCCESS + 201 -> RequestStatus.WARNING + in 400 .. 999 -> RequestStatus.ERROR + else -> RequestStatus.UNKNOWN } +} + +fun HttpStatusCode.asRequestStatus(): RequestStatus { + return this.value().asRequestStatus() } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index 04c73ef..24cdc49 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -20,13 +20,13 @@ package dev.dnpm.etl.processor.output import dev.dnpm.etl.processor.config.RestTargetProperties +import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.http.HttpEntity import org.springframework.http.HttpHeaders import org.springframework.http.MediaType import org.springframework.web.client.RestClientException import org.springframework.web.client.RestTemplate -import org.springframework.web.util.UriComponentsBuilder class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) : MtbFileSender { @@ -46,21 +46,17 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) ) if (!response.statusCode.is2xxSuccessful) { logger.warn("Error sending to remote system: {}", response.body) - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Status-Code: ${response.statusCode.value()}") + return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}") } logger.debug("Sent file via RestMtbFileSender") - return if (response.body?.contains("warning") == true) { - MtbFileSender.Response(MtbFileSender.ResponseStatus.WARNING, "${response.body}") - } else { - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) - } + return MtbFileSender.Response(response.statusCode.asRequestStatus()) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { logger.info(restTargetProperties.uri!!.toString()) logger.error("Cannot send data to remote system", e) } - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung") + return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung") } override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response { @@ -74,14 +70,14 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) String::class.java ) logger.debug("Sent file via RestMtbFileSender") - return MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + return MtbFileSender.Response(RequestStatus.SUCCESS) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { logger.info(restTargetProperties.uri!!.toString()) logger.error("Cannot send data to remote system", e) } - return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung") + return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung") } } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index 6465e82..d2f8619 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -31,8 +31,9 @@ import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith import org.apache.commons.codec.binary.Base32 import org.apache.commons.codec.digest.DigestUtils import org.slf4j.LoggerFactory +import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Service -import reactor.core.publisher.Sinks +import java.time.Instant import java.util.* @Service @@ -41,73 +42,54 @@ class RequestProcessor( private val sender: MtbFileSender, private val requestService: RequestService, private val objectMapper: ObjectMapper, - private val statisticsUpdateProducer: Sinks.Many + private val applicationEventPublisher: ApplicationEventPublisher ) { private val logger = LoggerFactory.getLogger(RequestProcessor::class.java) fun processMtbFile(mtbFile: MtbFile) { + val requestId = UUID.randomUUID().toString() val pid = mtbFile.patient.id mtbFile pseudonymizeWith pseudonymizeService - if (isDuplication(mtbFile)) { - requestService.save( - Request( - patientId = mtbFile.patient.id, - pid = pid, - fingerprint = fingerprint(mtbFile), - status = RequestStatus.DUPLICATION, - type = RequestType.MTB_FILE, - report = Report("Duplikat erkannt - keine Daten weitergeleitet") - ) - ) - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) - return - } - - val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), mtbFile) - - val responseStatus = sender.send(request) - if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) { - logger.info( - "Sent file for Patient '{}' using '{}'", - mtbFile.patient.id, - sender.javaClass.simpleName - ) - } else { - logger.error( - "Error sending file for Patient '{}' using '{}'", - mtbFile.patient.id, - sender.javaClass.simpleName - ) - } - - val requestStatus = when (responseStatus.status) { - MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR - MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING - MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS - else -> RequestStatus.UNKNOWN - } + val request = MtbFileSender.MtbFileRequest(requestId, mtbFile) requestService.save( Request( - uuid = request.requestId, + uuid = requestId, patientId = request.mtbFile.patient.id, pid = pid, fingerprint = fingerprint(request.mtbFile), - status = requestStatus, - type = RequestType.MTB_FILE, - report = when (requestStatus) { - RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten", responseStatus.reason) - RequestStatus.UNKNOWN -> Report("Keine Informationen") - else -> null - } + status = RequestStatus.UNKNOWN, + type = RequestType.MTB_FILE ) ) - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) + if (isDuplication(mtbFile)) { + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + RequestStatus.DUPLICATION + ) + ) + return + } + + val responseStatus = sender.send(request) + + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + responseStatus.status, + when (responseStatus.status) { + RequestStatus.WARNING -> Optional.of(responseStatus.body) + else -> Optional.empty() + } + ) + ) } private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean { @@ -126,55 +108,31 @@ class RequestProcessor( try { val patientPseudonym = pseudonymizeService.patientPseudonym(patientId) - val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) - when (responseStatus.status) { - MtbFileSender.ResponseStatus.SUCCESS -> { - logger.info( - "Sent delete for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - - MtbFileSender.ResponseStatus.ERROR -> { - logger.error( - "Error deleting data for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - - else -> { - logger.error( - "Unknown result on deleting data for Patient '{}' using '{}'", - patientPseudonym, - sender.javaClass.simpleName - ) - } - } - - val requestStatus = when (responseStatus.status) { - MtbFileSender.ResponseStatus.ERROR -> RequestStatus.ERROR - MtbFileSender.ResponseStatus.WARNING -> RequestStatus.WARNING - MtbFileSender.ResponseStatus.SUCCESS -> RequestStatus.SUCCESS - else -> RequestStatus.UNKNOWN - } - requestService.save( Request( uuid = requestId, patientId = patientPseudonym, pid = patientId, fingerprint = fingerprint(patientPseudonym), - status = requestStatus, - type = RequestType.DELETE, - report = when (requestStatus) { - RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar") - RequestStatus.UNKNOWN -> Report("Keine Informationen") - else -> null + status = RequestStatus.UNKNOWN, + type = RequestType.DELETE + ) + ) + + val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym)) + + applicationEventPublisher.publishEvent( + ResponseEvent( + requestId, + Instant.now(), + responseStatus.status, + when (responseStatus.status) { + RequestStatus.WARNING, RequestStatus.ERROR -> Optional.of(responseStatus.body) + else -> Optional.empty() } ) ) + } catch (e: Exception) { requestService.save( Request( @@ -188,7 +146,6 @@ class RequestProcessor( ) ) } - statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) } private fun fingerprint(mtbFile: MtbFile): String { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt new file mode 100644 index 0000000..d7ad86f --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -0,0 +1,96 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.monitoring.Report +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import org.slf4j.LoggerFactory +import org.springframework.context.event.EventListener +import org.springframework.stereotype.Service +import reactor.core.publisher.Sinks +import java.time.Instant +import java.util.* + +@Service +class ResponseProcessor( + private val requestRepository: RequestRepository, + private val statisticsUpdateProducer: Sinks.Many, + private val objectMapper: ObjectMapper +) { + + private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java) + + @EventListener(classes = [ResponseEvent::class]) + fun handleResponseEvent(event: ResponseEvent) { + requestRepository.findByUuidEquals(event.requestUuid).ifPresentOrElse({ + it.processedAt = event.timestamp + it.status = event.status + + when (event.status) { + RequestStatus.SUCCESS -> { + it.report = Report( + "Keine Probleme erkannt", + ) + } + + RequestStatus.WARNING -> { + it.report = Report( + "Warnungen über mangelhafte Daten", + objectMapper.writeValueAsString(event.body) + ) + } + + RequestStatus.ERROR -> { + it.report = Report( + "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", + objectMapper.writeValueAsString(event.body) + ) + } + + RequestStatus.DUPLICATION -> { + it.report = Report( + "Duplikat erkannt" + ) + } + + else -> { + logger.error("Cannot process response: Unknown response code!") + return@ifPresentOrElse + } + } + + requestRepository.save(it) + + statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST) + }, { + logger.error("Response for unknown request '${event.requestUuid}'!") + }) + } + +} + +data class ResponseEvent( + val requestUuid: String, + val timestamp: Instant, + val status: RequestStatus, + val body: Optional = Optional.empty() +) \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index 1e9263d..ef880f4 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -22,16 +22,18 @@ package dev.dnpm.etl.processor.services.kafka import com.fasterxml.jackson.annotation.JsonAlias import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.ObjectMapper -import dev.dnpm.etl.processor.monitoring.Report -import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.output.asRequestStatus +import dev.dnpm.etl.processor.services.ResponseEvent import org.apache.kafka.clients.consumer.ConsumerRecord import org.slf4j.LoggerFactory +import org.springframework.context.ApplicationEventPublisher import org.springframework.kafka.listener.MessageListener import java.time.Instant +import java.util.* class KafkaResponseProcessor( - private val requestRepository: RequestRepository, + private val eventPublisher: ApplicationEventPublisher, private val objectMapper: ObjectMapper ) : MessageListener { @@ -39,55 +41,44 @@ class KafkaResponseProcessor( override fun onMessage(data: ConsumerRecord) { try { - val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java) - requestRepository.findByUuidEquals(responseKey.requestId).ifPresent { - val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) - - when (responseBody.statusCode) { - 200 -> { - it.status = RequestStatus.SUCCESS - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - requestRepository.save(it) - } - - 201 -> { - it.status = RequestStatus.WARNING - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Warnungen über mangelhafte Daten", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) - } - - 400, 422 -> { - it.status = RequestStatus.ERROR - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) - } - - in 900..999 -> { - it.status = RequestStatus.ERROR - it.processedAt = Instant.ofEpochMilli(data.timestamp()) - it.report = Report( - "Fehler bei der Datenübertragung, keine Verbindung zum bwHC-Backend möglich", - objectMapper.writeValueAsString(responseBody.statusBody) - ) - requestRepository.save(it) - } - - else -> { - logger.error("Cannot process Kafka response: Unknown response code!") - } - } - } + Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java)) } catch (e: Exception) { - logger.error("Cannot process Kafka response", e) - } + Optional.empty() + }.ifPresentOrElse({ responseKey -> + val event = try { + val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) + ResponseEvent( + responseKey.requestId, + Instant.ofEpochMilli(data.timestamp()), + responseBody.statusCode.asRequestStatus(), + when (responseBody.statusCode.asRequestStatus()) { + RequestStatus.SUCCESS -> { + Optional.empty() + } + + RequestStatus.WARNING, RequestStatus.ERROR -> { + Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) + } + + else -> { + logger.error("Kafka response: Unknown response code!") + Optional.empty() + } + } + ) + } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) + ResponseEvent( + responseKey.requestId, + Instant.ofEpochMilli(data.timestamp()), + RequestStatus.ERROR, + Optional.of("Cannot process Kafka response") + ) + } + eventPublisher.publishEvent(event) + }, { + logger.error("No response key in Kafka response") + }) } data class ResponseKey(val requestId: String) diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt index 8552bbb..f9d8182 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/RequestProcessorTest.kt @@ -37,7 +37,7 @@ import org.mockito.Mockito.* import org.mockito.junit.jupiter.MockitoExtension import org.mockito.kotlin.any import org.mockito.kotlin.argumentCaptor -import reactor.core.publisher.Sinks +import org.springframework.context.ApplicationEventPublisher import java.time.Instant import java.util.* @@ -48,7 +48,7 @@ class RequestProcessorTest { private lateinit var pseudonymizeService: PseudonymizeService private lateinit var sender: MtbFileSender private lateinit var requestService: RequestService - private lateinit var statisticsUpdateProducer: Sinks.Many + private lateinit var applicationEventPublisher: ApplicationEventPublisher private lateinit var requestProcessor: RequestProcessor @@ -57,11 +57,12 @@ class RequestProcessorTest { @Mock pseudonymizeService: PseudonymizeService, @Mock sender: RestMtbFileSender, @Mock requestService: RequestService, + @Mock applicationEventPublisher: ApplicationEventPublisher ) { this.pseudonymizeService = pseudonymizeService this.sender = sender this.requestService = requestService - this.statisticsUpdateProducer = Sinks.many().multicast().directBestEffort() + this.applicationEventPublisher = applicationEventPublisher val objectMapper = ObjectMapper() @@ -70,12 +71,12 @@ class RequestProcessorTest { sender, requestService, objectMapper, - statisticsUpdateProducer + applicationEventPublisher ) } @Test - fun testShouldDetectMtbFileDuplicationAndSaveRequestStatus() { + fun testShouldSendMtbFileDuplicationAndSaveUnknownRequestStatusAtFirst() { doAnswer { Request( id = 1L, @@ -126,11 +127,66 @@ class RequestProcessorTest { val requestCaptor = argumentCaptor() verify(requestService, times(1)).save(requestCaptor.capture()) assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.DUPLICATION) + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.UNKNOWN) } @Test - fun testShouldSendMtbFileAndSaveSuccessRequestStatus() { + fun testShouldDetectMtbFileDuplicationAndSendDuplicationEvent() { + doAnswer { + Request( + id = 1L, + uuid = UUID.randomUUID().toString(), + patientId = "TEST_12345678901", + pid = "P1", + fingerprint = "xrysxpozhbs2lnrjgf3yq4fzj33kxr7xr5c2cbuskmelfdmckl3a", + type = RequestType.MTB_FILE, + status = RequestStatus.SUCCESS, + processedAt = Instant.parse("2023-08-08T02:00:00Z") + ) + }.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString()) + + doAnswer { + false + }.`when`(requestService).isLastRequestDeletion(anyString()) + + doAnswer { + it.arguments[0] as String + }.`when`(pseudonymizeService).patientPseudonym(any()) + + val mtbFile = MtbFile.builder() + .withPatient( + Patient.builder() + .withId("1") + .withBirthDate("2000-08-08") + .withGender(Patient.Gender.MALE) + .build() + ) + .withConsent( + Consent.builder() + .withId("1") + .withStatus(Consent.Status.ACTIVE) + .withPatient("123") + .build() + ) + .withEpisode( + Episode.builder() + .withId("1") + .withPatient("1") + .withPeriod(PeriodStart("2023-08-08")) + .build() + ) + .build() + + this.requestProcessor.processMtbFile(mtbFile) + + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.DUPLICATION) + } + + @Test + fun testShouldSendMtbFileAndSendSuccessEvent() { doAnswer { Request( id = 1L, @@ -149,7 +205,7 @@ class RequestProcessorTest { }.`when`(requestService).isLastRequestDeletion(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(status = RequestStatus.SUCCESS) }.`when`(sender).send(any()) doAnswer { @@ -182,14 +238,14 @@ class RequestProcessorTest { this.requestProcessor.processMtbFile(mtbFile) - val requestCaptor = argumentCaptor() - verify(requestService, times(1)).save(requestCaptor.capture()) - assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) } @Test - fun testShouldSendMtbFileAndSaveErrorRequestStatus() { + fun testShouldSendMtbFileAndSendErrorEvent() { doAnswer { Request( id = 1L, @@ -208,7 +264,7 @@ class RequestProcessorTest { }.`when`(requestService).isLastRequestDeletion(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(status = RequestStatus.ERROR) }.`when`(sender).send(any()) doAnswer { @@ -241,20 +297,20 @@ class RequestProcessorTest { this.requestProcessor.processMtbFile(mtbFile) - val requestCaptor = argumentCaptor() - verify(requestService, times(1)).save(requestCaptor.capture()) - assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) } @Test - fun testShouldSendDeleteRequestAndSaveSuccessRequestStatus() { + fun testShouldSendDeleteRequestAndSaveUnknownRequestStatusAtFirst() { doAnswer { "PSEUDONYM" }.`when`(pseudonymizeService).patientPseudonym(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.SUCCESS) + MtbFileSender.Response(status = RequestStatus.UNKNOWN) }.`when`(sender).send(any()) this.requestProcessor.processDeletion("TEST_12345678901") @@ -262,25 +318,43 @@ class RequestProcessorTest { val requestCaptor = argumentCaptor() verify(requestService, times(1)).save(requestCaptor.capture()) assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.UNKNOWN) } @Test - fun testShouldSendDeleteRequestAndSaveErrorRequestStatus() { + fun testShouldSendDeleteRequestAndSendSuccessEvent() { doAnswer { "PSEUDONYM" }.`when`(pseudonymizeService).patientPseudonym(anyString()) doAnswer { - MtbFileSender.Response(status = MtbFileSender.ResponseStatus.ERROR) + MtbFileSender.Response(status = RequestStatus.SUCCESS) }.`when`(sender).send(any()) this.requestProcessor.processDeletion("TEST_12345678901") - val requestCaptor = argumentCaptor() - verify(requestService, times(1)).save(requestCaptor.capture()) - assertThat(requestCaptor.firstValue).isNotNull - assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS) + } + + @Test + fun testShouldSendDeleteRequestAndSendErrorEvent() { + doAnswer { + "PSEUDONYM" + }.`when`(pseudonymizeService).patientPseudonym(anyString()) + + doAnswer { + MtbFileSender.Response(status = RequestStatus.ERROR) + }.`when`(sender).send(any()) + + this.requestProcessor.processDeletion("TEST_12345678901") + + val eventCaptor = argumentCaptor() + verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture()) + assertThat(eventCaptor.firstValue).isNotNull + assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR) } @Test diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt new file mode 100644 index 0000000..cfb1111 --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/ResponseProcessorTest.kt @@ -0,0 +1,142 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.KotlinModule +import dev.dnpm.etl.processor.monitoring.Request +import dev.dnpm.etl.processor.monitoring.RequestRepository +import dev.dnpm.etl.processor.monitoring.RequestStatus +import dev.dnpm.etl.processor.monitoring.RequestType +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.ArgumentMatchers.anyString +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.* +import reactor.core.publisher.Sinks +import java.time.Instant +import java.util.* + +@ExtendWith(MockitoExtension::class) +class ResponseProcessorTest { + + private lateinit var requestRepository: RequestRepository + private lateinit var statisticsUpdateProducer: Sinks.Many + + private lateinit var responseProcessor: ResponseProcessor + + private val testRequest = Request( + 1L, + "TestID1234", + "PSEUDONYM-A", + "1", + "dummyfingerprint", + RequestType.MTB_FILE, + RequestStatus.UNKNOWN + ) + + @BeforeEach + fun setup( + @Mock requestRepository: RequestRepository, + @Mock statisticsUpdateProducer: Sinks.Many + ) { + val objectMapper = ObjectMapper().registerModule(KotlinModule.Builder().build()) + + this.requestRepository = requestRepository + this.statisticsUpdateProducer = statisticsUpdateProducer + + this.responseProcessor = ResponseProcessor(requestRepository, statisticsUpdateProducer, objectMapper) + } + + @Test + fun shouldNotSaveStatusForUnknownRequest() { + doAnswer { + Optional.empty() + }.whenever(requestRepository).findByUuidEquals(anyString()) + + val event = ResponseEvent( + "TestID1234", + Instant.parse("2023-09-09T00:00:00Z"), + RequestStatus.SUCCESS + ) + + this.responseProcessor.handleResponseEvent(event) + + verify(requestRepository, never()).save(any()) + } + + @Test + fun shouldNotSaveStatusWithUnknownState() { + doAnswer { + Optional.of(testRequest) + }.whenever(requestRepository).findByUuidEquals(anyString()) + + val event = ResponseEvent( + "TestID1234", + Instant.parse("2023-09-09T00:00:00Z"), + RequestStatus.UNKNOWN + ) + + this.responseProcessor.handleResponseEvent(event) + + verify(requestRepository, never()).save(any()) + } + + @ParameterizedTest + @MethodSource("requestStatusSource") + fun shouldSaveStatusForKnownRequest(requestStatus: RequestStatus) { + doAnswer { + Optional.of(testRequest) + }.whenever(requestRepository).findByUuidEquals(anyString()) + + val event = ResponseEvent( + "TestID1234", + Instant.parse("2023-09-09T00:00:00Z"), + requestStatus + ) + + this.responseProcessor.handleResponseEvent(event) + + val captor = argumentCaptor() + verify(requestRepository, times(1)).save(captor.capture()) + assertThat(captor.firstValue).isNotNull + assertThat(captor.firstValue.status).isEqualTo(requestStatus) + } + + companion object { + + @JvmStatic + fun requestStatusSource(): Set { + return setOf( + RequestStatus.SUCCESS, + RequestStatus.WARNING, + RequestStatus.ERROR, + RequestStatus.DUPLICATION + ) + } + + } + +} \ No newline at end of file diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt new file mode 100644 index 0000000..0f524ca --- /dev/null +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt @@ -0,0 +1,119 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.services.kafka + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.KotlinModule +import dev.dnpm.etl.processor.services.ResponseEvent +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.kotlin.any +import org.mockito.kotlin.never +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import org.springframework.context.ApplicationEventPublisher +import org.springframework.http.HttpStatus + +@ExtendWith(MockitoExtension::class) +class KafkaResponseProcessorTest { + + private lateinit var eventPublisher: ApplicationEventPublisher + private lateinit var objectMapper: ObjectMapper + + private lateinit var kafkaResponseProcessor: KafkaResponseProcessor + + private fun createkafkaRecord( + requestId: String? = null, + statusCode: Int = 200, + statusBody: Map? = mapOf() + ): ConsumerRecord { + return ConsumerRecord( + "test-topic", + 0, + 0, + if (requestId == null) { + null + } else { + this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseKey(requestId)) + }, + if (statusBody == null) { + "" + } else { + this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(statusCode, statusBody)) + } + ) + } + + @BeforeEach + fun setup( + @Mock eventPublisher: ApplicationEventPublisher + ) { + this.eventPublisher = eventPublisher + this.objectMapper = ObjectMapper().registerModule(KotlinModule.Builder().build()) + + this.kafkaResponseProcessor = KafkaResponseProcessor(eventPublisher, objectMapper) + } + + @Test + fun shouldNotProcessRecordsWithoutValidKey() { + this.kafkaResponseProcessor.onMessage(createkafkaRecord(null, 200)) + + verify(eventPublisher, never()).publishEvent(any()) + } + + @Test + fun shouldNotProcessRecordsWithoutValidBody() { + this.kafkaResponseProcessor.onMessage(createkafkaRecord(requestId = "TestID1234", statusBody = null)) + + verify(eventPublisher, never()).publishEvent(any()) + } + + @ParameterizedTest + @MethodSource("statusCodeSource") + fun shouldProcessValidRecordsWithStatusCode(statusCode: Int) { + this.kafkaResponseProcessor.onMessage(createkafkaRecord("TestID1234", statusCode)) + verify(eventPublisher, times(1)).publishEvent(any()) + } + + companion object { + + @JvmStatic + fun statusCodeSource(): Set { + return setOf( + HttpStatus.OK, + HttpStatus.CREATED, + HttpStatus.BAD_REQUEST, + HttpStatus.NOT_FOUND, + HttpStatus.UNPROCESSABLE_ENTITY, + HttpStatus.INTERNAL_SERVER_ERROR + ) + .map { it.value() } + .toSet() + } + + } + +} \ No newline at end of file