1
0
mirror of https://github.com/pcvolkmer/onco-analytics-monitor.git synced 2025-04-19 11:06:52 +00:00

feat: use oBDS FHIR topics

This commit is contained in:
Paul-Christian Volkmer 2024-08-05 21:27:39 +02:00
parent d6c651468a
commit 550da3d1f7
3 changed files with 65 additions and 0 deletions

View File

@ -35,6 +35,7 @@ dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
implementation("org.springframework.kafka:spring-kafka")
implementation("commons-codec:commons-codec")
implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:7.2.2")
developmentOnly("org.springframework.boot:spring-boot-devtools")
developmentOnly("org.springframework.boot:spring-boot-docker-compose")
annotationProcessor("org.springframework.boot:spring-boot-configuration-processor")

View File

@ -25,6 +25,14 @@ class ConditionInMemoryRepository {
return false
}
fun saveIfNotOlderNewerVersion(condition: Condition): Boolean {
if ((this.conditions[condition.id]?.version ?: 0) <= condition.version) {
this.conditions[condition.id] = condition
return true
}
return false
}
fun findAll(): List<Condition> {
return conditions.values.toList()
}

View File

@ -0,0 +1,56 @@
package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners
import ca.uhn.fhir.context.FhirContext
import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Condition
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionId
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionInMemoryRepository
import dev.pcvolkmer.oncoanalytics.monitor.fetchStatistics
import org.hl7.fhir.r4.model.Bundle
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.handler.annotation.Header
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.stereotype.Component
@Component
class ObdsFhirTopicMonitor(
@Qualifier("obdsFhirConditionRepository")
private val conditionRepository: ConditionInMemoryRepository,
statisticsEventProducer: StatisticsSink,
) : TopicMonitor(statisticsEventProducer) {
@KafkaListener(topicPattern = "fhir.obds.Condition.*")
override fun handleTopicRecord(
@Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) timestamp: Long,
@Header(KafkaHeaders.RECEIVED_KEY) key: String,
@Payload payload: String,
) {
try {
val ctx = FhirContext.forR4()
val parser = ctx.newJsonParser()
val bundle = parser.parseResource(Bundle::class.java, payload)
val firstEntry = bundle.entry.firstOrNull() ?: return
if (firstEntry.resource.fhirType() == "Condition") {
val condition = firstEntry.resource as org.hl7.fhir.r4.model.Condition
val updated = conditionRepository.saveIfNotOlderNewerVersion(
Condition(
ConditionId(condition.id),
0,
condition.code.coding.first { "http://fhir.de/CodeSystem/bfarm/icd-10-gm" == it.system }.code
)
)
if (updated) {
sendUpdatedStatistics(fetchStatistics("obdsfhir", conditionRepository))
}
}
} catch (e: Exception) {
// Ignore
}
}
}