mirror of
https://github.com/pcvolkmer/onco-analytics-monitor.git
synced 2025-04-19 19:16:52 +00:00
test: check handling of incoming kafka records
This commit is contained in:
parent
afa9be27b9
commit
3979415c18
@ -0,0 +1,58 @@
|
||||
package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners
|
||||
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.extension.ExtendWith
|
||||
import org.mockito.Mock
|
||||
import org.mockito.junit.jupiter.MockitoExtension
|
||||
import org.mockito.kotlin.any
|
||||
import org.mockito.kotlin.whenever
|
||||
import org.springframework.core.io.ClassPathResource
|
||||
import reactor.core.publisher.Sinks
|
||||
import reactor.test.StepVerifier
|
||||
import java.time.Instant
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
import kotlin.time.toJavaDuration
|
||||
|
||||
@ExtendWith(MockitoExtension::class)
|
||||
class FhirObdsTopicMonitorTest {
|
||||
|
||||
private lateinit var topicMonitor: FhirObdsTopicMonitor
|
||||
|
||||
private lateinit var statisticsEventProducer: StatisticsSink
|
||||
|
||||
private fun payload(): String {
|
||||
return ClassPathResource("testfhir.json").getContentAsString(Charsets.UTF_8)
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
fun setup(
|
||||
@Mock conditionRepository: ConditionRepository,
|
||||
) {
|
||||
this.statisticsEventProducer = Sinks.many().multicast().directBestEffort()
|
||||
this.topicMonitor = FhirObdsTopicMonitor(conditionRepository, statisticsEventProducer)
|
||||
|
||||
whenever(conditionRepository.save(any())).thenReturn(true)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun shouldHandleKafkaRecordAndEmitEvent() {
|
||||
val stepVerifier = StepVerifier
|
||||
.create(statisticsEventProducer.asFlux())
|
||||
.expectNextCount(1)
|
||||
.expectTimeout(3.seconds.toJavaDuration())
|
||||
.verifyLater()
|
||||
|
||||
this.topicMonitor.handleTopicRecord(
|
||||
"test",
|
||||
Instant.now().toEpochMilli(),
|
||||
"Struct{REFERENZ_NUMMER=00001234,TUMOR_ID=1}",
|
||||
payload()
|
||||
)
|
||||
|
||||
stepVerifier.verify()
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners
|
||||
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.extension.ExtendWith
|
||||
import org.mockito.Mock
|
||||
import org.mockito.junit.jupiter.MockitoExtension
|
||||
import org.mockito.kotlin.any
|
||||
import org.mockito.kotlin.whenever
|
||||
import org.springframework.core.io.ClassPathResource
|
||||
import reactor.core.publisher.Sinks
|
||||
import reactor.test.StepVerifier
|
||||
import java.time.Instant
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
import kotlin.time.toJavaDuration
|
||||
|
||||
@ExtendWith(MockitoExtension::class)
|
||||
class FhirPseudonymizedTopicMonitorTest {
|
||||
|
||||
private lateinit var topicMonitor: FhirPseudonymizedTopicMonitor
|
||||
|
||||
private lateinit var statisticsEventProducer: StatisticsSink
|
||||
|
||||
private fun payload(): String {
|
||||
return ClassPathResource("testfhir.json").getContentAsString(Charsets.UTF_8)
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
fun setup(
|
||||
@Mock conditionRepository: ConditionRepository,
|
||||
) {
|
||||
this.statisticsEventProducer = Sinks.many().multicast().directBestEffort()
|
||||
this.topicMonitor = FhirPseudonymizedTopicMonitor(conditionRepository, statisticsEventProducer)
|
||||
|
||||
whenever(conditionRepository.save(any())).thenReturn(true)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun shouldHandleKafkaRecordAndEmitEvent() {
|
||||
val stepVerifier = StepVerifier
|
||||
.create(statisticsEventProducer.asFlux())
|
||||
.expectNextCount(1)
|
||||
.expectTimeout(3.seconds.toJavaDuration())
|
||||
.verifyLater()
|
||||
|
||||
this.topicMonitor.handleTopicRecord(
|
||||
"test",
|
||||
Instant.now().toEpochMilli(),
|
||||
"Struct{REFERENZ_NUMMER=00001234,TUMOR_ID=1}",
|
||||
payload()
|
||||
)
|
||||
|
||||
stepVerifier.verify()
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,61 @@
|
||||
package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners
|
||||
|
||||
import com.fasterxml.jackson.databind.MapperFeature
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper
|
||||
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.extension.ExtendWith
|
||||
import org.mockito.Mock
|
||||
import org.mockito.junit.jupiter.MockitoExtension
|
||||
import org.mockito.kotlin.any
|
||||
import org.mockito.kotlin.whenever
|
||||
import org.springframework.core.io.ClassPathResource
|
||||
import reactor.core.publisher.Sinks
|
||||
import reactor.test.StepVerifier
|
||||
import java.time.Instant
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
import kotlin.time.toJavaDuration
|
||||
|
||||
@ExtendWith(MockitoExtension::class)
|
||||
class ObdsXmlTopicMonitorTest {
|
||||
|
||||
private lateinit var topicMonitor: ObdsXmlTopicMonitor
|
||||
|
||||
private lateinit var statisticsEventProducer: StatisticsSink
|
||||
|
||||
private val objectMapper = JsonMapper.builder()
|
||||
.addModules(ParameterNamesModule())
|
||||
.enable(MapperFeature.AUTO_DETECT_CREATORS)
|
||||
.build()
|
||||
|
||||
private fun payload(): String {
|
||||
return ClassPathResource("testobds.json").getContentAsString(Charsets.UTF_8)
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
fun setup(
|
||||
@Mock conditionRepository: ConditionRepository,
|
||||
) {
|
||||
this.statisticsEventProducer = Sinks.many().multicast().directBestEffort()
|
||||
this.topicMonitor = ObdsXmlTopicMonitor(conditionRepository, objectMapper, statisticsEventProducer)
|
||||
|
||||
whenever(conditionRepository.saveIfNewerVersion(any())).thenReturn(true)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun shouldHandleKafkaRecordAndEmitEvent() {
|
||||
val stepVerifier = StepVerifier
|
||||
.create(statisticsEventProducer.asFlux())
|
||||
.expectNextCount(1)
|
||||
.expectTimeout(3.seconds.toJavaDuration())
|
||||
.verifyLater()
|
||||
|
||||
this.topicMonitor.handleTopicRecord("test", Instant.now().toEpochMilli(), "{\"ID\": 1}", payload())
|
||||
|
||||
stepVerifier.verify()
|
||||
}
|
||||
|
||||
}
|
48
src/test/resources/testfhir.json
Normal file
48
src/test/resources/testfhir.json
Normal file
@ -0,0 +1,48 @@
|
||||
{
|
||||
"resourceType": "Bundle",
|
||||
"type": "transaction",
|
||||
"entry": [
|
||||
{
|
||||
"fullUrl": "Condition/test123",
|
||||
"resource": {
|
||||
"resourceType": "Condition",
|
||||
"id": "test123",
|
||||
"meta": {
|
||||
"source": "TEST.ONKOSTAR:obds-to-fhir:0.0.0",
|
||||
"profile": [
|
||||
"http://dktk.dkfz.de/fhir/StructureDefinition/onco-core-Condition-Primaerdiagnose"
|
||||
]
|
||||
},
|
||||
"code": {
|
||||
"coding": [
|
||||
{
|
||||
"system": "http://fhir.de/CodeSystem/bfarm/icd-10-gm",
|
||||
"version": "2015",
|
||||
"code": "C17.1"
|
||||
}
|
||||
]
|
||||
},
|
||||
"subject": {
|
||||
"reference": "Patient/test123",
|
||||
"identifier": {
|
||||
"type": {
|
||||
"coding": [
|
||||
{
|
||||
"system": "http://terminology.hl7.org/CodeSystem/v2-0203",
|
||||
"code": "MR"
|
||||
}
|
||||
]
|
||||
},
|
||||
"system": "https://fhir.diz.uk-erlangen.de/identifiers/patient-id",
|
||||
"value": "00001234"
|
||||
}
|
||||
},
|
||||
"onsetDateTime": "2024-01-01"
|
||||
},
|
||||
"request": {
|
||||
"method": "PUT",
|
||||
"url": "Condition/test123"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
1
src/test/resources/testobds.json
Normal file
1
src/test/resources/testobds.json
Normal file
@ -0,0 +1 @@
|
||||
{"schema":{"type":"struct","fields":[{"type":"int32","optional":true,"field":"YEAR"},{"type":"int32","optional":true,"field":"VERSIONSNUMMER"},{"type":"int32","optional":false,"field":"ID"},{"type":"string","optional":true,"field":"XML_DATEN"}],"optional":false},"payload":{"YEAR":2024,"VERSIONSNUMMER":1,"ID":123456,"XML_DATEN":"<?xml version=\"1.0\" encoding=\"UTF-8\"?><ADT_GEKID xmlns=\"http://www.gekid.de/namespace\" Schema_Version=\"2.2.3\"><Absender Absender_ID=\"TEST\" Software_ID=\"ONKOSTAR\" Installations_ID=\"2011\"><Absender_Bezeichnung>TEST</Absender_Bezeichnung><Absender_Anschrift>Musterstraße 1, 012345 Musterhausen</Absender_Anschrift></Absender><Menge_Patient><Patient><Patienten_Stammdaten Patient_ID=\"00001234\"><KrankenversichertenNr>E123456789</KrankenversichertenNr><KrankenkassenNr>123456789</KrankenkassenNr><Patienten_Nachname>Tester</Patienten_Nachname><Patienten_Titel></Patienten_Titel><Patienten_Vornamen>Patrick</Patienten_Vornamen><Patienten_Geburtsname>Tester</Patienten_Geburtsname><Patienten_Geschlecht>M</Patienten_Geschlecht><Patienten_Geburtsdatum>01.01.1980</Patienten_Geburtsdatum><Menge_Adresse><Adresse><Patienten_Strasse>Testweg</Patienten_Strasse><Patienten_Hausnummer>1</Patienten_Hausnummer><Patienten_Land>DE</Patienten_Land><Patienten_PLZ>01234</Patienten_PLZ><Patienten_Ort>Musterhausen</Patienten_Ort></Adresse></Menge_Adresse></Patienten_Stammdaten><Menge_Meldung><Meldung Meldung_ID=\"TEST123456\" Melder_ID=\"TEST\"><Meldedatum>11.06.2024</Meldedatum><Meldebegruendung>I</Meldebegruendung><Meldeanlass>statusaenderung</Meldeanlass><Tumorzuordnung Tumor_ID=\"1\"><Primaertumor_ICD_Code>C17.1</Primaertumor_ICD_Code><Primaertumor_ICD_Version>10 2015 GM</Primaertumor_ICD_Version><Diagnosedatum>10.06.2024</Diagnosedatum><Seitenlokalisation>T</Seitenlokalisation></Tumorzuordnung><Menge_Tumorkonferenz><Tumorkonferenz Tumorkonferenz_ID=\"1234567\"><Tumorkonferenz_Datum>11.06.2024</Tumorkonferenz_Datum><Tumorkonferenz_Typ>praeth</Tumorkonferenz_Typ></Tumorkonferenz></Menge_Tumorkonferenz></Meldung></Menge_Meldung></Patient></Menge_Patient></ADT_GEKID>"}}
|
Loading…
x
Reference in New Issue
Block a user