1
0
mirror of https://github.com/pcvolkmer/etl-processor.git synced 2025-07-02 22:42:55 +00:00

feat: new kafka config due to kafka input

This commit is contained in:
2024-02-19 17:06:02 +01:00
parent f5c80f6d81
commit 50a6d66718
6 changed files with 130 additions and 27 deletions

View File

@ -21,7 +21,7 @@ package dev.dnpm.etl.processor.output
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.*
import dev.dnpm.etl.processor.config.KafkaTargetProperties
import dev.dnpm.etl.processor.config.KafkaProperties
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
@ -53,13 +53,13 @@ class KafkaMtbFileSenderTest {
fun setup(
@Mock kafkaTemplate: KafkaTemplate<String, String>
) {
val kafkaTargetProperties = KafkaTargetProperties("testtopic")
val kafkaProperties = KafkaProperties("testtopic")
val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(1)).build()
this.objectMapper = ObjectMapper()
this.kafkaTemplate = kafkaTemplate
this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, retryTemplate, objectMapper)
this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaProperties, retryTemplate, objectMapper)
}
@ParameterizedTest
@ -125,9 +125,9 @@ class KafkaMtbFileSenderTest {
@ParameterizedTest
@MethodSource("requestWithResponseSource")
fun shouldRetryOnMtbFileKafkaSendError(testData: TestData) {
val kafkaTargetProperties = KafkaTargetProperties("testtopic")
val kafkaProperties = KafkaProperties("testtopic")
val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build()
this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaTargetProperties, retryTemplate, this.objectMapper)
this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaProperties, retryTemplate, this.objectMapper)
doAnswer {
if (null != testData.exception) {
@ -151,9 +151,9 @@ class KafkaMtbFileSenderTest {
@ParameterizedTest
@MethodSource("requestWithResponseSource")
fun shouldRetryOnDeleteKafkaSendError(testData: TestData) {
val kafkaTargetProperties = KafkaTargetProperties("testtopic")
val kafkaProperties = KafkaProperties("testtopic")
val retryTemplate = RetryTemplateBuilder().customPolicy(SimpleRetryPolicy(3)).build()
this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaTargetProperties, retryTemplate, this.objectMapper)
this.kafkaMtbFileSender = KafkaMtbFileSender(this.kafkaTemplate, kafkaProperties, retryTemplate, this.objectMapper)
doAnswer {
if (null != testData.exception) {