diff --git a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsXmlTopicMonitor.kt b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsXmlTopicMonitor.kt index f692f5b..aeb28a5 100644 --- a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsXmlTopicMonitor.kt +++ b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsXmlTopicMonitor.kt @@ -9,6 +9,7 @@ import dev.pcvolkmer.oncoanalytics.monitor.conditions.Condition import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionId import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository import dev.pcvolkmer.oncoanalytics.monitor.fetchStatistics +import org.slf4j.LoggerFactory import org.springframework.beans.factory.annotation.Qualifier import org.springframework.kafka.annotation.KafkaListener import org.springframework.kafka.support.KafkaHeaders @@ -35,6 +36,9 @@ class ObdsXmlTopicMonitor( statisticsEventProducer: StatisticsSink, ) : AbstractTopicMonitor(statisticsEventProducer) { + private val logger = LoggerFactory.getLogger(javaClass) + private val xPath = XPathFactory.newDefaultInstance().newXPath() + @KafkaListener(topicPattern = "onkostar.MELDUNG_EXPORT.*") override fun handleTopicRecord( @Header(KafkaHeaders.RECEIVED_TOPIC) topic: String, @@ -44,38 +48,72 @@ class ObdsXmlTopicMonitor( ) { try { val p = objectMapper.readValue(payload, Record::class.java) - val xPath = XPathFactory.newDefaultInstance().newXPath() - - // Use local-name() due to XML namespace - val patientId = xPath.evaluate( - "//*[local-name()='Patienten_Stammdaten']/@Patient_ID", - InputSource(StringReader(p.payload.data)) - ) - val tumorId = xPath.evaluate( - "//*[local-name()='Tumorzuordnung']/@Tumor_ID", - InputSource(StringReader(p.payload.data)) - ) - val icd10 = xPath.evaluate( - "//*[local-name()='Primaertumor_ICD_Code']/text()", - InputSource(StringReader(p.payload.data)) - ) - - val updated = conditionRepository.saveIfNewerVersion( - Condition( - ConditionId.fromPatientIdAndTumorId(patientId, tumorId), - icd10, - p.payload.version - ) - ) - - if (updated) { - sendUpdatedStatistics(fetchStatistics("obdsxml", conditionRepository)) + when (xPath.evaluate("name(/*)", InputSource(StringReader(p.payload.data)))) { + "ADT_GEKID" -> handleObds2Payload(p.payload) + "oBDS" -> handleObds3Payload(p.payload) + else -> logger.warn("Cannot handle a non oBDS 2.x/3.x record!") } } catch (e: Exception) { // Ignore } } + private fun handleObds2Payload(payload: RecordPayload) { + // Use local-name() due to XML namespace + val patientId = xPath.evaluate( + "//*[local-name()='Patienten_Stammdaten']/@Patient_ID", + InputSource(StringReader(payload.data)) + ) + val tumorId = xPath.evaluate( + "//*[local-name()='Tumorzuordnung']/@Tumor_ID", + InputSource(StringReader(payload.data)) + ) + val icd10 = xPath.evaluate( + "//*[local-name()='Primaertumor_ICD_Code']/text()", + InputSource(StringReader(payload.data)) + ) + + val updated = conditionRepository.saveIfNewerVersion( + Condition( + ConditionId.fromPatientIdAndTumorId(patientId, tumorId), + icd10, + payload.version + ) + ) + + if (updated) { + sendUpdatedStatistics(fetchStatistics("obdsxml", conditionRepository)) + } + } + + private fun handleObds3Payload(payload: RecordPayload) { + // Use local-name() due to XML namespace + val patientId = xPath.evaluate( + "//*[local-name()='Patient']/@Patient_ID", + InputSource(StringReader(payload.data)) + ) + val tumorId = xPath.evaluate( + "//*[local-name()='Tumorzuordnung']/@Tumor_ID", + InputSource(StringReader(payload.data)) + ) + val icd10 = xPath.evaluate( + "//*[local-name()='Primaertumor_ICD']/*[local-name()='Code']/text()", + InputSource(StringReader(payload.data)) + ) + + val updated = conditionRepository.saveIfNewerVersion( + Condition( + ConditionId.fromPatientIdAndTumorId(patientId, tumorId), + icd10, + payload.version + ) + ) + + if (updated) { + sendUpdatedStatistics(fetchStatistics("obdsxml", conditionRepository)) + } + } + @JsonIgnoreProperties(ignoreUnknown = true) class Record @JsonCreator constructor(val payload: RecordPayload) diff --git a/src/test/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsXmlTopicMonitorTest.kt b/src/test/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsXmlTopicMonitorTest.kt index da4d3d3..8313b7b 100644 --- a/src/test/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsXmlTopicMonitorTest.kt +++ b/src/test/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsXmlTopicMonitorTest.kt @@ -4,14 +4,16 @@ import com.fasterxml.jackson.databind.MapperFeature import com.fasterxml.jackson.databind.json.JsonMapper import com.fasterxml.jackson.module.paramnames.ParameterNamesModule import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink +import dev.pcvolkmer.oncoanalytics.monitor.conditions.Condition +import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionId import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository +import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith import org.mockito.Mock import org.mockito.junit.jupiter.MockitoExtension -import org.mockito.kotlin.any -import org.mockito.kotlin.whenever +import org.mockito.kotlin.* import org.springframework.core.io.ClassPathResource import reactor.core.publisher.Sinks import reactor.test.StepVerifier @@ -22,6 +24,8 @@ import kotlin.time.toJavaDuration @ExtendWith(MockitoExtension::class) class ObdsXmlTopicMonitorTest { + private lateinit var conditionRepository: ConditionRepository + private lateinit var topicMonitor: ObdsXmlTopicMonitor private lateinit var statisticsEventProducer: StatisticsSink @@ -31,14 +35,19 @@ class ObdsXmlTopicMonitorTest { .enable(MapperFeature.AUTO_DETECT_CREATORS) .build() - private fun payload(): String { + private fun obds2Payload(): String { return ClassPathResource("testobds.json").getContentAsString(Charsets.UTF_8) } + private fun obds3Payload(): String { + return ClassPathResource("testobds3.json").getContentAsString(Charsets.UTF_8) + } + @BeforeEach fun setup( @Mock conditionRepository: ConditionRepository, ) { + this.conditionRepository = conditionRepository this.statisticsEventProducer = Sinks.many().multicast().directBestEffort() this.topicMonitor = ObdsXmlTopicMonitor(conditionRepository, objectMapper, statisticsEventProducer) @@ -46,16 +55,59 @@ class ObdsXmlTopicMonitorTest { } @Test - fun shouldHandleKafkaRecordAndEmitEvent() { + fun shouldHandleObds2KafkaRecordAndEmitEvent() { val stepVerifier = StepVerifier .create(statisticsEventProducer.asFlux()) .expectNextCount(1) .expectTimeout(3.seconds.toJavaDuration()) .verifyLater() - this.topicMonitor.handleTopicRecord("test", Instant.now().toEpochMilli(), "{\"ID\": 1}", payload()) + this.topicMonitor.handleTopicRecord("test", Instant.now().toEpochMilli(), "{\"ID\": 1}", obds2Payload()) stepVerifier.verify() } + @Test + fun shouldHandleObds3KafkaRecordAndEmitEvent() { + val stepVerifier = StepVerifier + .create(statisticsEventProducer.asFlux()) + .expectNextCount(1) + .expectTimeout(3.seconds.toJavaDuration()) + .verifyLater() + + this.topicMonitor.handleTopicRecord("test", Instant.now().toEpochMilli(), "{\"ID\": 1}", obds3Payload()) + + stepVerifier.verify() + } + + @Test + fun shouldSaveConditionOnObds2Record() { + this.topicMonitor.handleTopicRecord("test", Instant.now().toEpochMilli(), "{\"ID\": 1}", obds2Payload()) + + val captor = argumentCaptor() + verify(conditionRepository, times(1)).saveIfNewerVersion(captor.capture()) + assertThat(captor.firstValue).isEqualTo( + Condition( + ConditionId.fromPatientIdAndTumorId("00001234", "1"), + "C17.1", + 1 + ) + ) + } + + @Test + fun shouldSaveConditionOnObds3Record() { + this.topicMonitor.handleTopicRecord("test", Instant.now().toEpochMilli(), "{\"ID\": 1}", obds3Payload()) + + val captor = argumentCaptor() + verify(conditionRepository, times(1)).saveIfNewerVersion(captor.capture()) + assertThat(captor.firstValue).isEqualTo( + Condition( + ConditionId.fromPatientIdAndTumorId("00001234", "1"), + "C17.1", + 1 + ) + ) + } + } \ No newline at end of file diff --git a/src/test/resources/testobds3.json b/src/test/resources/testobds3.json new file mode 100644 index 0000000..1574e13 --- /dev/null +++ b/src/test/resources/testobds3.json @@ -0,0 +1 @@ +{"schema":{"type":"struct","fields":[{"type":"int32","optional":true,"field":"YEAR"},{"type":"int32","optional":true,"field":"VERSIONSNUMMER"},{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"XML_DATEN"}],"optional":false},"payload":{"YEAR":2024,"VERSIONSNUMMER":1,"ID":123456,"XML_DATEN":"TESTMusterstraße 1, 012345 Musterhausen2024-06-11103456789E123456789TesterPatrickM1980-01-01Testweg1DE01234MusterhausenIJC17.110 2015 GM2024-06-1010103456789"}} \ No newline at end of file