1
0
mirror of https://github.com/pcvolkmer/etl-processor.git synced 2025-04-19 17:26:51 +00:00

Merge pull request #46 from CCC-MF/issue_42_kafka

Dateneingang über Apache-Kafka als Alternative zu HTTP-Request
This commit is contained in:
Paul-Christian Volkmer 2024-02-29 13:15:57 +01:00 committed by GitHub
commit 4568f491f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 103 additions and 7 deletions

View File

@ -26,6 +26,7 @@ import dev.dnpm.etl.processor.output.KafkaMtbFileSender
import dev.dnpm.etl.processor.output.RestMtbFileSender
import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator
import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator
import dev.dnpm.etl.processor.services.RequestProcessor
import dev.dnpm.etl.processor.services.TokenRepository
import dev.dnpm.etl.processor.services.TokenService
import org.assertj.core.api.Assertions.assertThat
@ -144,6 +145,7 @@ class AppConfigurationTest {
"app.kafka.group-id=test"
]
)
@MockBean(RequestProcessor::class)
inner class AppConfigurationUsingKafkaInputTest(private val context: ApplicationContext) {
@Test

View File

@ -25,6 +25,7 @@ import dev.dnpm.etl.processor.monitoring.ConnectionCheckService
import dev.dnpm.etl.processor.monitoring.KafkaConnectionCheckService
import dev.dnpm.etl.processor.output.KafkaMtbFileSender
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.services.RequestProcessor
import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
@ -97,9 +98,10 @@ class AppKafkaConfiguration {
@Bean
@ConditionalOnProperty(value = ["app.kafka.input-topic"])
fun kafkaInputListener(
applicationEventPublisher: ApplicationEventPublisher,
requestProcessor: RequestProcessor,
objectMapper: ObjectMapper
): KafkaInputListener {
return KafkaInputListener(applicationEventPublisher)
return KafkaInputListener(requestProcessor, objectMapper)
}
@Bean

View File

@ -19,15 +19,28 @@
package dev.dnpm.etl.processor.input
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.services.RequestProcessor
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.context.ApplicationEventPublisher
import org.slf4j.LoggerFactory
import org.springframework.kafka.listener.MessageListener
class KafkaInputListener(
private val applicationEventPublisher: ApplicationEventPublisher
) : MessageListener<String, MtbFile> {
override fun onMessage(data: ConsumerRecord<String, MtbFile>) {
TODO("Not yet implemented")
private val requestProcessor: RequestProcessor,
private val objectMapper: ObjectMapper
) : MessageListener<String, String> {
private val logger = LoggerFactory.getLogger(KafkaInputListener::class.java)
override fun onMessage(data: ConsumerRecord<String, String>) {
val mtbFile = objectMapper.readValue(data.value(), MtbFile::class.java)
if (mtbFile.consent.status == Consent.Status.ACTIVE) {
logger.debug("Accepted MTB File for processing")
requestProcessor.processMtbFile(mtbFile)
} else {
logger.debug("Accepted MTB File and process deletion")
requestProcessor.processDeletion(mtbFile.patient.id)
}
}
}

View File

@ -0,0 +1,79 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2024 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.input
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
import de.ukw.ccc.bwhc.dto.Patient
import dev.dnpm.etl.processor.services.RequestProcessor
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.ArgumentMatchers.anyString
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.any
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
@ExtendWith(MockitoExtension::class)
class KafkaInputListenerTest {
private lateinit var requestProcessor: RequestProcessor
private lateinit var objectMapper: ObjectMapper
private lateinit var kafkaInputListener: KafkaInputListener
@BeforeEach
fun setup(
@Mock requestProcessor: RequestProcessor
) {
this.requestProcessor = requestProcessor
this.objectMapper = ObjectMapper()
this.kafkaInputListener = KafkaInputListener(requestProcessor, objectMapper)
}
@Test
fun shouldProcessMtbFileRequest() {
val mtbFile = MtbFile.builder()
.withPatient(Patient.builder().withId("DUMMY_12345678").build())
.withConsent(Consent.builder().withStatus(Consent.Status.ACTIVE).build())
.build()
kafkaInputListener.onMessage(ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile)))
verify(requestProcessor, times(1)).processMtbFile(any())
}
@Test
fun shouldProcessDeleteRequest() {
val mtbFile = MtbFile.builder()
.withPatient(Patient.builder().withId("DUMMY_12345678").build())
.withConsent(Consent.builder().withStatus(Consent.Status.REJECTED).build())
.build()
kafkaInputListener.onMessage(ConsumerRecord("testtopic", 0, 0, "", this.objectMapper.writeValueAsString(mtbFile)))
verify(requestProcessor, times(1)).processDeletion(anyString())
}
}