mirror of
https://github.com/pcvolkmer/mv64e-etl-processor
synced 2025-09-13 09:02:50 +00:00
fix: mime type representation in kafka header (#139)
This commit is contained in:
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
|
|||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import org.springframework.http.MediaType
|
import org.springframework.http.MediaType
|
||||||
import org.springframework.kafka.listener.MessageListener
|
import org.springframework.kafka.listener.MessageListener
|
||||||
|
import java.nio.charset.Charset
|
||||||
|
|
||||||
class KafkaInputListener(
|
class KafkaInputListener(
|
||||||
private val requestProcessor: RequestProcessor,
|
private val requestProcessor: RequestProcessor,
|
||||||
@@ -49,19 +50,16 @@ class KafkaInputListener(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun guessMimeType(record: ConsumerRecord<String, String>): String {
|
private fun guessMimeType(record: ConsumerRecord<String, String>): String? {
|
||||||
if (record.headers().headers("contentType").toList().isEmpty()) {
|
if (record.headers().headers("contentType").toList().isEmpty()) {
|
||||||
// Fallback if no contentType set (old behavior)
|
// Fallback if no contentType set (old behavior)
|
||||||
return MediaType.APPLICATION_JSON_VALUE
|
return MediaType.APPLICATION_JSON_VALUE
|
||||||
}
|
}
|
||||||
|
|
||||||
return record.headers().headers("contentType")?.firstOrNull()?.value().contentToString()
|
return record.headers().headers("contentType")?.firstOrNull()?.value()?.toString(Charset.forName("UTF-8"))
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun handleDnpmV2Message(record: ConsumerRecord<String, String>) {
|
private fun handleDnpmV2Message(record: ConsumerRecord<String, String>) {
|
||||||
// Do not handle DNPM-V2 for now
|
|
||||||
logger.warn("Ignoring MTB File in DNPM V2 format: Not implemented yet")
|
|
||||||
|
|
||||||
val mtbFile = objectMapper.readValue(record.value(), Mtb::class.java)
|
val mtbFile = objectMapper.readValue(record.value(), Mtb::class.java)
|
||||||
val patientId = PatientId(mtbFile.patient.id)
|
val patientId = PatientId(mtbFile.patient.id)
|
||||||
val firstRequestIdHeader = record.headers().headers("requestId")?.firstOrNull()
|
val firstRequestIdHeader = record.headers().headers("requestId")?.firstOrNull()
|
||||||
|
@@ -244,7 +244,14 @@ class KafkaInputListenerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun shouldNotProcessDnpmV2Request() {
|
fun shouldProcessDnpmV2Request() {
|
||||||
|
whenever(consentEvaluator.check(any())).thenReturn(
|
||||||
|
ConsentEvaluation(
|
||||||
|
TtpConsentStatus.BROAD_CONSENT_GIVEN,
|
||||||
|
false
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
val mtbFile = Mtb.builder()
|
val mtbFile = Mtb.builder()
|
||||||
.patient(Patient.builder().id("DUMMY_12345678").build())
|
.patient(Patient.builder().id("DUMMY_12345678").build())
|
||||||
.metadata(
|
.metadata(
|
||||||
@@ -285,7 +292,7 @@ class KafkaInputListenerTest {
|
|||||||
Optional.empty()
|
Optional.empty()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
verify(requestProcessor, times(0)).processDeletion(
|
verify(requestProcessor, times(1)).processDeletion(
|
||||||
anyValueClass(), anyValueClass(), eq(
|
anyValueClass(), anyValueClass(), eq(
|
||||||
TtpConsentStatus.UNKNOWN_CHECK_FILE
|
TtpConsentStatus.UNKNOWN_CHECK_FILE
|
||||||
)
|
)
|
||||||
|
Reference in New Issue
Block a user