mirror of
https://github.com/pcvolkmer/etl-processor.git
synced 2025-04-20 01:36:50 +00:00
feat #17: initial support for request retry
This commit is contained in:
parent
8a6f9a6e02
commit
4a9cffbaa5
@ -32,8 +32,14 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
|
|||||||
import org.springframework.boot.context.properties.EnableConfigurationProperties
|
import org.springframework.boot.context.properties.EnableConfigurationProperties
|
||||||
import org.springframework.context.annotation.Bean
|
import org.springframework.context.annotation.Bean
|
||||||
import org.springframework.context.annotation.Configuration
|
import org.springframework.context.annotation.Configuration
|
||||||
|
import org.springframework.retry.policy.SimpleRetryPolicy
|
||||||
|
import org.springframework.retry.support.RetryTemplate
|
||||||
|
import org.springframework.retry.support.RetryTemplateBuilder
|
||||||
import org.springframework.scheduling.annotation.EnableScheduling
|
import org.springframework.scheduling.annotation.EnableScheduling
|
||||||
import reactor.core.publisher.Sinks
|
import reactor.core.publisher.Sinks
|
||||||
|
import kotlin.time.Duration.Companion.seconds
|
||||||
|
import kotlin.time.toJavaDuration
|
||||||
|
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@EnableConfigurationProperties(
|
@EnableConfigurationProperties(
|
||||||
@ -89,5 +95,14 @@ class AppConfiguration {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
fun retryTemplate(): RetryTemplate {
|
||||||
|
return RetryTemplateBuilder()
|
||||||
|
.notRetryOn(IllegalArgumentException::class.java)
|
||||||
|
.fixedBackoff(5.seconds.toJavaDuration())
|
||||||
|
.customPolicy(SimpleRetryPolicy(3))
|
||||||
|
.build()
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,6 +37,7 @@ import org.springframework.kafka.core.ConsumerFactory
|
|||||||
import org.springframework.kafka.core.KafkaTemplate
|
import org.springframework.kafka.core.KafkaTemplate
|
||||||
import org.springframework.kafka.listener.ContainerProperties
|
import org.springframework.kafka.listener.ContainerProperties
|
||||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
||||||
|
import org.springframework.retry.support.RetryTemplate
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@EnableConfigurationProperties(
|
@EnableConfigurationProperties(
|
||||||
@ -53,10 +54,11 @@ class AppKafkaConfiguration {
|
|||||||
fun kafkaMtbFileSender(
|
fun kafkaMtbFileSender(
|
||||||
kafkaTemplate: KafkaTemplate<String, String>,
|
kafkaTemplate: KafkaTemplate<String, String>,
|
||||||
kafkaTargetProperties: KafkaTargetProperties,
|
kafkaTargetProperties: KafkaTargetProperties,
|
||||||
|
retryTemplate: RetryTemplate,
|
||||||
objectMapper: ObjectMapper
|
objectMapper: ObjectMapper
|
||||||
): MtbFileSender {
|
): MtbFileSender {
|
||||||
logger.info("Selected 'KafkaMtbFileSender'")
|
logger.info("Selected 'KafkaMtbFileSender'")
|
||||||
return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper)
|
return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
@ -30,6 +30,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
|
|||||||
import org.springframework.context.annotation.Bean
|
import org.springframework.context.annotation.Bean
|
||||||
import org.springframework.context.annotation.Configuration
|
import org.springframework.context.annotation.Configuration
|
||||||
import org.springframework.core.annotation.Order
|
import org.springframework.core.annotation.Order
|
||||||
|
import org.springframework.retry.support.RetryTemplate
|
||||||
import org.springframework.web.client.RestTemplate
|
import org.springframework.web.client.RestTemplate
|
||||||
|
|
||||||
@Configuration
|
@Configuration
|
||||||
@ -51,9 +52,13 @@ class AppRestConfiguration {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
fun restMtbFileSender(restTemplate: RestTemplate, restTargetProperties: RestTargetProperties): MtbFileSender {
|
fun restMtbFileSender(
|
||||||
|
restTemplate: RestTemplate,
|
||||||
|
restTargetProperties: RestTargetProperties,
|
||||||
|
retryTemplate: RetryTemplate
|
||||||
|
): MtbFileSender {
|
||||||
logger.info("Selected 'RestMtbFileSender'")
|
logger.info("Selected 'RestMtbFileSender'")
|
||||||
return RestMtbFileSender(restTemplate, restTargetProperties)
|
return RestMtbFileSender(restTemplate, restTargetProperties, retryTemplate)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
/*
|
/*
|
||||||
* This file is part of ETL-Processor
|
* This file is part of ETL-Processor
|
||||||
*
|
*
|
||||||
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
* Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
||||||
*
|
*
|
||||||
* This program is free software: you can redistribute it and/or modify
|
* This program is free software: you can redistribute it and/or modify
|
||||||
* it under the terms of the GNU Affero General Public License as published
|
* it under the terms of the GNU Affero General Public License as published
|
||||||
@ -26,10 +26,12 @@ import dev.dnpm.etl.processor.config.KafkaTargetProperties
|
|||||||
import dev.dnpm.etl.processor.monitoring.RequestStatus
|
import dev.dnpm.etl.processor.monitoring.RequestStatus
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import org.springframework.kafka.core.KafkaTemplate
|
import org.springframework.kafka.core.KafkaTemplate
|
||||||
|
import org.springframework.retry.support.RetryTemplate
|
||||||
|
|
||||||
class KafkaMtbFileSender(
|
class KafkaMtbFileSender(
|
||||||
private val kafkaTemplate: KafkaTemplate<String, String>,
|
private val kafkaTemplate: KafkaTemplate<String, String>,
|
||||||
private val kafkaTargetProperties: KafkaTargetProperties,
|
private val kafkaTargetProperties: KafkaTargetProperties,
|
||||||
|
private val retryTemplate: RetryTemplate,
|
||||||
private val objectMapper: ObjectMapper
|
private val objectMapper: ObjectMapper
|
||||||
) : MtbFileSender {
|
) : MtbFileSender {
|
||||||
|
|
||||||
@ -37,6 +39,7 @@ class KafkaMtbFileSender(
|
|||||||
|
|
||||||
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
|
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
|
||||||
return try {
|
return try {
|
||||||
|
return retryTemplate.execute<MtbFileSender.Response, Exception> {
|
||||||
val result = kafkaTemplate.send(
|
val result = kafkaTemplate.send(
|
||||||
kafkaTargetProperties.topic,
|
kafkaTargetProperties.topic,
|
||||||
key(request),
|
key(request),
|
||||||
@ -48,6 +51,7 @@ class KafkaMtbFileSender(
|
|||||||
} else {
|
} else {
|
||||||
MtbFileSender.Response(RequestStatus.ERROR)
|
MtbFileSender.Response(RequestStatus.ERROR)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
logger.error("An error occurred sending to kafka", e)
|
logger.error("An error occurred sending to kafka", e)
|
||||||
MtbFileSender.Response(RequestStatus.ERROR)
|
MtbFileSender.Response(RequestStatus.ERROR)
|
||||||
@ -65,6 +69,7 @@ class KafkaMtbFileSender(
|
|||||||
.build()
|
.build()
|
||||||
|
|
||||||
return try {
|
return try {
|
||||||
|
return retryTemplate.execute<MtbFileSender.Response, Exception> {
|
||||||
val result = kafkaTemplate.send(
|
val result = kafkaTemplate.send(
|
||||||
kafkaTargetProperties.topic,
|
kafkaTargetProperties.topic,
|
||||||
key(request),
|
key(request),
|
||||||
@ -77,6 +82,7 @@ class KafkaMtbFileSender(
|
|||||||
} else {
|
} else {
|
||||||
MtbFileSender.Response(RequestStatus.ERROR)
|
MtbFileSender.Response(RequestStatus.ERROR)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
logger.error("An error occurred sending to kafka", e)
|
logger.error("An error occurred sending to kafka", e)
|
||||||
MtbFileSender.Response(RequestStatus.ERROR)
|
MtbFileSender.Response(RequestStatus.ERROR)
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
/*
|
/*
|
||||||
* This file is part of ETL-Processor
|
* This file is part of ETL-Processor
|
||||||
*
|
*
|
||||||
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
* Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
||||||
*
|
*
|
||||||
* This program is free software: you can redistribute it and/or modify
|
* This program is free software: you can redistribute it and/or modify
|
||||||
* it under the terms of the GNU Affero General Public License as published
|
* it under the terms of the GNU Affero General Public License as published
|
||||||
@ -25,18 +25,21 @@ import org.slf4j.LoggerFactory
|
|||||||
import org.springframework.http.HttpEntity
|
import org.springframework.http.HttpEntity
|
||||||
import org.springframework.http.HttpHeaders
|
import org.springframework.http.HttpHeaders
|
||||||
import org.springframework.http.MediaType
|
import org.springframework.http.MediaType
|
||||||
|
import org.springframework.retry.support.RetryTemplate
|
||||||
import org.springframework.web.client.RestClientException
|
import org.springframework.web.client.RestClientException
|
||||||
import org.springframework.web.client.RestTemplate
|
import org.springframework.web.client.RestTemplate
|
||||||
|
|
||||||
class RestMtbFileSender(
|
class RestMtbFileSender(
|
||||||
private val restTemplate: RestTemplate,
|
private val restTemplate: RestTemplate,
|
||||||
private val restTargetProperties: RestTargetProperties
|
private val restTargetProperties: RestTargetProperties,
|
||||||
|
private val retryTemplate: RetryTemplate
|
||||||
) : MtbFileSender {
|
) : MtbFileSender {
|
||||||
|
|
||||||
private val logger = LoggerFactory.getLogger(RestMtbFileSender::class.java)
|
private val logger = LoggerFactory.getLogger(RestMtbFileSender::class.java)
|
||||||
|
|
||||||
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
|
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
|
||||||
try {
|
try {
|
||||||
|
return retryTemplate.execute<MtbFileSender.Response, Exception> {
|
||||||
val headers = HttpHeaders()
|
val headers = HttpHeaders()
|
||||||
headers.contentType = MediaType.APPLICATION_JSON
|
headers.contentType = MediaType.APPLICATION_JSON
|
||||||
val entityReq = HttpEntity(request.mtbFile, headers)
|
val entityReq = HttpEntity(request.mtbFile, headers)
|
||||||
@ -47,10 +50,14 @@ class RestMtbFileSender(
|
|||||||
)
|
)
|
||||||
if (!response.statusCode.is2xxSuccessful) {
|
if (!response.statusCode.is2xxSuccessful) {
|
||||||
logger.warn("Error sending to remote system: {}", response.body)
|
logger.warn("Error sending to remote system: {}", response.body)
|
||||||
return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}")
|
return@execute MtbFileSender.Response(
|
||||||
|
response.statusCode.asRequestStatus(),
|
||||||
|
"Status-Code: ${response.statusCode.value()}"
|
||||||
|
)
|
||||||
}
|
}
|
||||||
logger.debug("Sent file via RestMtbFileSender")
|
logger.debug("Sent file via RestMtbFileSender")
|
||||||
return MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty())
|
return@execute MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty())
|
||||||
|
}
|
||||||
} catch (e: IllegalArgumentException) {
|
} catch (e: IllegalArgumentException) {
|
||||||
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
|
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
|
||||||
} catch (e: RestClientException) {
|
} catch (e: RestClientException) {
|
||||||
@ -62,6 +69,7 @@ class RestMtbFileSender(
|
|||||||
|
|
||||||
override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response {
|
override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response {
|
||||||
try {
|
try {
|
||||||
|
return retryTemplate.execute<MtbFileSender.Response, Exception> {
|
||||||
val headers = HttpHeaders()
|
val headers = HttpHeaders()
|
||||||
headers.contentType = MediaType.APPLICATION_JSON
|
headers.contentType = MediaType.APPLICATION_JSON
|
||||||
val entityReq = HttpEntity(null, headers)
|
val entityReq = HttpEntity(null, headers)
|
||||||
@ -71,7 +79,8 @@ class RestMtbFileSender(
|
|||||||
String::class.java
|
String::class.java
|
||||||
)
|
)
|
||||||
logger.debug("Sent file via RestMtbFileSender")
|
logger.debug("Sent file via RestMtbFileSender")
|
||||||
return MtbFileSender.Response(RequestStatus.SUCCESS)
|
return@execute MtbFileSender.Response(RequestStatus.SUCCESS)
|
||||||
|
}
|
||||||
} catch (e: IllegalArgumentException) {
|
} catch (e: IllegalArgumentException) {
|
||||||
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
|
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
|
||||||
} catch (e: RestClientException) {
|
} catch (e: RestClientException) {
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
/*
|
/*
|
||||||
* This file is part of ETL-Processor
|
* This file is part of ETL-Processor
|
||||||
*
|
*
|
||||||
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
* Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
||||||
*
|
*
|
||||||
* This program is free software: you can redistribute it and/or modify
|
* This program is free software: you can redistribute it and/or modify
|
||||||
* it under the terms of the GNU Affero General Public License as published
|
* it under the terms of the GNU Affero General Public License as published
|
||||||
@ -35,6 +35,8 @@ import org.mockito.junit.jupiter.MockitoExtension
|
|||||||
import org.mockito.kotlin.*
|
import org.mockito.kotlin.*
|
||||||
import org.springframework.kafka.core.KafkaTemplate
|
import org.springframework.kafka.core.KafkaTemplate
|
||||||
import org.springframework.kafka.support.SendResult
|
import org.springframework.kafka.support.SendResult
|
||||||
|
import org.springframework.retry.policy.SimpleRetryPolicy
|
||||||
|
import org.springframework.retry.support.RetryTemplateBuilder
|
||||||
import java.util.concurrent.CompletableFuture.completedFuture
|
import java.util.concurrent.CompletableFuture.completedFuture
|
||||||
import java.util.concurrent.ExecutionException
|
import java.util.concurrent.ExecutionException
|
||||||
|
|
||||||
@ -52,10 +54,12 @@ class KafkaMtbFileSenderTest {
|
|||||||
@Mock kafkaTemplate: KafkaTemplate<String, String>
|
@Mock kafkaTemplate: KafkaTemplate<String, String>
|
||||||
) {
|
) {
|
||||||
val kafkaTargetProperties = KafkaTargetProperties("testtopic")
|
val kafkaTargetProperties = KafkaTargetProperties("testtopic")
|
||||||
|
val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(1)).build()
|
||||||
|
|
||||||
this.objectMapper = ObjectMapper()
|
this.objectMapper = ObjectMapper()
|
||||||
this.kafkaTemplate = kafkaTemplate
|
this.kafkaTemplate = kafkaTemplate
|
||||||
|
|
||||||
this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper)
|
this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
/*
|
/*
|
||||||
* This file is part of ETL-Processor
|
* This file is part of ETL-Processor
|
||||||
*
|
*
|
||||||
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
* Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
||||||
*
|
*
|
||||||
* This program is free software: you can redistribute it and/or modify
|
* This program is free software: you can redistribute it and/or modify
|
||||||
* it under the terms of the GNU Affero General Public License as published
|
* it under the terms of the GNU Affero General Public License as published
|
||||||
@ -28,6 +28,8 @@ import org.junit.jupiter.params.ParameterizedTest
|
|||||||
import org.junit.jupiter.params.provider.MethodSource
|
import org.junit.jupiter.params.provider.MethodSource
|
||||||
import org.springframework.http.HttpMethod
|
import org.springframework.http.HttpMethod
|
||||||
import org.springframework.http.HttpStatus
|
import org.springframework.http.HttpStatus
|
||||||
|
import org.springframework.retry.policy.SimpleRetryPolicy
|
||||||
|
import org.springframework.retry.support.RetryTemplateBuilder
|
||||||
import org.springframework.test.web.client.MockRestServiceServer
|
import org.springframework.test.web.client.MockRestServiceServer
|
||||||
import org.springframework.test.web.client.match.MockRestRequestMatchers.method
|
import org.springframework.test.web.client.match.MockRestRequestMatchers.method
|
||||||
import org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo
|
import org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo
|
||||||
@ -44,10 +46,11 @@ class RestMtbFileSenderTest {
|
|||||||
fun setup() {
|
fun setup() {
|
||||||
val restTemplate = RestTemplate()
|
val restTemplate = RestTemplate()
|
||||||
val restTargetProperties = RestTargetProperties("http://localhost:9000/mtbfile")
|
val restTargetProperties = RestTargetProperties("http://localhost:9000/mtbfile")
|
||||||
|
val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(1)).build()
|
||||||
|
|
||||||
this.mockRestServiceServer = MockRestServiceServer.createServer(restTemplate)
|
this.mockRestServiceServer = MockRestServiceServer.createServer(restTemplate)
|
||||||
|
|
||||||
this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties)
|
this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties, retryTemplate)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
|
Loading…
x
Reference in New Issue
Block a user