1
0
mirror of https://github.com/pcvolkmer/etl-processor.git synced 2025-07-04 15:32:55 +00:00

test: add test for incoming kafka message processing

This commit is contained in:
2024-02-29 12:49:06 +01:00
parent 3e45bf8494
commit 952ad8c0cf
3 changed files with 91 additions and 8 deletions

View File

@ -98,9 +98,10 @@ class AppKafkaConfiguration {
@Bean
@ConditionalOnProperty(value = ["app.kafka.input-topic"])
fun kafkaInputListener(
requestProcessor: RequestProcessor
requestProcessor: RequestProcessor,
objectMapper: ObjectMapper
): KafkaInputListener {
return KafkaInputListener(requestProcessor)
return KafkaInputListener(requestProcessor, objectMapper)
}
@Bean

View File

@ -19,6 +19,7 @@
package dev.dnpm.etl.processor.input
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.services.RequestProcessor
@ -27,17 +28,19 @@ import org.slf4j.LoggerFactory
import org.springframework.kafka.listener.MessageListener
class KafkaInputListener(
private val requestProcessor: RequestProcessor
) : MessageListener<String, MtbFile> {
private val requestProcessor: RequestProcessor,
private val objectMapper: ObjectMapper
) : MessageListener<String, String> {
private val logger = LoggerFactory.getLogger(KafkaInputListener::class.java)
override fun onMessage(data: ConsumerRecord<String, MtbFile>) {
if (data.value().consent.status == Consent.Status.ACTIVE) {
override fun onMessage(data: ConsumerRecord<String, String>) {
val mtbFile = objectMapper.readValue(data.value(), MtbFile::class.java)
if (mtbFile.consent.status == Consent.Status.ACTIVE) {
logger.debug("Accepted MTB File for processing")
requestProcessor.processMtbFile(data.value())
requestProcessor.processMtbFile(mtbFile)
} else {
logger.debug("Accepted MTB File and process deletion")
requestProcessor.processDeletion(data.value().patient.id)
requestProcessor.processDeletion(mtbFile.patient.id)
}
}
}