mirror of
https://github.com/pcvolkmer/etl-processor.git
synced 2025-04-20 01:36:50 +00:00
Add request ID to Kafka key
This commit is contained in:
parent
2d140d94b2
commit
aed5f15d2d
@ -20,7 +20,6 @@
|
|||||||
package dev.dnpm.etl.processor.output
|
package dev.dnpm.etl.processor.output
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import de.ukw.ccc.bwhc.dto.MtbFile
|
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import org.springframework.kafka.core.KafkaTemplate
|
import org.springframework.kafka.core.KafkaTemplate
|
||||||
|
|
||||||
@ -31,14 +30,11 @@ class KafkaMtbFileSender(
|
|||||||
|
|
||||||
private val logger = LoggerFactory.getLogger(KafkaMtbFileSender::class.java)
|
private val logger = LoggerFactory.getLogger(KafkaMtbFileSender::class.java)
|
||||||
|
|
||||||
override fun send(mtbFile: MtbFile): MtbFileSender.Response {
|
override fun send(request: MtbFileSender.Request): MtbFileSender.Response {
|
||||||
return try {
|
return try {
|
||||||
val result = kafkaTemplate.sendDefault(
|
val result = kafkaTemplate.sendDefault(
|
||||||
String.format(
|
header(request),
|
||||||
"{\"pid\": \"%s\", \"eid\": \"%s\"}",
|
objectMapper.writeValueAsString(request.mtbFile)
|
||||||
mtbFile.patient.id,
|
|
||||||
mtbFile.episode.id
|
|
||||||
), objectMapper.writeValueAsString(mtbFile)
|
|
||||||
)
|
)
|
||||||
if (result.get() != null) {
|
if (result.get() != null) {
|
||||||
logger.debug("Sent file via KafkaMtbFileSender")
|
logger.debug("Sent file via KafkaMtbFileSender")
|
||||||
@ -53,4 +49,9 @@ class KafkaMtbFileSender(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun header(request: MtbFileSender.Request): String {
|
||||||
|
return "{\"pid\": \"${request.mtbFile.patient.id}\", " +
|
||||||
|
"\"eid\": \"${request.mtbFile.episode.id}\", " +
|
||||||
|
"\"requestId\": \"${request.requestId}\"}"
|
||||||
|
}
|
||||||
}
|
}
|
@ -22,10 +22,12 @@ package dev.dnpm.etl.processor.output
|
|||||||
import de.ukw.ccc.bwhc.dto.MtbFile
|
import de.ukw.ccc.bwhc.dto.MtbFile
|
||||||
|
|
||||||
interface MtbFileSender {
|
interface MtbFileSender {
|
||||||
fun send(mtbFile: MtbFile): Response
|
fun send(request: Request): Response
|
||||||
|
|
||||||
data class Response(val status: ResponseStatus, val reason: String = "")
|
data class Response(val status: ResponseStatus, val reason: String = "")
|
||||||
|
|
||||||
|
data class Request(val requestId: String, val mtbFile: MtbFile)
|
||||||
|
|
||||||
enum class ResponseStatus {
|
enum class ResponseStatus {
|
||||||
SUCCESS,
|
SUCCESS,
|
||||||
WARNING,
|
WARNING,
|
||||||
|
@ -19,7 +19,6 @@
|
|||||||
|
|
||||||
package dev.dnpm.etl.processor.output
|
package dev.dnpm.etl.processor.output
|
||||||
|
|
||||||
import de.ukw.ccc.bwhc.dto.MtbFile
|
|
||||||
import dev.dnpm.etl.processor.config.RestTargetProperties
|
import dev.dnpm.etl.processor.config.RestTargetProperties
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import org.springframework.http.HttpEntity
|
import org.springframework.http.HttpEntity
|
||||||
@ -34,11 +33,11 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
|
|||||||
|
|
||||||
private val restTemplate = RestTemplate()
|
private val restTemplate = RestTemplate()
|
||||||
|
|
||||||
override fun send(mtbFile: MtbFile): MtbFileSender.Response {
|
override fun send(request: MtbFileSender.Request): MtbFileSender.Response {
|
||||||
try {
|
try {
|
||||||
val headers = HttpHeaders()
|
val headers = HttpHeaders()
|
||||||
headers.contentType = MediaType.APPLICATION_JSON
|
headers.contentType = MediaType.APPLICATION_JSON
|
||||||
val entityReq = HttpEntity(mtbFile, headers)
|
val entityReq = HttpEntity(request.mtbFile, headers)
|
||||||
val response = restTemplate.postForEntity(
|
val response = restTemplate.postForEntity(
|
||||||
restTargetProperties.uri!!,
|
restTargetProperties.uri!!,
|
||||||
entityReq,
|
entityReq,
|
||||||
|
@ -35,6 +35,7 @@ import org.springframework.web.bind.annotation.PostMapping
|
|||||||
import org.springframework.web.bind.annotation.RequestBody
|
import org.springframework.web.bind.annotation.RequestBody
|
||||||
import org.springframework.web.bind.annotation.RestController
|
import org.springframework.web.bind.annotation.RestController
|
||||||
import reactor.core.publisher.Sinks
|
import reactor.core.publisher.Sinks
|
||||||
|
import java.util.UUID
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
class MtbFileController(
|
class MtbFileController(
|
||||||
@ -70,8 +71,10 @@ class MtbFileController(
|
|||||||
return ResponseEntity.noContent().build()
|
return ResponseEntity.noContent().build()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val request = MtbFileSender.Request(UUID.randomUUID().toString(), pseudonymized)
|
||||||
|
|
||||||
val responses = senders.map {
|
val responses = senders.map {
|
||||||
val responseStatus = it.send(pseudonymized)
|
val responseStatus = it.send(request)
|
||||||
if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) {
|
if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) {
|
||||||
logger.info(
|
logger.info(
|
||||||
"Sent file for Patient '{}' using '{}'",
|
"Sent file for Patient '{}' using '{}'",
|
||||||
@ -100,9 +103,10 @@ class MtbFileController(
|
|||||||
|
|
||||||
requestRepository.save(
|
requestRepository.save(
|
||||||
Request(
|
Request(
|
||||||
patientId = pseudonymized.patient.id,
|
uuid = request.requestId,
|
||||||
|
patientId = request.mtbFile.patient.id,
|
||||||
pid = pid,
|
pid = pid,
|
||||||
fingerprint = fingerprint(mtbFile),
|
fingerprint = fingerprint(request.mtbFile),
|
||||||
status = requestStatus,
|
status = requestStatus,
|
||||||
report = when (requestStatus) {
|
report = when (requestStatus) {
|
||||||
RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar")
|
RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user