From 50a6d66718abbf08368c41ec9c0df536cf5771c1 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Mon, 19 Feb 2024 17:06:02 +0100 Subject: [PATCH 1/4] feat: new kafka config due to kafka input --- .../processor/config/AppConfigurationTest.kt | 46 +++++++++++++++++-- .../processor/config/AppConfigProperties.kt | 19 ++++++-- .../processor/config/AppKafkaConfiguration.kt | 35 +++++++++++--- .../etl/processor/input/KafkaInputListener.kt | 33 +++++++++++++ .../processor/output/KafkaMtbFileSender.kt | 10 ++-- .../output/KafkaMtbFileSenderTest.kt | 14 +++--- 6 files changed, 130 insertions(+), 27 deletions(-) create mode 100644 src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt index b4e4b92..d37c251 100644 --- a/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt @@ -20,6 +20,7 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.input.KafkaInputListener import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.RestMtbFileSender @@ -78,8 +79,8 @@ class AppConfigurationTest { @TestPropertySource( properties = [ "app.kafka.servers=localhost:9092", - "app.kafka.topic=test", - "app.kafka.response-topic=test-response", + "app.kafka.output-topic=test", + "app.kafka.output-response-topic=test-response", "app.kafka.group-id=test" ] ) @@ -99,8 +100,8 @@ class AppConfigurationTest { properties = [ "app.rest.uri=http://localhost:9000", "app.kafka.servers=localhost:9092", - "app.kafka.topic=test", - "app.kafka.response-topic=test-response", + "app.kafka.output-topic=test", + "app.kafka.output-response-topic=test-response", "app.kafka.group-id=test" ] ) @@ -114,6 +115,43 @@ class AppConfigurationTest { } + @Nested + @TestPropertySource( + properties = [ + "app.kafka.servers=localhost:9092", + "app.kafka.output-topic=test", + "app.kafka.output-response-topic=test-response", + "app.kafka.group-id=test" + ] + ) + inner class AppConfigurationWithoutKafkaInputTest(private val context: ApplicationContext) { + + @Test + fun shouldNotUseKafkaInputListener() { + assertThrows { context.getBean(KafkaInputListener::class.java) } + } + + } + + @Nested + @TestPropertySource( + properties = [ + "app.kafka.servers=localhost:9092", + "app.kafka.input-topic=test_input", + "app.kafka.output-topic=test", + "app.kafka.output-response-topic=test-response", + "app.kafka.group-id=test" + ] + ) + inner class AppConfigurationUsingKafkaInputTest(private val context: ApplicationContext) { + + @Test + fun shouldUseKafkaInputListener() { + assertThat(context.getBean(KafkaInputListener::class.java)).isNotNull + } + + } + @Nested @TestPropertySource( properties = [ diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt index 2290d44..2eea92e 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt @@ -72,10 +72,21 @@ data class RestTargetProperties( } } -@ConfigurationProperties(KafkaTargetProperties.NAME) -data class KafkaTargetProperties( - val topic: String = "etl-processor", - val responseTopic: String = "${topic}_response", +@ConfigurationProperties(KafkaProperties.NAME) +data class KafkaProperties( + val inputTopic: String?, + val outputTopic: String = "etl-processor", + @get:DeprecatedConfigurationProperty( + reason = "Deprecated", + replacement = "outputTopic" + ) + val topic: String = outputTopic, + val outputResponseTopic: String = "${outputTopic}_response", + @get:DeprecatedConfigurationProperty( + reason = "Deprecated", + replacement = "outputResponseTopic" + ) + val responseTopic: String = outputResponseTopic, val groupId: String = "${topic}_group", val servers: String = "" ) { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index 68b86b2..0977527 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -20,6 +20,7 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper +import dev.dnpm.etl.processor.input.KafkaInputListener import dev.dnpm.etl.processor.monitoring.ConnectionCheckService import dev.dnpm.etl.processor.monitoring.KafkaConnectionCheckService import dev.dnpm.etl.processor.output.KafkaMtbFileSender @@ -42,9 +43,9 @@ import reactor.core.publisher.Sinks @Configuration @EnableConfigurationProperties( - value = [KafkaTargetProperties::class] + value = [KafkaProperties::class] ) -@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"]) +@ConditionalOnProperty(value = ["app.kafka.servers"]) @ConditionalOnMissingBean(MtbFileSender::class) @Order(-5) class AppKafkaConfiguration { @@ -54,21 +55,21 @@ class AppKafkaConfiguration { @Bean fun kafkaMtbFileSender( kafkaTemplate: KafkaTemplate, - kafkaTargetProperties: KafkaTargetProperties, + kafkaProperties: KafkaProperties, retryTemplate: RetryTemplate, objectMapper: ObjectMapper ): MtbFileSender { logger.info("Selected 'KafkaMtbFileSender'") - return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper) + return KafkaMtbFileSender(kafkaTemplate, kafkaProperties, retryTemplate, objectMapper) } @Bean - fun kafkaListenerContainer( + fun kafkaResponseListenerContainer( consumerFactory: ConsumerFactory, - kafkaTargetProperties: KafkaTargetProperties, + kafkaProperties: KafkaProperties, kafkaResponseProcessor: KafkaResponseProcessor ): KafkaMessageListenerContainer { - val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic) + val containerProperties = ContainerProperties(kafkaProperties.responseTopic) containerProperties.messageListener = kafkaResponseProcessor return KafkaMessageListenerContainer(consumerFactory, containerProperties) } @@ -81,6 +82,26 @@ class AppKafkaConfiguration { return KafkaResponseProcessor(applicationEventPublisher, objectMapper) } + @Bean + @ConditionalOnProperty(value = ["app.kafka.input-topic"]) + fun kafkaInputListenerContainer( + consumerFactory: ConsumerFactory, + kafkaProperties: KafkaProperties, + kafkaInputListener: KafkaInputListener + ): KafkaMessageListenerContainer { + val containerProperties = ContainerProperties(kafkaProperties.inputTopic) + containerProperties.messageListener = kafkaInputListener + return KafkaMessageListenerContainer(consumerFactory, containerProperties) + } + + @Bean + @ConditionalOnProperty(value = ["app.kafka.input-topic"]) + fun kafkaInputListener( + applicationEventPublisher: ApplicationEventPublisher, + ): KafkaInputListener { + return KafkaInputListener(applicationEventPublisher) + } + @Bean fun connectionCheckService(consumerFactory: ConsumerFactory, configsUpdateProducer: Sinks.Many): ConnectionCheckService { return KafkaConnectionCheckService(consumerFactory.createConsumer(), configsUpdateProducer) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt new file mode 100644 index 0000000..ee6b56e --- /dev/null +++ b/src/main/kotlin/dev/dnpm/etl/processor/input/KafkaInputListener.kt @@ -0,0 +1,33 @@ +/* + * This file is part of ETL-Processor + * + * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package dev.dnpm.etl.processor.input + +import de.ukw.ccc.bwhc.dto.MtbFile +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.context.ApplicationEventPublisher +import org.springframework.kafka.listener.MessageListener + +class KafkaInputListener( + private val applicationEventPublisher: ApplicationEventPublisher +) : MessageListener { + override fun onMessage(data: ConsumerRecord) { + TODO("Not yet implemented") + } +} \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 01c7d43..09edd31 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -22,7 +22,7 @@ package dev.dnpm.etl.processor.output import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.Consent import de.ukw.ccc.bwhc.dto.MtbFile -import dev.dnpm.etl.processor.config.KafkaTargetProperties +import dev.dnpm.etl.processor.config.KafkaProperties import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate @@ -30,7 +30,7 @@ import org.springframework.retry.support.RetryTemplate class KafkaMtbFileSender( private val kafkaTemplate: KafkaTemplate, - private val kafkaTargetProperties: KafkaTargetProperties, + private val kafkaProperties: KafkaProperties, private val retryTemplate: RetryTemplate, private val objectMapper: ObjectMapper ) : MtbFileSender { @@ -41,7 +41,7 @@ class KafkaMtbFileSender( return try { return retryTemplate.execute { val result = kafkaTemplate.send( - kafkaTargetProperties.topic, + kafkaProperties.topic, key(request), objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile)) ) @@ -71,7 +71,7 @@ class KafkaMtbFileSender( return try { return retryTemplate.execute { val result = kafkaTemplate.send( - kafkaTargetProperties.topic, + kafkaProperties.topic, key(request), objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile)) ) @@ -90,7 +90,7 @@ class KafkaMtbFileSender( } override fun endpoint(): String { - return "${this.kafkaTargetProperties.servers} (${this.kafkaTargetProperties.topic}/${this.kafkaTargetProperties.responseTopic})" + return "${this.kafkaProperties.servers} (${this.kafkaProperties.topic}/${this.kafkaProperties.responseTopic})" } private fun key(request: MtbFileSender.MtbFileRequest): String { diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt index d0f7c30..ff5b1ae 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt @@ -21,7 +21,7 @@ package dev.dnpm.etl.processor.output import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.* -import dev.dnpm.etl.processor.config.KafkaTargetProperties +import dev.dnpm.etl.processor.config.KafkaProperties import dev.dnpm.etl.processor.monitoring.RequestStatus import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach @@ -53,13 +53,13 @@ class KafkaMtbFileSenderTest { fun setup( @Mock kafkaTemplate: KafkaTemplate ) { - val kafkaTargetProperties = KafkaTargetProperties("testtopic") + val kafkaProperties = KafkaProperties("testtopic") val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(1)).build() this.objectMapper = ObjectMapper() this.kafkaTemplate = kafkaTemplate - this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper) + this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaProperties, retryTemplate, objectMapper) } @ParameterizedTest @@ -125,9 +125,9 @@ class KafkaMtbFileSenderTest { @ParameterizedTest @MethodSource("requestWithResponseSource") fun shouldRetryOnMtbFileKafkaSendError(testData: TestData) { - val kafkaTargetProperties = KafkaTargetProperties("testtopic") + val kafkaProperties = KafkaProperties("testtopic") val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build() - this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaTargetProperties, retryTemplate, this.objectMapper) + this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaProperties, retryTemplate, this.objectMapper) doAnswer { if (null != testData.exception) { @@ -151,9 +151,9 @@ class KafkaMtbFileSenderTest { @ParameterizedTest @MethodSource("requestWithResponseSource") fun shouldRetryOnDeleteKafkaSendError(testData: TestData) { - val kafkaTargetProperties = KafkaTargetProperties("testtopic") + val kafkaProperties = KafkaProperties("testtopic") val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build() - this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaTargetProperties, retryTemplate, this.objectMapper) + this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaProperties, retryTemplate, this.objectMapper) doAnswer { if (null != testData.exception) { From 50b8f7bbd4107fb228d26101a11e67067493b70b Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 29 Feb 2024 08:26:54 +0100 Subject: [PATCH 2/4] feat: use global RetryTemplate --- .../pseudonym/GpasPseudonymGenerator.java | 72 +++++-------------- .../etl/processor/config/AppConfiguration.kt | 23 ++++-- 2 files changed, 35 insertions(+), 60 deletions(-) diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java index 62e702f..3d367bc 100644 --- a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java +++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java @@ -22,21 +22,6 @@ package dev.dnpm.etl.processor.pseudonym; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.parser.IParser; import dev.dnpm.etl.processor.config.GPasConfigProperties; -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.net.ConnectException; -import java.security.KeyManagementException; -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; -import java.security.cert.CertificateFactory; -import java.security.cert.X509Certificate; -import java.util.Base64; -import java.util.HashMap; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManagerFactory; import org.apache.commons.lang3.StringUtils; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.HttpClients; @@ -54,35 +39,39 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; +import org.springframework.http.*; import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; -import org.springframework.retry.RetryCallback; -import org.springframework.retry.RetryContext; -import org.springframework.retry.RetryListener; -import org.springframework.retry.RetryPolicy; -import org.springframework.retry.backoff.ExponentialBackOffPolicy; -import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryTemplate; -import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.util.Base64; + public class GpasPseudonymGenerator implements Generator { private final static FhirContext r4Context = FhirContext.forR4(); private final String gPasUrl; private final String psnTargetDomain; private final HttpHeaders httpHeader; - private final RetryTemplate retryTemplate = defaultTemplate(); + private final RetryTemplate retryTemplate; private final Logger log = LoggerFactory.getLogger(GpasPseudonymGenerator.class); private SSLContext customSslContext; private RestTemplate restTemplate; - public GpasPseudonymGenerator(GPasConfigProperties gpasCfg) { + public GpasPseudonymGenerator(GPasConfigProperties gpasCfg, RetryTemplate retryTemplate) { + this.retryTemplate = retryTemplate; this.gPasUrl = gpasCfg.getUri(); this.psnTargetDomain = gpasCfg.getTarget(); @@ -202,31 +191,6 @@ public class GpasPseudonymGenerator implements Generator { return headers; } - protected RetryTemplate defaultTemplate() { - RetryTemplate retryTemplate = new RetryTemplate(); - ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); - backOffPolicy.setInitialInterval(1000); - backOffPolicy.setMultiplier(1.25); - retryTemplate.setBackOffPolicy(backOffPolicy); - HashMap, Boolean> retryableExceptions = new HashMap<>(); - retryableExceptions.put(RestClientException.class, true); - retryableExceptions.put(ConnectException.class, true); - RetryPolicy retryPolicy = new SimpleRetryPolicy(3, retryableExceptions); - retryTemplate.setRetryPolicy(retryPolicy); - - retryTemplate.registerListener(new RetryListener() { - @Override - public void onError(RetryContext context, - RetryCallback callback, Throwable throwable) { - log.warn("HTTP Error occurred: {}. Retrying {}", throwable.getMessage(), - context.getRetryCount()); - RetryListener.super.onError(context, callback, throwable); - } - }); - - return retryTemplate; - } - /** * Read SSL root certificate and return SSLContext * diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index 92965a6..71911fc 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -35,13 +35,15 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import org.springframework.retry.RetryCallback +import org.springframework.retry.RetryContext +import org.springframework.retry.RetryListener import org.springframework.retry.policy.SimpleRetryPolicy import org.springframework.retry.support.RetryTemplate import org.springframework.retry.support.RetryTemplateBuilder import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.security.crypto.password.PasswordEncoder import org.springframework.security.provisioning.InMemoryUserDetailsManager -import org.springframework.security.provisioning.UserDetailsManager import reactor.core.publisher.Sinks import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration @@ -62,8 +64,8 @@ class AppConfiguration { @ConditionalOnProperty(value = ["app.pseudonymize.generator"], havingValue = "GPAS") @Bean - fun gpasPseudonymGenerator(configProperties: GPasConfigProperties): Generator { - return GpasPseudonymGenerator(configProperties) + fun gpasPseudonymGenerator(configProperties: GPasConfigProperties, retryTemplate: RetryTemplate): Generator { + return GpasPseudonymGenerator(configProperties, retryTemplate) } @ConditionalOnProperty(value = ["app.pseudonymize.generator"], havingValue = "BUILDIN", matchIfMissing = true) @@ -75,8 +77,8 @@ class AppConfiguration { @ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "GPAS") @ConditionalOnMissingBean @Bean - fun gpasPseudonymGeneratorOnDeprecatedProperty(configProperties: GPasConfigProperties): Generator { - return GpasPseudonymGenerator(configProperties) + fun gpasPseudonymGeneratorOnDeprecatedProperty(configProperties: GPasConfigProperties, retryTemplate: RetryTemplate): Generator { + return GpasPseudonymGenerator(configProperties, retryTemplate) } @ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "BUILDIN") @@ -114,8 +116,17 @@ class AppConfiguration { fun retryTemplate(): RetryTemplate { return RetryTemplateBuilder() .notRetryOn(IllegalArgumentException::class.java) - .fixedBackoff(5.seconds.toJavaDuration()) + .exponentialBackoff(2.seconds.toJavaDuration(), 1.25, 5.seconds.toJavaDuration()) .customPolicy(SimpleRetryPolicy(3)) + .withListener(object : RetryListener { + override fun onError( + context: RetryContext, + callback: RetryCallback, + throwable: Throwable + ) { + logger.warn("Error occured: {}. Retrying {}", throwable.message, context.retryCount) + } + }) .build() } From 61e5273158406fac25efc0a715104e103a3ea540 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 29 Feb 2024 08:29:26 +0100 Subject: [PATCH 3/4] feat: add max-retry-attempts config option --- .../dev/dnpm/etl/processor/config/AppConfigProperties.kt | 3 ++- .../kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt index 2eea92e..430648e 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt @@ -30,7 +30,8 @@ data class AppConfigProperties( replacement = "app.pseudonymize.generator" ) var pseudonymizer: PseudonymGenerator = PseudonymGenerator.BUILDIN, - var transformations: List = listOf() + var transformations: List = listOf(), + var maxRetryAttempts: Int = 3 ) { companion object { const val NAME = "app" diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index 71911fc..8fb9e19 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -113,11 +113,11 @@ class AppConfiguration { } @Bean - fun retryTemplate(): RetryTemplate { + fun retryTemplate(configProperties: AppConfigProperties): RetryTemplate { return RetryTemplateBuilder() .notRetryOn(IllegalArgumentException::class.java) .exponentialBackoff(2.seconds.toJavaDuration(), 1.25, 5.seconds.toJavaDuration()) - .customPolicy(SimpleRetryPolicy(3)) + .customPolicy(SimpleRetryPolicy(configProperties.maxRetryAttempts)) .withListener(object : RetryListener { override fun onError( context: RetryContext, From 408b121f269b49da1b2dbfa4c3d7190b6df6b010 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 29 Feb 2024 08:47:17 +0100 Subject: [PATCH 4/4] test: add test for max retry attempts --- .../processor/config/AppConfigurationTest.kt | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt b/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt index d37c251..46a756b 100644 --- a/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt +++ b/src/integrationTest/kotlin/dev/dnpm/etl/processor/config/AppConfigurationTest.kt @@ -38,6 +38,7 @@ import org.springframework.boot.test.context.SpringBootTest import org.springframework.boot.test.mock.mockito.MockBean import org.springframework.boot.test.mock.mockito.MockBeans import org.springframework.context.ApplicationContext +import org.springframework.retry.support.RetryTemplate import org.springframework.security.crypto.password.PasswordEncoder import org.springframework.security.provisioning.InMemoryUserDetailsManager import org.springframework.test.context.ContextConfiguration @@ -276,4 +277,30 @@ class AppConfigurationTest { } + @Nested + @TestPropertySource( + properties = [ + "app.rest.uri=http://localhost:9000", + "app.max-retry-attempts=5" + ] + ) + inner class AppConfigurationRetryTest(private val context: ApplicationContext) { + + private val maxRetryAttempts = 5 + + @Test + fun shouldUseRetryTemplateWithConfiguredMaxAttempts() { + val retryTemplate = context.getBean(RetryTemplate::class.java) + assertThat(retryTemplate).isNotNull + + assertThrows { + retryTemplate.execute { + assertThat(it.retryCount).isLessThan(maxRetryAttempts) + throw RuntimeException() + } + } + } + + } + } \ No newline at end of file