1
0
mirror of https://github.com/pcvolkmer/etl-processor.git synced 2025-04-20 01:36:50 +00:00

Issue #7: Send and expect requestId in record body, not in record key (#8)

This commit is contained in:
Paul-Christian Volkmer 2023-08-16 15:25:46 +02:00 committed by GitHub
parent 2eb5cc61b9
commit 8dc82225a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 82 additions and 58 deletions

View File

@ -40,7 +40,7 @@ class KafkaMtbFileSender(
val result = kafkaTemplate.send( val result = kafkaTemplate.send(
kafkaTargetProperties.topic, kafkaTargetProperties.topic,
key(request), key(request),
objectMapper.writeValueAsString(request.mtbFile) objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
) )
if (result.get() != null) { if (result.get() != null) {
logger.debug("Sent file via KafkaMtbFileSender") logger.debug("Sent file via KafkaMtbFileSender")
@ -68,7 +68,7 @@ class KafkaMtbFileSender(
val result = kafkaTemplate.send( val result = kafkaTemplate.send(
kafkaTargetProperties.topic, kafkaTargetProperties.topic,
key(request), key(request),
objectMapper.writeValueAsString(dummyMtbFile) objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
) )
if (result.get() != null) { if (result.get() != null) {
@ -85,12 +85,12 @@ class KafkaMtbFileSender(
private fun key(request: MtbFileSender.MtbFileRequest): String { private fun key(request: MtbFileSender.MtbFileRequest): String {
return "{\"pid\": \"${request.mtbFile.patient.id}\", " + return "{\"pid\": \"${request.mtbFile.patient.id}\", " +
"\"eid\": \"${request.mtbFile.episode.id}\", " + "\"eid\": \"${request.mtbFile.episode.id}\"}"
"\"requestId\": \"${request.requestId}\"}"
} }
private fun key(request: MtbFileSender.DeleteRequest): String { private fun key(request: MtbFileSender.DeleteRequest): String {
return "{\"pid\": \"${request.patientId}\", " + return "{\"pid\": \"${request.patientId}\"}"
"\"requestId\": \"${request.requestId}\"}"
} }
data class Data(val requestId: String, val content: MtbFile)
} }

View File

@ -71,7 +71,7 @@ class ResponseProcessor(
} }
else -> { else -> {
logger.error("Cannot process response: Unknown response code!") logger.error("Cannot process response: Unknown response!")
return@ifPresentOrElse return@ifPresentOrElse
} }
} }

View File

@ -41,50 +41,40 @@ class KafkaResponseProcessor(
override fun onMessage(data: ConsumerRecord<String, String>) { override fun onMessage(data: ConsumerRecord<String, String>) {
try { try {
Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java)) Optional.of(objectMapper.readValue(data.value(), ResponseBody::class.java))
} catch (e: Exception) { } catch (e: Exception) {
logger.error("Cannot process Kafka response", e)
Optional.empty() Optional.empty()
}.ifPresentOrElse({ responseKey -> }.ifPresentOrElse({ responseBody ->
val event = try { val event = ResponseEvent(
val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java) responseBody.requestId,
ResponseEvent( Instant.ofEpochMilli(data.timestamp()),
responseKey.requestId, responseBody.statusCode.asRequestStatus(),
Instant.ofEpochMilli(data.timestamp()), when (responseBody.statusCode.asRequestStatus()) {
responseBody.statusCode.asRequestStatus(), RequestStatus.SUCCESS -> {
when (responseBody.statusCode.asRequestStatus()) { Optional.empty()
RequestStatus.SUCCESS -> {
Optional.empty()
}
RequestStatus.WARNING, RequestStatus.ERROR -> {
Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
}
else -> {
logger.error("Kafka response: Unknown response code!")
Optional.empty()
}
} }
)
} catch (e: Exception) { RequestStatus.WARNING, RequestStatus.ERROR -> {
logger.error("Cannot process Kafka response", e) Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
ResponseEvent( }
responseKey.requestId,
Instant.ofEpochMilli(data.timestamp()), else -> {
RequestStatus.ERROR, logger.error("Kafka response: Unknown response code '{}'!", responseBody.statusCode)
Optional.of("Cannot process Kafka response") Optional.empty()
) }
} }
)
eventPublisher.publishEvent(event) eventPublisher.publishEvent(event)
}, { }, {
logger.error("No response key in Kafka response") logger.error("No requestId in Kafka response")
}) })
} }
data class ResponseKey(val requestId: String)
data class ResponseBody( data class ResponseBody(
@JsonProperty("request_id") @JsonAlias("requestId") val requestId: String,
@JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int, @JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int,
@JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map<String, Any> @JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map<String, Any>
) )
} }

