mirror of
https://github.com/pcvolkmer/etl-processor.git
synced 2025-04-19 17:26:51 +00:00
commit
0083e75940
@ -32,8 +32,14 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.retry.policy.SimpleRetryPolicy
|
||||
import org.springframework.retry.support.RetryTemplate
|
||||
import org.springframework.retry.support.RetryTemplateBuilder
|
||||
import org.springframework.scheduling.annotation.EnableScheduling
|
||||
import reactor.core.publisher.Sinks
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
import kotlin.time.toJavaDuration
|
||||
|
||||
|
||||
@Configuration
|
||||
@EnableConfigurationProperties(
|
||||
@ -89,5 +95,14 @@ class AppConfiguration {
|
||||
})
|
||||
}
|
||||
|
||||
@Bean
|
||||
fun retryTemplate(): RetryTemplate {
|
||||
return RetryTemplateBuilder()
|
||||
.notRetryOn(IllegalArgumentException::class.java)
|
||||
.fixedBackoff(5.seconds.toJavaDuration())
|
||||
.customPolicy(SimpleRetryPolicy(3))
|
||||
.build()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -37,6 +37,7 @@ import org.springframework.kafka.core.ConsumerFactory
|
||||
import org.springframework.kafka.core.KafkaTemplate
|
||||
import org.springframework.kafka.listener.ContainerProperties
|
||||
import org.springframework.kafka.listener.KafkaMessageListenerContainer
|
||||
import org.springframework.retry.support.RetryTemplate
|
||||
|
||||
@Configuration
|
||||
@EnableConfigurationProperties(
|
||||
@ -53,10 +54,11 @@ class AppKafkaConfiguration {
|
||||
fun kafkaMtbFileSender(
|
||||
kafkaTemplate: KafkaTemplate<String, String>,
|
||||
kafkaTargetProperties: KafkaTargetProperties,
|
||||
retryTemplate: RetryTemplate,
|
||||
objectMapper: ObjectMapper
|
||||
): MtbFileSender {
|
||||
logger.info("Selected 'KafkaMtbFileSender'")
|
||||
return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper)
|
||||
return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper)
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
@ -30,6 +30,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.core.annotation.Order
|
||||
import org.springframework.retry.support.RetryTemplate
|
||||
import org.springframework.web.client.RestTemplate
|
||||
|
||||
@Configuration
|
||||
@ -51,9 +52,13 @@ class AppRestConfiguration {
|
||||
}
|
||||
|
||||
@Bean
|
||||
fun restMtbFileSender(restTemplate: RestTemplate, restTargetProperties: RestTargetProperties): MtbFileSender {
|
||||
fun restMtbFileSender(
|
||||
restTemplate: RestTemplate,
|
||||
restTargetProperties: RestTargetProperties,
|
||||
retryTemplate: RetryTemplate
|
||||
): MtbFileSender {
|
||||
logger.info("Selected 'RestMtbFileSender'")
|
||||
return RestMtbFileSender(restTemplate, restTargetProperties)
|
||||
return RestMtbFileSender(restTemplate, restTargetProperties, retryTemplate)
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
@ -1,7 +1,7 @@
|
||||
/*
|
||||
* This file is part of ETL-Processor
|
||||
*
|
||||
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
||||
* Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published
|
||||
@ -26,10 +26,12 @@ import dev.dnpm.etl.processor.config.KafkaTargetProperties
|
||||
import dev.dnpm.etl.processor.monitoring.RequestStatus
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.springframework.kafka.core.KafkaTemplate
|
||||
import org.springframework.retry.support.RetryTemplate
|
||||
|
||||
class KafkaMtbFileSender(
|
||||
private val kafkaTemplate: KafkaTemplate<String, String>,
|
||||
private val kafkaTargetProperties: KafkaTargetProperties,
|
||||
private val retryTemplate: RetryTemplate,
|
||||
private val objectMapper: ObjectMapper
|
||||
) : MtbFileSender {
|
||||
|
||||
@ -37,16 +39,18 @@ class KafkaMtbFileSender(
|
||||
|
||||
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
|
||||
return try {
|
||||
val result = kafkaTemplate.send(
|
||||
kafkaTargetProperties.topic,
|
||||
key(request),
|
||||
objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
|
||||
)
|
||||
if (result.get() != null) {
|
||||
logger.debug("Sent file via KafkaMtbFileSender")
|
||||
MtbFileSender.Response(RequestStatus.UNKNOWN)
|
||||
} else {
|
||||
MtbFileSender.Response(RequestStatus.ERROR)
|
||||
return retryTemplate.execute<MtbFileSender.Response, Exception> {
|
||||
val result = kafkaTemplate.send(
|
||||
kafkaTargetProperties.topic,
|
||||
key(request),
|
||||
objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
|
||||
)
|
||||
if (result.get() != null) {
|
||||
logger.debug("Sent file via KafkaMtbFileSender")
|
||||
MtbFileSender.Response(RequestStatus.UNKNOWN)
|
||||
} else {
|
||||
MtbFileSender.Response(RequestStatus.ERROR)
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
logger.error("An error occurred sending to kafka", e)
|
||||
@ -65,17 +69,19 @@ class KafkaMtbFileSender(
|
||||
.build()
|
||||
|
||||
return try {
|
||||
val result = kafkaTemplate.send(
|
||||
kafkaTargetProperties.topic,
|
||||
key(request),
|
||||
objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
|
||||
)
|
||||
return retryTemplate.execute<MtbFileSender.Response, Exception> {
|
||||
val result = kafkaTemplate.send(
|
||||
kafkaTargetProperties.topic,
|
||||
key(request),
|
||||
objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
|
||||
)
|
||||
|
||||
if (result.get() != null) {
|
||||
logger.debug("Sent deletion request via KafkaMtbFileSender")
|
||||
MtbFileSender.Response(RequestStatus.UNKNOWN)
|
||||
} else {
|
||||
MtbFileSender.Response(RequestStatus.ERROR)
|
||||
if (result.get() != null) {
|
||||
logger.debug("Sent deletion request via KafkaMtbFileSender")
|
||||
MtbFileSender.Response(RequestStatus.UNKNOWN)
|
||||
} else {
|
||||
MtbFileSender.Response(RequestStatus.ERROR)
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
logger.error("An error occurred sending to kafka", e)
|
||||
|
@ -1,7 +1,7 @@
|
||||
/*
|
||||
* This file is part of ETL-Processor
|
||||
*
|
||||
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
||||
* Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published
|
||||
@ -25,32 +25,39 @@ import org.slf4j.LoggerFactory
|
||||
import org.springframework.http.HttpEntity
|
||||
import org.springframework.http.HttpHeaders
|
||||
import org.springframework.http.MediaType
|
||||
import org.springframework.retry.support.RetryTemplate
|
||||
import org.springframework.web.client.RestClientException
|
||||
import org.springframework.web.client.RestTemplate
|
||||
|
||||
class RestMtbFileSender(
|
||||
private val restTemplate: RestTemplate,
|
||||
private val restTargetProperties: RestTargetProperties
|
||||
private val restTargetProperties: RestTargetProperties,
|
||||
private val retryTemplate: RetryTemplate
|
||||
) : MtbFileSender {
|
||||
|
||||
private val logger = LoggerFactory.getLogger(RestMtbFileSender::class.java)
|
||||
|
||||
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
|
||||
try {
|
||||
val headers = HttpHeaders()
|
||||
headers.contentType = MediaType.APPLICATION_JSON
|
||||
val entityReq = HttpEntity(request.mtbFile, headers)
|
||||
val response = restTemplate.postForEntity(
|
||||
"${restTargetProperties.uri}/MTBFile",
|
||||
entityReq,
|
||||
String::class.java
|
||||
)
|
||||
if (!response.statusCode.is2xxSuccessful) {
|
||||
logger.warn("Error sending to remote system: {}", response.body)
|
||||
return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}")
|
||||
return retryTemplate.execute<MtbFileSender.Response, Exception> {
|
||||
val headers = HttpHeaders()
|
||||
headers.contentType = MediaType.APPLICATION_JSON
|
||||
val entityReq = HttpEntity(request.mtbFile, headers)
|
||||
val response = restTemplate.postForEntity(
|
||||
"${restTargetProperties.uri}/MTBFile",
|
||||
entityReq,
|
||||
String::class.java
|
||||
)
|
||||
if (!response.statusCode.is2xxSuccessful) {
|
||||
logger.warn("Error sending to remote system: {}", response.body)
|
||||
return@execute MtbFileSender.Response(
|
||||
response.statusCode.asRequestStatus(),
|
||||
"Status-Code: ${response.statusCode.value()}"
|
||||
)
|
||||
}
|
||||
logger.debug("Sent file via RestMtbFileSender")
|
||||
return@execute MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty())
|
||||
}
|
||||
logger.debug("Sent file via RestMtbFileSender")
|
||||
return MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty())
|
||||
} catch (e: IllegalArgumentException) {
|
||||
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
|
||||
} catch (e: RestClientException) {
|
||||
@ -62,16 +69,18 @@ class RestMtbFileSender(
|
||||
|
||||
override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response {
|
||||
try {
|
||||
val headers = HttpHeaders()
|
||||
headers.contentType = MediaType.APPLICATION_JSON
|
||||
val entityReq = HttpEntity(null, headers)
|
||||
restTemplate.delete(
|
||||
"${restTargetProperties.uri}/Patient/${request.patientId}",
|
||||
entityReq,
|
||||
String::class.java
|
||||
)
|
||||
logger.debug("Sent file via RestMtbFileSender")
|
||||
return MtbFileSender.Response(RequestStatus.SUCCESS)
|
||||
return retryTemplate.execute<MtbFileSender.Response, Exception> {
|
||||
val headers = HttpHeaders()
|
||||
headers.contentType = MediaType.APPLICATION_JSON
|
||||
val entityReq = HttpEntity(null, headers)
|
||||
restTemplate.delete(
|
||||
"${restTargetProperties.uri}/Patient/${request.patientId}",
|
||||
entityReq,
|
||||
String::class.java
|
||||
)
|
||||
logger.debug("Sent file via RestMtbFileSender")
|
||||
return@execute MtbFileSender.Response(RequestStatus.SUCCESS)
|
||||
}
|
||||
} catch (e: IllegalArgumentException) {
|
||||
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
|
||||
} catch (e: RestClientException) {
|
||||
|
@ -1,7 +1,7 @@
|
||||
/*
|
||||
* This file is part of ETL-Processor
|
||||
*
|
||||
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
||||
* Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published
|
||||
@ -35,6 +35,8 @@ import org.mockito.junit.jupiter.MockitoExtension
|
||||
import org.mockito.kotlin.*
|
||||
import org.springframework.kafka.core.KafkaTemplate
|
||||
import org.springframework.kafka.support.SendResult
|
||||
import org.springframework.retry.policy.SimpleRetryPolicy
|
||||
import org.springframework.retry.support.RetryTemplateBuilder
|
||||
import java.util.concurrent.CompletableFuture.completedFuture
|
||||
import java.util.concurrent.ExecutionException
|
||||
|
||||
@ -52,10 +54,12 @@ class KafkaMtbFileSenderTest {
|
||||
@Mock kafkaTemplate: KafkaTemplate<String, String>
|
||||
) {
|
||||
val kafkaTargetProperties = KafkaTargetProperties("testtopic")
|
||||
val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(1)).build()
|
||||
|
||||
this.objectMapper = ObjectMapper()
|
||||
this.kafkaTemplate = kafkaTemplate
|
||||
|
||||
this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper)
|
||||
this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper)
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ -118,6 +122,58 @@ class KafkaMtbFileSenderTest {
|
||||
assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.REJECTED)))
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("requestWithResponseSource")
|
||||
fun shouldRetryOnMtbFileKafkaSendError(testData: TestData) {
|
||||
val kafkaTargetProperties = KafkaTargetProperties("testtopic")
|
||||
val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build()
|
||||
this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaTargetProperties, retryTemplate, this.objectMapper)
|
||||
|
||||
doAnswer {
|
||||
if (null != testData.exception) {
|
||||
throw testData.exception
|
||||
}
|
||||
completedFuture(SendResult<String, String>(null, null))
|
||||
}.whenever(kafkaTemplate).send(anyString(), anyString(), anyString())
|
||||
|
||||
kafkaMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile(Consent.Status.ACTIVE)))
|
||||
|
||||
val expectedCount = when (testData.exception) {
|
||||
// OK - No Retry
|
||||
null -> times(1)
|
||||
// Request failed - Retry max 3 times
|
||||
else -> times(3)
|
||||
}
|
||||
|
||||
verify(kafkaTemplate, expectedCount).send(anyString(), anyString(), anyString())
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("requestWithResponseSource")
|
||||
fun shouldRetryOnDeleteKafkaSendError(testData: TestData) {
|
||||
val kafkaTargetProperties = KafkaTargetProperties("testtopic")
|
||||
val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build()
|
||||
this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaTargetProperties, retryTemplate, this.objectMapper)
|
||||
|
||||
doAnswer {
|
||||
if (null != testData.exception) {
|
||||
throw testData.exception
|
||||
}
|
||||
completedFuture(SendResult<String, String>(null, null))
|
||||
}.whenever(kafkaTemplate).send(anyString(), anyString(), anyString())
|
||||
|
||||
kafkaMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID"))
|
||||
|
||||
val expectedCount = when (testData.exception) {
|
||||
// OK - No Retry
|
||||
null -> times(1)
|
||||
// Request failed - Retry max 3 times
|
||||
else -> times(3)
|
||||
}
|
||||
|
||||
verify(kafkaTemplate, expectedCount).send(anyString(), anyString(), anyString())
|
||||
}
|
||||
|
||||
companion object {
|
||||
fun mtbFile(consentStatus: Consent.Status): MtbFile {
|
||||
return if (consentStatus == Consent.Status.ACTIVE) {
|
||||
|
@ -1,7 +1,7 @@
|
||||
/*
|
||||
* This file is part of ETL-Processor
|
||||
*
|
||||
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
||||
* Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published
|
||||
@ -28,6 +28,9 @@ import org.junit.jupiter.params.ParameterizedTest
|
||||
import org.junit.jupiter.params.provider.MethodSource
|
||||
import org.springframework.http.HttpMethod
|
||||
import org.springframework.http.HttpStatus
|
||||
import org.springframework.retry.policy.SimpleRetryPolicy
|
||||
import org.springframework.retry.support.RetryTemplateBuilder
|
||||
import org.springframework.test.web.client.ExpectedCount
|
||||
import org.springframework.test.web.client.MockRestServiceServer
|
||||
import org.springframework.test.web.client.match.MockRestRequestMatchers.method
|
||||
import org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo
|
||||
@ -44,10 +47,11 @@ class RestMtbFileSenderTest {
|
||||
fun setup() {
|
||||
val restTemplate = RestTemplate()
|
||||
val restTargetProperties = RestTargetProperties("http://localhost:9000/mtbfile")
|
||||
val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(1)).build()
|
||||
|
||||
this.mockRestServiceServer = MockRestServiceServer.createServer(restTemplate)
|
||||
|
||||
this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties)
|
||||
this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties, retryTemplate)
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ -80,6 +84,64 @@ class RestMtbFileSenderTest {
|
||||
assertThat(response.body).isEqualTo(requestWithResponse.response.body)
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("mtbFileRequestWithResponseSource")
|
||||
fun shouldRetryOnMtbFileHttpRequestError(requestWithResponse: RequestWithResponse) {
|
||||
val restTemplate = RestTemplate()
|
||||
val restTargetProperties = RestTargetProperties("http://localhost:9000/mtbfile")
|
||||
val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build()
|
||||
|
||||
this.mockRestServiceServer = MockRestServiceServer.createServer(restTemplate)
|
||||
this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties, retryTemplate)
|
||||
|
||||
val expectedCount = when (requestWithResponse.httpStatus) {
|
||||
// OK - No Retry
|
||||
HttpStatus.OK, HttpStatus.CREATED -> ExpectedCount.max(1)
|
||||
// Request failed - Retry max 3 times
|
||||
else -> ExpectedCount.max(3)
|
||||
}
|
||||
|
||||
this.mockRestServiceServer.expect(expectedCount) {
|
||||
method(HttpMethod.POST)
|
||||
requestTo("/mtbfile")
|
||||
}.andRespond {
|
||||
withStatus(requestWithResponse.httpStatus).body(requestWithResponse.body).createResponse(it)
|
||||
}
|
||||
|
||||
val response = restMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile))
|
||||
assertThat(response.status).isEqualTo(requestWithResponse.response.status)
|
||||
assertThat(response.body).isEqualTo(requestWithResponse.response.body)
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("deleteRequestWithResponseSource")
|
||||
fun shouldRetryOnDeleteHttpRequestError(requestWithResponse: RequestWithResponse) {
|
||||
val restTemplate = RestTemplate()
|
||||
val restTargetProperties = RestTargetProperties("http://localhost:9000/mtbfile")
|
||||
val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build()
|
||||
|
||||
this.mockRestServiceServer = MockRestServiceServer.createServer(restTemplate)
|
||||
this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties, retryTemplate)
|
||||
|
||||
val expectedCount = when (requestWithResponse.httpStatus) {
|
||||
// OK - No Retry
|
||||
HttpStatus.OK, HttpStatus.CREATED -> ExpectedCount.max(1)
|
||||
// Request failed - Retry max 3 times
|
||||
else -> ExpectedCount.max(3)
|
||||
}
|
||||
|
||||
this.mockRestServiceServer.expect(expectedCount) {
|
||||
method(HttpMethod.DELETE)
|
||||
requestTo("/mtbfile")
|
||||
}.andRespond {
|
||||
withStatus(requestWithResponse.httpStatus).body(requestWithResponse.body).createResponse(it)
|
||||
}
|
||||
|
||||
val response = restMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID"))
|
||||
assertThat(response.status).isEqualTo(requestWithResponse.response.status)
|
||||
assertThat(response.body).isEqualTo(requestWithResponse.response.body)
|
||||
}
|
||||
|
||||
companion object {
|
||||
data class RequestWithResponse(
|
||||
val httpStatus: HttpStatus,
|
||||
|
Loading…
x
Reference in New Issue
Block a user