From 8dc82225a4cd45a315fac3efe4d76513e6d536fc Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 16 Aug 2023 15:25:46 +0200 Subject: [PATCH] Issue #7: Send and expect requestId in record body, not in record key (#8) --- .../processor/output/KafkaMtbFileSender.kt | 12 ++-- .../processor/services/ResponseProcessor.kt | 2 +- .../services/kafka/KafkaResponseProcessor.kt | 58 ++++++++----------- src/main/resources/application-dev.yml | 2 +- .../output/KafkaMtbFileSenderTest.kt | 12 ++-- .../kafka/KafkaResponseProcessorTest.kt | 54 +++++++++++++---- 6 files changed, 82 insertions(+), 58 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index e7f9769..5772faf 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -40,7 +40,7 @@ class KafkaMtbFileSender( val result = kafkaTemplate.send( kafkaTargetProperties.topic, key(request), - objectMapper.writeValueAsString(request.mtbFile) + objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile)) ) if (result.get() != null) { logger.debug("Sent file via KafkaMtbFileSender") @@ -68,7 +68,7 @@ class KafkaMtbFileSender( val result = kafkaTemplate.send( kafkaTargetProperties.topic, key(request), - objectMapper.writeValueAsString(dummyMtbFile) + objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile)) ) if (result.get() != null) { @@ -85,12 +85,12 @@ class KafkaMtbFileSender( private fun key(request: MtbFileSender.MtbFileRequest): String { return "{\"pid\": \"${request.mtbFile.patient.id}\", " + - "\"eid\": \"${request.mtbFile.episode.id}\", " + - "\"requestId\": \"${request.requestId}\"}" + "\"eid\": \"${request.mtbFile.episode.id}\"}" } private fun key(request: MtbFileSender.DeleteRequest): String { - return "{\"pid\": \"${request.patientId}\", " + - "\"requestId\": \"${request.requestId}\"}" + return "{\"pid\": \"${request.patientId}\"}" } + + data class Data(val requestId: String, val content: MtbFile) } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt index 677443a..4048348 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/ResponseProcessor.kt @@ -71,7 +71,7 @@ class ResponseProcessor( } else -> { - logger.error("Cannot process response: Unknown response code!") + logger.error("Cannot process response: Unknown response!") return@ifPresentOrElse } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt index 68e3611..a29010f 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessor.kt @@ -41,50 +41,40 @@ class KafkaResponseProcessor( override fun onMessage(data: ConsumerRecord) { try { - Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java)) + Optional.of(objectMapper.readValue(data.value(), ResponseBody::class.java)) } catch (e: Exception) { + logger.error("Cannot process Kafka response", e) Optional.empty() - }.ifPresentOrElse({ responseKey -> - val event = try { - val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) - ResponseEvent( - responseKey.requestId, - Instant.ofEpochMilli(data.timestamp()), - responseBody.statusCode.asRequestStatus(), - when (responseBody.statusCode.asRequestStatus()) { - RequestStatus.SUCCESS -> { - Optional.empty() - } - - RequestStatus.WARNING, RequestStatus.ERROR -> { - Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) - } - - else -> { - logger.error("Kafka response: Unknown response code!") - Optional.empty() - } + }.ifPresentOrElse({ responseBody -> + val event = ResponseEvent( + responseBody.requestId, + Instant.ofEpochMilli(data.timestamp()), + responseBody.statusCode.asRequestStatus(), + when (responseBody.statusCode.asRequestStatus()) { + RequestStatus.SUCCESS -> { + Optional.empty() } - ) - } catch (e: Exception) { - logger.error("Cannot process Kafka response", e) - ResponseEvent( - responseKey.requestId, - Instant.ofEpochMilli(data.timestamp()), - RequestStatus.ERROR, - Optional.of("Cannot process Kafka response") - ) - } + + RequestStatus.WARNING, RequestStatus.ERROR -> { + Optional.of(objectMapper.writeValueAsString(responseBody.statusBody)) + } + + else -> { + logger.error("Kafka response: Unknown response code '{}'!", responseBody.statusCode) + Optional.empty() + } + } + ) eventPublisher.publishEvent(event) }, { - logger.error("No response key in Kafka response") + logger.error("No requestId in Kafka response") }) } - data class ResponseKey(val requestId: String) - data class ResponseBody( + @JsonProperty("request_id") @JsonAlias("requestId") val requestId: String, @JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int, @JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map ) + } \ No newline at end of file diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index b1cc2fc..a60cd8a 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -11,7 +11,7 @@ app: # otherwise connection will not be available kafka: topic: test - response-topic: test-response + response-topic: test_response servers: kafka:9092 server: diff --git a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt index 14bdd5d..3ec9757 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSenderTest.kt @@ -97,9 +97,9 @@ class KafkaMtbFileSenderTest { val captor = argumentCaptor() verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture()) assertThat(captor.firstValue).isNotNull - assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\", \"requestId\": \"TestID\"}") + assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\"}") assertThat(captor.secondValue).isNotNull - assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.ACTIVE))) + assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.ACTIVE))) } @Test @@ -113,9 +113,9 @@ class KafkaMtbFileSenderTest { val captor = argumentCaptor() verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture()) assertThat(captor.firstValue).isNotNull - assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"requestId\": \"TestID\"}") + assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\"}") assertThat(captor.secondValue).isNotNull - assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.REJECTED))) + assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.REJECTED))) } companion object { @@ -154,6 +154,10 @@ class KafkaMtbFileSenderTest { }.build() } + fun kafkaRecordData(requestId: String, consentStatus: Consent.Status): KafkaMtbFileSender.Data { + return KafkaMtbFileSender.Data(requestId, mtbFile(consentStatus)) + } + data class TestData(val requestStatus: RequestStatus, val exception: Throwable? = null) @JvmStatic diff --git a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt index 6d83146..95bf41b 100644 --- a/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt +++ b/src/test/kotlin/dev/dnpm/etl/processor/services/kafka/KafkaResponseProcessorTest.kt @@ -46,7 +46,7 @@ class KafkaResponseProcessorTest { private lateinit var kafkaResponseProcessor: KafkaResponseProcessor private fun createKafkaRecord( - requestId: String? = null, + requestId: String, statusCode: Int = 200, statusBody: Map? = mapOf() ): ConsumerRecord { @@ -54,15 +54,11 @@ class KafkaResponseProcessorTest { "test-topic", 0, 0, - if (requestId == null) { - null - } else { - this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseKey(requestId)) - }, + null, if (statusBody == null) { "" } else { - this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(statusCode, statusBody)) + this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(requestId, statusCode, statusBody)) } ) } @@ -78,17 +74,51 @@ class KafkaResponseProcessorTest { } @Test - fun shouldNotProcessRecordsWithoutValidKey() { - this.kafkaResponseProcessor.onMessage(createKafkaRecord(null, 200)) + fun shouldNotProcessRecordsWithoutRequestIdInBody() { + val record = ConsumerRecord( + "test-topic", + 0, + 0, + null, + """ + { + "statusCode": 200, + "statusBody": {} + } + """.trimIndent() + ) - verify(eventPublisher, never()).publishEvent(any()) + this.kafkaResponseProcessor.onMessage(record) + + verify(eventPublisher, never()).publishEvent(any()) } @Test - fun shouldNotProcessRecordsWithoutValidBody() { + fun shouldProcessRecordsWithAliasNames() { + val record = ConsumerRecord( + "test-topic", + 0, + 0, + null, + """ + { + "request_id": "test0123456789", + "status_code": 200, + "status_body": {} + } + """.trimIndent() + ) + + this.kafkaResponseProcessor.onMessage(record) + + verify(eventPublisher, times(1)).publishEvent(any()) + } + + @Test + fun shouldNotProcessRecordsWithoutValidStatusBody() { this.kafkaResponseProcessor.onMessage(createKafkaRecord(requestId = "TestID1234", statusBody = null)) - verify(eventPublisher, never()).publishEvent(any()) + verify(eventPublisher, never()).publishEvent(any()) } @ParameterizedTest