mirror of
https://github.com/pcvolkmer/onco-analytics-monitor.git
synced 2025-04-19 19:16:52 +00:00
feat: handle oBDS Version 3.x records
This commit is contained in:
parent
b0dc82ed27
commit
2262783c9a
@ -9,6 +9,7 @@ import dev.pcvolkmer.oncoanalytics.monitor.conditions.Condition
|
|||||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionId
|
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionId
|
||||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository
|
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository
|
||||||
import dev.pcvolkmer.oncoanalytics.monitor.fetchStatistics
|
import dev.pcvolkmer.oncoanalytics.monitor.fetchStatistics
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
import org.springframework.beans.factory.annotation.Qualifier
|
import org.springframework.beans.factory.annotation.Qualifier
|
||||||
import org.springframework.kafka.annotation.KafkaListener
|
import org.springframework.kafka.annotation.KafkaListener
|
||||||
import org.springframework.kafka.support.KafkaHeaders
|
import org.springframework.kafka.support.KafkaHeaders
|
||||||
@ -35,6 +36,9 @@ class ObdsXmlTopicMonitor(
|
|||||||
statisticsEventProducer: StatisticsSink,
|
statisticsEventProducer: StatisticsSink,
|
||||||
) : AbstractTopicMonitor(statisticsEventProducer) {
|
) : AbstractTopicMonitor(statisticsEventProducer) {
|
||||||
|
|
||||||
|
private val logger = LoggerFactory.getLogger(javaClass)
|
||||||
|
private val xPath = XPathFactory.newDefaultInstance().newXPath()
|
||||||
|
|
||||||
@KafkaListener(topicPattern = "onkostar.MELDUNG_EXPORT.*")
|
@KafkaListener(topicPattern = "onkostar.MELDUNG_EXPORT.*")
|
||||||
override fun handleTopicRecord(
|
override fun handleTopicRecord(
|
||||||
@Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
|
@Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
|
||||||
@ -44,35 +48,69 @@ class ObdsXmlTopicMonitor(
|
|||||||
) {
|
) {
|
||||||
try {
|
try {
|
||||||
val p = objectMapper.readValue(payload, Record::class.java)
|
val p = objectMapper.readValue(payload, Record::class.java)
|
||||||
val xPath = XPathFactory.newDefaultInstance().newXPath()
|
when (xPath.evaluate("name(/*)", InputSource(StringReader(p.payload.data)))) {
|
||||||
|
"ADT_GEKID" -> handleObds2Payload(p.payload)
|
||||||
|
"oBDS" -> handleObds3Payload(p.payload)
|
||||||
|
else -> logger.warn("Cannot handle a non oBDS 2.x/3.x record!")
|
||||||
|
}
|
||||||
|
} catch (e: Exception) {
|
||||||
|
// Ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun handleObds2Payload(payload: RecordPayload) {
|
||||||
// Use local-name() due to XML namespace
|
// Use local-name() due to XML namespace
|
||||||
val patientId = xPath.evaluate(
|
val patientId = xPath.evaluate(
|
||||||
"//*[local-name()='Patienten_Stammdaten']/@Patient_ID",
|
"//*[local-name()='Patienten_Stammdaten']/@Patient_ID",
|
||||||
InputSource(StringReader(p.payload.data))
|
InputSource(StringReader(payload.data))
|
||||||
)
|
)
|
||||||
val tumorId = xPath.evaluate(
|
val tumorId = xPath.evaluate(
|
||||||
"//*[local-name()='Tumorzuordnung']/@Tumor_ID",
|
"//*[local-name()='Tumorzuordnung']/@Tumor_ID",
|
||||||
InputSource(StringReader(p.payload.data))
|
InputSource(StringReader(payload.data))
|
||||||
)
|
)
|
||||||
val icd10 = xPath.evaluate(
|
val icd10 = xPath.evaluate(
|
||||||
"//*[local-name()='Primaertumor_ICD_Code']/text()",
|
"//*[local-name()='Primaertumor_ICD_Code']/text()",
|
||||||
InputSource(StringReader(p.payload.data))
|
InputSource(StringReader(payload.data))
|
||||||
)
|
)
|
||||||
|
|
||||||
val updated = conditionRepository.saveIfNewerVersion(
|
val updated = conditionRepository.saveIfNewerVersion(
|
||||||
Condition(
|
Condition(
|
||||||
ConditionId.fromPatientIdAndTumorId(patientId, tumorId),
|
ConditionId.fromPatientIdAndTumorId(patientId, tumorId),
|
||||||
icd10,
|
icd10,
|
||||||
p.payload.version
|
payload.version
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if (updated) {
|
if (updated) {
|
||||||
sendUpdatedStatistics(fetchStatistics("obdsxml", conditionRepository))
|
sendUpdatedStatistics(fetchStatistics("obdsxml", conditionRepository))
|
||||||
}
|
}
|
||||||
} catch (e: Exception) {
|
}
|
||||||
// Ignore
|
|
||||||
|
private fun handleObds3Payload(payload: RecordPayload) {
|
||||||
|
// Use local-name() due to XML namespace
|
||||||
|
val patientId = xPath.evaluate(
|
||||||
|
"//*[local-name()='Patient']/@Patient_ID",
|
||||||
|
InputSource(StringReader(payload.data))
|
||||||
|
)
|
||||||
|
val tumorId = xPath.evaluate(
|
||||||
|
"//*[local-name()='Tumorzuordnung']/@Tumor_ID",
|
||||||
|
InputSource(StringReader(payload.data))
|
||||||
|
)
|
||||||
|
val icd10 = xPath.evaluate(
|
||||||
|
"//*[local-name()='Primaertumor_ICD']/*[local-name()='Code']/text()",
|
||||||
|
InputSource(StringReader(payload.data))
|
||||||
|
)
|
||||||
|
|
||||||
|
val updated = conditionRepository.saveIfNewerVersion(
|
||||||
|
Condition(
|
||||||
|
ConditionId.fromPatientIdAndTumorId(patientId, tumorId),
|
||||||
|
icd10,
|
||||||
|
payload.version
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if (updated) {
|
||||||
|
sendUpdatedStatistics(fetchStatistics("obdsxml", conditionRepository))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,14 +4,16 @@ import com.fasterxml.jackson.databind.MapperFeature
|
|||||||
import com.fasterxml.jackson.databind.json.JsonMapper
|
import com.fasterxml.jackson.databind.json.JsonMapper
|
||||||
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule
|
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule
|
||||||
import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink
|
import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink
|
||||||
|
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Condition
|
||||||
|
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionId
|
||||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository
|
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository
|
||||||
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
import org.junit.jupiter.api.BeforeEach
|
import org.junit.jupiter.api.BeforeEach
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
import org.junit.jupiter.api.extension.ExtendWith
|
import org.junit.jupiter.api.extension.ExtendWith
|
||||||
import org.mockito.Mock
|
import org.mockito.Mock
|
||||||
import org.mockito.junit.jupiter.MockitoExtension
|
import org.mockito.junit.jupiter.MockitoExtension
|
||||||
import org.mockito.kotlin.any
|
import org.mockito.kotlin.*
|
||||||
import org.mockito.kotlin.whenever
|
|
||||||
import org.springframework.core.io.ClassPathResource
|
import org.springframework.core.io.ClassPathResource
|
||||||
import reactor.core.publisher.Sinks
|
import reactor.core.publisher.Sinks
|
||||||
import reactor.test.StepVerifier
|
import reactor.test.StepVerifier
|
||||||
@ -22,6 +24,8 @@ import kotlin.time.toJavaDuration
|
|||||||
@ExtendWith(MockitoExtension::class)
|
@ExtendWith(MockitoExtension::class)
|
||||||
class ObdsXmlTopicMonitorTest {
|
class ObdsXmlTopicMonitorTest {
|
||||||
|
|
||||||
|
private lateinit var conditionRepository: ConditionRepository
|
||||||
|
|
||||||
private lateinit var topicMonitor: ObdsXmlTopicMonitor
|
private lateinit var topicMonitor: ObdsXmlTopicMonitor
|
||||||
|
|
||||||
private lateinit var statisticsEventProducer: StatisticsSink
|
private lateinit var statisticsEventProducer: StatisticsSink
|
||||||
@ -31,14 +35,19 @@ class ObdsXmlTopicMonitorTest {
|
|||||||
.enable(MapperFeature.AUTO_DETECT_CREATORS)
|
.enable(MapperFeature.AUTO_DETECT_CREATORS)
|
||||||
.build()
|
.build()
|
||||||
|
|
||||||
private fun payload(): String {
|
private fun obds2Payload(): String {
|
||||||
return ClassPathResource("testobds.json").getContentAsString(Charsets.UTF_8)
|
return ClassPathResource("testobds.json").getContentAsString(Charsets.UTF_8)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun obds3Payload(): String {
|
||||||
|
return ClassPathResource("testobds3.json").getContentAsString(Charsets.UTF_8)
|
||||||
|
}
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
fun setup(
|
fun setup(
|
||||||
@Mock conditionRepository: ConditionRepository,
|
@Mock conditionRepository: ConditionRepository,
|
||||||
) {
|
) {
|
||||||
|
this.conditionRepository = conditionRepository
|
||||||
this.statisticsEventProducer = Sinks.many().multicast().directBestEffort()
|
this.statisticsEventProducer = Sinks.many().multicast().directBestEffort()
|
||||||
this.topicMonitor = ObdsXmlTopicMonitor(conditionRepository, objectMapper, statisticsEventProducer)
|
this.topicMonitor = ObdsXmlTopicMonitor(conditionRepository, objectMapper, statisticsEventProducer)
|
||||||
|
|
||||||
@ -46,16 +55,59 @@ class ObdsXmlTopicMonitorTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun shouldHandleKafkaRecordAndEmitEvent() {
|
fun shouldHandleObds2KafkaRecordAndEmitEvent() {
|
||||||
val stepVerifier = StepVerifier
|
val stepVerifier = StepVerifier
|
||||||
.create(statisticsEventProducer.asFlux())
|
.create(statisticsEventProducer.asFlux())
|
||||||
.expectNextCount(1)
|
.expectNextCount(1)
|
||||||
.expectTimeout(3.seconds.toJavaDuration())
|
.expectTimeout(3.seconds.toJavaDuration())
|
||||||
.verifyLater()
|
.verifyLater()
|
||||||
|
|
||||||
this.topicMonitor.handleTopicRecord("test", Instant.now().toEpochMilli(), "{\"ID\": 1}", payload())
|
this.topicMonitor.handleTopicRecord("test", Instant.now().toEpochMilli(), "{\"ID\": 1}", obds2Payload())
|
||||||
|
|
||||||
stepVerifier.verify()
|
stepVerifier.verify()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun shouldHandleObds3KafkaRecordAndEmitEvent() {
|
||||||
|
val stepVerifier = StepVerifier
|
||||||
|
.create(statisticsEventProducer.asFlux())
|
||||||
|
.expectNextCount(1)
|
||||||
|
.expectTimeout(3.seconds.toJavaDuration())
|
||||||
|
.verifyLater()
|
||||||
|
|
||||||
|
this.topicMonitor.handleTopicRecord("test", Instant.now().toEpochMilli(), "{\"ID\": 1}", obds3Payload())
|
||||||
|
|
||||||
|
stepVerifier.verify()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun shouldSaveConditionOnObds2Record() {
|
||||||
|
this.topicMonitor.handleTopicRecord("test", Instant.now().toEpochMilli(), "{\"ID\": 1}", obds2Payload())
|
||||||
|
|
||||||
|
val captor = argumentCaptor<Condition>()
|
||||||
|
verify(conditionRepository, times(1)).saveIfNewerVersion(captor.capture())
|
||||||
|
assertThat(captor.firstValue).isEqualTo(
|
||||||
|
Condition(
|
||||||
|
ConditionId.fromPatientIdAndTumorId("00001234", "1"),
|
||||||
|
"C17.1",
|
||||||
|
1
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun shouldSaveConditionOnObds3Record() {
|
||||||
|
this.topicMonitor.handleTopicRecord("test", Instant.now().toEpochMilli(), "{\"ID\": 1}", obds3Payload())
|
||||||
|
|
||||||
|
val captor = argumentCaptor<Condition>()
|
||||||
|
verify(conditionRepository, times(1)).saveIfNewerVersion(captor.capture())
|
||||||
|
assertThat(captor.firstValue).isEqualTo(
|
||||||
|
Condition(
|
||||||
|
ConditionId.fromPatientIdAndTumorId("00001234", "1"),
|
||||||
|
"C17.1",
|
||||||
|
1
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
1
src/test/resources/testobds3.json
Normal file
1
src/test/resources/testobds3.json
Normal file
@ -0,0 +1 @@
|
|||||||
|
{"schema":{"type":"struct","fields":[{"type":"int32","optional":true,"field":"YEAR"},{"type":"int32","optional":true,"field":"VERSIONSNUMMER"},{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"XML_DATEN"}],"optional":false},"payload":{"YEAR":2024,"VERSIONSNUMMER":1,"ID":123456,"XML_DATEN":"<?xml version=\"1.0\" encoding=\"utf-8\" ?><oBDS xmlns=\"http://www.basisdatensatz.de/oBDS/XML\" Schema_Version=\"3.0.2\"><Absender Absender_ID=\"TEST\" Software_ID=\"ONKOSTAR\" Software_Version=\"2.13.1\"><Bezeichnung>TEST</Bezeichnung><Anschrift>Musterstraße 1, 012345 Musterhausen</Anschrift></Absender><Meldedatum>2024-06-11</Meldedatum><Menge_Patient><Patient Patient_ID=\"00001234\"><Patienten_Stammdaten><Versichertendaten_GKV><IKNR>103456789</IKNR><GKV_Versichertennummer>E123456789</GKV_Versichertennummer></Versichertendaten_GKV><Nachname>Tester</Nachname><Vornamen>Patrick</Vornamen><Geschlecht>M</Geschlecht><Geburtsdatum Datumsgenauigkeit=\"T\">1980-01-01</Geburtsdatum><Adresse><Strasse>Testweg</Strasse><Hausnummer>1</Hausnummer><Land>DE</Land><PLZ>01234</PLZ><Ort>Musterhausen</Ort></Adresse></Patienten_Stammdaten><Menge_Meldung><Meldung Meldung_ID=\"TEST1727528\" Melder_ID=\"TEST\"><Meldebegruendung>I</Meldebegruendung><Eigene_Leistung>J</Eigene_Leistung><Tumorzuordnung Tumor_ID=\"1\"><Primaertumor_ICD><Code>C17.1</Code><Version>10 2015 GM</Version></Primaertumor_ICD><Diagnosedatum Datumsgenauigkeit=\"T\">2024-06-10</Diagnosedatum></Tumorzuordnung><Diagnose><Diagnosesicherung>1</Diagnosesicherung><Allgemeiner_Leistungszustand>0</Allgemeiner_Leistungszustand></Diagnose></Meldung></Menge_Meldung></Patient></Menge_Patient><Menge_Melder><Melder ID=\"TEST\"><Ident_Nummern><IKNR>103456789</IKNR></Ident_Nummern></Melder></Menge_Melder></oBDS>"}}
|
Loading…
x
Reference in New Issue
Block a user