diff --git a/build.gradle.kts b/build.gradle.kts index 5978aa4..dbc9f41 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -35,6 +35,7 @@ dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor") implementation("org.springframework.kafka:spring-kafka") implementation("commons-codec:commons-codec") + implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:7.2.2") developmentOnly("org.springframework.boot:spring-boot-devtools") developmentOnly("org.springframework.boot:spring-boot-docker-compose") annotationProcessor("org.springframework.boot:spring-boot-configuration-processor") diff --git a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/conditions/ConditionInMemoryRepository.kt b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/conditions/ConditionInMemoryRepository.kt index c274fa5..07349f4 100644 --- a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/conditions/ConditionInMemoryRepository.kt +++ b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/conditions/ConditionInMemoryRepository.kt @@ -25,6 +25,14 @@ class ConditionInMemoryRepository { return false } + fun saveIfNotOlderNewerVersion(condition: Condition): Boolean { + if ((this.conditions[condition.id]?.version ?: 0) <= condition.version) { + this.conditions[condition.id] = condition + return true + } + return false + } + fun findAll(): List { return conditions.values.toList() } diff --git a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsFhirTopicMonitor.kt b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsFhirTopicMonitor.kt new file mode 100644 index 0000000..4b921fc --- /dev/null +++ b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsFhirTopicMonitor.kt @@ -0,0 +1,56 @@ +package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners + +import ca.uhn.fhir.context.FhirContext +import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink +import dev.pcvolkmer.oncoanalytics.monitor.conditions.Condition +import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionId +import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionInMemoryRepository +import dev.pcvolkmer.oncoanalytics.monitor.fetchStatistics +import org.hl7.fhir.r4.model.Bundle +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.kafka.support.KafkaHeaders +import org.springframework.messaging.handler.annotation.Header +import org.springframework.messaging.handler.annotation.Payload +import org.springframework.stereotype.Component + +@Component +class ObdsFhirTopicMonitor( + @Qualifier("obdsFhirConditionRepository") + private val conditionRepository: ConditionInMemoryRepository, + statisticsEventProducer: StatisticsSink, +) : TopicMonitor(statisticsEventProducer) { + + @KafkaListener(topicPattern = "fhir.obds.Condition.*") + override fun handleTopicRecord( + @Header(KafkaHeaders.RECEIVED_TOPIC) topic: String, + @Header(KafkaHeaders.RECEIVED_TIMESTAMP) timestamp: Long, + @Header(KafkaHeaders.RECEIVED_KEY) key: String, + @Payload payload: String, + ) { + try { + val ctx = FhirContext.forR4() + val parser = ctx.newJsonParser() + + val bundle = parser.parseResource(Bundle::class.java, payload) + val firstEntry = bundle.entry.firstOrNull() ?: return + + if (firstEntry.resource.fhirType() == "Condition") { + val condition = firstEntry.resource as org.hl7.fhir.r4.model.Condition + val updated = conditionRepository.saveIfNotOlderNewerVersion( + Condition( + ConditionId(condition.id), + 0, + condition.code.coding.first { "http://fhir.de/CodeSystem/bfarm/icd-10-gm" == it.system }.code + ) + ) + + if (updated) { + sendUpdatedStatistics(fetchStatistics("obdsfhir", conditionRepository)) + } + } + } catch (e: Exception) { + // Ignore + } + } +} \ No newline at end of file