1
0
mirror of https://github.com/pcvolkmer/etl-processor.git synced 2025-04-19 17:26:51 +00:00

Explicit producer topic configuration

This commit is contained in:
Paul-Christian Volkmer 2023-08-03 16:04:31 +02:00
parent 3dcee41569
commit ec76c775d9
4 changed files with 28 additions and 12 deletions

View File

@ -43,9 +43,10 @@ class AppKafkaConfiguration {
@Bean
fun kafkaMtbFileSender(
kafkaTemplate: KafkaTemplate<String, String>,
kafkaTargetProperties: KafkaTargetProperties,
objectMapper: ObjectMapper
): MtbFileSender {
return KafkaMtbFileSender(kafkaTemplate, objectMapper)
return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper)
}
@Bean
@ -54,7 +55,7 @@ class AppKafkaConfiguration {
kafkaTargetProperties: KafkaTargetProperties,
kafkaResponseProcessor: KafkaResponseProcessor
): KafkaMessageListenerContainer<String, String> {
val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic);
val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic)
containerProperties.messageListener = kafkaResponseProcessor
return KafkaMessageListenerContainer(consumerFactory, containerProperties)
}

View File

@ -22,11 +22,13 @@ package dev.dnpm.etl.processor.output
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.config.KafkaTargetProperties
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
class KafkaMtbFileSender(
private val kafkaTemplate: KafkaTemplate<String, String>,
private val kafkaTargetProperties: KafkaTargetProperties,
private val objectMapper: ObjectMapper
) : MtbFileSender {
@ -34,13 +36,14 @@ class KafkaMtbFileSender(
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
return try {
val result = kafkaTemplate.sendDefault(
header(request),
val result = kafkaTemplate.send(
kafkaTargetProperties.topic,
key(request),
objectMapper.writeValueAsString(request.mtbFile)
)
if (result.get() != null) {
logger.debug("Sent file via KafkaMtbFileSender")
MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
} else {
MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
}
@ -61,14 +64,15 @@ class KafkaMtbFileSender(
.build()
return try {
val result = kafkaTemplate.sendDefault(
header(request),
val result = kafkaTemplate.send(
kafkaTargetProperties.topic,
key(request),
objectMapper.writeValueAsString(dummyMtbFile)
)
if (result.get() != null) {
logger.debug("Sent deletion request via KafkaMtbFileSender")
MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
} else {
MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
}
@ -78,13 +82,13 @@ class KafkaMtbFileSender(
}
}
private fun header(request: MtbFileSender.MtbFileRequest): String {
private fun key(request: MtbFileSender.MtbFileRequest): String {
return "{\"pid\": \"${request.mtbFile.patient.id}\", " +
"\"eid\": \"${request.mtbFile.episode.id}\", " +
"\"requestId\": \"${request.requestId}\"}"
}
private fun header(request: MtbFileSender.DeleteRequest): String {
private fun key(request: MtbFileSender.DeleteRequest): String {
return "{\"pid\": \"${request.patientId}\", " +
"\"requestId\": \"${request.requestId}\"}"
}

View File

@ -42,6 +42,9 @@ class KafkaResponseProcessor(
val responseKey = objectMapper.readValue(data.key(), ResponseKey::class.java)
requestRepository.findByUuidEquals(responseKey.requestId).ifPresent {
val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java)
println("${responseBody.statusCode}")
when (responseBody.statusCode) {
200 -> {
it.status = RequestStatus.SUCCESS
@ -69,6 +72,16 @@ class KafkaResponseProcessor(
requestRepository.save(it)
}
in 900..999 -> {
it.status = RequestStatus.ERROR
it.processedAt = Instant.ofEpochMilli(data.timestamp())
it.report = Report(
"Fehler bei der Datenübertragung, keine Verbindung zum bwHC-Backend möglich",
objectMapper.writeValueAsString(responseBody.statusBody)
)
requestRepository.save(it)
}
else -> {
logger.error("Cannot process Kafka response: Unknown response code!")
}

View File

@ -1,8 +1,6 @@
spring:
kafka:
bootstrap-servers: ${app.kafka.servers}
template:
default-topic: ${app.kafka.topic}
consumer:
group-id: ${app.kafka.group-id}
flyway: