diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index b6bedf5..42632aa 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -32,8 +32,14 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import org.springframework.retry.policy.SimpleRetryPolicy +import org.springframework.retry.support.RetryTemplate +import org.springframework.retry.support.RetryTemplateBuilder import org.springframework.scheduling.annotation.EnableScheduling import reactor.core.publisher.Sinks +import kotlin.time.Duration.Companion.seconds +import kotlin.time.toJavaDuration + @Configuration @EnableConfigurationProperties( @@ -89,5 +95,14 @@ class AppConfiguration { }) } + @Bean + fun retryTemplate(): RetryTemplate { + return RetryTemplateBuilder() + .notRetryOn(IllegalArgumentException::class.java) + .fixedBackoff(5.seconds.toJavaDuration()) + .customPolicy(SimpleRetryPolicy(3)) + .build() + } + } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index c8fbdf5..15ed798 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -37,6 +37,7 @@ import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.listener.ContainerProperties import org.springframework.kafka.listener.KafkaMessageListenerContainer +import org.springframework.retry.support.RetryTemplate @Configuration @EnableConfigurationProperties( @@ -53,10 +54,11 @@ class AppKafkaConfiguration { fun kafkaMtbFileSender( kafkaTemplate: KafkaTemplate, kafkaTargetProperties: KafkaTargetProperties, + retryTemplate: RetryTemplate, objectMapper: ObjectMapper ): MtbFileSender { logger.info("Selected 'KafkaMtbFileSender'") - return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper) + return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper) } @Bean diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt index 2596e1c..64e91e7 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt @@ -30,6 +30,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.core.annotation.Order +import org.springframework.retry.support.RetryTemplate import org.springframework.web.client.RestTemplate @Configuration @@ -51,9 +52,13 @@ class AppRestConfiguration { } @Bean - fun restMtbFileSender(restTemplate: RestTemplate, restTargetProperties: RestTargetProperties): MtbFileSender { + fun restMtbFileSender( + restTemplate: RestTemplate, + restTargetProperties: RestTargetProperties, + retryTemplate: RetryTemplate + ): MtbFileSender { logger.info("Selected 'RestMtbFileSender'") - return RestMtbFileSender(restTemplate, restTargetProperties) + return RestMtbFileSender(restTemplate, restTargetProperties, retryTemplate) } @Bean diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 5772faf..8c244b8 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -1,7 +1,7 @@ /* * This file is part of ETL-Processor * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published @@ -26,10 +26,12 @@ import dev.dnpm.etl.processor.config.KafkaTargetProperties import dev.dnpm.etl.processor.monitoring.RequestStatus import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate +import org.springframework.retry.support.RetryTemplate class KafkaMtbFileSender( private val kafkaTemplate: KafkaTemplate, private val kafkaTargetProperties: KafkaTargetProperties, + private val retryTemplate: RetryTemplate, private val objectMapper: ObjectMapper ) : MtbFileSender { @@ -37,16 +39,18 @@ class KafkaMtbFileSender( override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response { return try { - val result = kafkaTemplate.send( - kafkaTargetProperties.topic, - key(request), - objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile)) - ) - if (result.get() != null) { - logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(RequestStatus.UNKNOWN) - } else { - MtbFileSender.Response(RequestStatus.ERROR) + return retryTemplate.execute { + val result = kafkaTemplate.send( + kafkaTargetProperties.topic, + key(request), + objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile)) + ) + if (result.get() != null) { + logger.debug("Sent file via KafkaMtbFileSender") + MtbFileSender.Response(RequestStatus.UNKNOWN) + } else { + MtbFileSender.Response(RequestStatus.ERROR) + } } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) @@ -65,17 +69,19 @@ class KafkaMtbFileSender( .build() return try { - val result = kafkaTemplate.send( - kafkaTargetProperties.topic, - key(request), - objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile)) - ) + return retryTemplate.execute { + val result = kafkaTemplate.send( + kafkaTargetProperties.topic, + key(request), + objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile)) + ) - if (result.get() != null) { - logger.debug("Sent deletion request via KafkaMtbFileSender") - MtbFileSender.Response(RequestStatus.UNKNOWN) - } else { - MtbFileSender.Response(RequestStatus.ERROR) + if (result.get() != null) { + logger.debug("Sent deletion request via KafkaMtbFileSender") + MtbFileSender.Response(RequestStatus.UNKNOWN) + } else { + MtbFileSender.Response(RequestStatus.ERROR) + } } } catch (e: Exception) { logger.error("An error occurred sending to kafka", e) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt index 1c59f5c..5a4ae9e 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSender.kt @@ -1,7 +1,7 @@ /* * This file is part of ETL-Processor * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published @@ -25,32 +25,39 @@ import org.slf4j.LoggerFactory import org.springframework.http.HttpEntity import org.springframework.http.HttpHeaders import org.springframework.http.MediaType +import org.springframework.retry.support.RetryTemplate import org.springframework.web.client.RestClientException import org.springframework.web.client.RestTemplate class RestMtbFileSender( private val restTemplate: RestTemplate, - private val restTargetProperties: RestTargetProperties + private val restTargetProperties: RestTargetProperties, + private val retryTemplate: RetryTemplate ) : MtbFileSender { private val logger = LoggerFactory.getLogger(RestMtbFileSender::class.java) override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response { try { - val headers = HttpHeaders() - headers.contentType = MediaType.APPLICATION_JSON - val entityReq = HttpEntity(request.mtbFile, headers) - val response = restTemplate.postForEntity( - "${restTargetProperties.uri}/MTBFile", - entityReq, - String::class.java - ) - if (!response.statusCode.is2xxSuccessful) { - logger.warn("Error sending to remote system: {}", response.body) - return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}") + return retryTemplate.execute { + val headers = HttpHeaders() + headers.contentType = MediaType.APPLICATION_JSON + val entityReq = HttpEntity(request.mtbFile, headers) + val response = restTemplate.postForEntity( + "${restTargetProperties.uri}/MTBFile", + entityReq, + String::class.java + ) + if (!response.statusCode.is2xxSuccessful) { + logger.warn("Error sending to remote system: {}", response.body) + return@execute MtbFileSender.Response( + response.statusCode.asRequestStatus(), + "Status-Code: ${response.statusCode.value()}" + ) + } + logger.debug("Sent file via RestMtbFileSender") + return@execute MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty()) } - logger.debug("Sent file via RestMtbFileSender") - return MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty()) } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { @@ -62,16 +69,18 @@ class RestMtbFileSender( override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response { try { - val headers = HttpHeaders() - headers.contentType = MediaType.APPLICATION_JSON - val entityReq = HttpEntity(null, headers) - restTemplate.delete( - "${restTargetProperties.uri}/Patient/${request.patientId}", - entityReq, - String::class.java - ) - logger.debug("Sent file via RestMtbFileSender") - return MtbFileSender.Response(RequestStatus.SUCCESS) + return retryTemplate.execute { + val headers = HttpHeaders() + headers.contentType = MediaType.APPLICATION_JSON + val entityReq = HttpEntity(null, headers) + restTemplate.delete( + "${restTargetProperties.uri}/Patient/${request.patientId}", + entityReq, + String::class.java + ) + logger.debug("Sent file via RestMtbFileSender") + return@execute MtbFileSender.Response(RequestStatus.SUCCESS) + } } catch (e: IllegalArgumentException) { logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!) } catch (e: RestClientException) { diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt index 3ec9757..9945538 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt @@ -1,7 +1,7 @@ /* * This file is part of ETL-Processor * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published @@ -35,6 +35,8 @@ import org.mockito.junit.jupiter.MockitoExtension import org.mockito.kotlin.* import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.support.SendResult +import org.springframework.retry.policy.SimpleRetryPolicy +import org.springframework.retry.support.RetryTemplateBuilder import java.util.concurrent.CompletableFuture.completedFuture import java.util.concurrent.ExecutionException @@ -52,10 +54,12 @@ class KafkaMtbFileSenderTest { @Mock kafkaTemplate: KafkaTemplate ) { val kafkaTargetProperties = KafkaTargetProperties("testtopic") + val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(1)).build() + this.objectMapper = ObjectMapper() this.kafkaTemplate = kafkaTemplate - this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper) + this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper) } @ParameterizedTest diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt index 0cad285..e39c61b 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/RestMtbFileSenderTest.kt @@ -1,7 +1,7 @@ /* * This file is part of ETL-Processor * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors + * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published @@ -28,6 +28,8 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource import org.springframework.http.HttpMethod import org.springframework.http.HttpStatus +import org.springframework.retry.policy.SimpleRetryPolicy +import org.springframework.retry.support.RetryTemplateBuilder import org.springframework.test.web.client.MockRestServiceServer import org.springframework.test.web.client.match.MockRestRequestMatchers.method import org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo @@ -44,10 +46,11 @@ class RestMtbFileSenderTest { fun setup() { val restTemplate = RestTemplate() val restTargetProperties = RestTargetProperties("http://localhost:9000/mtbfile") + val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(1)).build() this.mockRestServiceServer = MockRestServiceServer.createServer(restTemplate) - this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties) + this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties, retryTemplate) } @ParameterizedTest