mirror of
https://github.com/pcvolkmer/onco-analytics-monitor.git
synced 2025-07-03 09:12:54 +00:00
Initial commit
This commit is contained in:
@ -0,0 +1,34 @@
|
||||
package dev.pcvolkmer.oncoanalytics.monitor
|
||||
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionInMemoryRepository
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Statistics
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication
|
||||
import org.springframework.boot.runApplication
|
||||
import org.springframework.context.annotation.Bean
|
||||
import reactor.core.publisher.Sinks
|
||||
|
||||
typealias StatisticsSink = Sinks.Many<Statistics>
|
||||
|
||||
@SpringBootApplication
|
||||
class OncoAnalyticsMonitorApplication {
|
||||
|
||||
@Bean
|
||||
fun statisticsEventProducer(): StatisticsSink {
|
||||
return Sinks.many().multicast().onBackpressureBuffer()
|
||||
}
|
||||
|
||||
@Bean
|
||||
fun obdsXmlConditionRepository(): ConditionInMemoryRepository {
|
||||
return ConditionInMemoryRepository()
|
||||
}
|
||||
|
||||
@Bean
|
||||
fun obdsFhirConditionRepository(): ConditionInMemoryRepository {
|
||||
return ConditionInMemoryRepository()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fun main(args: Array<String>) {
|
||||
runApplication<OncoAnalyticsMonitorApplication>(*args)
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
package dev.pcvolkmer.oncoanalytics.monitor.conditions
|
||||
|
||||
import org.apache.commons.codec.digest.DigestUtils
|
||||
|
||||
@JvmInline
|
||||
value class ConditionId(val value: String)
|
||||
|
||||
data class Condition(val id: ConditionId, val version: Int, val icd10: String) {
|
||||
companion object {
|
||||
fun generateConditionId(patientId: String, tumorId: String): ConditionId {
|
||||
return ConditionId(DigestUtils.sha256Hex("$patientId-$tumorId"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ConditionInMemoryRepository {
|
||||
|
||||
private val conditions = mutableMapOf<ConditionId, Condition>()
|
||||
|
||||
fun saveIfNewerVersion(condition: Condition): Boolean {
|
||||
if ((this.conditions[condition.id]?.version ?: 0) < condition.version) {
|
||||
this.conditions[condition.id] = condition
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
fun findAll(): List<Condition> {
|
||||
return conditions.values.toList()
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,8 @@
|
||||
package dev.pcvolkmer.oncoanalytics.monitor.conditions
|
||||
|
||||
data class Statistics(
|
||||
val name: String,
|
||||
val entries: List<StatisticsEntry>
|
||||
)
|
||||
|
||||
data class StatisticsEntry(val name: String, val count: Int)
|
@ -0,0 +1,75 @@
|
||||
package dev.pcvolkmer.oncoanalytics.monitor
|
||||
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionInMemoryRepository
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Statistics
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.StatisticsEntry
|
||||
|
||||
val allKeys = listOf(
|
||||
"C00-C14",
|
||||
"C15",
|
||||
"C16",
|
||||
"C18-C21",
|
||||
"C22",
|
||||
"C23-C24",
|
||||
"C25",
|
||||
"C32",
|
||||
"C33-C34",
|
||||
"C50, D05",
|
||||
"C53, D06",
|
||||
"C54-C55",
|
||||
"C56, D39.1",
|
||||
"C61",
|
||||
"C62",
|
||||
"C64",
|
||||
"C67, D09.0, D41.4",
|
||||
"C70-C72",
|
||||
"C73",
|
||||
"C81",
|
||||
"C82-C88, C96",
|
||||
"C90",
|
||||
"C91-C95",
|
||||
"Other"
|
||||
)
|
||||
|
||||
fun fetchStatistics(name: String, source: ConditionInMemoryRepository): Statistics {
|
||||
fun mapIcd10Code(code: String): String {
|
||||
val c = when (code) {
|
||||
"D39.1", "D09.0", "D41.4" -> code
|
||||
else -> code.split('.').first()
|
||||
}
|
||||
|
||||
return when (c) {
|
||||
"C00", "C01", "C02", "C03", "C04", "C05", "C06", "C07", "C08", "C09", "C10", "C11", "C12", "C13", "C14" -> "C00-C14"
|
||||
"C15" -> "C15"
|
||||
"C16" -> "C16"
|
||||
"C18", "C19", "C20", "C21" -> "C18-C21"
|
||||
"C22" -> "C22"
|
||||
"C23", "C24" -> "C23-C24"
|
||||
"C25" -> "C25"
|
||||
"C32" -> "C32"
|
||||
"C33", "C34" -> "C33-C34"
|
||||
"C43" -> "C43"
|
||||
"C50", "D05" -> "C50, D05"
|
||||
"C53", "D06" -> "C53, D06"
|
||||
"C54", "C55" -> "C54-C55"
|
||||
"C56", "D39.1" -> "C56, D39.1"
|
||||
"C61" -> "C61"
|
||||
"C62" -> "C62"
|
||||
"C64" -> "C64"
|
||||
"C67", "D09.0", "D41.4" -> "C67, D09.0, D41.4"
|
||||
"C70", "C71", "C72" -> "C70-C72"
|
||||
"C73" -> "C73"
|
||||
"C81" -> "C81"
|
||||
"C82", "C83", "C84", "C85", "C86", "C87", "C88", "C96" -> "C82-C88, C96"
|
||||
"C90" -> "C90"
|
||||
"C91", "C92", "C93", "C94", "C95" -> "C91-C95"
|
||||
else -> "Other"
|
||||
}
|
||||
}
|
||||
|
||||
val entries = source.findAll()
|
||||
.groupBy { mapIcd10Code(it.icd10) }
|
||||
.mapValues { it.value.size }
|
||||
|
||||
return Statistics(name, allKeys.map { StatisticsEntry(it, 0) }.map { StatisticsEntry(it.name, entries.getOrDefault(it.name, 0)) })
|
||||
}
|
@ -0,0 +1,79 @@
|
||||
package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonAlias
|
||||
import com.fasterxml.jackson.annotation.JsonCreator
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Condition
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionInMemoryRepository
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.fetchStatistics
|
||||
import org.springframework.beans.factory.annotation.Qualifier
|
||||
import org.springframework.kafka.annotation.KafkaListener
|
||||
import org.springframework.kafka.support.KafkaHeaders
|
||||
import org.springframework.messaging.handler.annotation.Header
|
||||
import org.springframework.messaging.handler.annotation.Payload
|
||||
import org.springframework.stereotype.Component
|
||||
import org.xml.sax.InputSource
|
||||
import java.io.StringReader
|
||||
import javax.xml.xpath.XPathFactory
|
||||
|
||||
@Component
|
||||
class ObdsXmlTopicMonitor(
|
||||
@Qualifier("obdsXmlConditionRepository")
|
||||
private val conditionRepository: ConditionInMemoryRepository,
|
||||
private val objectMapper: ObjectMapper,
|
||||
statisticsEventProducer: StatisticsSink,
|
||||
) : TopicMonitor(statisticsEventProducer) {
|
||||
|
||||
@KafkaListener(topicPattern = "input.*")
|
||||
override fun handleTopicRecord(
|
||||
@Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
|
||||
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) timestamp: Long,
|
||||
@Header(KafkaHeaders.RECEIVED_KEY) key: String,
|
||||
@Payload payload: String,
|
||||
) {
|
||||
try {
|
||||
val p = objectMapper.readValue(payload, Record::class.java)
|
||||
val xPath = XPathFactory.newDefaultInstance().newXPath()
|
||||
|
||||
// Use local-name() due to XML namespace
|
||||
val patientId = xPath.evaluate(
|
||||
"//*[local-name()='Patienten_Stammdaten']/@Patient_ID",
|
||||
InputSource(StringReader(p.payload.data))
|
||||
)
|
||||
val tumorId = xPath.evaluate(
|
||||
"//*[local-name()='Tumorzuordnung']/@Tumor_ID",
|
||||
InputSource(StringReader(p.payload.data))
|
||||
)
|
||||
val icd10 = xPath.evaluate(
|
||||
"//*[local-name()='Primaertumor_ICD_Code']/text()",
|
||||
InputSource(StringReader(p.payload.data))
|
||||
)
|
||||
|
||||
val updated = conditionRepository.saveIfNewerVersion(
|
||||
Condition(
|
||||
Condition.generateConditionId(patientId, tumorId),
|
||||
p.payload.version,
|
||||
icd10
|
||||
)
|
||||
)
|
||||
|
||||
if (updated) {
|
||||
sendUpdatedStatistics(fetchStatistics("obdsxml", conditionRepository))
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
class Record @JsonCreator constructor(val payload: RecordPayload)
|
||||
|
||||
class RecordPayload @JsonCreator constructor(
|
||||
@JsonAlias("ID") val id: Int,
|
||||
@JsonAlias("YEAR") val year: String,
|
||||
@JsonAlias("VERSIONSNUMMER") val version: Int,
|
||||
@JsonAlias("XML_DATEN") val data: String
|
||||
)
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
package dev.pcvolkmer.oncoanalytics.monitor.topiclisteners
|
||||
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Statistics
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.springframework.kafka.listener.ConsumerSeekAware
|
||||
import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback
|
||||
import reactor.core.publisher.Sinks
|
||||
|
||||
abstract class TopicMonitor(private val statisticsEventProducer: Sinks.Many<Statistics>) : ConsumerSeekAware {
|
||||
|
||||
abstract fun handleTopicRecord(topic: String, timestamp: Long, key: String, payload: String)
|
||||
|
||||
fun sendUpdatedStatistics(statistics: Statistics) {
|
||||
statisticsEventProducer.emitNext(statistics) { _, _ -> false }
|
||||
}
|
||||
|
||||
override fun onPartitionsAssigned(assignments: MutableMap<TopicPartition, Long>, callback: ConsumerSeekCallback) {
|
||||
callback.seekToBeginning(assignments.keys)
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,23 @@
|
||||
package dev.pcvolkmer.oncoanalytics.monitor.web
|
||||
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Statistics
|
||||
import org.springframework.http.MediaType
|
||||
import org.springframework.http.codec.ServerSentEvent
|
||||
import org.springframework.web.bind.annotation.GetMapping
|
||||
import org.springframework.web.bind.annotation.RestController
|
||||
import reactor.core.publisher.Flux
|
||||
|
||||
@RestController
|
||||
class EventStreamController(
|
||||
private val statisticsEventProducer: StatisticsSink,
|
||||
) {
|
||||
|
||||
@GetMapping(path = ["/events"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
|
||||
fun eventStream(): Flux<ServerSentEvent<Statistics>> {
|
||||
return statisticsEventProducer.asFlux().map {
|
||||
ServerSentEvent.builder(it).event(it.name).build()
|
||||
}.doOnComplete { println("X") }
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,14 @@
|
||||
package dev.pcvolkmer.oncoanalytics.monitor.web
|
||||
|
||||
import org.springframework.stereotype.Controller
|
||||
import org.springframework.web.bind.annotation.GetMapping
|
||||
|
||||
@Controller
|
||||
class HomeController {
|
||||
|
||||
@GetMapping
|
||||
fun index(): String {
|
||||
return "index"
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package dev.pcvolkmer.oncoanalytics.monitor.web
|
||||
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.ConditionInMemoryRepository
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Statistics
|
||||
import dev.pcvolkmer.oncoanalytics.monitor.fetchStatistics
|
||||
import org.springframework.beans.factory.annotation.Qualifier
|
||||
import org.springframework.web.bind.annotation.GetMapping
|
||||
import org.springframework.web.bind.annotation.RequestMapping
|
||||
import org.springframework.web.bind.annotation.RestController
|
||||
|
||||
@RestController
|
||||
@RequestMapping(path = ["/statistics"])
|
||||
class StatisticsController(
|
||||
@Qualifier("obdsXmlConditionRepository")
|
||||
private val obdsXmlConditionRepository: ConditionInMemoryRepository,
|
||||
@Qualifier("obdsFhirConditionRepository")
|
||||
private val obdsFhirConditionRepository: ConditionInMemoryRepository,
|
||||
) {
|
||||
|
||||
@GetMapping(path = ["obdsxml"])
|
||||
fun obdsxmlStatistics(): Statistics {
|
||||
return fetchStatistics("obdsxml", obdsXmlConditionRepository)
|
||||
}
|
||||
|
||||
@GetMapping(path = ["obdsfhir"])
|
||||
fun obdsfhirStatistics(): Statistics {
|
||||
return fetchStatistics("obdfhir", obdsFhirConditionRepository)
|
||||
}
|
||||
|
||||
}
|
8
src/main/resources/application-dev.yml
Normal file
8
src/main/resources/application-dev.yml
Normal file
@ -0,0 +1,8 @@
|
||||
spring:
|
||||
application:
|
||||
name: onco-analytics-monitor
|
||||
|
||||
docker:
|
||||
compose:
|
||||
file: dev-compose.yml
|
||||
|
18
src/main/resources/application.yml
Normal file
18
src/main/resources/application.yml
Normal file
@ -0,0 +1,18 @@
|
||||
spring:
|
||||
application:
|
||||
name: onco-analytics-monitor
|
||||
|
||||
kafka:
|
||||
consumer:
|
||||
group-id: ${spring.application.name}
|
||||
|
||||
web:
|
||||
resources:
|
||||
cache:
|
||||
cachecontrol:
|
||||
max-age: 1d
|
||||
chain:
|
||||
strategy:
|
||||
content:
|
||||
enabled: true
|
||||
paths: /**/*.js,/**/*.css,/**/*.svg,/**/*.jpeg
|
BIN
src/main/resources/static/images/db.png
Normal file
BIN
src/main/resources/static/images/db.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 3.0 KiB |
BIN
src/main/resources/static/images/job.png
Normal file
BIN
src/main/resources/static/images/job.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 1.9 KiB |
BIN
src/main/resources/static/images/kafka.png
Normal file
BIN
src/main/resources/static/images/kafka.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 3.3 KiB |
BIN
src/main/resources/static/images/topic.png
Normal file
BIN
src/main/resources/static/images/topic.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 985 B |
185
src/main/resources/templates/index.html
Normal file
185
src/main/resources/templates/index.html
Normal file
@ -0,0 +1,185 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>onco-analytics-monitor</title>
|
||||
<style>
|
||||
body {
|
||||
margin: 0;
|
||||
background-color: #f5f5f5;
|
||||
}
|
||||
|
||||
header {
|
||||
margin: 1em 0 6em 0;
|
||||
text-align: left;
|
||||
}
|
||||
|
||||
.statistics table {
|
||||
border-collapse: collapse;
|
||||
margin: 3em auto 0;
|
||||
background-color: white;
|
||||
}
|
||||
|
||||
td, th {
|
||||
text-align: left;
|
||||
padding: 2px;
|
||||
}
|
||||
|
||||
tr {
|
||||
border-collapse: collapse;
|
||||
border-spacing: 0;
|
||||
}
|
||||
|
||||
tr th {
|
||||
background-color: #ccc;
|
||||
}
|
||||
|
||||
tr:nth-child(even) td {
|
||||
background-color: #eee
|
||||
}
|
||||
|
||||
tr > td:last-of-type {
|
||||
text-align: right;
|
||||
}
|
||||
|
||||
.content {
|
||||
width: fit-content;
|
||||
height: 100vh;
|
||||
margin: 0 auto;
|
||||
padding: 1em;
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
.step {
|
||||
display: inline-block;
|
||||
width: fit-content;
|
||||
vertical-align: top;
|
||||
}
|
||||
|
||||
.step > .item {
|
||||
display: inline-block;
|
||||
min-width: 6em;
|
||||
padding: .4em .4em 2em;
|
||||
border: 1px solid transparent;
|
||||
}
|
||||
|
||||
.step .item:has(.statistics) {
|
||||
width: 14em;
|
||||
|
||||
background: white;
|
||||
border-radius: 6em 6em 1em 1em;
|
||||
border: 1px solid #ddd;
|
||||
box-shadow: 1px 1px 2px #ddd;
|
||||
}
|
||||
|
||||
.step:before, .step:after {
|
||||
content: "";
|
||||
margin: 1.5em 0;
|
||||
width: 1em;
|
||||
height: 2px;
|
||||
background: black;
|
||||
display: inline-block;
|
||||
}
|
||||
|
||||
.step:first-of-type:before, .step:last-of-type:after {
|
||||
display: none;
|
||||
}
|
||||
|
||||
.step .logo {
|
||||
display: block;
|
||||
width: 3em;
|
||||
height: 3em;
|
||||
vertical-align: middle;
|
||||
margin: 1em auto;
|
||||
}
|
||||
|
||||
.step .description {
|
||||
font-size: small;
|
||||
font-weight: bold;
|
||||
display: block;
|
||||
text-align: center;
|
||||
}
|
||||
|
||||
.step > .statistics {
|
||||
display: block;
|
||||
vertical-align: middle;
|
||||
font-family: monospace;
|
||||
border: 1px solid gray;
|
||||
border-left: none;
|
||||
border-radius: 1em;
|
||||
padding: 1em;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
<main class="content">
|
||||
|
||||
<header>
|
||||
<h1>onco-analytics-monitor</h1>
|
||||
<p>
|
||||
Überwachung der einzelnen Kafka Topics und enthaltener Conditions - aufgeteilt nach ICD10-Gruppe.
|
||||
</p>
|
||||
</header>
|
||||
|
||||
<div class="step">
|
||||
<div class="item">
|
||||
<img class="logo" th:src="@{/images/db.png}" alt="db"/>
|
||||
<div class="description">Onkostar Database</div>
|
||||
</div>
|
||||
</div><div class="step">
|
||||
<div class="item">
|
||||
<img class="logo" th:src="@{/images/kafka.png}" alt="kafka"/>
|
||||
<div class="description">Kafka Connect</div>
|
||||
</div>
|
||||
</div><div class="step">
|
||||
<div class="item">
|
||||
<img class="logo" th:src="@{/images/topic.png}" alt="topic"/>
|
||||
<div class="description">Kafka Topic oBDS XML</div>
|
||||
<div class="statistics">
|
||||
<table id="obdsxml"></table>
|
||||
</div>
|
||||
</div>
|
||||
</div><div class="step">
|
||||
<div class="item">
|
||||
<img class="logo" th:src="@{/images/job.png}" alt="job"/>
|
||||
<div class="description">oBDS-to-fhir</div>
|
||||
</div>
|
||||
</div><div class="step">
|
||||
<div class="item">
|
||||
<img class="logo" th:src="@{/images/topic.png}" alt="topic"/>
|
||||
<div class="description">Kafka Topic oBDS FHIR</div>
|
||||
<div class="statistics">
|
||||
<table id="obdsfhir"></table>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
</main>
|
||||
|
||||
<script>
|
||||
function updateData(data, elemName) {
|
||||
let elem = document.getElementById(elemName);
|
||||
|
||||
elem.innerHTML = '<thead><tr><th>ICD10-Gruppe</th><th>Anzahl</th></tr></thead>'
|
||||
+ Array.from(data.entries).map(entry => `<tr><td>${entry.name}</td><td>${entry.count}</td></tr>`).join('');
|
||||
}
|
||||
|
||||
fetch('/statistics/obdsxml').then(res => res.json()).then(data => updateData(data, 'obdsxml'));
|
||||
fetch('/statistics/obdsfhir').then(res => res.json()).then(data => updateData(data, 'obdsfhir'));
|
||||
|
||||
window.addEventListener('load', () => {
|
||||
const evtSource = new EventSource('/events');
|
||||
|
||||
evtSource.addEventListener('obdsxml', (event) => {
|
||||
updateData(JSON.parse(event.data), 'obdsxml')
|
||||
});
|
||||
|
||||
evtSource.addEventListener('obdsfhir', (event) => {
|
||||
updateData(JSON.parse(event.data), 'obdsfhir')
|
||||
});
|
||||
});
|
||||
</script>
|
||||
|
||||
</body>
|
||||
</html>
|
Reference in New Issue
Block a user