diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt index b4e4b92..d37c251 100644 --- a/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt @@ -20,6 +20,7 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.input.KafkaInputListener import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.RestMtbFileSender @@ -78,8 +79,8 @@ class AppConfigurationTest { @TestPropertySource( properties = [ "app.kafka.servers=localhost:9092", - "app.kafka.topic=test", - "app.kafka.response-topic=test-response", + "app.kafka.output-topic=test", + "app.kafka.output-response-topic=test-response", "app.kafka.group-id=test" ] ) @@ -99,8 +100,8 @@ class AppConfigurationTest { properties = [ "app.rest.uri=http://localhost:9000", "app.kafka.servers=localhost:9092", - "app.kafka.topic=test", - "app.kafka.response-topic=test-response", + "app.kafka.output-topic=test", + "app.kafka.output-response-topic=test-response", "app.kafka.group-id=test" ] ) @@ -114,6 +115,43 @@ class AppConfigurationTest { } + @Nested + @TestPropertySource( + properties = [ + "app.kafka.servers=localhost:9092", + "app.kafka.output-topic=test", + "app.kafka.output-response-topic=test-response", + "app.kafka.group-id=test" + ] + ) + inner class AppConfigurationWithoutKafkaInputTest(private val context: ApplicationContext) { + + @Test + fun shouldNotUseKafkaInputListener() { + assertThrows { context.getBean(KafkaInputListener::class.java) } + } + + } + + @Nested + @TestPropertySource( + properties = [ + "app.kafka.servers=localhost:9092", + "app.kafka.input-topic=test_input", + "app.kafka.output-topic=test", + "app.kafka.output-response-topic=test-response", + "app.kafka.group-id=test" + ] + ) + inner class AppConfigurationUsingKafkaInputTest(private val context: ApplicationContext) { + + @Test + fun shouldUseKafkaInputListener() { + assertThat(context.getBean(KafkaInputListener::class.java)).isNotNull + } + + } + @Nested @TestPropertySource( properties = [ diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt index 2290d44..2eea92e 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt @@ -72,10 +72,21 @@ data class RestTargetProperties( } } -@ConfigurationProperties(KafkaTargetProperties.NAME) -data class KafkaTargetProperties( - val topic: String = "etl-processor", - val responseTopic: String = "${topic}_response", +@ConfigurationProperties(KafkaProperties.NAME) +data class KafkaProperties( + val inputTopic: String?, + val outputTopic: String = "etl-processor", + @get:DeprecatedConfigurationProperty( + reason = "Deprecated", + replacement = "outputTopic" + ) + val topic: String = outputTopic, + val outputResponseTopic: String = "${outputTopic}_response", + @get:DeprecatedConfigurationProperty( + reason = "Deprecated", + replacement = "outputResponseTopic" + ) + val responseTopic: String = outputResponseTopic, val groupId: String = "${topic}_group", val servers: String = "" ) { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index 68b86b2..0977527 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -20,6 +20,7 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.input.KafkaInputListener import dev.dnpm.etl.processor.monitoring.ConnectionCheckService import dev.dnpm.etl.processor.monitoring.KafkaConnectionCheckService import dev.dnpm.etl.processor.output.KafkaMtbFileSender @@ -42,9 +43,9 @@ import reactor.core.publisher.Sinks @Configuration @EnableConfigurationProperties( - value = [KafkaTargetProperties::class] + value = [KafkaProperties::class] ) -@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) +@ConditionalOnProperty(value = ["app.kafka.servers"]) @ConditionalOnMissingBean(MtbFileSender::class) @Order(-5) class AppKafkaConfiguration { @@ -54,21 +55,21 @@ class AppKafkaConfiguration { @Bean fun kafkaMtbFileSender( kafkaTemplate: KafkaTemplate, - kafkaTargetProperties: KafkaTargetProperties, + kafkaProperties: KafkaProperties, retryTemplate: RetryTemplate, objectMapper: ObjectMapper ): MtbFileSender { logger.info("Selected 'KafkaMtbFileSender'") - return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper) + return KafkaMtbFileSender(kafkaTemplate, kafkaProperties, retryTemplate, objectMapper) } @Bean - fun kafkaListenerContainer( + fun kafkaResponseListenerContainer( consumerFactory: ConsumerFactory, - kafkaTargetProperties: KafkaTargetProperties, + kafkaProperties: KafkaProperties, kafkaResponseProcessor: KafkaResponseProcessor ): KafkaMessageListenerContainer { - val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic) + val containerProperties = ContainerProperties(kafkaProperties.responseTopic) containerProperties.messageListener = kafkaResponseProcessor return KafkaMessageListenerContainer(consumerFactory, containerProperties) } @@ -81,6 +82,26 @@ class AppKafkaConfiguration { return KafkaResponseProcessor(applicationEventPublisher, objectMapper) } + @Bean + @ConditionalOnProperty(value = ["app.kafka.input-topic"]) + fun kafkaInputListenerContainer( + consumerFactory: ConsumerFactory, + kafkaProperties: KafkaProperties, + kafkaInputListener: KafkaInputListener + ): KafkaMessageListenerContainer { + val containerProperties = ContainerProperties(kafkaProperties.inputTopic) + containerProperties.messageListener = kafkaInputListener + return KafkaMessageListenerContainer(consumerFactory, containerProperties) + } + + @Bean + @ConditionalOnProperty(value = ["app.kafka.input-topic"]) + fun kafkaInputListener( + applicationEventPublisher: ApplicationEventPublisher, + ): KafkaInputListener { + return KafkaInputListener(applicationEventPublisher) + } + @Bean fun connectionCheckService(consumerFactory: ConsumerFactory, configsUpdateProducer: Sinks.Many): ConnectionCheckService { return KafkaConnectionCheckService(consumerFactory.createConsumer(), configsUpdateProducer) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt new file mode 100644 index 0000000..ee6b56e --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt @@ -0,0 +1,33 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.input + +import de.ukw.ccc.bwhc.dto.MtbFile +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.context.ApplicationEventPublisher +import org.springframework.kafka.listener.MessageListener + +class KafkaInputListener( + private val applicationEventPublisher: ApplicationEventPublisher +) : MessageListener { + override fun onMessage(data: ConsumerRecord) { + TODO("Not yet implemented") + } +} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 01c7d43..09edd31 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -22,7 +22,7 @@ package dev.dnpm.etl.processor.output import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.Consent import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.config.KafkaTargetProperties +import dev.dnpm.etl.processor.config.KafkaProperties import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate @@ -30,7 +30,7 @@ import org.springframework.retry.support.RetryTemplate class KafkaMtbFileSender( private val kafkaTemplate: KafkaTemplate, - private val kafkaTargetProperties: KafkaTargetProperties, + private val kafkaProperties: KafkaProperties, private val retryTemplate: RetryTemplate, private val objectMapper: ObjectMapper ) : MtbFileSender { @@ -41,7 +41,7 @@ class KafkaMtbFileSender( return try { return retryTemplate.execute { val result = kafkaTemplate.send( - kafkaTargetProperties.topic, + kafkaProperties.topic, key(request), objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile)) ) @@ -71,7 +71,7 @@ class KafkaMtbFileSender( return try { return retryTemplate.execute { val result = kafkaTemplate.send( - kafkaTargetProperties.topic, + kafkaProperties.topic, key(request), objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile)) ) @@ -90,7 +90,7 @@ class KafkaMtbFileSender( } override fun endpoint(): String { - return "${this.kafkaTargetProperties.servers} (${this.kafkaTargetProperties.topic}/${this.kafkaTargetProperties.responseTopic})" + return "${this.kafkaProperties.servers} (${this.kafkaProperties.topic}/${this.kafkaProperties.responseTopic})" } private fun key(request: MtbFileSender.MtbFileRequest): String { diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt index d0f7c30..ff5b1ae 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt @@ -21,7 +21,7 @@ package dev.dnpm.etl.processor.output import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.* -import dev.dnpm.etl.processor.config.KafkaTargetProperties +import dev.dnpm.etl.processor.config.KafkaProperties import dev.dnpm.etl.processor.monitoring.RequestStatus import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach @@ -53,13 +53,13 @@ class KafkaMtbFileSenderTest { fun setup( @Mock kafkaTemplate: KafkaTemplate ) { - val kafkaTargetProperties = KafkaTargetProperties("testtopic") + val kafkaProperties = KafkaProperties("testtopic") val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(1)).build() this.objectMapper = ObjectMapper() this.kafkaTemplate = kafkaTemplate - this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper) + this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaProperties, retryTemplate, objectMapper) } @ParameterizedTest @@ -125,9 +125,9 @@ class KafkaMtbFileSenderTest { @ParameterizedTest @MethodSource("requestWithResponseSource") fun shouldRetryOnMtbFileKafkaSendError(testData: TestData) { - val kafkaTargetProperties = KafkaTargetProperties("testtopic") + val kafkaProperties = KafkaProperties("testtopic") val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build() - this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaTargetProperties, retryTemplate, this.objectMapper) + this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaProperties, retryTemplate, this.objectMapper) doAnswer { if (null != testData.exception) { @@ -151,9 +151,9 @@ class KafkaMtbFileSenderTest { @ParameterizedTest @MethodSource("requestWithResponseSource") fun shouldRetryOnDeleteKafkaSendError(testData: TestData) { - val kafkaTargetProperties = KafkaTargetProperties("testtopic") + val kafkaProperties = KafkaProperties("testtopic") val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build() - this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaTargetProperties, retryTemplate, this.objectMapper) + this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaProperties, retryTemplate, this.objectMapper) doAnswer { if (null != testData.exception) {