View File

@ -11,7 +11,7 @@ app:
# otherwise connection will not be available # otherwise connection will not be available
kafka: kafka:
topic: test topic: test
response-topic: test-response response-topic: test_response
servers: kafka:9092 servers: kafka:9092
server: server:

View File

@ -97,9 +97,9 @@ class KafkaMtbFileSenderTest {
val captor = argumentCaptor<String>() val captor = argumentCaptor<String>()
verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture()) verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture())
assertThat(captor.firstValue).isNotNull assertThat(captor.firstValue).isNotNull
assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\", \"requestId\": \"TestID\"}") assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\"}")
assertThat(captor.secondValue).isNotNull assertThat(captor.secondValue).isNotNull
assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.ACTIVE))) assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.ACTIVE)))
} }
@Test @Test
@ -113,9 +113,9 @@ class KafkaMtbFileSenderTest {
val captor = argumentCaptor<String>() val captor = argumentCaptor<String>()
verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture()) verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture())
assertThat(captor.firstValue).isNotNull assertThat(captor.firstValue).isNotNull
assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"requestId\": \"TestID\"}") assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\"}")
assertThat(captor.secondValue).isNotNull assertThat(captor.secondValue).isNotNull
assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.REJECTED))) assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.REJECTED)))
} }
companion object { companion object {
@ -154,6 +154,10 @@ class KafkaMtbFileSenderTest {
}.build() }.build()
} }
fun kafkaRecordData(requestId: String, consentStatus: Consent.Status): KafkaMtbFileSender.Data {
return KafkaMtbFileSender.Data(requestId, mtbFile(consentStatus))
}
data class TestData(val requestStatus: RequestStatus, val exception: Throwable? = null) data class TestData(val requestStatus: RequestStatus, val exception: Throwable? = null)
@JvmStatic @JvmStatic

View File

@ -46,7 +46,7 @@ class KafkaResponseProcessorTest {
private lateinit var kafkaResponseProcessor: KafkaResponseProcessor private lateinit var kafkaResponseProcessor: KafkaResponseProcessor
private fun createKafkaRecord( private fun createKafkaRecord(
requestId: String? = null, requestId: String,
statusCode: Int = 200, statusCode: Int = 200,
statusBody: Map<String, Any>? = mapOf() statusBody: Map<String, Any>? = mapOf()
): ConsumerRecord<String, String> { ): ConsumerRecord<String, String> {
@ -54,15 +54,11 @@ class KafkaResponseProcessorTest {
"test-topic", "test-topic",
0, 0,
0, 0,
if (requestId == null) { null,
null
} else {
this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseKey(requestId))
},
if (statusBody == null) { if (statusBody == null) {
"" ""
} else { } else {
this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(statusCode, statusBody)) this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(requestId, statusCode, statusBody))
} }
) )
} }
@ -78,17 +74,51 @@ class KafkaResponseProcessorTest {
} }
@Test @Test
fun shouldNotProcessRecordsWithoutValidKey() { fun shouldNotProcessRecordsWithoutRequestIdInBody() {
this.kafkaResponseProcessor.onMessage(createKafkaRecord(null, 200)) val record = ConsumerRecord<String, String>(
"test-topic",
0,
0,
null,
"""
{
"statusCode": 200,
"statusBody": {}
}
""".trimIndent()
)
verify(eventPublisher, never()).publishEvent(any()) this.kafkaResponseProcessor.onMessage(record)
verify(eventPublisher, never()).publishEvent(any<ResponseEvent>())
} }
@Test @Test
fun shouldNotProcessRecordsWithoutValidBody() { fun shouldProcessRecordsWithAliasNames() {
val record = ConsumerRecord<String, String>(
"test-topic",
0,
0,
null,
"""
{
"request_id": "test0123456789",
"status_code": 200,
"status_body": {}
}
""".trimIndent()
)
this.kafkaResponseProcessor.onMessage(record)
verify(eventPublisher, times(1)).publishEvent(any<ResponseEvent>())
}
@Test
fun shouldNotProcessRecordsWithoutValidStatusBody() {
this.kafkaResponseProcessor.onMessage(createKafkaRecord(requestId = "TestID1234", statusBody = null)) this.kafkaResponseProcessor.onMessage(createKafkaRecord(requestId = "TestID1234", statusBody = null))
verify(eventPublisher, never()).publishEvent(any()) verify(eventPublisher, never()).publishEvent(any<ResponseEvent>())
} }
@ParameterizedTest @ParameterizedTest