From ba21d029d13a1f423f36db464df76330a4c30eea Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 27 Aug 2025 15:07:43 +0200 Subject: [PATCH] fix: add missing requestId to KafkaMtbFileSender (#142) --- .../dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt | 7 +++---- .../dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt | 2 ++ 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 8be0a1c..d45fc51 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -27,7 +27,6 @@ import dev.pcvolkmer.mv64e.mtb.Mtb import dev.pcvolkmer.mv64e.mtb.MvhMetadata import org.apache.kafka.clients.producer.ProducerRecord import org.slf4j.LoggerFactory -import org.springframework.http.MediaType import org.springframework.kafka.core.KafkaTemplate import org.springframework.retry.support.RetryTemplate @@ -47,8 +46,9 @@ class KafkaMtbFileSender( ProducerRecord( kafkaProperties.outputTopic, key(request), - objectMapper.writeValueAsString(request) + objectMapper.writeValueAsString(request), ) + record.headers().add("requestId", request.requestId.value.toByteArray()) when (request) { is DnpmV2MtbFileRequest -> record.headers() .add( @@ -82,7 +82,6 @@ class KafkaMtbFileSender( ProducerRecord( kafkaProperties.outputTopic, key(request), - // Always use old BwhcV1FileRequest with Consent REJECT objectMapper.writeValueAsString( DnpmV2MtbFileRequest( request.requestId, @@ -90,7 +89,7 @@ class KafkaMtbFileSender( ) ) ) - + record.headers().add("requestId", request.requestId.value.toByteArray()) val result = kafkaTemplate.send(record) if (result.get() != null) { logger.debug("Sent deletion request via KafkaMtbFileSender") diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt index 1e9c853..d13d5e1 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt @@ -163,6 +163,8 @@ class KafkaMtbFileSenderTest { assertThat(captor.firstValue.key()).isEqualTo("{\"pid\": \"PID\"}") assertThat(captor.firstValue.headers().headers("contentType")).isNotNull assertThat(captor.firstValue.headers().headers("contentType")?.firstOrNull()?.value()).isEqualTo(CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray()) + assertThat(captor.firstValue.headers().headers("requestId")).isNotNull + assertThat(captor.firstValue.headers().headers("requestId")?.firstOrNull()?.value()).isEqualTo(TEST_REQUEST_ID.value.toByteArray()) assertThat(captor.firstValue.value()).isNotNull assertThat(captor.firstValue.value()).isEqualTo(objectMapper.writeValueAsString(dnmpV2kafkaRecordData(TEST_REQUEST_ID))) }