mirror of
https://github.com/pcvolkmer/mv64e-etl-processor
synced 2025-09-13 09:02:50 +00:00
fix: add missing requestId to KafkaMtbFileSender (#142)
This commit is contained in:
@@ -27,7 +27,6 @@ import dev.pcvolkmer.mv64e.mtb.Mtb
|
|||||||
import dev.pcvolkmer.mv64e.mtb.MvhMetadata
|
import dev.pcvolkmer.mv64e.mtb.MvhMetadata
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord
|
import org.apache.kafka.clients.producer.ProducerRecord
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import org.springframework.http.MediaType
|
|
||||||
import org.springframework.kafka.core.KafkaTemplate
|
import org.springframework.kafka.core.KafkaTemplate
|
||||||
import org.springframework.retry.support.RetryTemplate
|
import org.springframework.retry.support.RetryTemplate
|
||||||
|
|
||||||
@@ -47,8 +46,9 @@ class KafkaMtbFileSender(
|
|||||||
ProducerRecord(
|
ProducerRecord(
|
||||||
kafkaProperties.outputTopic,
|
kafkaProperties.outputTopic,
|
||||||
key(request),
|
key(request),
|
||||||
objectMapper.writeValueAsString(request)
|
objectMapper.writeValueAsString(request),
|
||||||
)
|
)
|
||||||
|
record.headers().add("requestId", request.requestId.value.toByteArray())
|
||||||
when (request) {
|
when (request) {
|
||||||
is DnpmV2MtbFileRequest -> record.headers()
|
is DnpmV2MtbFileRequest -> record.headers()
|
||||||
.add(
|
.add(
|
||||||
@@ -82,7 +82,6 @@ class KafkaMtbFileSender(
|
|||||||
ProducerRecord(
|
ProducerRecord(
|
||||||
kafkaProperties.outputTopic,
|
kafkaProperties.outputTopic,
|
||||||
key(request),
|
key(request),
|
||||||
// Always use old BwhcV1FileRequest with Consent REJECT
|
|
||||||
objectMapper.writeValueAsString(
|
objectMapper.writeValueAsString(
|
||||||
DnpmV2MtbFileRequest(
|
DnpmV2MtbFileRequest(
|
||||||
request.requestId,
|
request.requestId,
|
||||||
@@ -90,7 +89,7 @@ class KafkaMtbFileSender(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
record.headers().add("requestId", request.requestId.value.toByteArray())
|
||||||
val result = kafkaTemplate.send(record)
|
val result = kafkaTemplate.send(record)
|
||||||
if (result.get() != null) {
|
if (result.get() != null) {
|
||||||
logger.debug("Sent deletion request via KafkaMtbFileSender")
|
logger.debug("Sent deletion request via KafkaMtbFileSender")
|
||||||
|
@@ -163,6 +163,8 @@ class KafkaMtbFileSenderTest {
|
|||||||
assertThat(captor.firstValue.key()).isEqualTo("{\"pid\": \"PID\"}")
|
assertThat(captor.firstValue.key()).isEqualTo("{\"pid\": \"PID\"}")
|
||||||
assertThat(captor.firstValue.headers().headers("contentType")).isNotNull
|
assertThat(captor.firstValue.headers().headers("contentType")).isNotNull
|
||||||
assertThat(captor.firstValue.headers().headers("contentType")?.firstOrNull()?.value()).isEqualTo(CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray())
|
assertThat(captor.firstValue.headers().headers("contentType")?.firstOrNull()?.value()).isEqualTo(CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray())
|
||||||
|
assertThat(captor.firstValue.headers().headers("requestId")).isNotNull
|
||||||
|
assertThat(captor.firstValue.headers().headers("requestId")?.firstOrNull()?.value()).isEqualTo(TEST_REQUEST_ID.value.toByteArray())
|
||||||
assertThat(captor.firstValue.value()).isNotNull
|
assertThat(captor.firstValue.value()).isNotNull
|
||||||
assertThat(captor.firstValue.value()).isEqualTo(objectMapper.writeValueAsString(dnmpV2kafkaRecordData(TEST_REQUEST_ID)))
|
assertThat(captor.firstValue.value()).isEqualTo(objectMapper.writeValueAsString(dnmpV2kafkaRecordData(TEST_REQUEST_ID)))
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user