1
0
mirror of https://github.com/pcvolkmer/onco-analytics-monitor.git synced 2025-04-19 19:16:52 +00:00

refactor: put FHIR data extraction into abstract class

This commit is contained in:
Paul-Christian Volkmer 2024-08-08 19:15:50 +02:00
parent 64818ab7b0
commit 48677250b4
6 changed files with 131 additions and 61 deletions

View File

@ -0,0 +1,44 @@
package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners
import ca.uhn.fhir.context.FhirContext
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Condition
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionId
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Statistics
import org.hl7.fhir.r4.model.Bundle
import reactor.core.publisher.Sinks
/**
* Abstract class with common methods for FHIR TopicMonitors
*
* @property statisticsEventProducer The event producer/sink to notify about saved condition
*
* @author Paul-Christian Volkmer
* @since 0.1.0
*/
abstract class AbstractFhirTopicMonitor(statisticsEventProducer: Sinks.Many<Statistics>) :
AbstractTopicMonitor(statisticsEventProducer) {
private val fhirContext = FhirContext.forR4()
/**
* Handle parsable FHIR resource
*
* @param payload The string representation of the FHIR resource
* @param handler The handler function to define what to do if payload contains usable FHIR condition
*/
fun handleUsableFhirPayload(payload: String, handler: (condition: Condition) -> Unit) {
val bundle = fhirContext.newJsonParser().parseResource(Bundle::class.java, payload)
val firstEntry = bundle.entry.firstOrNull() ?: return
if (firstEntry.resource.fhirType() == "Condition") {
val condition = firstEntry.resource as org.hl7.fhir.r4.model.Condition
handler(
Condition(
ConditionId(condition.id),
condition.code.coding.first { "http://fhir.de/CodeSystem/bfarm/icd-10-gm" == it.system }.code
)
)
}
}
}

View File

@ -0,0 +1,40 @@
package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Statistics
import org.apache.kafka.common.TopicPartition
import org.springframework.kafka.listener.ConsumerSeekAware
import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback
import reactor.core.publisher.Sinks
/**
* Abstract class with common methods for all TopicMonitors
*
* This implements ConsumerSeekAware to seek any topic to the beginning and load all available topic records from start.
*
* @property statisticsEventProducer The event producer/sink to notify about saved condition
*
* @author Paul-Christian Volkmer
* @since 0.1.0
*
* @see ConsumerSeekAware
*/
abstract class AbstractTopicMonitor(private val statisticsEventProducer: Sinks.Many<Statistics>) : TopicMonitor,
ConsumerSeekAware {
/**
* This will send new/updated statistics
*
* @param statistics The statistics to be sent/updated
*/
fun sendUpdatedStatistics(statistics: Statistics) {
statisticsEventProducer.emitNext(statistics) { _, _ -> false }
}
/**
* This will seek assigned Kafka Partitions back to the beginning for all inheriting classes
*/
override fun onPartitionsAssigned(assignments: MutableMap<TopicPartition, Long>, callback: ConsumerSeekCallback) {
callback.seekToBeginning(assignments.keys)
}
}

View File

