diff --git a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/AbstractFhirTopicMonitor.kt b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/AbstractFhirTopicMonitor.kt new file mode 100644 index 0000000..76dce8c --- /dev/null +++ b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/AbstractFhirTopicMonitor.kt @@ -0,0 +1,44 @@ +package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners + +import ca.uhn.fhir.context.FhirContext +import dev.pcvolkmer.oncoanalytics.monitor.conditions.Condition +import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionId +import dev.pcvolkmer.oncoanalytics.monitor.conditions.Statistics +import org.hl7.fhir.r4.model.Bundle +import reactor.core.publisher.Sinks + +/** + * Abstract class with common methods for FHIR TopicMonitors + * + * @property statisticsEventProducer The event producer/sink to notify about saved condition + * + * @author Paul-Christian Volkmer + * @since 0.1.0 + */ +abstract class AbstractFhirTopicMonitor(statisticsEventProducer: Sinks.Many) : + AbstractTopicMonitor(statisticsEventProducer) { + + private val fhirContext = FhirContext.forR4() + + /** + * Handle parsable FHIR resource + * + * @param payload The string representation of the FHIR resource + * @param handler The handler function to define what to do if payload contains usable FHIR condition + */ + fun handleUsableFhirPayload(payload: String, handler: (condition: Condition) -> Unit) { + val bundle = fhirContext.newJsonParser().parseResource(Bundle::class.java, payload) + val firstEntry = bundle.entry.firstOrNull() ?: return + + if (firstEntry.resource.fhirType() == "Condition") { + val condition = firstEntry.resource as org.hl7.fhir.r4.model.Condition + handler( + Condition( + ConditionId(condition.id), + condition.code.coding.first { "http://fhir.de/CodeSystem/bfarm/icd-10-gm" == it.system }.code + ) + ) + } + } + +} \ No newline at end of file diff --git a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/AbstractTopicMonitor.kt b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/AbstractTopicMonitor.kt new file mode 100644 index 0000000..de6d8c1 --- /dev/null +++ b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/AbstractTopicMonitor.kt @@ -0,0 +1,40 @@ +package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners + +import dev.pcvolkmer.oncoanalytics.monitor.conditions.Statistics +import org.apache.kafka.common.TopicPartition +import org.springframework.kafka.listener.ConsumerSeekAware +import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback +import reactor.core.publisher.Sinks + +/** + * Abstract class with common methods for all TopicMonitors + * + * This implements ConsumerSeekAware to seek any topic to the beginning and load all available topic records from start. + * + * @property statisticsEventProducer The event producer/sink to notify about saved condition + * + * @author Paul-Christian Volkmer + * @since 0.1.0 + * + * @see ConsumerSeekAware + */ +abstract class AbstractTopicMonitor(private val statisticsEventProducer: Sinks.Many) : TopicMonitor, + ConsumerSeekAware { + + /** + * This will send new/updated statistics + * + * @param statistics The statistics to be sent/updated + */ + fun sendUpdatedStatistics(statistics: Statistics) { + statisticsEventProducer.emitNext(statistics) { _, _ -> false } + } + + /** + * This will seek assigned Kafka Partitions back to the beginning for all inheriting classes + */ + override fun onPartitionsAssigned(assignments: MutableMap, callback: ConsumerSeekCallback) { + callback.seekToBeginning(assignments.keys) + } + +} \ No newline at end of file diff --git a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/FhirObdsTopicMonitor.kt b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/FhirObdsTopicMonitor.kt index 3d36698..7037252 100644 --- a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/FhirObdsTopicMonitor.kt +++ b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/FhirObdsTopicMonitor.kt @@ -1,12 +1,8 @@ package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners -import ca.uhn.fhir.context.FhirContext import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink -import dev.pcvolkmer.oncoanalytics.monitor.conditions.Condition -import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionId import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository import dev.pcvolkmer.oncoanalytics.monitor.fetchStatistics -import org.hl7.fhir.r4.model.Bundle import org.springframework.beans.factory.annotation.Qualifier import org.springframework.kafka.annotation.KafkaListener import org.springframework.kafka.support.KafkaHeaders @@ -14,12 +10,20 @@ import org.springframework.messaging.handler.annotation.Header import org.springframework.messaging.handler.annotation.Payload import org.springframework.stereotype.Component +/** + * FHIR TopicMonitor to listen to Kafka Topics matching 'fhir.obds.Condition.*' + * + * @property statisticsEventProducer The event producer/sink to notify about saved condition + * + * @author Paul-Christian Volkmer + * @since 0.1.0 + */ @Component class FhirObdsTopicMonitor( @Qualifier("fhirObdsConditionRepository") private val conditionRepository: ConditionRepository, statisticsEventProducer: StatisticsSink, -) : TopicMonitor(statisticsEventProducer) { +) : AbstractFhirTopicMonitor(statisticsEventProducer) { @KafkaListener(topicPattern = "fhir.obds.Condition.*") override fun handleTopicRecord( @@ -29,22 +33,8 @@ class FhirObdsTopicMonitor( @Payload payload: String, ) { try { - val ctx = FhirContext.forR4() - val parser = ctx.newJsonParser() - - val bundle = parser.parseResource(Bundle::class.java, payload) - val firstEntry = bundle.entry.firstOrNull() ?: return - - if (firstEntry.resource.fhirType() == "Condition") { - val condition = firstEntry.resource as org.hl7.fhir.r4.model.Condition - val updated = conditionRepository.save( - Condition( - ConditionId(condition.id), - condition.code.coding.first { "http://fhir.de/CodeSystem/bfarm/icd-10-gm" == it.system }.code - ) - ) - - if (updated) { + this.handleUsableFhirPayload(payload) { condition -> + if (conditionRepository.save(condition)) { sendUpdatedStatistics(fetchStatistics("fhirobds", conditionRepository)) } } diff --git a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/FhirPseudonymizedTopicMonitor.kt b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/FhirPseudonymizedTopicMonitor.kt index 67ab00f..53d886d 100644 --- a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/FhirPseudonymizedTopicMonitor.kt +++ b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/FhirPseudonymizedTopicMonitor.kt @@ -1,12 +1,8 @@ package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners -import ca.uhn.fhir.context.FhirContext import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink -import dev.pcvolkmer.oncoanalytics.monitor.conditions.Condition -import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionId import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionRepository import dev.pcvolkmer.oncoanalytics.monitor.fetchStatistics -import org.hl7.fhir.r4.model.Bundle import org.springframework.beans.factory.annotation.Qualifier import org.springframework.kafka.annotation.KafkaListener import org.springframework.kafka.support.KafkaHeaders @@ -14,12 +10,20 @@ import org.springframework.messaging.handler.annotation.Header import org.springframework.messaging.handler.annotation.Payload import org.springframework.stereotype.Component +/** + * FHIR TopicMonitor to listen to Kafka Topics matching 'fhir.pseudonymized.*' + * + * @property statisticsEventProducer The event producer/sink to notify about saved condition + * + * @author Paul-Christian Volkmer + * @since 0.1.0 + */ @Component class FhirPseudonymizedTopicMonitor( @Qualifier("fhirPseudonymizedConditionRepository") private val conditionRepository: ConditionRepository, statisticsEventProducer: StatisticsSink, -) : TopicMonitor(statisticsEventProducer) { +) : AbstractFhirTopicMonitor(statisticsEventProducer) { @KafkaListener(topicPattern = "fhir.pseudonymized.*") override fun handleTopicRecord( @@ -29,22 +33,8 @@ class FhirPseudonymizedTopicMonitor( @Payload payload: String, ) { try { - val ctx = FhirContext.forR4() - val parser = ctx.newJsonParser() - - val bundle = parser.parseResource(Bundle::class.java, payload) - val firstEntry = bundle.entry.firstOrNull() ?: return - - if (firstEntry.resource.fhirType() == "Condition") { - val condition = firstEntry.resource as org.hl7.fhir.r4.model.Condition - val updated = conditionRepository.save( - Condition( - ConditionId(condition.id), - condition.code.coding.first { "http://fhir.de/CodeSystem/bfarm/icd-10-gm" == it.system }.code - ) - ) - - if (updated) { + this.handleUsableFhirPayload(payload) { condition -> + if (conditionRepository.save(condition)) { sendUpdatedStatistics(fetchStatistics("fhirpseudonymized", conditionRepository)) } } diff --git a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsXmlTopicMonitor.kt b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsXmlTopicMonitor.kt index ffa2cc1..f692f5b 100644 --- a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsXmlTopicMonitor.kt +++ b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/ObdsXmlTopicMonitor.kt @@ -19,13 +19,21 @@ import org.xml.sax.InputSource import java.io.StringReader import javax.xml.xpath.XPathFactory +/** + * oBDS TopicMonitor to listen to Kafka Topics matching 'onkostar.MELDUNG_EXPORT.*' + * + * @property statisticsEventProducer The event producer/sink to notify about saved condition + * + * @author Paul-Christian Volkmer + * @since 0.1.0 + */ @Component class ObdsXmlTopicMonitor( @Qualifier("obdsXmlConditionRepository") private val conditionRepository: ConditionRepository, private val objectMapper: ObjectMapper, statisticsEventProducer: StatisticsSink, -) : TopicMonitor(statisticsEventProducer) { +) : AbstractTopicMonitor(statisticsEventProducer) { @KafkaListener(topicPattern = "onkostar.MELDUNG_EXPORT.*") override fun handleTopicRecord( diff --git a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/TopicMonitor.kt b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/TopicMonitor.kt index 4ca0d34..7663ef9 100644 --- a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/TopicMonitor.kt +++ b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/topiclisteners/TopicMonitor.kt @@ -1,21 +1,19 @@ package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners -import dev.pcvolkmer.oncoanalytics.monitor.conditions.Statistics -import org.apache.kafka.common.TopicPartition -import org.springframework.kafka.listener.ConsumerSeekAware -import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback -import reactor.core.publisher.Sinks - -abstract class TopicMonitor(private val statisticsEventProducer: Sinks.Many) : ConsumerSeekAware { - - abstract fun handleTopicRecord(topic: String, timestamp: Long, key: String, payload: String) - - fun sendUpdatedStatistics(statistics: Statistics) { - statisticsEventProducer.emitNext(statistics) { _, _ -> false } - } - - override fun onPartitionsAssigned(assignments: MutableMap, callback: ConsumerSeekCallback) { - callback.seekToBeginning(assignments.keys) - } - +/** + * TopicMonitor to listen to Kafka Topics + * + * @author Paul-Christian Volkmer + * @since 0.1.0 + */ +interface TopicMonitor { + /** + * Handle incoming Kafka Record + * + * @param topic The exact name of the topic + * @param timestamp The timestamp the record has been published + * @param key The records key + * @param payload The records payload + */ + fun handleTopicRecord(topic: String, timestamp: Long, key: String, payload: String) } \ No newline at end of file