diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt index 63bf60a..de901ce 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt @@ -35,12 +35,27 @@ class KafkaInputListener( override fun onMessage(data: ConsumerRecord) { val mtbFile = objectMapper.readValue(data.value(), MtbFile::class.java) + val firstRequestIdHeader = data.headers().headers("requestId")?.firstOrNull() + val requestId = if (null != firstRequestIdHeader) { + String(firstRequestIdHeader.value()) + } else { + "" + } + if (mtbFile.consent.status == Consent.Status.ACTIVE) { logger.debug("Accepted MTB File for processing") - requestProcessor.processMtbFile(mtbFile) + if (requestId.isBlank()) { + requestProcessor.processMtbFile(mtbFile) + } else { + requestProcessor.processMtbFile(mtbFile, requestId) + } } else { logger.debug("Accepted MTB File and process deletion") - requestProcessor.processDeletion(mtbFile.patient.id) + if (requestId.isBlank()) { + requestProcessor.processDeletion(mtbFile.patient.id) + } else { + requestProcessor.processDeletion(mtbFile.patient.id, requestId) + } } } } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt index d0b6341..66ff291 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/RequestProcessor.kt @@ -48,7 +48,10 @@ class RequestProcessor( ) { fun processMtbFile(mtbFile: MtbFile) { - val requestId = UUID.randomUUID().toString() + processMtbFile(mtbFile, UUID.randomUUID().toString()) + } + + fun processMtbFile(mtbFile: MtbFile, requestId: String) { val pid = mtbFile.patient.id mtbFile pseudonymizeWith pseudonymizeService @@ -103,8 +106,10 @@ class RequestProcessor( } fun processDeletion(patientId: String) { - val requestId = UUID.randomUUID().toString() + processDeletion(patientId, UUID.randomUUID().toString()) + } + fun processDeletion(patientId: String, requestId: String) { try { val patientPseudonym = pseudonymizeService.patientPseudonym(patientId) diff --git a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt index cf5ba39..1157644 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/input/KafkaInputListenerTest.kt @@ -25,6 +25,9 @@ import de.ukw.ccc.bwhc.dto.MtbFile import de.ukw.ccc.bwhc.dto.Patient import dev.dnpm.etl.processor.services.RequestProcessor import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.RecordHeader +import org.apache.kafka.common.header.internals.RecordHeaders +import org.apache.kafka.common.record.TimestampType import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith @@ -34,6 +37,7 @@ import org.mockito.junit.jupiter.MockitoExtension import org.mockito.kotlin.any import org.mockito.kotlin.times import org.mockito.kotlin.verify +import java.util.* @ExtendWith(MockitoExtension::class) class KafkaInputListenerTest { @@ -76,4 +80,33 @@ class KafkaInputListenerTest { verify(requestProcessor, times(1)).processDeletion(anyString()) } + @Test + fun shouldProcessMtbFileRequestWithExistingRequestId() { + val mtbFile = MtbFile.builder() + .withPatient(Patient.builder().withId("DUMMY_12345678").build()) + .withConsent(Consent.builder().withStatus(Consent.Status.ACTIVE).build()) + .build() + + val headers = RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()))) + kafkaInputListener.onMessage( + ConsumerRecord("testtopic", 0, 0, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, "", this.objectMapper.writeValueAsString(mtbFile), headers, Optional.empty()) + ) + + verify(requestProcessor, times(1)).processMtbFile(any(), anyString()) + } + + @Test + fun shouldProcessDeleteRequestWithExistingRequestId() { + val mtbFile = MtbFile.builder() + .withPatient(Patient.builder().withId("DUMMY_12345678").build()) + .withConsent(Consent.builder().withStatus(Consent.Status.REJECTED).build()) + .build() + + val headers = RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray()))) + kafkaInputListener.onMessage( + ConsumerRecord("testtopic", 0, 0, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, "", this.objectMapper.writeValueAsString(mtbFile), headers, Optional.empty()) + ) + verify(requestProcessor, times(1)).processDeletion(anyString(), anyString()) + } + } \ No newline at end of file