@ -1,12 +1,8 @@
package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners
import ca.uhn.fhir.context.FhirContext
import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Condition
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionId
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository
import dev.pcvolkmer.oncoanalytics.monitor.fetchStatistics import dev.pcvolkmer.oncoanalytics.monitor.fetchStatistics
import org.hl7.fhir.r4.model.Bundle
import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.kafka.annotation.KafkaListener import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.KafkaHeaders import org.springframework.kafka.support.KafkaHeaders
@ -14,12 +10,20 @@ import org.springframework.messaging.handler.annotation.Header
import org.springframework.messaging.handler.annotation.Payload import org.springframework.messaging.handler.annotation.Payload
import org.springframework.stereotype.Component import org.springframework.stereotype.Component
/**
* FHIR TopicMonitor to listen to Kafka Topics matching 'fhir.obds.Condition.*'
*
* @property statisticsEventProducer The event producer/sink to notify about saved condition
*
* @author Paul-Christian Volkmer
* @since 0.1.0
*/
@Component @Component
class FhirObdsTopicMonitor( class FhirObdsTopicMonitor(
@Qualifier("fhirObdsConditionRepository") @Qualifier("fhirObdsConditionRepository")
private val conditionRepository: ConditionRepository, private val conditionRepository: ConditionRepository,
statisticsEventProducer: StatisticsSink, statisticsEventProducer: StatisticsSink,
) : TopicMonitor(statisticsEventProducer) { ) : AbstractFhirTopicMonitor(statisticsEventProducer) {
@KafkaListener(topicPattern = "fhir.obds.Condition.*") @KafkaListener(topicPattern = "fhir.obds.Condition.*")
override fun handleTopicRecord( override fun handleTopicRecord(
@ -29,22 +33,8 @@ class FhirObdsTopicMonitor(
@Payload payload: String, @Payload payload: String,
) { ) {
try { try {
val ctx = FhirContext.forR4() this.handleUsableFhirPayload(payload) { condition ->
val parser = ctx.newJsonParser() if (conditionRepository.save(condition)) {
val bundle = parser.parseResource(Bundle::class.java, payload)
val firstEntry = bundle.entry.firstOrNull() ?: return
if (firstEntry.resource.fhirType() == "Condition") {
val condition = firstEntry.resource as org.hl7.fhir.r4.model.Condition
val updated = conditionRepository.save(
Condition(
ConditionId(condition.id),
condition.code.coding.first { "http://fhir.de/CodeSystem/bfarm/icd-10-gm" == it.system }.code
)
)
if (updated) {
sendUpdatedStatistics(fetchStatistics("fhirobds", conditionRepository)) sendUpdatedStatistics(fetchStatistics("fhirobds", conditionRepository))
} }
} }

View File

@ -1,12 +1,8 @@
package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners
import ca.uhn.fhir.context.FhirContext
import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Condition
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionId
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository
import dev.pcvolkmer.oncoanalytics.monitor.fetchStatistics import dev.pcvolkmer.oncoanalytics.monitor.fetchStatistics
import org.hl7.fhir.r4.model.Bundle
import org.springframework.beans.factory.annotation.Qualifier import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.kafka.annotation.KafkaListener import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.KafkaHeaders import org.springframework.kafka.support.KafkaHeaders
@ -14,12 +10,20 @@ import org.springframework.messaging.handler.annotation.Header
import org.springframework.messaging.handler.annotation.Payload import org.springframework.messaging.handler.annotation.Payload
import org.springframework.stereotype.Component import org.springframework.stereotype.Component
/**
* FHIR TopicMonitor to listen to Kafka Topics matching 'fhir.pseudonymized.*'
*
* @property statisticsEventProducer The event producer/sink to notify about saved condition
*
* @author Paul-Christian Volkmer
* @since 0.1.0
*/
@Component @Component
class FhirPseudonymizedTopicMonitor( class FhirPseudonymizedTopicMonitor(
@Qualifier("fhirPseudonymizedConditionRepository") @Qualifier("fhirPseudonymizedConditionRepository")
private val conditionRepository: ConditionRepository, private val conditionRepository: ConditionRepository,
statisticsEventProducer: StatisticsSink, statisticsEventProducer: StatisticsSink,
) : TopicMonitor(statisticsEventProducer) { ) : AbstractFhirTopicMonitor(statisticsEventProducer) {
@KafkaListener(topicPattern = "fhir.pseudonymized.*") @KafkaListener(topicPattern = "fhir.pseudonymized.*")
override fun handleTopicRecord( override fun handleTopicRecord(
@ -29,22 +33,8 @@ class FhirPseudonymizedTopicMonitor(
@Payload payload: String, @Payload payload: String,
) { ) {
try { try {
val ctx = FhirContext.forR4() this.handleUsableFhirPayload(payload) { condition ->
val parser = ctx.newJsonParser() if (conditionRepository.save(condition)) {
val bundle = parser.parseResource(Bundle::class.java, payload)
val firstEntry = bundle.entry.firstOrNull() ?: return
if (firstEntry.resource.fhirType() == "Condition") {
val condition = firstEntry.resource as org.hl7.fhir.r4.model.Condition
val updated = conditionRepository.save(
Condition(
ConditionId(condition.id),
condition.code.coding.first { "http://fhir.de/CodeSystem/bfarm/icd-10-gm" == it.system }.code
)
)
if (updated) {
sendUpdatedStatistics(fetchStatistics("fhirpseudonymized", conditionRepository)) sendUpdatedStatistics(fetchStatistics("fhirpseudonymized", conditionRepository))
} }
} }

View File

@ -19,13 +19,21 @@ import org.xml.sax.InputSource
import java.io.StringReader import java.io.StringReader
import javax.xml.xpath.XPathFactory import javax.xml.xpath.XPathFactory
/**
* oBDS TopicMonitor to listen to Kafka Topics matching 'onkostar.MELDUNG_EXPORT.*'
*
* @property statisticsEventProducer The event producer/sink to notify about saved condition
*
* @author Paul-Christian Volkmer
* @since 0.1.0
*/
@Component @Component
class ObdsXmlTopicMonitor( class ObdsXmlTopicMonitor(
@Qualifier("obdsXmlConditionRepository") @Qualifier("obdsXmlConditionRepository")
private val conditionRepository: ConditionRepository, private val conditionRepository: ConditionRepository,
private val objectMapper: ObjectMapper, private val objectMapper: ObjectMapper,
statisticsEventProducer: StatisticsSink, statisticsEventProducer: StatisticsSink,
) : TopicMonitor(statisticsEventProducer) { ) : AbstractTopicMonitor(statisticsEventProducer) {
@KafkaListener(topicPattern = "onkostar.MELDUNG_EXPORT.*") @KafkaListener(topicPattern = "onkostar.MELDUNG_EXPORT.*")
override fun handleTopicRecord( override fun handleTopicRecord(

View File

@ -1,21 +1,19 @@
package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Statistics /**
import org.apache.kafka.common.TopicPartition * TopicMonitor to listen to Kafka Topics
import org.springframework.kafka.listener.ConsumerSeekAware *
import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback * @author Paul-Christian Volkmer
import reactor.core.publisher.Sinks * @since 0.1.0
*/
abstract class TopicMonitor(private val statisticsEventProducer: Sinks.Many<Statistics>) : ConsumerSeekAware { interface TopicMonitor {
/**
abstract fun handleTopicRecord(topic: String, timestamp: Long, key: String, payload: String) * Handle incoming Kafka Record
*
fun sendUpdatedStatistics(statistics: Statistics) { * @param topic The exact name of the topic
statisticsEventProducer.emitNext(statistics) { _, _ -> false } * @param timestamp The timestamp the record has been published
} * @param key The records key
* @param payload The records payload
override fun onPartitionsAssigned(assignments: MutableMap<TopicPartition, Long>, callback: ConsumerSeekCallback) { */
callback.seekToBeginning(assignments.keys) fun handleTopicRecord(topic: String, timestamp: Long, key: String, payload: String)
}
} }