diff --git a/build.gradle.kts b/build.gradle.kts
index e29435a..0cbf77e 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -32,11 +32,16 @@ dependencies {
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.springframework.boot:spring-boot-starter-thymeleaf")
implementation("org.springframework.boot:spring-boot-starter-web")
+ implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.springframework.kafka:spring-kafka")
+ implementation("org.flywaydb:flyway-mysql")
implementation("commons-codec:commons-codec")
implementation("de.ukw.ccc:bwhc-dto-java:0.2.0")
+ runtimeOnly("org.mariadb.jdbc:mariadb-java-client")
+ runtimeOnly("org.postgresql:postgresql")
developmentOnly("org.springframework.boot:spring-boot-devtools")
+ developmentOnly("org.springframework.boot:spring-boot-docker-compose")
annotationProcessor("org.springframework.boot:spring-boot-configuration-processor")
providedRuntime("org.springframework.boot:spring-boot-starter-tomcat")
testImplementation("org.springframework.boot:spring-boot-starter-test")
diff --git a/dev-compose.yml b/dev-compose.yml
new file mode 100644
index 0000000..cebe942
--- /dev/null
+++ b/dev-compose.yml
@@ -0,0 +1,10 @@
+services:
+ mariadb:
+ image: mariadb:10
+ ports:
+ - "13306:3306"
+ environment:
+ MARIADB_DATABASE: dev
+ MARIADB_USER: dev
+ MARIADB_PASSWORD: dev
+ MARIADB_ROOT_PASSWORD: dev
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt
new file mode 100644
index 0000000..80ddde8
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt
@@ -0,0 +1,44 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package dev.dnpm.etl.processor.monitoring
+
+import org.springframework.data.annotation.Id
+import org.springframework.data.relational.core.mapping.Table
+import org.springframework.data.repository.CrudRepository
+import java.time.Instant
+import java.util.*
+
+typealias RequestId = UUID
+
+@Table("request")
+data class Request(
+ @Id val id: Long? = null,
+ val uuid: RequestId = RequestId.randomUUID(),
+ val patientId: String,
+ val fingerprint: String,
+ val status: RequestStatus,
+ val processedAt: Instant = Instant.now()
+)
+
+interface RequestRepository : CrudRepository {
+
+ fun findByPatientIdOrderByProcessedAtDesc(patientId: String): List
+
+}
\ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/RequestStatus.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/RequestStatus.kt
new file mode 100644
index 0000000..8c19e86
--- /dev/null
+++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/RequestStatus.kt
@@ -0,0 +1,28 @@
+/*
+ * This file is part of ETL-Processor
+ *
+ * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package dev.dnpm.etl.processor.monitoring
+
+enum class RequestStatus(val value: String) {
+ SUCCESS("success"),
+ WARNING("warning"),
+ ERROR("error"),
+ UNKNOWN("unknown"),
+ DUPLICATION("duplication")
+}
\ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
index 0983e35..acec07a 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt
@@ -21,7 +21,6 @@ package dev.dnpm.etl.processor.output
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.MtbFile
-import dev.dnpm.etl.processor.config.KafkaTargetProperties
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
@@ -32,14 +31,14 @@ class KafkaMtbFileSender(
private val logger = LoggerFactory.getLogger(KafkaMtbFileSender::class.java)
- override fun send(mtbFile: MtbFile): Boolean {
+ override fun send(mtbFile: MtbFile): MtbFileSender.ResponseStatus {
return try {
kafkaTemplate.sendDefault(objectMapper.writeValueAsString(mtbFile))
logger.debug("Sent file via KafkaMtbFileSender")
- true
+ MtbFileSender.ResponseStatus.UNKNOWN
} catch (e: Exception) {
logger.error("An error occured sending to kafka", e)
- false
+ MtbFileSender.ResponseStatus.ERROR
}
}
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
index f8279db..a085f04 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/MtbFileSender.kt
@@ -22,7 +22,12 @@ package dev.dnpm.etl.processor.output
import de.ukw.ccc.bwhc.dto.MtbFile
interface MtbFileSender {
+ fun send(mtbFile: MtbFile): ResponseStatus
- fun send(mtbFile: MtbFile): Boolean
-
+ enum class ResponseStatus {
+ SUCCESS,
+ WARNING,
+ ERROR,
+ UNKNOWN
+ }
}
\ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
index 018c7d6..d3b58fb 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt
@@ -34,7 +34,7 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
private val restTemplate = RestTemplate()
- override fun send(mtbFile: MtbFile): Boolean {
+ override fun send(mtbFile: MtbFile): MtbFileSender.ResponseStatus {
try {
val headers = HttpHeaders()
headers.contentType = MediaType.APPLICATION_JSON
@@ -46,17 +46,21 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
)
if (!response.statusCode.is2xxSuccessful) {
logger.warn("Error sending to remote system: {}", response.body)
- return false
+ return MtbFileSender.ResponseStatus.ERROR
}
logger.debug("Sent file via RestMtbFileSender")
- return true
+ return if (response.body?.contains("warning") == true) {
+ MtbFileSender.ResponseStatus.WARNING
+ } else {
+ MtbFileSender.ResponseStatus.SUCCESS
+ }
} catch (e: IllegalArgumentException) {
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
} catch (e: RestClientException) {
logger.info(restTargetProperties.uri!!.toString())
logger.error("Cannot send data to remote system", e)
}
- return false
+ return MtbFileSender.ResponseStatus.ERROR
}
}
\ No newline at end of file
diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt
index 900b8c8..7276561 100644
--- a/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt
+++ b/src/main/kotlin/dev/dnpm/etl/processor/web/MtbFileController.kt
@@ -19,11 +19,17 @@
package dev.dnpm.etl.processor.web
+import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.MtbFile
+import dev.dnpm.etl.processor.monitoring.Request
+import dev.dnpm.etl.processor.monitoring.RequestRepository
+import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.output.MtbFileSender
-import dev.dnpm.etl.processor.output.RestMtbFileSender
import dev.dnpm.etl.processor.pseudonym.PseudonymizeService
+import org.apache.commons.codec.binary.Base32
+import org.apache.commons.codec.digest.DigestUtils
import org.slf4j.LoggerFactory
+import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RestController
@@ -31,22 +37,78 @@ import org.springframework.web.bind.annotation.RestController
@RestController
class MtbFileController(
private val pseudonymizeService: PseudonymizeService,
- private val senders: List
+ private val senders: List,
+ private val requestRepository: RequestRepository,
+ private val objectMapper: ObjectMapper
) {
private val logger = LoggerFactory.getLogger(MtbFileController::class.java)
@PostMapping(path = ["/mtbfile"])
- fun mtbFile(@RequestBody mtbFile: MtbFile) {
+ fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity {
val pseudonymized = pseudonymizeService.pseudonymize(mtbFile)
- senders.forEach {
- val success = it.send(pseudonymized)
- if (success) {
- logger.info("Sent file for Patient '{}' using '{}'", pseudonymized.patient.id, it.javaClass.simpleName)
+
+ val lastRequestForPatient =
+ requestRepository.findByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id).firstOrNull()
+
+ if (null != lastRequestForPatient && lastRequestForPatient.fingerprint == fingerprint(mtbFile)) {
+ requestRepository.save(
+ Request(
+ patientId = pseudonymized.patient.id,
+ fingerprint = fingerprint(mtbFile),
+ status = RequestStatus.DUPLICATION
+ )
+ )
+ return ResponseEntity.noContent().build()
+ }
+
+ val responses = senders.map {
+ val responseStatus = it.send(pseudonymized)
+ if (responseStatus == MtbFileSender.ResponseStatus.SUCCESS || responseStatus == MtbFileSender.ResponseStatus.WARNING) {
+ logger.info(
+ "Sent file for Patient '{}' using '{}'",
+ pseudonymized.patient.id,
+ it.javaClass.simpleName
+ )
} else {
- logger.error("Error sending file for Patient '{}' using '{}'", pseudonymized.patient.id, it.javaClass.simpleName)
+ logger.error(
+ "Error sending file for Patient '{}' using '{}'",
+ pseudonymized.patient.id,
+ it.javaClass.simpleName
+ )
}
+ responseStatus
+ }
+
+ val requestStatus = if (responses.contains(MtbFileSender.ResponseStatus.ERROR)) {
+ RequestStatus.ERROR
+ } else if (responses.contains(MtbFileSender.ResponseStatus.WARNING)) {
+ RequestStatus.WARNING
+ } else if (responses.contains(MtbFileSender.ResponseStatus.SUCCESS)) {
+ RequestStatus.SUCCESS
+ } else {
+ RequestStatus.UNKNOWN
+ }
+
+ requestRepository.save(
+ Request(
+ patientId = pseudonymized.patient.id,
+ fingerprint = fingerprint(mtbFile),
+ status = requestStatus
+ )
+ )
+
+ return if (requestStatus == RequestStatus.ERROR) {
+ ResponseEntity.unprocessableEntity().build()
+ } else {
+ ResponseEntity.noContent().build()
}
}
+ private fun fingerprint(mtbFile: MtbFile): String {
+ return Base32().encodeAsString(DigestUtils.sha256(objectMapper.writeValueAsString(mtbFile)))
+ .replace("=", "")
+ .lowercase()
+ }
+
}
\ No newline at end of file
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index da543bd..39acb37 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -2,4 +2,6 @@ spring:
kafka:
bootstrap-servers: ${app.kafka.servers}
template:
- default-topic: ${app.kafka.topic}
\ No newline at end of file
+ default-topic: ${app.kafka.topic}
+ flyway:
+ locations: "classpath:db/migration/{vendor}"
\ No newline at end of file
diff --git a/src/main/resources/db/migration/mariadb/V0_1_0__Init.sql b/src/main/resources/db/migration/mariadb/V0_1_0__Init.sql
new file mode 100644
index 0000000..0683195
--- /dev/null
+++ b/src/main/resources/db/migration/mariadb/V0_1_0__Init.sql
@@ -0,0 +1,9 @@
+CREATE TABLE IF NOT EXISTS request
+(
+ id int auto_increment primary key,
+ uuid varchar(255) not null,
+ patient_id varchar(255) not null,
+ fingerprint varchar(255) not null,
+ status varchar(16) not null,
+ processed_at datetime default utc_timestamp() not null
+);
\ No newline at end of file