mirror of
https://github.com/pcvolkmer/etl-processor.git
synced 2025-04-19 17:26:51 +00:00
feat: use requestId from incoming Kafka Record Header
This commit is contained in:
parent
bed91439db
commit
fc1901211d
@ -35,12 +35,27 @@ class KafkaInputListener(
|
|||||||
|
|
||||||
override fun onMessage(data: ConsumerRecord<String, String>) {
|
override fun onMessage(data: ConsumerRecord<String, String>) {
|
||||||
val mtbFile = objectMapper.readValue(data.value(), MtbFile::class.java)
|
val mtbFile = objectMapper.readValue(data.value(), MtbFile::class.java)
|
||||||
|
val firstRequestIdHeader = data.headers().headers("requestId")?.firstOrNull()
|
||||||
|
val requestId = if (null != firstRequestIdHeader) {
|
||||||
|
String(firstRequestIdHeader.value())
|
||||||
|
} else {
|
||||||
|
""
|
||||||
|
}
|
||||||
|
|
||||||
if (mtbFile.consent.status == Consent.Status.ACTIVE) {
|
if (mtbFile.consent.status == Consent.Status.ACTIVE) {
|
||||||
logger.debug("Accepted MTB File for processing")
|
logger.debug("Accepted MTB File for processing")
|
||||||
|
if (requestId.isBlank()) {
|
||||||
requestProcessor.processMtbFile(mtbFile)
|
requestProcessor.processMtbFile(mtbFile)
|
||||||
|
} else {
|
||||||
|
requestProcessor.processMtbFile(mtbFile, requestId)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.debug("Accepted MTB File and process deletion")
|
logger.debug("Accepted MTB File and process deletion")
|
||||||
|
if (requestId.isBlank()) {
|
||||||
requestProcessor.processDeletion(mtbFile.patient.id)
|
requestProcessor.processDeletion(mtbFile.patient.id)
|
||||||
|
} else {
|
||||||
|
requestProcessor.processDeletion(mtbFile.patient.id, requestId)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -48,7 +48,10 @@ class RequestProcessor(
|
|||||||
) {
|
) {
|
||||||
|
|
||||||
fun processMtbFile(mtbFile: MtbFile) {
|
fun processMtbFile(mtbFile: MtbFile) {
|
||||||
val requestId = UUID.randomUUID().toString()
|
processMtbFile(mtbFile, UUID.randomUUID().toString())
|
||||||
|
}
|
||||||
|
|
||||||
|
fun processMtbFile(mtbFile: MtbFile, requestId: String) {
|
||||||
val pid = mtbFile.patient.id
|
val pid = mtbFile.patient.id
|
||||||
|
|
||||||
mtbFile pseudonymizeWith pseudonymizeService
|
mtbFile pseudonymizeWith pseudonymizeService
|
||||||
@ -103,8 +106,10 @@ class RequestProcessor(
|
|||||||
}
|
}
|
||||||
|
|
||||||
fun processDeletion(patientId: String) {
|
fun processDeletion(patientId: String) {
|
||||||
val requestId = UUID.randomUUID().toString()
|
processDeletion(patientId, UUID.randomUUID().toString())
|
||||||
|
}
|
||||||
|
|
||||||
|
fun processDeletion(patientId: String, requestId: String) {
|
||||||
try {
|
try {
|
||||||
val patientPseudonym = pseudonymizeService.patientPseudonym(patientId)
|
val patientPseudonym = pseudonymizeService.patientPseudonym(patientId)
|
||||||
|
|
||||||
|
@ -25,6 +25,9 @@ import de.ukw.ccc.bwhc.dto.MtbFile
|
|||||||
import de.ukw.ccc.bwhc.dto.Patient
|
import de.ukw.ccc.bwhc.dto.Patient
|
||||||
import dev.dnpm.etl.processor.services.RequestProcessor
|
import dev.dnpm.etl.processor.services.RequestProcessor
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||||
|
import org.apache.kafka.common.header.internals.RecordHeader
|
||||||
|
import org.apache.kafka.common.header.internals.RecordHeaders
|
||||||
|
import org.apache.kafka.common.record.TimestampType
|
||||||
import org.junit.jupiter.api.BeforeEach
|
import org.junit.jupiter.api.BeforeEach
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
import org.junit.jupiter.api.extension.ExtendWith
|
import org.junit.jupiter.api.extension.ExtendWith
|
||||||
@ -34,6 +37,7 @@ import org.mockito.junit.jupiter.MockitoExtension
|
|||||||
import org.mockito.kotlin.any
|
import org.mockito.kotlin.any
|
||||||
import org.mockito.kotlin.times
|
import org.mockito.kotlin.times
|
||||||
import org.mockito.kotlin.verify
|
import org.mockito.kotlin.verify
|
||||||
|
import java.util.*
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension::class)
|
@ExtendWith(MockitoExtension::class)
|
||||||
class KafkaInputListenerTest {
|
class KafkaInputListenerTest {
|
||||||
@ -76,4 +80,33 @@ class KafkaInputListenerTest {
|
|||||||
verify(requestProcessor, times(1)).processDeletion(anyString())
|
verify(requestProcessor, times(1)).processDeletion(anyString())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun shouldProcessMtbFileRequestWithExistingRequestId() {
|
||||||
|
val mtbFile = MtbFile.builder()
|
||||||
|
.withPatient(Patient.builder().withId("DUMMY_12345678").build())
|
||||||
|
.withConsent(Consent.builder().withStatus(Consent.Status.ACTIVE).build())
|
||||||
|
.build()
|
||||||
|
|
||||||
|
val headers = RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray())))
|
||||||
|
kafkaInputListener.onMessage(
|
||||||
|
ConsumerRecord("testtopic", 0, 0, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, "", this.objectMapper.writeValueAsString(mtbFile), headers, Optional.empty())
|
||||||
|
)
|
||||||
|
|
||||||
|
verify(requestProcessor, times(1)).processMtbFile(any(), anyString())
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun shouldProcessDeleteRequestWithExistingRequestId() {
|
||||||
|
val mtbFile = MtbFile.builder()
|
||||||
|
.withPatient(Patient.builder().withId("DUMMY_12345678").build())
|
||||||
|
.withConsent(Consent.builder().withStatus(Consent.Status.REJECTED).build())
|
||||||
|
.build()
|
||||||
|
|
||||||
|
val headers = RecordHeaders(listOf(RecordHeader("requestId", UUID.randomUUID().toString().toByteArray())))
|
||||||
|
kafkaInputListener.onMessage(
|
||||||
|
ConsumerRecord("testtopic", 0, 0, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, "", this.objectMapper.writeValueAsString(mtbFile), headers, Optional.empty())
|
||||||
|
)
|
||||||
|
verify(requestProcessor, times(1)).processDeletion(anyString(), anyString())
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user