1
0
mirror of https://github.com/pcvolkmer/etl-processor.git synced 2025-07-01 22:22:53 +00:00

feat: implement KafkaInputListener

This commit is contained in:
2024-02-29 09:19:32 +01:00
parent 46ddaf10f7
commit 3e45bf8494
3 changed files with 18 additions and 5 deletions

View File

@ -26,6 +26,7 @@ import dev.dnpm.etl.processor.output.KafkaMtbFileSender
import dev.dnpm.etl.processor.output.RestMtbFileSender import dev.dnpm.etl.processor.output.RestMtbFileSender
import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator
import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator
import dev.dnpm.etl.processor.services.RequestProcessor
import dev.dnpm.etl.processor.services.TokenRepository import dev.dnpm.etl.processor.services.TokenRepository
import dev.dnpm.etl.processor.services.TokenService import dev.dnpm.etl.processor.services.TokenService
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
@ -144,6 +145,7 @@ class AppConfigurationTest {
"app.kafka.group-id=test" "app.kafka.group-id=test"
] ]
) )
@MockBean(RequestProcessor::class)
inner class AppConfigurationUsingKafkaInputTest(private val context: ApplicationContext) { inner class AppConfigurationUsingKafkaInputTest(private val context: ApplicationContext) {
@Test @Test

View File

@ -25,6 +25,7 @@ import dev.dnpm.etl.processor.monitoring.ConnectionCheckService
import dev.dnpm.etl.processor.monitoring.KafkaConnectionCheckService import dev.dnpm.etl.processor.monitoring.KafkaConnectionCheckService
import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.KafkaMtbFileSender
import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.services.RequestProcessor
import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
@ -97,9 +98,9 @@ class AppKafkaConfiguration {
@Bean @Bean
@ConditionalOnProperty(value = ["app.kafka.input-topic"]) @ConditionalOnProperty(value = ["app.kafka.input-topic"])
fun kafkaInputListener( fun kafkaInputListener(
applicationEventPublisher: ApplicationEventPublisher, requestProcessor: RequestProcessor
): KafkaInputListener { ): KafkaInputListener {
return KafkaInputListener(applicationEventPublisher) return KafkaInputListener(requestProcessor)
} }
@Bean @Bean

View File

@ -19,15 +19,25 @@
package dev.dnpm.etl.processor.input package dev.dnpm.etl.processor.input
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.services.RequestProcessor
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.context.ApplicationEventPublisher import org.slf4j.LoggerFactory
import org.springframework.kafka.listener.MessageListener import org.springframework.kafka.listener.MessageListener
class KafkaInputListener( class KafkaInputListener(
private val applicationEventPublisher: ApplicationEventPublisher private val requestProcessor: RequestProcessor
) : MessageListener<String, MtbFile> { ) : MessageListener<String, MtbFile> {
private val logger = LoggerFactory.getLogger(KafkaInputListener::class.java)
override fun onMessage(data: ConsumerRecord<String, MtbFile>) { override fun onMessage(data: ConsumerRecord<String, MtbFile>) {
TODO("Not yet implemented") if (data.value().consent.status == Consent.Status.ACTIVE) {
logger.debug("Accepted MTB File for processing")
requestProcessor.processMtbFile(data.value())
} else {
logger.debug("Accepted MTB File and process deletion")
requestProcessor.processDeletion(data.value().patient.id)
}
} }
} }