mirror of
https://github.com/pcvolkmer/etl-processor.git
synced 2025-04-19 17:26:51 +00:00
Add request and status logging
This commit is contained in:
parent
05149bac0b
commit
2929bb26ac
@ -32,11 +32,16 @@ dependencies {
|
||||
implementation("org.jetbrains.kotlin:kotlin-reflect")
|
||||
implementation("org.springframework.boot:spring-boot-starter-thymeleaf")
|
||||
implementation("org.springframework.boot:spring-boot-starter-web")
|
||||
implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
|
||||
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
|
||||
implementation("org.springframework.kafka:spring-kafka")
|
||||
implementation("org.flywaydb:flyway-mysql")
|
||||
implementation("commons-codec:commons-codec")
|
||||
implementation("de.ukw.ccc:bwhc-dto-java:0.2.0")
|
||||
runtimeOnly("org.mariadb.jdbc:mariadb-java-client")
|
||||
runtimeOnly("org.postgresql:postgresql")
|
||||
developmentOnly("org.springframework.boot:spring-boot-devtools")
|
||||
developmentOnly("org.springframework.boot:spring-boot-docker-compose")
|
||||
annotationProcessor("org.springframework.boot:spring-boot-configuration-processor")
|
||||
providedRuntime("org.springframework.boot:spring-boot-starter-tomcat")
|
||||
testImplementation("org.springframework.boot:spring-boot-starter-test")
|
||||
|
10
dev-compose.yml
Normal file
10
dev-compose.yml
Normal file
@ -0,0 +1,10 @@
|
||||
services:
|
||||
mariadb:
|
||||
image: mariadb:10
|
||||
ports:
|
||||
- "13306:3306"
|
||||
environment:
|
||||
MARIADB_DATABASE: dev
|
||||
MARIADB_USER: dev
|
||||
MARIADB_PASSWORD: dev
|
||||
MARIADB_ROOT_PASSWORD: dev
|
44
src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt
Normal file
44
src/main/kotlin/dev/dnpm/etl/processor/monitoring/Request.kt
Normal file
@ -0,0 +1,44 @@
|
||||
/*
|
||||
* This file is part of ETL-Processor
|
||||
*
|
||||
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published
|
||||
* by the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package dev.dnpm.etl.processor.monitoring
|
||||
|
||||
import org.springframework.data.annotation.Id
|
||||
import org.springframework.data.relational.core.mapping.Table
|
||||
import org.springframework.data.repository.CrudRepository
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
||||
typealias RequestId = UUID
|
||||
|
||||
@Table("request")
|
||||
data class Request(
|
||||
@Id val id: Long? = null,
|
||||
val uuid: RequestId = RequestId.randomUUID(),
|
||||
val patientId: String,
|
||||
val fingerprint: String,
|
||||
val status: RequestStatus,
|
||||
val processedAt: Instant = Instant.now()
|
||||
)
|
||||
|
||||
interface RequestRepository : CrudRepository<Request, Long> {
|
||||
|
||||
fun findByPatientIdOrderByProcessedAtDesc(patientId: String): List<Request>
|
||||
|
||||
}
|
@ -0,0 +1,28 @@
|
||||
/*
|
||||
* This file is part of ETL-Processor
|
||||
*
|
||||
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published
|
||||
* by the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package dev.dnpm.etl.processor.monitoring
|
||||
|
||||
enum class RequestStatus(val value: String) {
|
||||
SUCCESS("success"),
|
||||
WARNING("warning"),
|
||||
ERROR("error"),
|
||||
UNKNOWN("unknown"),
|
||||
DUPLICATION("duplication")
|
||||
}
|
@ -21,7 +21,6 @@ package dev.dnpm.etl.processor.output
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import de.ukw.ccc.bwhc.dto.MtbFile
|
||||
import dev.dnpm.etl.processor.config.KafkaTargetProperties
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.springframework.kafka.core.KafkaTemplate
|
||||
|
||||
@ -32,14 +31,14 @@ class KafkaMtbFileSender(
|
||||
|
||||
private val logger = LoggerFactory.getLogger(KafkaMtbFileSender::class.java)
|
||||
|
||||
override fun send(mtbFile: MtbFile): Boolean {
|
||||
override fun send(mtbFile: MtbFile): MtbFileSender.ResponseStatus {
|
||||
return try {
|
||||
kafkaTemplate.sendDefault(objectMapper.writeValueAsString(mtbFile))
|
||||
logger.debug("Sent file via KafkaMtbFileSender")
|
||||
true
|
||||
MtbFileSender.ResponseStatus.UNKNOWN
|
||||
} catch (e: Exception) {
|
||||
logger.error("An error occured sending to kafka", e)
|
||||
false
|
||||
MtbFileSender.ResponseStatus.ERROR
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,12 @@ package dev.dnpm.etl.processor.output
|
||||
import de.ukw.ccc.bwhc.dto.MtbFile
|
||||
|
||||
interface MtbFileSender {
|
||||
fun send(mtbFile: MtbFile): ResponseStatus
|
||||
|
||||
fun send(mtbFile: MtbFile): Boolean
|
||||
|
||||
enum class ResponseStatus {
|
||||
SUCCESS,
|
||||
WARNING,
|
||||
ERROR,
|
||||
UNKNOWN
|
||||
}
|
||||
}
|
@ -34,7 +34,7 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
|
||||
|
||||
private val restTemplate = RestTemplate()
|
||||
|
||||
override fun send(mtbFile: MtbFile): Boolean {
|
||||
override fun send(mtbFile: MtbFile): MtbFileSender.ResponseStatus {
|
||||
try {
|
||||
val headers = HttpHeaders()
|
||||
headers.contentType = MediaType.APPLICATION_JSON
|
||||
@ -46,17 +46,21 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
|
||||
)
|
||||
if (!response.statusCode.is2xxSuccessful) {
|
||||
logger.warn("Error sending to remote system: {}", response.body)
|
||||
return false
|
||||
return MtbFileSender.ResponseStatus.ERROR
|
||||
}
|
||||
logger.debug("Sent file via RestMtbFileSender")
|
||||
return true
|
||||
return if (response.body?.contains("warning") == true) {
|
||||
MtbFileSender.ResponseStatus.WARNING
|
||||
} else {
|
||||
MtbFileSender.ResponseStatus.SUCCESS
|
||||
}
|
||||
} catch (e: IllegalArgumentException) {
|
||||
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
|
||||
} catch (e: RestClientException) {
|
||||
logger.info(restTargetProperties.uri!!.toString())
|
||||
logger.error("Cannot send data to remote system", e)
|
||||
}
|
||||
return false
|
||||
return MtbFileSender.ResponseStatus.ERROR
|
||||
}
|
||||
|
||||
}
|
@ -19,11 +19,17 @@
|
||||
|
||||
package dev.dnpm.etl.processor.web
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import de.ukw.ccc.bwhc.dto.MtbFile
|
||||
import dev.dnpm.etl.processor.monitoring.Request
|
||||
import dev.dnpm.etl.processor.monitoring.RequestRepository
|
||||
import dev.dnpm.etl.processor.monitoring.RequestStatus
|
||||
import dev.dnpm.etl.processor.output.MtbFileSender
|
||||
import dev.dnpm.etl.processor.output.RestMtbFileSender
|
||||
import dev.dnpm.etl.processor.pseudonym.PseudonymizeService
|
||||
import org.apache.commons.codec.binary.Base32
|
||||
import org.apache.commons.codec.digest.DigestUtils
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.springframework.http.ResponseEntity
|
||||
import org.springframework.web.bind.annotation.PostMapping
|
||||
import org.springframework.web.bind.annotation.RequestBody
|
||||
import org.springframework.web.bind.annotation.RestController
|
||||
@ -31,22 +37,78 @@ import org.springframework.web.bind.annotation.RestController
|
||||
@RestController
|
||||
class MtbFileController(
|
||||
private val pseudonymizeService: PseudonymizeService,
|
||||
private val senders: List<MtbFileSender>
|
||||
private val senders: List<MtbFileSender>,
|
||||
private val requestRepository: RequestRepository,
|
||||
private val objectMapper: ObjectMapper
|
||||
) {
|
||||
|
||||
private val logger = LoggerFactory.getLogger(MtbFileController::class.java)
|
||||
|
||||
@PostMapping(path = ["/mtbfile"])
|
||||
fun mtbFile(@RequestBody mtbFile: MtbFile) {
|
||||
fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity<Void> {
|
||||
val pseudonymized = pseudonymizeService.pseudonymize(mtbFile)
|
||||
senders.forEach {
|
||||
val success = it.send(pseudonymized)
|
||||
if (success) {
|
||||
logger.info("Sent file for Patient '{}' using '{}'", pseudonymized.patient.id, it.javaClass.simpleName)
|
||||
|
||||
val lastRequestForPatient =
|
||||
requestRepository.findByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id).firstOrNull()
|
||||
|
||||
if (null != lastRequestForPatient && lastRequestForPatient.fingerprint == fingerprint(mtbFile)) {
|
||||
requestRepository.save(
|
||||
Request(
|
||||
patientId = pseudonymized.patient.id,
|
||||
fingerprint = fingerprint(mtbFile),
|
||||
status = RequestStatus.DUPLICATION
|
||||
)
|
||||
)
|
||||
return ResponseEntity.noContent().build()
|
||||
}
|
||||
|
||||
val responses = senders.map {
|
||||
val responseStatus = it.send(pseudonymized)
|
||||
if (responseStatus == MtbFileSender.ResponseStatus.SUCCESS || responseStatus == MtbFileSender.ResponseStatus.WARNING) {
|
||||
logger.info(
|
||||
"Sent file for Patient '{}' using '{}'",
|
||||
pseudonymized.patient.id,
|
||||
it.javaClass.simpleName
|
||||
)
|
||||
} else {
|
||||
logger.error("Error sending file for Patient '{}' using '{}'", pseudonymized.patient.id, it.javaClass.simpleName)
|
||||
logger.error(
|
||||
"Error sending file for Patient '{}' using '{}'",
|
||||
pseudonymized.patient.id,
|
||||
it.javaClass.simpleName
|
||||
)
|
||||
}
|
||||
responseStatus
|
||||
}
|
||||
|
||||
val requestStatus = if (responses.contains(MtbFileSender.ResponseStatus.ERROR)) {
|
||||
RequestStatus.ERROR
|
||||
} else if (responses.contains(MtbFileSender.ResponseStatus.WARNING)) {
|
||||
RequestStatus.WARNING
|
||||
} else if (responses.contains(MtbFileSender.ResponseStatus.SUCCESS)) {
|
||||
RequestStatus.SUCCESS
|
||||
} else {
|
||||
RequestStatus.UNKNOWN
|
||||
}
|
||||
|
||||
requestRepository.save(
|
||||
Request(
|
||||
patientId = pseudonymized.patient.id,
|
||||
fingerprint = fingerprint(mtbFile),
|
||||
status = requestStatus
|
||||
)
|
||||
)
|
||||
|
||||
return if (requestStatus == RequestStatus.ERROR) {
|
||||
ResponseEntity.unprocessableEntity().build()
|
||||
} else {
|
||||
ResponseEntity.noContent().build()
|
||||
}
|
||||
}
|
||||
|
||||
private fun fingerprint(mtbFile: MtbFile): String {
|
||||
return Base32().encodeAsString(DigestUtils.sha256(objectMapper.writeValueAsString(mtbFile)))
|
||||
.replace("=", "")
|
||||
.lowercase()
|
||||
}
|
||||
|
||||
}
|
@ -2,4 +2,6 @@ spring:
|
||||
kafka:
|
||||
bootstrap-servers: ${app.kafka.servers}
|
||||
template:
|
||||
default-topic: ${app.kafka.topic}
|
||||
default-topic: ${app.kafka.topic}
|
||||
flyway:
|
||||
locations: "classpath:db/migration/{vendor}"
|
9
src/main/resources/db/migration/mariadb/V0_1_0__Init.sql
Normal file
9
src/main/resources/db/migration/mariadb/V0_1_0__Init.sql
Normal file
@ -0,0 +1,9 @@
|
||||
CREATE TABLE IF NOT EXISTS request
|
||||
(
|
||||
id int auto_increment primary key,
|
||||
uuid varchar(255) not null,
|
||||
patient_id varchar(255) not null,
|
||||
fingerprint varchar(255) not null,
|
||||
status varchar(16) not null,
|
||||
processed_at datetime default utc_timestamp() not null
|
||||
);
|
Loading…
x
Reference in New Issue
Block a user