1
0
mirror of https://github.com/pcvolkmer/etl-processor.git synced 2025-04-19 17:26:51 +00:00

Merge branch 'master' into add-docker-build

# Conflicts:
#	README.md
#	build.gradle.kts
This commit is contained in:
Jakub Lidke 2023-08-25 12:59:38 +02:00
commit da26b5a2c8
40 changed files with 2753 additions and 277 deletions

35
.github/workflows/deploy.yml vendored Normal file
View File

@ -0,0 +1,35 @@
name: "Run build and deploy"
on:
release:
types: [ 'published' ]
jobs:
docker:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
- name: Setup Gradle
uses: gradle/gradle-build-action@v2.4.2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to Docker Hub
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Execute image build and push
run: |
./gradlew bootBuildImage
docker tag ghcr.io/ccc-mf/etl-processor ghcr.io/ccc-mf/etl-processor:${{ github.ref_name }}
docker push ghcr.io/ccc-mf/etl-processor
docker push ghcr.io/ccc-mf/etl-processor:${{ github.ref_name }}

39
.github/workflows/test.yml vendored Normal file
View File

@ -0,0 +1,39 @@
name: 'Run Tests'
on:
push:
branches: [ 'master' ]
tags: [ '*' ]
pull_request:
branches: [ '*' ]
jobs:
tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
- name: Setup Gradle
uses: gradle/gradle-build-action@v2.4.2
- name: Execute tests
run: ./gradlew test
integrationTests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
- name: Setup Gradle
uses: gradle/gradle-build-action@v2.4.2
- name: Execute integration tests
run: ./gradlew integrationTest

128
README.md
View File

@ -1,7 +1,28 @@
# ETL-Processor for bwHC data
# ETL-Processor for bwHC data [![Run Tests](https://github.com/CCC-MF/etl-processor/actions/workflows/test.yml/badge.svg)](https://github.com/CCC-MF/etl-processor/actions/workflows/test.yml)
Diese Anwendung versendet ein bwHC-MTB-File an das bwHC-Backend und pseudonymisiert die
Patienten-ID.
Diese Anwendung versendet ein bwHC-MTB-File an das bwHC-Backend und pseudonymisiert die Patienten-ID.
### Einordnung innerhalb einer DNPM-ETL-Strecke
Diese Anwendung erlaubt das Entgegennehmen HTTP/REST-Anfragen aus dem Onkostar-Plugin **[onkostar-plugin-dnpmexport](https://github.com/CCC-MF/onkostar-plugin-dnpmexport)**.
Der Inhalt einer Anfrage, wenn ein bwHC-MTBFile, wird pseudonymisiert und auf Duplikate geprüft.
Duplikate werden verworfen, Änderungen werden weitergeleitet.
Löschanfragen werden immer als Löschanfrage an das bwHC-backend weitergeleitet.
![Modell DNPM-ETL-Strecke](docs/etl.png)
#### HTTP/REST-Konfiguration
Anfragen werden, wenn nicht als Duplikat behandelt, nach der Pseudonymisierung direkt an das bwHC-Backend gesendet.
#### Konfiguration für Apache Kafka
Anfragen werden, wenn nicht als Duplikat behandelt, nach der Pseudonymisierung an Apache Kafka übergeben.
Eine Antwort wird dabei ebenfalls mithilfe von Apache Kafka übermittelt und nach der Entgegennahme verarbeitet.
Siehe hierzu auch: https://github.com/CCC-MF/kafka-to-bwhc
## Pseudonymisierung der Patienten-ID
@ -13,10 +34,8 @@ Ist diese nicht gesetzt. wird intern eine Anonymisierung der Patienten-ID vorgen
### Eingebaute Pseudonymisierung
Wurde keine oder die Verwendung der eingebauten Pseudonymisierung konfiguriert, so wird für die
Patienten-ID der
entsprechende SHA-256-Hash gebildet und Base64-codiert - hier ohne endende "=" - zuzüglich des
konfigurierten Prefixes
Wurde keine oder die Verwendung der eingebauten Pseudonymisierung konfiguriert, so wird für die Patienten-ID der
entsprechende SHA-256-Hash gebildet und Base64-codiert - hier ohne endende "=" - zuzüglich des konfigurierten Prefixes
als Patienten-Pseudonym verwendet.
### Pseudonymisierung mit gPAS
@ -28,40 +47,87 @@ Wurde die Verwendung von gPAS konfiguriert, so sind weitere Angaben zu konfiguri
* `APP_PSEUDONYMIZE_GPAS_TARGET`: gPas Domänenname
* `APP_PSEUDONYMIZE_GPAS_USERNAME`: gPas Basic-Auth Benutzername
* `APP_PSEUDONYMIZE_GPAS_PASSWORD`: gPas Basic-Auth Passwort
* `APP_PSEUDONYMIZE_GPAS_SSLCALOCATION`: Root Zertifikat für gPas, falls es dediziert hinzugefügt
werden muss.
* `APP_PSEUDONYMIZE_GPAS_SSLCALOCATION`: Root Zertifikat für gPas, falls es dediziert hinzugefügt werden muss.
## Mögliche Endpunkte
Für REST-Requests als auch (parallel) zur Nutzung von Kafka-Topics können Endpunkte konfiguriert
werden.
Für REST-Requests als auch zur Nutzung von Kafka-Topics können Endpunkte konfiguriert werden.
Es ist dabei nur die Konfiguration eines Endpunkts zulässig.
Werden sowohl REST als auch Kafka-Endpunkt konfiguriert, wird nur der REST-Endpunkt verwendet.
### REST
Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an das bwHC-Backend
gesendet wird:
Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an das bwHC-Backend gesendet wird:
* `APP_REST_URI`: URI der zu benutzenden API der bwHC-Backend-Instanz.
z.B.: `http://localhost:9000/bwhc/etl/api`
* `APP_REST_URI`: URI der zu benutzenden API der bwHC-Backend-Instanz. z.B.: `http://localhost:9000/bwhc/etl/api`
### Kafka-Topics
Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an ein Kafka-Topic
übermittelt wird:
Folgende Umgebungsvariablen müssen gesetzt sein, damit ein bwHC-MTB-File an ein Kafka-Topic übermittelt wird:
* `APP_KAFKA_TOPIC`: Zu verwendendes Topic
* `APP_KAFKA_TOPIC`: Zu verwendendes Topic zum Versenden von Anfragen
* `APP_KAFKA_RESPONSE_TOPIC`: Topic mit Antworten über den Erfolg des Versendens. Standardwert: `APP_KAFKA_TOPIC` mit Anhang "_response".
* `APP_KAFKA_GROUP_ID`: Kafka GroupID des Consumers. Standardwert: `APP_KAFKA_TOPIC` mit Anhang "_group".
* `APP_KAFKA_SERVERS`: Zu verwendende Kafka-Bootstrap-Server als kommagetrennte Liste
Wird keine Rückantwort über Apache Kafka empfangen und es gibt keine weitere Möglichkeit den Status festzustellen, verbleibt der Status auf `UNKNOWN`.
Weitere Einstellungen können über die Parameter von Spring Kafka konfiguriert werden.
### Docker Image
Lässt sich keine Verbindung zu dem bwHC-Backend aufbauen, wird eine Rückantwort mit Status-Code `900` erwartet, welchen es
für HTTP nicht gibt.
Bauen eines Docker Images kann wie folgt erzeugt werden:
#### Retention Time
Generell werden in Apache Kafka alle Records entsprechend der Konfiguration vorgehalten.
So wird ohne spezielle Konfiguration ein Record für 7 Tage in Apache Kafka gespeichert.
Es sind innerhalb dieses Zeitraums auch alte Informationen weiterhin enthalten, wenn der Consent später abgelehnt wurde.
Durch eine entsprechende Konfiguration des Topics kann dies verhindert werden.
Beispiel - auszuführen innerhalb des Kafka-Containers: Löschen alter Records nach einem Tag
```
kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config retention.ms=86400000
```
#### Key based Retention
Möchten Sie hingegen immer nur die letzte Meldung für einen Patienten und eine Erkrankung in Apache Kafka vorhalten,
so ist die nachfolgend genannte Konfiguration der Kafka-Topics hilfreich.
* `retention.ms`: Möglichst kurze Zeit in der alte Records noch erhalten bleiben, z.B. 10 Sekunden 10000
* `cleanup.policy`: Löschen alter Records und Beibehalten des letzten Records zu einem Key [delete,compact]
Beispiele für ein Topic `test`, hier bitte an die verwendeten Topics anpassen.
```
kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config retention.ms=10000
kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config cleanup.policy=[delete,compact]
```
Da als Key eines Records die (pseudonymisierte) Patienten-ID und die (anonymisierte) Erkrankungs-ID verwendet wird,
stehen mit obiger Konfiguration der Kafka-Topics nach 10 Sekunden nur noch der jeweils letzte Eintrag für den entsprechenden
Key zur Verfügung.
Da der Key sowohl für die Records in Richtung bwHC-Backend für die Rückantwort identisch aufgebaut ist, lassen sich so
auch im Falle eines Consent-Widerspruchs die enthaltenen Daten als auch die Offenlegung durch Verifikationsdaten in der
Antwort effektiv verhindern, da diese nach 10 Sekunden gelöscht werden.
Es steht dann nur noch die jeweils letzten Information zur Verfügung, dass für einen Patienten/eine Erkrankung
ein Consent-Widerspruch erfolgte.
## Docker-Images
Diese Anwendung ist auch als Docker-Image verfügbar: https://github.com/CCC-MF/etl-processor/pkgs/container/etl-processor
### Images lokal bauen
```bash
docker build . -t "imageName"
```
## Deployment
*Ausführen als Docker Conatiner:*
Wenn gewünscht, Änderungen in der `env` vornehmen. Beachten, dass *MONITORING_HTTP_PORT* über
Host-Umgebung gesetzt werden muss (z.B. .env oder Parameter --env-file )
@ -69,4 +135,24 @@ Host-Umgebung gesetzt werden muss (z.B. .env oder Parameter --env-file )
```bash
cd ./deploy
docker compose up -d
```
```
## Entwicklungssetup
Zum Starten einer lokalen Entwicklungs- und Testumgebung kann die beiliegende Datei `dev-compose.yml` verwendet werden.
Diese kann zur Nutzung der Datenbanken **MariaDB** als auch **PostgreSQL** angepasst werden.
Zur Nutzung von Apache Kafka muss dazu ein Eintrag im hosts-File vorgenommen werden und der Hostname `kafka` auf die lokale
IP-Adresse verweisen. Ohne diese Einstellung ist eine Nutzung von Apache Kafka außerhalb der Docker-Umgebung nicht möglich.
Beim Start der Anwendung mit dem Profil `dev` wird die in `dev-compose.yml` definierte Umgebung beim Start der
Anwendung mit gestartet:
```
SPRING_PROFILES_ACTIVE=dev ./gradlew bootRun
```
Die Datei `application-dev.yml` enthält hierzu die Konfiguration für das Profil `dev`.
Beim Ausführen der Integrationstests wird eine Testdatenbank in einem Docker-Container gestartet.
Siehe hier auch die Klasse `AbstractTestcontainerTest` unter `src/integrationTest`.

View File

@ -1,4 +1,6 @@
import org.gradle.api.tasks.testing.logging.TestLogEvent
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
import org.springframework.boot.gradle.tasks.bundling.BootBuildImage
plugins {
id("org.springframework.boot") version "3.1.1"
@ -8,12 +10,31 @@ plugins {
}
group = "de.ukw.ccc"
version = "0.1.0-SNAPSHOT"
version = "0.2.0-SNAPSHOT"
var versions = mapOf(
"bwhc-dto-java" to "0.2.0",
"hapi-fhir" to "6.6.2",
"httpclient5" to "5.2.1",
"mockito-kotlin" to "5.1.0"
)
java {
sourceCompatibility = JavaVersion.VERSION_17
}
sourceSets {
create("integrationTest") {
compileClasspath += sourceSets.main.get().output
runtimeClasspath += sourceSets.main.get().output
}
}
val integrationTestImplementation: Configuration by configurations.getting {
extendsFrom(configurations.testImplementation.get())
extendsFrom(configurations.runtimeOnly.get())
}
configurations {
compileOnly {
extendsFrom(configurations.annotationProcessor.get())
@ -41,10 +62,10 @@ dependencies {
implementation("org.flywaydb:flyway-mysql")
implementation("commons-codec:commons-codec")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
implementation("de.ukw.ccc:bwhc-dto-java:0.2.0")
implementation("ca.uhn.hapi.fhir:hapi-fhir-base:6.6.2")
implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:6.6.2")
implementation("org.apache.httpcomponents.client5:httpclient5:5.2.1")
implementation("de.ukw.ccc:bwhc-dto-java:${versions["bwhc-dto-java"]}")
implementation("ca.uhn.hapi.fhir:hapi-fhir-base:${versions["hapi-fhir"]}")
implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:${versions["hapi-fhir"]}")
implementation("org.apache.httpcomponents.client5:httpclient5:${versions["httpclient5"]}")
runtimeOnly("org.mariadb.jdbc:mariadb-java-client")
runtimeOnly("org.postgresql:postgresql")
developmentOnly("org.springframework.boot:spring-boot-devtools")
@ -52,6 +73,9 @@ dependencies {
annotationProcessor("org.springframework.boot:spring-boot-configuration-processor")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.projectreactor:reactor-test")
testImplementation("org.mockito.kotlin:mockito-kotlin:${versions["mockito-kotlin"]}")
integrationTestImplementation("org.testcontainers:junit-jupiter")
integrationTestImplementation("org.testcontainers:postgresql")
}
tasks.withType<KotlinCompile> {
@ -63,5 +87,26 @@ tasks.withType<KotlinCompile> {
tasks.withType<Test> {
useJUnitPlatform()
testLogging {
events(TestLogEvent.FAILED, TestLogEvent.PASSED, TestLogEvent.SKIPPED)
}
}
task<Test>("integrationTest") {
description = "Runs integration tests"
testClassesDirs = sourceSets["integrationTest"].output.classesDirs
classpath = sourceSets["integrationTest"].runtimeClasspath
shouldRunAfter("test")
}
tasks.named<BootBuildImage>("bootBuildImage") {
imageName.set("ghcr.io/ccc-mf/etl-processor")
environment.set(environment.get() + mapOf(
"BP_OCI_SOURCE" to "https://github.com/CCC-MF/etl-processor",
"BP_OCI_LICENSES" to "AGPLv3",
"BP_OCI_DESCRIPTION" to "ETL Processor for bwHC MTB files"
))
}

View File

@ -1,4 +1,7 @@
services:
# Note: Make sure, hostname "kafka" points to 127.0.0.1
# otherwise connection will not be available
kafka:
image: bitnami/kafka
hostname: kafka
@ -6,6 +9,12 @@ services:
- "9092:9092"
environment:
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_CFG_NODE_ID: "0"
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
mariadb:
image: mariadb:10

BIN
docs/etl.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 75 KiB

View File

@ -0,0 +1,51 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor
import org.springframework.test.context.DynamicPropertyRegistry
import org.springframework.test.context.DynamicPropertySource
import org.testcontainers.containers.PostgreSQLContainer
import org.testcontainers.junit.jupiter.Container
abstract class AbstractTestcontainerTest {
companion object {
@Container
val dbContainer = CustomPostgreSQLContainer("postgres:10-alpine")
.withDatabaseName("test")
.withUsername("test")
.withPassword("test") ?: throw RuntimeException("Failed to create testcontainer!")
@DynamicPropertySource
@JvmStatic
fun registerDynamicProperties(registry: DynamicPropertyRegistry) {
registry.add("spring.datasource.url", dbContainer::getJdbcUrl)
registry.add("spring.datasource.username", dbContainer::getUsername)
registry.add("spring.datasource.password", dbContainer::getPassword)
}
}
}
class CustomPostgreSQLContainer(dockerImageName: String) : PostgreSQLContainer<CustomPostgreSQLContainer>(dockerImageName) {
override fun stop() {
// Keep Testcontainer alive until JVM destroys it
}
}

View File

@ -0,0 +1,45 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor
import dev.dnpm.etl.processor.output.MtbFileSender
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.context.ApplicationContext
import org.springframework.test.context.junit.jupiter.SpringExtension
import org.testcontainers.junit.jupiter.Testcontainers
@Testcontainers
@ExtendWith(SpringExtension::class)
@SpringBootTest
@MockBean(MtbFileSender::class)
class EtlProcessorApplicationTests : AbstractTestcontainerTest() {
@Test
fun contextLoadsIfMtbFileSenderConfigured(@Autowired context: ApplicationContext) {
// Simply check bean configuration
assertThat(context).isNotNull
}
}

View File

@ -0,0 +1,102 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.config
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.output.KafkaMtbFileSender
import dev.dnpm.etl.processor.output.RestMtbFileSender
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.springframework.beans.factory.NoSuchBeanDefinitionException
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.boot.test.mock.mockito.MockBeans
import org.springframework.context.ApplicationContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
@SpringBootTest
@ContextConfiguration(classes = [KafkaAutoConfiguration::class, AppKafkaConfiguration::class, AppRestConfiguration::class])
class AppConfigurationTest {
@Nested
@TestPropertySource(
properties = [
"app.rest.uri=http://localhost:9000"
]
)
inner class AppConfigurationRestTest(private val context: ApplicationContext) {
@Test
fun shouldUseRestMtbFileSenderNotKafkaMtbFileSender() {
assertThat(context.getBean(RestMtbFileSender::class.java)).isNotNull
assertThrows<NoSuchBeanDefinitionException> { context.getBean(KafkaMtbFileSender::class.java) }
}
}
@Nested
@TestPropertySource(
properties = [
"app.kafka.servers=localhost:9092",
"app.kafka.topic=test",
"app.kafka.response-topic=test-response",
"app.kafka.group-id=test"
]
)
@MockBeans(value = [
MockBean(ObjectMapper::class),
MockBean(RequestRepository::class)
])
inner class AppConfigurationKafkaTest(private val context: ApplicationContext) {
@Test
fun shouldUseKafkaMtbFileSenderNotRestMtbFileSender() {
assertThrows<NoSuchBeanDefinitionException> { context.getBean(RestMtbFileSender::class.java) }
assertThat(context.getBean(KafkaMtbFileSender::class.java)).isNotNull
}
}
@Nested
@TestPropertySource(
properties = [
"app.rest.uri=http://localhost:9000",
"app.kafka.servers=localhost:9092",
"app.kafka.topic=test",
"app.kafka.response-topic=test-response",
"app.kafka.group-id=test"
]
)
inner class AppConfigurationRestInPrecedenceTest(private val context: ApplicationContext) {
@Test
fun shouldUseRestMtbFileSenderNotKafkaMtbFileSender() {
assertThat(context.getBean(RestMtbFileSender::class.java)).isNotNull
assertThrows<NoSuchBeanDefinitionException> { context.getBean(KafkaMtbFileSender::class.java) }
}
}
}

View File

@ -0,0 +1,134 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.services
import dev.dnpm.etl.processor.AbstractTestcontainerTest
import dev.dnpm.etl.processor.monitoring.Request
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.monitoring.RequestType
import dev.dnpm.etl.processor.output.MtbFileSender
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.test.context.junit.jupiter.SpringExtension
import org.springframework.transaction.annotation.Transactional
import org.testcontainers.junit.jupiter.Testcontainers
import java.time.Instant
import java.util.*
@Testcontainers
@ExtendWith(SpringExtension::class)
@SpringBootTest
@Transactional
@MockBean(MtbFileSender::class)
class RequestServiceIntegrationTest : AbstractTestcontainerTest() {
private lateinit var requestRepository: RequestRepository
private lateinit var requestService: RequestService
@BeforeEach
fun setup(
@Autowired requestRepository: RequestRepository
) {
this.requestRepository = requestRepository
this.requestService = RequestService(requestRepository)
}
@Test
fun shouldResultInEmptyRequestList() {
val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901")
assertThat(actual).isEmpty()
}
private fun setupTestData() {
// Prepare DB
this.requestRepository.saveAll(
listOf(
Request(
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdef1",
type = RequestType.MTB_FILE,
status = RequestStatus.SUCCESS,
processedAt = Instant.parse("2023-07-07T02:00:00Z")
),
// Should be ignored - wrong patient ID -->
Request(
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678902",
pid = "P2",
fingerprint = "0123456789abcdef2",
type = RequestType.MTB_FILE,
status = RequestStatus.WARNING,
processedAt = Instant.parse("2023-08-08T00:00:00Z")
),
// <--
Request(
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P2",
fingerprint = "0123456789abcdee1",
type = RequestType.DELETE,
status = RequestStatus.SUCCESS,
processedAt = Instant.parse("2023-08-08T02:00:00Z")
)
)
)
}
@Test
fun shouldResultInSortedRequestList() {
setupTestData()
val actual = requestService.allRequestsByPatientPseudonym("TEST_12345678901")
assertThat(actual).hasSize(2)
assertThat(actual[0].fingerprint).isEqualTo("0123456789abcdee1")
assertThat(actual[1].fingerprint).isEqualTo("0123456789abcdef1")
}
@Test
fun shouldReturnDeleteRequestAsLastRequest() {
setupTestData()
val actual = requestService.isLastRequestWithKnownStatusDeletion("TEST_12345678901")
assertThat(actual).isTrue()
}
@Test
fun shouldReturnLastMtbFileRequest() {
setupTestData()
val actual = requestService.lastMtbFileRequestForPatientPseudonym("TEST_12345678901")
assertThat(actual).isNotNull
assertThat(actual?.fingerprint).isEqualTo("0123456789abcdef1")
}
}

View File

@ -69,13 +69,11 @@ import java.util.HashMap;
public class GpasPseudonymGenerator implements Generator {
private final static FhirContext r4Context = FhirContext.forR4();
private final String gPasUrl;
private final String psnTargetDomain;
private static FhirContext r4Context = FhirContext.forR4();
private final HttpHeaders httpHeader;
private final RetryTemplate retryTemplate = defaultTemplate();
private final Logger log = LoggerFactory.getLogger(GpasPseudonymGenerator.class);
private SSLContext customSslContext;
@ -110,12 +108,19 @@ public class GpasPseudonymGenerator implements Generator {
@NotNull
public static String unwrapPseudonym(Parameters gPasPseudonymResult) {
Identifier pseudonym = (Identifier) gPasPseudonymResult.getParameter().stream().findFirst()
.get().getPart().stream().filter(a -> a.getName().equals("pseudonym")).findFirst()
.orElseGet(ParametersParameterComponent::new).getValue();
final var parameters = gPasPseudonymResult.getParameter().stream().findFirst();
if (parameters.isEmpty()) {
throw new PseudonymRequestFailed("Empty HL7 parameters, cannot find first one");
}
final var identifier = (Identifier) parameters.get().getPart().stream()
.filter(a -> a.getName().equals("pseudonym"))
.findFirst()
.orElseGet(ParametersParameterComponent::new).getValue();
// pseudonym
return pseudonym.getSystem() + "|" + pseudonym.getValue();
return identifier.getSystem() + "|" + identifier.getValue();
}

View File

@ -28,4 +28,3 @@ class EtlProcessorApplication
fun main(args: Array<String>) {
runApplication<EtlProcessorApplication>(*args)
}

View File

@ -23,7 +23,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties
@ConfigurationProperties(AppConfigProperties.NAME)
data class AppConfigProperties(
var bwhc_uri: String?,
var bwhcUri: String?,
var generator: PseudonymGenerator = PseudonymGenerator.BUILDIN
) {
companion object {
@ -48,7 +48,7 @@ data class GPasConfigProperties(
val password: String?,
val sslCaLocation: String?,
) {
) {
companion object {
const val NAME = "app.pseudonymize.gpas"
}
@ -66,6 +66,8 @@ data class RestTargetProperties(
@ConfigurationProperties(KafkaTargetProperties.NAME)
data class KafkaTargetProperties(
val topic: String = "etl-processor",
val responseTopic: String = "${topic}_response",
val groupId: String = "${topic}_group",
val servers: String = ""
) {
companion object {

View File

@ -21,9 +21,6 @@ package dev.dnpm.etl.processor.config
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.monitoring.ReportService
import dev.dnpm.etl.processor.output.KafkaMtbFileSender
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.output.RestMtbFileSender
import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator
import dev.dnpm.etl.processor.pseudonym.Generator
import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator
@ -32,7 +29,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.KafkaTemplate
import reactor.core.publisher.Sinks
@Configuration
@ -40,9 +36,7 @@ import reactor.core.publisher.Sinks
value = [
AppConfigProperties::class,
PseudonymizeConfigProperties::class,
GPasConfigProperties::class,
RestTargetProperties::class,
KafkaTargetProperties::class
GPasConfigProperties::class
]
)
class AppConfiguration {
@ -60,25 +54,13 @@ class AppConfiguration {
}
@Bean
fun pseudonymizeService(generator: Generator, pseudonymizeConfigProperties: PseudonymizeConfigProperties): PseudonymizeService {
fun pseudonymizeService(
generator: Generator,
pseudonymizeConfigProperties: PseudonymizeConfigProperties
): PseudonymizeService {
return PseudonymizeService(generator, pseudonymizeConfigProperties)
}
@ConditionalOnProperty(value = ["app.rest.uri"])
@Bean
fun restMtbFileSender(restTargetProperties: RestTargetProperties): MtbFileSender {
return RestMtbFileSender(restTargetProperties)
}
@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"])
@Bean
fun kafkaMtbFileSender(
kafkaTemplate: KafkaTemplate<String, String>,
objectMapper: ObjectMapper
): MtbFileSender {
return KafkaMtbFileSender(kafkaTemplate, objectMapper)
}
@Bean
fun reportService(objectMapper: ObjectMapper): ReportService {
return ReportService(objectMapper)

View File

@ -0,0 +1,79 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.config
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.output.KafkaMtbFileSender
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.services.kafka.KafkaResponseProcessor
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.ApplicationEventPublisher
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.annotation.Order
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.listener.KafkaMessageListenerContainer
@Configuration
@EnableConfigurationProperties(
value = [KafkaTargetProperties::class]
)
@ConditionalOnProperty(value = ["app.kafka.topic", "app.kafka.servers"])
@ConditionalOnMissingBean(MtbFileSender::class)
@Order(-5)
class AppKafkaConfiguration {
private val logger = LoggerFactory.getLogger(AppKafkaConfiguration::class.java)
@Bean
fun kafkaMtbFileSender(
kafkaTemplate: KafkaTemplate<String, String>,
kafkaTargetProperties: KafkaTargetProperties,
objectMapper: ObjectMapper
): MtbFileSender {
logger.info("Selected 'KafkaMtbFileSender'")
return KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper)
}
@Bean
fun kafkaListenerContainer(
consumerFactory: ConsumerFactory<String, String>,
kafkaTargetProperties: KafkaTargetProperties,
kafkaResponseProcessor: KafkaResponseProcessor
): KafkaMessageListenerContainer<String, String> {
val containerProperties = ContainerProperties(kafkaTargetProperties.responseTopic)
containerProperties.messageListener = kafkaResponseProcessor
return KafkaMessageListenerContainer(consumerFactory, containerProperties)
}
@Bean
fun kafkaResponseProcessor(
applicationEventPublisher: ApplicationEventPublisher,
objectMapper: ObjectMapper
): KafkaResponseProcessor {
return KafkaResponseProcessor(applicationEventPublisher, objectMapper)
}
}

View File

@ -0,0 +1,58 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.config
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.output.RestMtbFileSender
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.annotation.Order
import org.springframework.web.client.RestTemplate
@Configuration
@EnableConfigurationProperties(
value = [
RestTargetProperties::class
]
)
@ConditionalOnProperty(value = ["app.rest.uri"])
@ConditionalOnMissingBean(MtbFileSender::class)
@Order(-10)
class AppRestConfiguration {
private val logger = LoggerFactory.getLogger(AppRestConfiguration::class.java)
@Bean
fun restTemplate(): RestTemplate {
return RestTemplate()
}
@Bean
fun restMtbFileSender(restTemplate: RestTemplate, restTargetProperties: RestTargetProperties): MtbFileSender {
logger.info("Selected 'RestMtbFileSender'")
return RestMtbFileSender(restTemplate, restTargetProperties)
}
}

View File

@ -19,7 +19,9 @@
package dev.dnpm.etl.processor.monitoring
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.fasterxml.jackson.annotation.JsonValue
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.JsonMappingException
import com.fasterxml.jackson.databind.ObjectMapper
@ -33,15 +35,22 @@ class ReportService(
}
return try {
objectMapper.readValue(dataQualityReport, DataQualityReport::class.java).issues
} catch (e: JsonMappingException) {
e.printStackTrace()
listOf()
} catch (e: Exception) {
val otherIssue =
Issue(Severity.ERROR, "Not parsable data quality report '$dataQualityReport'")
return when (e) {
is JsonMappingException -> listOf(otherIssue)
is JsonParseException -> listOf(otherIssue)
else -> throw e
}
}
}
@JsonIgnoreProperties(ignoreUnknown = true)
private data class DataQualityReport(val issues: List<Issue>)
@JsonIgnoreProperties(ignoreUnknown = true)
data class Issue(val severity: Severity, val message: String)
enum class Severity(@JsonValue val value: String) {

View File

@ -36,9 +36,9 @@ data class Request(
val patientId: String,
val pid: String,
val fingerprint: String,
val status: RequestStatus,
val type: RequestType,
val processedAt: Instant = Instant.now(),
var status: RequestStatus,
var processedAt: Instant = Instant.now(),
@Embedded.Nullable var report: Report? = null
)

View File

@ -20,11 +20,16 @@
package dev.dnpm.etl.processor.output
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.config.KafkaTargetProperties
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
class KafkaMtbFileSender(
private val kafkaTemplate: KafkaTemplate<String, String>,
private val kafkaTargetProperties: KafkaTargetProperties,
private val objectMapper: ObjectMapper
) : MtbFileSender {
@ -32,31 +37,60 @@ class KafkaMtbFileSender(
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
return try {
val result = kafkaTemplate.sendDefault(
header(request),
objectMapper.writeValueAsString(request.mtbFile)
val result = kafkaTemplate.send(
kafkaTargetProperties.topic,
key(request),
objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
)
if (result.get() != null) {
logger.debug("Sent file via KafkaMtbFileSender")
MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
MtbFileSender.Response(RequestStatus.UNKNOWN)
} else {
MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
MtbFileSender.Response(RequestStatus.ERROR)
}
} catch (e: Exception) {
logger.error("An error occurred sending to kafka", e)
MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
MtbFileSender.Response(RequestStatus.ERROR)
}
}
// TODO not yet implemented
override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response {
return MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
val dummyMtbFile = MtbFile.builder()
.withConsent(
Consent.builder()
.withPatient(request.patientId)
.withStatus(Consent.Status.REJECTED)
.build()
)
.build()
return try {
val result = kafkaTemplate.send(
kafkaTargetProperties.topic,
key(request),
objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
)
if (result.get() != null) {
logger.debug("Sent deletion request via KafkaMtbFileSender")
MtbFileSender.Response(RequestStatus.UNKNOWN)
} else {
MtbFileSender.Response(RequestStatus.ERROR)
}
} catch (e: Exception) {
logger.error("An error occurred sending to kafka", e)
MtbFileSender.Response(RequestStatus.ERROR)
}
}
private fun header(request: MtbFileSender.MtbFileRequest): String {
private fun key(request: MtbFileSender.MtbFileRequest): String {
return "{\"pid\": \"${request.mtbFile.patient.id}\", " +
"\"eid\": \"${request.mtbFile.episode.id}\", " +
"\"requestId\": \"${request.requestId}\"}"
"\"eid\": \"${request.mtbFile.episode.id}\"}"
}
private fun key(request: MtbFileSender.DeleteRequest): String {
return "{\"pid\": \"${request.patientId}\"}"
}
data class Data(val requestId: String, val content: MtbFile)
}

View File

@ -20,22 +20,31 @@
package dev.dnpm.etl.processor.output
import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.springframework.http.HttpStatusCode
interface MtbFileSender {
fun send(request: MtbFileRequest): Response
fun send(request: DeleteRequest): Response
data class Response(val status: ResponseStatus, val reason: String = "")
data class Response(val status: RequestStatus, val body: String = "")
data class MtbFileRequest(val requestId: String, val mtbFile: MtbFile)
data class DeleteRequest(val requestId: String, val patientId: String)
enum class ResponseStatus {
SUCCESS,
WARNING,
ERROR,
UNKNOWN
}
fun Int.asRequestStatus(): RequestStatus {
return when (this) {
200 -> RequestStatus.SUCCESS
201 -> RequestStatus.WARNING
in 400 .. 999 -> RequestStatus.ERROR
else -> RequestStatus.UNKNOWN
}
}
fun HttpStatusCode.asRequestStatus(): RequestStatus {
return this.value().asRequestStatus()
}

View File

@ -20,20 +20,21 @@
package dev.dnpm.etl.processor.output
import dev.dnpm.etl.processor.config.RestTargetProperties
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.http.HttpEntity
import org.springframework.http.HttpHeaders
import org.springframework.http.MediaType
import org.springframework.web.client.RestClientException
import org.springframework.web.client.RestTemplate
import org.springframework.web.util.UriComponentsBuilder
class RestMtbFileSender(private val restTargetProperties: RestTargetProperties) : MtbFileSender {
class RestMtbFileSender(
private val restTemplate: RestTemplate,
private val restTargetProperties: RestTargetProperties
) : MtbFileSender {
private val logger = LoggerFactory.getLogger(RestMtbFileSender::class.java)
private val restTemplate = RestTemplate()
override fun send(request: MtbFileSender.MtbFileRequest): MtbFileSender.Response {
try {
val headers = HttpHeaders()
@ -46,21 +47,17 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
)
if (!response.statusCode.is2xxSuccessful) {
logger.warn("Error sending to remote system: {}", response.body)
return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Status-Code: ${response.statusCode.value()}")
return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}")
}
logger.debug("Sent file via RestMtbFileSender")
return if (response.body?.contains("warning") == true) {
MtbFileSender.Response(MtbFileSender.ResponseStatus.WARNING, "${response.body}")
} else {
MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
}
return MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty())
} catch (e: IllegalArgumentException) {
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
} catch (e: RestClientException) {
logger.info(restTargetProperties.uri!!.toString())
logger.error("Cannot send data to remote system", e)
}
return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
}
override fun send(request: MtbFileSender.DeleteRequest): MtbFileSender.Response {
@ -74,14 +71,14 @@ class RestMtbFileSender(private val restTargetProperties: RestTargetProperties)
String::class.java
)
logger.debug("Sent file via RestMtbFileSender")
return MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
return MtbFileSender.Response(RequestStatus.SUCCESS)
} catch (e: IllegalArgumentException) {
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
} catch (e: RestClientException) {
logger.info(restTargetProperties.uri!!.toString())
logger.error("Cannot send data to remote system", e)
}
return MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
return MtbFileSender.Response(RequestStatus.ERROR, "Sonstiger Fehler bei der Übertragung")
}
}

View File

@ -19,7 +19,6 @@
package dev.dnpm.etl.processor.pseudonym
import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.config.PseudonymizeConfigProperties
class PseudonymizeService(
@ -27,38 +26,11 @@ class PseudonymizeService(
private val configProperties: PseudonymizeConfigProperties
) {
fun pseudonymize(mtbFile: MtbFile): MtbFile {
val patientPseudonym = patientPseudonym(mtbFile.patient.id)
mtbFile.episode.patient = patientPseudonym
mtbFile.carePlans.forEach { it.patient = patientPseudonym }
mtbFile.patient.id = patientPseudonym
mtbFile.claims.forEach { it.patient = patientPseudonym }
mtbFile.consent.patient = patientPseudonym
mtbFile.claimResponses.forEach { it.patient = patientPseudonym }
mtbFile.diagnoses.forEach { it.patient = patientPseudonym }
mtbFile.ecogStatus.forEach { it.patient = patientPseudonym }
mtbFile.familyMemberDiagnoses.forEach { it.patient = patientPseudonym }
mtbFile.geneticCounsellingRequests.forEach { it.patient = patientPseudonym }
mtbFile.histologyReevaluationRequests.forEach { it.patient = patientPseudonym }
mtbFile.histologyReports.forEach { it.patient = patientPseudonym }
mtbFile.lastGuidelineTherapies.forEach { it.patient = patientPseudonym }
mtbFile.molecularPathologyFindings.forEach { it.patient = patientPseudonym }
mtbFile.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } }
mtbFile.ngsReports.forEach { it.patient = patientPseudonym }
mtbFile.previousGuidelineTherapies.forEach { it.patient = patientPseudonym }
mtbFile.rebiopsyRequests.forEach { it.patient = patientPseudonym }
mtbFile.recommendations.forEach { it.patient = patientPseudonym }
mtbFile.recommendations.forEach { it.patient = patientPseudonym }
mtbFile.responses.forEach { it.patient = patientPseudonym }
mtbFile.specimens.forEach { it.patient = patientPseudonym }
mtbFile.specimens.forEach { it.patient = patientPseudonym }
return mtbFile
}
fun patientPseudonym(patientId: String): String {
return "${configProperties.prefix}_${generator.generate(patientId)}"
return when (generator) {
is GpasPseudonymGenerator -> generator.generate(patientId)
else -> "${configProperties.prefix}_${generator.generate(patientId)}"
}
}
}

View File

@ -0,0 +1,50 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.pseudonym
import de.ukw.ccc.bwhc.dto.MtbFile
infix fun MtbFile.pseudonymizeWith(pseudonymizeService: PseudonymizeService) {
val patientPseudonym = pseudonymizeService.patientPseudonym(this.patient.id)
this.episode.patient = patientPseudonym
this.carePlans.forEach { it.patient = patientPseudonym }
this.patient.id = patientPseudonym
this.claims.forEach { it.patient = patientPseudonym }
this.consent.patient = patientPseudonym
this.claimResponses.forEach { it.patient = patientPseudonym }
this.diagnoses.forEach { it.patient = patientPseudonym }
this.ecogStatus.forEach { it.patient = patientPseudonym }
this.familyMemberDiagnoses.forEach { it.patient = patientPseudonym }
this.geneticCounsellingRequests.forEach { it.patient = patientPseudonym }
this.histologyReevaluationRequests.forEach { it.patient = patientPseudonym }
this.histologyReports.forEach { it.patient = patientPseudonym }
this.lastGuidelineTherapies.forEach { it.patient = patientPseudonym }
this.molecularPathologyFindings.forEach { it.patient = patientPseudonym }
this.molecularTherapies.forEach { molecularTherapy -> molecularTherapy.history.forEach { it.patient = patientPseudonym } }
this.ngsReports.forEach { it.patient = patientPseudonym }
this.previousGuidelineTherapies.forEach { it.patient = patientPseudonym }
this.rebiopsyRequests.forEach { it.patient = patientPseudonym }
this.recommendations.forEach { it.patient = patientPseudonym }
this.recommendations.forEach { it.patient = patientPseudonym }
this.responses.forEach { it.patient = patientPseudonym }
this.specimens.forEach { it.patient = patientPseudonym }
this.specimens.forEach { it.patient = patientPseudonym }
}

View File

@ -21,169 +21,117 @@ package dev.dnpm.etl.processor.services
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.monitoring.*
import dev.dnpm.etl.processor.monitoring.Report
import dev.dnpm.etl.processor.monitoring.Request
import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.monitoring.RequestType
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.pseudonym.PseudonymizeService
import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith
import org.apache.commons.codec.binary.Base32
import org.apache.commons.codec.digest.DigestUtils
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Service
import reactor.core.publisher.Sinks
import java.time.Instant
import java.util.*
@Service
class RequestProcessor(
private val pseudonymizeService: PseudonymizeService,
private val senders: List<MtbFileSender>,
private val requestRepository: RequestRepository,
private val sender: MtbFileSender,
private val requestService: RequestService,
private val objectMapper: ObjectMapper,
private val statisticsUpdateProducer: Sinks.Many<Any>
private val applicationEventPublisher: ApplicationEventPublisher
) {
private val logger = LoggerFactory.getLogger(RequestProcessor::class.java)
fun processMtbFile(mtbFile: MtbFile): RequestStatus {
fun processMtbFile(mtbFile: MtbFile) {
val requestId = UUID.randomUUID().toString()
val pid = mtbFile.patient.id
val pseudonymized = pseudonymizeService.pseudonymize(mtbFile)
val lastRequestForPatient =
requestRepository.findAllByPatientIdOrderByProcessedAtDesc(pseudonymized.patient.id)
.firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING }
mtbFile pseudonymizeWith pseudonymizeService
if (null != lastRequestForPatient && lastRequestForPatient.fingerprint == fingerprint(mtbFile)) {
requestRepository.save(
Request(
patientId = pseudonymized.patient.id,
pid = pid,
fingerprint = fingerprint(mtbFile),
status = RequestStatus.DUPLICATION,
type = RequestType.MTB_FILE,
report = Report("Duplikat erkannt - keine Daten weitergeleitet")
)
)
statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
return RequestStatus.DUPLICATION
}
val request = MtbFileSender.MtbFileRequest(requestId, mtbFile)
val request = MtbFileSender.MtbFileRequest(UUID.randomUUID().toString(), pseudonymized)
val responses = senders.map {
val responseStatus = it.send(request)
if (responseStatus.status == MtbFileSender.ResponseStatus.SUCCESS || responseStatus.status == MtbFileSender.ResponseStatus.WARNING) {
logger.info(
"Sent file for Patient '{}' using '{}'",
pseudonymized.patient.id,
it.javaClass.simpleName
)
} else {
logger.error(
"Error sending file for Patient '{}' using '{}'",
pseudonymized.patient.id,
it.javaClass.simpleName
)
}
responseStatus
}
val requestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) {
RequestStatus.ERROR
} else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.WARNING)) {
RequestStatus.WARNING
} else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) {
RequestStatus.SUCCESS
} else {
RequestStatus.UNKNOWN
}
requestRepository.save(
requestService.save(
Request(
uuid = request.requestId,
uuid = requestId,
patientId = request.mtbFile.patient.id,
pid = pid,
fingerprint = fingerprint(request.mtbFile),
status = requestStatus,
type = RequestType.MTB_FILE,
report = when (requestStatus) {
RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar")
RequestStatus.WARNING -> Report("Warnungen über mangelhafte Daten",
responses.joinToString("\n") { it.reason })
RequestStatus.UNKNOWN -> Report("Keine Informationen")
else -> null
}
status = RequestStatus.UNKNOWN,
type = RequestType.MTB_FILE
)
)
statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
if (isDuplication(mtbFile)) {
applicationEventPublisher.publishEvent(
ResponseEvent(
requestId,
Instant.now(),
RequestStatus.DUPLICATION
)
)
return
}
return requestStatus
val responseStatus = sender.send(request)
applicationEventPublisher.publishEvent(
ResponseEvent(
requestId,
Instant.now(),
responseStatus.status,
when (responseStatus.status) {
RequestStatus.WARNING -> Optional.of(responseStatus.body)
else -> Optional.empty()
}
)
)
}
fun processDeletion(patientId: String): RequestStatus {
private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean {
val lastMtbFileRequestForPatient =
requestService.lastMtbFileRequestForPatientPseudonym(pseudonymizedMtbFile.patient.id)
val isLastRequestDeletion = requestService.isLastRequestWithKnownStatusDeletion(pseudonymizedMtbFile.patient.id)
return null != lastMtbFileRequestForPatient
&& !isLastRequestDeletion
&& lastMtbFileRequestForPatient.fingerprint == fingerprint(pseudonymizedMtbFile)
}
fun processDeletion(patientId: String) {
val requestId = UUID.randomUUID().toString()
try {
val patientPseudonym = pseudonymizeService.patientPseudonym(patientId)
val responses = senders.map {
val responseStatus = it.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym))
when (responseStatus.status) {
MtbFileSender.ResponseStatus.SUCCESS -> {
logger.info(
"Sent delete for Patient '{}' using '{}'",
patientPseudonym,
it.javaClass.simpleName
)
}
MtbFileSender.ResponseStatus.ERROR -> {
logger.error(
"Error deleting data for Patient '{}' using '{}'",
patientPseudonym,
it.javaClass.simpleName
)
}
else -> {
logger.error(
"Unknown result on deleting data for Patient '{}' using '{}'",
patientPseudonym,
it.javaClass.simpleName
)
}
}
responseStatus
}
val overallRequestStatus = if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.ERROR)) {
RequestStatus.ERROR
} else if (responses.map { it.status }.contains(MtbFileSender.ResponseStatus.SUCCESS)) {
RequestStatus.SUCCESS
} else {
RequestStatus.UNKNOWN
}
requestRepository.save(
requestService.save(
Request(
uuid = requestId,
patientId = patientPseudonym,
pid = patientId,
fingerprint = fingerprint(patientPseudonym),
status = overallRequestStatus,
type = RequestType.DELETE,
report = when (overallRequestStatus) {
RequestStatus.ERROR -> Report("Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar")
RequestStatus.UNKNOWN -> Report("Keine Informationen")
else -> null
status = RequestStatus.UNKNOWN,
type = RequestType.DELETE
)
)
val responseStatus = sender.send(MtbFileSender.DeleteRequest(requestId, patientPseudonym))
applicationEventPublisher.publishEvent(
ResponseEvent(
requestId,
Instant.now(),
responseStatus.status,
when (responseStatus.status) {
RequestStatus.WARNING, RequestStatus.ERROR -> Optional.of(responseStatus.body)
else -> Optional.empty()
}
)
)
statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
return overallRequestStatus
} catch (e: Exception) {
requestRepository.save(
requestService.save(
Request(
uuid = requestId,
patientId = "???",
@ -194,10 +142,6 @@ class RequestProcessor(
report = Report("Fehler bei der Pseudonymisierung")
)
)
statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
return RequestStatus.ERROR
}
}

View File

@ -0,0 +1,57 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.services
import dev.dnpm.etl.processor.monitoring.Request
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.monitoring.RequestType
import org.springframework.stereotype.Service
@Service
class RequestService(
private val requestRepository: RequestRepository
) {
fun save(request: Request) = requestRepository.save(request)
fun allRequestsByPatientPseudonym(patientPseudonym: String) = requestRepository
.findAllByPatientIdOrderByProcessedAtDesc(patientPseudonym)
fun lastMtbFileRequestForPatientPseudonym(patientPseudonym: String) =
Companion.lastMtbFileRequestForPatientPseudonym(allRequestsByPatientPseudonym(patientPseudonym))
fun isLastRequestWithKnownStatusDeletion(patientPseudonym: String) =
Companion.isLastRequestWithKnownStatusDeletion(allRequestsByPatientPseudonym(patientPseudonym))
companion object {
fun lastMtbFileRequestForPatientPseudonym(allRequests: List<Request>) = allRequests
.filter { it.type == RequestType.MTB_FILE }
.sortedByDescending { it.processedAt }
.firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING }
fun isLastRequestWithKnownStatusDeletion(allRequests: List<Request>) = allRequests
.filter { it.status != RequestStatus.UNKNOWN }
.maxByOrNull { it.processedAt }?.type == RequestType.DELETE
}
}

View File

@ -0,0 +1,94 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.services
import dev.dnpm.etl.processor.monitoring.Report
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.slf4j.LoggerFactory
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Service
import reactor.core.publisher.Sinks
import java.time.Instant
import java.util.*
@Service
class ResponseProcessor(
private val requestRepository: RequestRepository,
private val statisticsUpdateProducer: Sinks.Many<Any>
) {
private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java)
@EventListener(classes = [ResponseEvent::class])
fun handleResponseEvent(event: ResponseEvent) {
requestRepository.findByUuidEquals(event.requestUuid).ifPresentOrElse({
it.processedAt = event.timestamp
it.status = event.status
when (event.status) {
RequestStatus.SUCCESS -> {
it.report = Report(
"Keine Probleme erkannt",
)
}
RequestStatus.WARNING -> {
it.report = Report(
"Warnungen über mangelhafte Daten",
event.body.orElse("")
)
}
RequestStatus.ERROR -> {
it.report = Report(
"Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar",
event.body.orElse("")
)
}
RequestStatus.DUPLICATION -> {
it.report = Report(
"Duplikat erkannt"
)
}
else -> {
logger.error("Cannot process response: Unknown response!")
return@ifPresentOrElse
}
}
requestRepository.save(it)
statisticsUpdateProducer.emitNext("", Sinks.EmitFailureHandler.FAIL_FAST)
}, {
logger.error("Response for unknown request '${event.requestUuid}'!")
})
}
}
data class ResponseEvent(
val requestUuid: String,
val timestamp: Instant,
val status: RequestStatus,
val body: Optional<String> = Optional.empty()
)

View File

@ -0,0 +1,80 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.services.kafka
import com.fasterxml.jackson.annotation.JsonAlias
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.output.asRequestStatus
import dev.dnpm.etl.processor.services.ResponseEvent
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationEventPublisher
import org.springframework.kafka.listener.MessageListener
import java.time.Instant
import java.util.*
class KafkaResponseProcessor(
private val eventPublisher: ApplicationEventPublisher,
private val objectMapper: ObjectMapper
) : MessageListener<String, String> {
private val logger = LoggerFactory.getLogger(KafkaResponseProcessor::class.java)
override fun onMessage(data: ConsumerRecord<String, String>) {
try {
Optional.of(objectMapper.readValue(data.value(), ResponseBody::class.java))
} catch (e: Exception) {
logger.error("Cannot process Kafka response", e)
Optional.empty()
}.ifPresentOrElse({ responseBody ->
val event = ResponseEvent(
responseBody.requestId,
Instant.ofEpochMilli(data.timestamp()),
responseBody.statusCode.asRequestStatus(),
when (responseBody.statusCode.asRequestStatus()) {
RequestStatus.SUCCESS -> {
Optional.empty()
}
RequestStatus.WARNING, RequestStatus.ERROR -> {
Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
}
else -> {
logger.error("Kafka response: Unknown response code '{}'!", responseBody.statusCode)
Optional.empty()
}
}
)
eventPublisher.publishEvent(event)
}, {
logger.error("No requestId in Kafka response")
})
}
data class ResponseBody(
@JsonProperty("request_id") @JsonAlias("requestId") val requestId: String,
@JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int,
@JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map<String, Any>
)
}

View File

@ -19,40 +19,37 @@
package dev.dnpm.etl.processor.web
import de.ukw.ccc.bwhc.dto.Consent
import de.ukw.ccc.bwhc.dto.MtbFile
import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.services.RequestProcessor
import org.slf4j.LoggerFactory
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*
@RestController
class MtbFileController(
class MtbFileRestController(
private val requestProcessor: RequestProcessor,
) {
private val logger = LoggerFactory.getLogger(MtbFileController::class.java)
private val logger = LoggerFactory.getLogger(MtbFileRestController::class.java)
@PostMapping(path = ["/mtbfile"])
fun mtbFile(@RequestBody mtbFile: MtbFile): ResponseEntity<Void> {
val requestStatus = requestProcessor.processMtbFile(mtbFile)
return if (requestStatus == RequestStatus.ERROR) {
ResponseEntity.unprocessableEntity().build()
if (mtbFile.consent.status == Consent.Status.ACTIVE) {
logger.debug("Accepted MTB File for processing")
requestProcessor.processMtbFile(mtbFile)
} else {
ResponseEntity.noContent().build()
logger.debug("Accepted MTB File and process deletion")
requestProcessor.processDeletion(mtbFile.patient.id)
}
return ResponseEntity.accepted().build()
}
@DeleteMapping(path = ["/mtbfile/{patientId}"])
fun deleteData(@PathVariable patientId: String): ResponseEntity<Void> {
val requestStatus = requestProcessor.processDeletion(patientId)
return if (requestStatus == RequestStatus.ERROR) {
ResponseEntity.unprocessableEntity().build()
} else {
ResponseEntity.noContent().build()
}
logger.debug("Accepted patient ID to process deletion")
requestProcessor.processDeletion(patientId)
return ResponseEntity.accepted().build()
}
}

View File

@ -83,9 +83,9 @@ class StatisticsRestController(
.groupBy { formatter.format(it.processedAt) }
.map {
val requestList = it.value
.groupBy { it.status }
.map {
Pair(it.key, it.value.size)
.groupBy { request -> request.status }
.map { request ->
Pair(request.key, request.value.size)
}
.toMap()
Pair(

View File

@ -4,11 +4,15 @@ spring:
file: ./dev-compose.yml
app:
rest:
uri: http://localhost:9000/bwhc/etl/api/MTBFile
#kafka:
# topic: test
# servers: kafka:9092
#rest:
# uri: http://localhost:9000/bwhc/etl/api
# Note: Make sure, hostname "kafka" points to 127.0.0.1
# otherwise connection will not be available
kafka:
topic: test
response-topic: test_response
servers: kafka:9092
server:
port: 8000

View File

@ -1,7 +1,7 @@
spring:
kafka:
bootstrap-servers: ${app.kafka.servers}
template:
default-topic: ${app.kafka.topic}
consumer:
group-id: ${app.kafka.group-id}
flyway:
locations: "classpath:db/migration/{vendor}"

View File

@ -0,0 +1,173 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.output
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.*
import dev.dnpm.etl.processor.config.KafkaTargetProperties
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.mockito.ArgumentMatchers.anyString
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.*
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.SendResult
import java.util.concurrent.CompletableFuture.completedFuture
import java.util.concurrent.ExecutionException
@ExtendWith(MockitoExtension::class)
class KafkaMtbFileSenderTest {
private lateinit var kafkaTemplate: KafkaTemplate<String, String>
private lateinit var kafkaMtbFileSender: KafkaMtbFileSender
private lateinit var objectMapper: ObjectMapper
@BeforeEach
fun setup(
@Mock kafkaTemplate: KafkaTemplate<String, String>
) {
val kafkaTargetProperties = KafkaTargetProperties("testtopic")
this.objectMapper = ObjectMapper()
this.kafkaTemplate = kafkaTemplate
this.kafkaMtbFileSender = KafkaMtbFileSender(kafkaTemplate, kafkaTargetProperties, objectMapper)
}
@ParameterizedTest
@MethodSource("requestWithResponseSource")
fun shouldSendMtbFileRequestAndReturnExpectedState(testData: TestData) {
doAnswer {
if (null != testData.exception) {
throw testData.exception
}
completedFuture(SendResult<String, String>(null, null))
}.whenever(kafkaTemplate).send(anyString(), anyString(), anyString())
val response = kafkaMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile(Consent.Status.ACTIVE)))
assertThat(response.status).isEqualTo(testData.requestStatus)
}
@ParameterizedTest
@MethodSource("requestWithResponseSource")
fun shouldSendDeleteRequestAndReturnExpectedState(testData: TestData) {
doAnswer {
if (null != testData.exception) {
throw testData.exception
}
completedFuture(SendResult<String, String>(null, null))
}.whenever(kafkaTemplate).send(anyString(), anyString(), anyString())
val response = kafkaMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID"))
assertThat(response.status).isEqualTo(testData.requestStatus)
}
@Test
fun shouldSendMtbFileRequestWithCorrectKeyAndBody() {
doAnswer {
completedFuture(SendResult<String, String>(null, null))
}.whenever(kafkaTemplate).send(anyString(), anyString(), anyString())
kafkaMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile(Consent.Status.ACTIVE)))
val captor = argumentCaptor<String>()
verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture())
assertThat(captor.firstValue).isNotNull
assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\"}")
assertThat(captor.secondValue).isNotNull
assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.ACTIVE)))
}
@Test
fun shouldSendDeleteRequestWithCorrectKeyAndBody() {
doAnswer {
completedFuture(SendResult<String, String>(null, null))
}.whenever(kafkaTemplate).send(anyString(), anyString(), anyString())
kafkaMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID"))
val captor = argumentCaptor<String>()
verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture())
assertThat(captor.firstValue).isNotNull
assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\"}")
assertThat(captor.secondValue).isNotNull
assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.REJECTED)))
}
companion object {
fun mtbFile(consentStatus: Consent.Status): MtbFile {
return if (consentStatus == Consent.Status.ACTIVE) {
MtbFile.builder()
.withPatient(
Patient.builder()
.withId("PID")
.withBirthDate("2000-08-08")
.withGender(Patient.Gender.MALE)
.build()
)
.withConsent(
Consent.builder()
.withId("1")
.withStatus(consentStatus)
.withPatient("PID")
.build()
)
.withEpisode(
Episode.builder()
.withId("1")
.withPatient("PID")
.withPeriod(PeriodStart("2023-08-08"))
.build()
)
} else {
MtbFile.builder()
.withConsent(
Consent.builder()
.withStatus(consentStatus)
.withPatient("PID")
.build()
)
}.build()
}
fun kafkaRecordData(requestId: String, consentStatus: Consent.Status): KafkaMtbFileSender.Data {
return KafkaMtbFileSender.Data(requestId, mtbFile(consentStatus))
}
data class TestData(val requestStatus: RequestStatus, val exception: Throwable? = null)
@JvmStatic
fun requestWithResponseSource(): Set<TestData> {
return setOf(
TestData(RequestStatus.UNKNOWN),
TestData(RequestStatus.ERROR, InterruptedException("Test interrupted")),
TestData(RequestStatus.ERROR, ExecutionException(RuntimeException("Test execution aborted")))
)
}
}
}

View File

@ -0,0 +1,195 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.output
import de.ukw.ccc.bwhc.dto.*
import dev.dnpm.etl.processor.config.RestTargetProperties
import dev.dnpm.etl.processor.monitoring.RequestStatus
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.test.web.client.MockRestServiceServer
import org.springframework.test.web.client.match.MockRestRequestMatchers.method
import org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo
import org.springframework.test.web.client.response.MockRestResponseCreators.withStatus
import org.springframework.web.client.RestTemplate
class RestMtbFileSenderTest {
private lateinit var mockRestServiceServer: MockRestServiceServer
private lateinit var restMtbFileSender: RestMtbFileSender
@BeforeEach
fun setup() {
val restTemplate = RestTemplate()
val restTargetProperties = RestTargetProperties("http://localhost:9000/mtbfile")
this.mockRestServiceServer = MockRestServiceServer.createServer(restTemplate)
this.restMtbFileSender = RestMtbFileSender(restTemplate, restTargetProperties)
}
@ParameterizedTest
@MethodSource("deleteRequestWithResponseSource")
fun shouldReturnExpectedResponseForDelete(requestWithResponse: RequestWithResponse) {
this.mockRestServiceServer.expect {
method(HttpMethod.DELETE)
requestTo("/mtbfile")
}.andRespond {
withStatus(requestWithResponse.httpStatus).body(requestWithResponse.body).createResponse(it)
}
val response = restMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID"))
assertThat(response.status).isEqualTo(requestWithResponse.response.status)
assertThat(response.body).isEqualTo(requestWithResponse.response.body)
}
@ParameterizedTest
@MethodSource("mtbFileRequestWithResponseSource")
fun shouldReturnExpectedResponseForMtbFilePost(requestWithResponse: RequestWithResponse) {
this.mockRestServiceServer.expect {
method(HttpMethod.POST)
requestTo("/mtbfile")
}.andRespond {
withStatus(requestWithResponse.httpStatus).body(requestWithResponse.body).createResponse(it)
}
val response = restMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile))
assertThat(response.status).isEqualTo(requestWithResponse.response.status)
assertThat(response.body).isEqualTo(requestWithResponse.response.body)
}
companion object {
data class RequestWithResponse(
val httpStatus: HttpStatus,
val body: String,
val response: MtbFileSender.Response
)
private val warningBody = """
{
"patient_id": "PID",
"issues": [
{ "severity": "warning", "message": "Something is not right" }
]
}
""".trimIndent()
private val errorBody = """
{
"patient_id": "PID",
"issues": [
{ "severity": "error", "message": "Something is very bad" }
]
}
""".trimIndent()
val mtbFile: MtbFile = MtbFile.builder()
.withPatient(
Patient.builder()
.withId("PID")
.withBirthDate("2000-08-08")
.withGender(Patient.Gender.MALE)
.build()
)
.withConsent(
Consent.builder()
.withId("1")
.withStatus(Consent.Status.ACTIVE)
.withPatient("PID")
.build()
)
.withEpisode(
Episode.builder()
.withId("1")
.withPatient("PID")
.withPeriod(PeriodStart("2023-08-08"))
.build()
)
.build()
private const val ERROR_RESPONSE_BODY = "Sonstiger Fehler bei der Übertragung"
/**
* Synthetic http responses with related request status
* Also see: https://ibmi-intra.cs.uni-tuebingen.de/display/ZPM/bwHC+REST+API
*/
@JvmStatic
fun mtbFileRequestWithResponseSource(): Set<RequestWithResponse> {
return setOf(
RequestWithResponse(HttpStatus.OK, "{}", MtbFileSender.Response(RequestStatus.SUCCESS, "{}")),
RequestWithResponse(
HttpStatus.CREATED,
warningBody,
MtbFileSender.Response(RequestStatus.WARNING, warningBody)
),
RequestWithResponse(
HttpStatus.BAD_REQUEST,
"??",
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
),
RequestWithResponse(
HttpStatus.UNPROCESSABLE_ENTITY,
errorBody,
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
),
// Some more errors not mentioned in documentation
RequestWithResponse(
HttpStatus.NOT_FOUND,
"what????",
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
),
RequestWithResponse(
HttpStatus.INTERNAL_SERVER_ERROR,
"what????",
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
)
)
}
/**
* Synthetic http responses with related request status
* Also see: https://ibmi-intra.cs.uni-tuebingen.de/display/ZPM/bwHC+REST+API
*/
@JvmStatic
fun deleteRequestWithResponseSource(): Set<RequestWithResponse> {
return setOf(
RequestWithResponse(HttpStatus.OK, "", MtbFileSender.Response(RequestStatus.SUCCESS)),
// Some more errors not mentioned in documentation
RequestWithResponse(
HttpStatus.NOT_FOUND,
"what????",
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
),
RequestWithResponse(
HttpStatus.INTERNAL_SERVER_ERROR,
"what????",
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
)
)
}
}
}

View File

@ -0,0 +1,86 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.pseudonym
import de.ukw.ccc.bwhc.dto.*
import dev.dnpm.etl.processor.config.PseudonymizeConfigProperties
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.ArgumentMatchers.anyString
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.doAnswer
import org.mockito.kotlin.whenever
@ExtendWith(MockitoExtension::class)
class PseudonymizeServiceTest {
private val mtbFile = MtbFile.builder()
.withPatient(
Patient.builder()
.withId("123")
.withBirthDate("2000-08-08")
.withGender(Patient.Gender.MALE)
.build()
)
.withConsent(
Consent.builder()
.withId("1")
.withStatus(Consent.Status.ACTIVE)
.withPatient("123")
.build()
)
.withEpisode(
Episode.builder()
.withId("1")
.withPatient("123")
.withPeriod(PeriodStart("2023-08-08"))
.build()
)
.build()
@Test
fun shouldNotUsePseudonymPrefixForGpas(@Mock generator: GpasPseudonymGenerator) {
doAnswer {
it.arguments[0]
}.whenever(generator).generate(anyString())
val pseudonymizeService = PseudonymizeService(generator, PseudonymizeConfigProperties())
mtbFile.pseudonymizeWith(pseudonymizeService)
assertThat(mtbFile.patient.id).isEqualTo("123")
}
@Test
fun shouldUsePseudonymPrefixForBuiltin(@Mock generator: AnonymizingGenerator) {
doAnswer {
it.arguments[0]
}.whenever(generator).generate(anyString())
val pseudonymizeService = PseudonymizeService(generator, PseudonymizeConfigProperties())
mtbFile.pseudonymizeWith(pseudonymizeService)
assertThat(mtbFile.patient.id).isEqualTo("UNKNOWN_123")
}
}

View File

@ -0,0 +1,70 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.services
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import dev.dnpm.etl.processor.monitoring.ReportService
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
class ReportServiceTest {
private lateinit var reportService: ReportService
@BeforeEach
fun setup() {
this.reportService = ReportService(ObjectMapper().registerModule(KotlinModule.Builder().build()))
}
@Test
fun shouldParseDataQualityReport() {
val json = """
{
"patient": "4711",
"issues": [
{ "severity": "warning", "message": "Warning Message" },
{ "severity": "error", "message": "Error Message" }
]
}
""".trimIndent()
val actual = this.reportService.deserialize(json)
assertThat(actual).hasSize(2)
assertThat(actual[0].severity).isEqualTo(ReportService.Severity.WARNING)
assertThat(actual[0].message).isEqualTo("Warning Message")
assertThat(actual[1].severity).isEqualTo(ReportService.Severity.ERROR)
assertThat(actual[1].message).isEqualTo("Error Message")
}
@Test
fun shouldReturnSyntheticDataQualityReportOnParserError() {
val invalidResponse = "Invalid Response Data"
val actual = this.reportService.deserialize(invalidResponse)
assertThat(actual).hasSize(1)
assertThat(actual[0].severity).isEqualTo(ReportService.Severity.ERROR)
assertThat(actual[0].message).isEqualTo("Not parsable data quality report '$invalidResponse'")
}
}

View File

@ -0,0 +1,372 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.services
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.*
import dev.dnpm.etl.processor.monitoring.Request
import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.monitoring.RequestType
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.output.RestMtbFileSender
import dev.dnpm.etl.processor.pseudonym.PseudonymizeService
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.ArgumentMatchers.anyString
import org.mockito.Mock
import org.mockito.Mockito.*
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.any
import org.mockito.kotlin.argumentCaptor
import org.springframework.context.ApplicationEventPublisher
import java.time.Instant
import java.util.*
@ExtendWith(MockitoExtension::class)
class RequestProcessorTest {
private lateinit var pseudonymizeService: PseudonymizeService
private lateinit var sender: MtbFileSender
private lateinit var requestService: RequestService
private lateinit var applicationEventPublisher: ApplicationEventPublisher
private lateinit var requestProcessor: RequestProcessor
@BeforeEach
fun setup(
@Mock pseudonymizeService: PseudonymizeService,
@Mock sender: RestMtbFileSender,
@Mock requestService: RequestService,
@Mock applicationEventPublisher: ApplicationEventPublisher
) {
this.pseudonymizeService = pseudonymizeService
this.sender = sender
this.requestService = requestService
this.applicationEventPublisher = applicationEventPublisher
val objectMapper = ObjectMapper()
requestProcessor = RequestProcessor(
pseudonymizeService,
sender,
requestService,
objectMapper,
applicationEventPublisher
)
}
@Test
fun testShouldSendMtbFileDuplicationAndSaveUnknownRequestStatusAtFirst() {
doAnswer {
Request(
id = 1L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "xrysxpozhbs2lnrjgf3yq4fzj33kxr7xr5c2cbuskmelfdmckl3a",
type = RequestType.MTB_FILE,
status = RequestStatus.SUCCESS,
processedAt = Instant.parse("2023-08-08T02:00:00Z")
)
}.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString())
doAnswer {
false
}.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
doAnswer {
it.arguments[0] as String
}.`when`(pseudonymizeService).patientPseudonym(any())
val mtbFile = MtbFile.builder()
.withPatient(
Patient.builder()
.withId("1")
.withBirthDate("2000-08-08")
.withGender(Patient.Gender.MALE)
.build()
)
.withConsent(
Consent.builder()
.withId("1")
.withStatus(Consent.Status.ACTIVE)
.withPatient("123")
.build()
)
.withEpisode(
Episode.builder()
.withId("1")
.withPatient("1")
.withPeriod(PeriodStart("2023-08-08"))
.build()
)
.build()
this.requestProcessor.processMtbFile(mtbFile)
val requestCaptor = argumentCaptor<Request>()
verify(requestService, times(1)).save(requestCaptor.capture())
assertThat(requestCaptor.firstValue).isNotNull
assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.UNKNOWN)
}
@Test
fun testShouldDetectMtbFileDuplicationAndSendDuplicationEvent() {
doAnswer {
Request(
id = 1L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "xrysxpozhbs2lnrjgf3yq4fzj33kxr7xr5c2cbuskmelfdmckl3a",
type = RequestType.MTB_FILE,
status = RequestStatus.SUCCESS,
processedAt = Instant.parse("2023-08-08T02:00:00Z")
)
}.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString())
doAnswer {
false
}.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
doAnswer {
it.arguments[0] as String
}.`when`(pseudonymizeService).patientPseudonym(any())
val mtbFile = MtbFile.builder()
.withPatient(
Patient.builder()
.withId("1")
.withBirthDate("2000-08-08")
.withGender(Patient.Gender.MALE)
.build()
)
.withConsent(
Consent.builder()
.withId("1")
.withStatus(Consent.Status.ACTIVE)
.withPatient("123")
.build()
)
.withEpisode(
Episode.builder()
.withId("1")
.withPatient("1")
.withPeriod(PeriodStart("2023-08-08"))
.build()
)
.build()
this.requestProcessor.processMtbFile(mtbFile)
val eventCaptor = argumentCaptor<ResponseEvent>()
verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture())
assertThat(eventCaptor.firstValue).isNotNull
assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.DUPLICATION)
}
@Test
fun testShouldSendMtbFileAndSendSuccessEvent() {
doAnswer {
Request(
id = 1L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "different",
type = RequestType.MTB_FILE,
status = RequestStatus.SUCCESS,
processedAt = Instant.parse("2023-08-08T02:00:00Z")
)
}.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString())
doAnswer {
false
}.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
doAnswer {
MtbFileSender.Response(status = RequestStatus.SUCCESS)
}.`when`(sender).send(any<MtbFileSender.MtbFileRequest>())
doAnswer {
it.arguments[0] as String
}.`when`(pseudonymizeService).patientPseudonym(any())
val mtbFile = MtbFile.builder()
.withPatient(
Patient.builder()
.withId("1")
.withBirthDate("2000-08-08")
.withGender(Patient.Gender.MALE)
.build()
)
.withConsent(
Consent.builder()
.withId("1")
.withStatus(Consent.Status.ACTIVE)
.withPatient("123")
.build()
)
.withEpisode(
Episode.builder()
.withId("1")
.withPatient("1")
.withPeriod(PeriodStart("2023-08-08"))
.build()
)
.build()
this.requestProcessor.processMtbFile(mtbFile)
val eventCaptor = argumentCaptor<ResponseEvent>()
verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture())
assertThat(eventCaptor.firstValue).isNotNull
assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS)
}
@Test
fun testShouldSendMtbFileAndSendErrorEvent() {
doAnswer {
Request(
id = 1L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "different",
type = RequestType.MTB_FILE,
status = RequestStatus.SUCCESS,
processedAt = Instant.parse("2023-08-08T02:00:00Z")
)
}.`when`(requestService).lastMtbFileRequestForPatientPseudonym(anyString())
doAnswer {
false
}.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
doAnswer {
MtbFileSender.Response(status = RequestStatus.ERROR)
}.`when`(sender).send(any<MtbFileSender.MtbFileRequest>())
doAnswer {
it.arguments[0] as String
}.`when`(pseudonymizeService).patientPseudonym(any())
val mtbFile = MtbFile.builder()
.withPatient(
Patient.builder()
.withId("1")
.withBirthDate("2000-08-08")
.withGender(Patient.Gender.MALE)
.build()
)
.withConsent(
Consent.builder()
.withId("1")
.withStatus(Consent.Status.ACTIVE)
.withPatient("123")
.build()
)
.withEpisode(
Episode.builder()
.withId("1")
.withPatient("1")
.withPeriod(PeriodStart("2023-08-08"))
.build()
)
.build()
this.requestProcessor.processMtbFile(mtbFile)
val eventCaptor = argumentCaptor<ResponseEvent>()
verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture())
assertThat(eventCaptor.firstValue).isNotNull
assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR)
}
@Test
fun testShouldSendDeleteRequestAndSaveUnknownRequestStatusAtFirst() {
doAnswer {
"PSEUDONYM"
}.`when`(pseudonymizeService).patientPseudonym(anyString())
doAnswer {
MtbFileSender.Response(status = RequestStatus.UNKNOWN)
}.`when`(sender).send(any<MtbFileSender.DeleteRequest>())
this.requestProcessor.processDeletion("TEST_12345678901")
val requestCaptor = argumentCaptor<Request>()
verify(requestService, times(1)).save(requestCaptor.capture())
assertThat(requestCaptor.firstValue).isNotNull
assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.UNKNOWN)
}
@Test
fun testShouldSendDeleteRequestAndSendSuccessEvent() {
doAnswer {
"PSEUDONYM"
}.`when`(pseudonymizeService).patientPseudonym(anyString())
doAnswer {
MtbFileSender.Response(status = RequestStatus.SUCCESS)
}.`when`(sender).send(any<MtbFileSender.DeleteRequest>())
this.requestProcessor.processDeletion("TEST_12345678901")
val eventCaptor = argumentCaptor<ResponseEvent>()
verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture())
assertThat(eventCaptor.firstValue).isNotNull
assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.SUCCESS)
}
@Test
fun testShouldSendDeleteRequestAndSendErrorEvent() {
doAnswer {
"PSEUDONYM"
}.`when`(pseudonymizeService).patientPseudonym(anyString())
doAnswer {
MtbFileSender.Response(status = RequestStatus.ERROR)
}.`when`(sender).send(any<MtbFileSender.DeleteRequest>())
this.requestProcessor.processDeletion("TEST_12345678901")
val eventCaptor = argumentCaptor<ResponseEvent>()
verify(applicationEventPublisher, times(1)).publishEvent(eventCaptor.capture())
assertThat(eventCaptor.firstValue).isNotNull
assertThat(eventCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR)
}
@Test
fun testShouldSendDeleteRequestWithPseudonymErrorAndSaveErrorRequestStatus() {
doThrow(RuntimeException()).`when`(pseudonymizeService).patientPseudonym(anyString())
this.requestProcessor.processDeletion("TEST_12345678901")
val requestCaptor = argumentCaptor<Request>()
verify(requestService, times(1)).save(requestCaptor.capture())
assertThat(requestCaptor.firstValue).isNotNull
assertThat(requestCaptor.firstValue.status).isEqualTo(RequestStatus.ERROR)
}
}

View File

@ -0,0 +1,225 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.services
import dev.dnpm.etl.processor.monitoring.Request
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.monitoring.RequestType
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.Mock
import org.mockito.Mockito.*
import org.mockito.junit.jupiter.MockitoExtension
import java.time.Instant
import java.util.*
@ExtendWith(MockitoExtension::class)
class RequestServiceTest {
private lateinit var requestRepository: RequestRepository
private lateinit var requestService: RequestService
private fun anyRequest() = any(Request::class.java) ?: Request(
id = 0L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_dummy",
pid = "PX",
fingerprint = "dummy",
type = RequestType.MTB_FILE,
status = RequestStatus.SUCCESS,
processedAt = Instant.parse("2023-08-08T02:00:00Z")
)
@BeforeEach
fun setup(
@Mock requestRepository: RequestRepository
) {
this.requestRepository = requestRepository
this.requestService = RequestService(requestRepository)
}
@Test
fun shouldIndicateLastRequestIsDeleteRequest() {
val requests = listOf(
Request(
id = 1L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdef1",
type = RequestType.MTB_FILE,
status = RequestStatus.WARNING,
processedAt = Instant.parse("2023-07-07T00:00:00Z")
),
Request(
id = 2L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdefd",
type = RequestType.DELETE,
status = RequestStatus.WARNING,
processedAt = Instant.parse("2023-07-07T02:00:00Z")
),
Request(
id = 3L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdef1",
type = RequestType.MTB_FILE,
status = RequestStatus.UNKNOWN,
processedAt = Instant.parse("2023-08-11T00:00:00Z")
)
)
val actual = RequestService.isLastRequestWithKnownStatusDeletion(requests)
assertThat(actual).isTrue()
}
@Test
fun shouldIndicateLastRequestIsNotDeleteRequest() {
val requests = listOf(
Request(
id = 1L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdef1",
type = RequestType.MTB_FILE,
status = RequestStatus.WARNING,
processedAt = Instant.parse("2023-07-07T00:00:00Z")
),
Request(
id = 2L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdef1",
type = RequestType.MTB_FILE,
status = RequestStatus.WARNING,
processedAt = Instant.parse("2023-07-07T02:00:00Z")
),
Request(
id = 3L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdef1",
type = RequestType.MTB_FILE,
status = RequestStatus.UNKNOWN,
processedAt = Instant.parse("2023-08-11T00:00:00Z")
)
)
val actual = RequestService.isLastRequestWithKnownStatusDeletion(requests)
assertThat(actual).isFalse()
}
@Test
fun shouldReturnPatientsLastRequest() {
val requests = listOf(
Request(
id = 1L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdef1",
type = RequestType.DELETE,
status = RequestStatus.SUCCESS,
processedAt = Instant.parse("2023-07-07T02:00:00Z")
),
Request(
id = 1L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678902",
pid = "P2",
fingerprint = "0123456789abcdef2",
type = RequestType.MTB_FILE,
status = RequestStatus.WARNING,
processedAt = Instant.parse("2023-08-08T00:00:00Z")
)
)
val actual = RequestService.lastMtbFileRequestForPatientPseudonym(requests)
assertThat(actual).isInstanceOf(Request::class.java)
assertThat(actual?.fingerprint).isEqualTo("0123456789abcdef2")
}
@Test
fun shouldReturnNullIfNoRequests() {
val requests = listOf<Request>()
val actual = RequestService.lastMtbFileRequestForPatientPseudonym(requests)
assertThat(actual).isNull()
}
@Test
fun saveShouldSaveRequestUsingRepository() {
doAnswer {
val obj = it.arguments[0] as Request
obj.copy(id = 1L)
}.`when`(requestRepository).save(anyRequest())
val request = Request(
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdef1",
type = RequestType.DELETE,
status = RequestStatus.SUCCESS,
processedAt = Instant.parse("2023-07-07T02:00:00Z")
)
requestService.save(request)
verify(requestRepository, times(1)).save(anyRequest())
}
@Test
fun allRequestsByPatientPseudonymShouldRequestAllRequestsForPatientPseudonym() {
requestService.allRequestsByPatientPseudonym("TEST_12345678901")
verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString())
}
@Test
fun lastMtbFileRequestForPatientPseudonymShouldRequestAllRequestsForPatientPseudonym() {
requestService.lastMtbFileRequestForPatientPseudonym("TEST_12345678901")
verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString())
}
@Test
fun isLastRequestDeletionShouldRequestAllRequestsForPatientPseudonym() {
requestService.isLastRequestWithKnownStatusDeletion("TEST_12345678901")
verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString())
}
}

View File

@ -0,0 +1,138 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.services
import dev.dnpm.etl.processor.monitoring.Request
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.monitoring.RequestType
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.mockito.ArgumentMatchers.anyString
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.*
import reactor.core.publisher.Sinks
import java.time.Instant
import java.util.*
@ExtendWith(MockitoExtension::class)
class ResponseProcessorTest {
private lateinit var requestRepository: RequestRepository
private lateinit var statisticsUpdateProducer: Sinks.Many<Any>
private lateinit var responseProcessor: ResponseProcessor
private val testRequest = Request(
1L,
"TestID1234",
"PSEUDONYM-A",
"1",
"dummyfingerprint",
RequestType.MTB_FILE,
RequestStatus.UNKNOWN
)
@BeforeEach
fun setup(
@Mock requestRepository: RequestRepository,
@Mock statisticsUpdateProducer: Sinks.Many<Any>
) {
this.requestRepository = requestRepository
this.statisticsUpdateProducer = statisticsUpdateProducer
this.responseProcessor = ResponseProcessor(requestRepository, statisticsUpdateProducer)
}
@Test
fun shouldNotSaveStatusForUnknownRequest() {
doAnswer {
Optional.empty<Request>()
}.whenever(requestRepository).findByUuidEquals(anyString())
val event = ResponseEvent(
"TestID1234",
Instant.parse("2023-09-09T00:00:00Z"),
RequestStatus.SUCCESS
)
this.responseProcessor.handleResponseEvent(event)
verify(requestRepository, never()).save(any())
}
@Test
fun shouldNotSaveStatusWithUnknownState() {
doAnswer {
Optional.of(testRequest)
}.whenever(requestRepository).findByUuidEquals(anyString())
val event = ResponseEvent(
"TestID1234",
Instant.parse("2023-09-09T00:00:00Z"),
RequestStatus.UNKNOWN
)
this.responseProcessor.handleResponseEvent(event)
verify(requestRepository, never()).save(any())
}
@ParameterizedTest
@MethodSource("requestStatusSource")
fun shouldSaveStatusForKnownRequest(requestStatus: RequestStatus) {
doAnswer {
Optional.of(testRequest)
}.whenever(requestRepository).findByUuidEquals(anyString())
val event = ResponseEvent(
"TestID1234",
Instant.parse("2023-09-09T00:00:00Z"),
requestStatus
)
this.responseProcessor.handleResponseEvent(event)
val captor = argumentCaptor<Request>()
verify(requestRepository, times(1)).save(captor.capture())
assertThat(captor.firstValue).isNotNull
assertThat(captor.firstValue.status).isEqualTo(requestStatus)
}
companion object {
@JvmStatic
fun requestStatusSource(): Set<RequestStatus> {
return setOf(
RequestStatus.SUCCESS,
RequestStatus.WARNING,
RequestStatus.ERROR,
RequestStatus.DUPLICATION
)
}
}
}

View File

@ -0,0 +1,149 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.services.kafka
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import dev.dnpm.etl.processor.services.ResponseEvent
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.any
import org.mockito.kotlin.never
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.springframework.context.ApplicationEventPublisher
import org.springframework.http.HttpStatus
@ExtendWith(MockitoExtension::class)
class KafkaResponseProcessorTest {
private lateinit var eventPublisher: ApplicationEventPublisher
private lateinit var objectMapper: ObjectMapper
private lateinit var kafkaResponseProcessor: KafkaResponseProcessor
private fun createKafkaRecord(
requestId: String,
statusCode: Int = 200,
statusBody: Map<String, Any>? = mapOf()
): ConsumerRecord<String, String> {
return ConsumerRecord<String, String>(
"test-topic",
0,
0,
null,
if (statusBody == null) {
""
} else {
this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(requestId, statusCode, statusBody))
}
)
}
@BeforeEach
fun setup(
@Mock eventPublisher: ApplicationEventPublisher
) {
this.eventPublisher = eventPublisher
this.objectMapper = ObjectMapper().registerModule(KotlinModule.Builder().build())
this.kafkaResponseProcessor = KafkaResponseProcessor(eventPublisher, objectMapper)
}
@Test
fun shouldNotProcessRecordsWithoutRequestIdInBody() {
val record = ConsumerRecord<String, String>(
"test-topic",
0,
0,
null,
"""
{
"statusCode": 200,
"statusBody": {}
}
""".trimIndent()
)
this.kafkaResponseProcessor.onMessage(record)
verify(eventPublisher, never()).publishEvent(any<ResponseEvent>())
}
@Test
fun shouldProcessRecordsWithAliasNames() {
val record = ConsumerRecord<String, String>(
"test-topic",
0,
0,
null,
"""
{
"request_id": "test0123456789",
"status_code": 200,
"status_body": {}
}
""".trimIndent()
)
this.kafkaResponseProcessor.onMessage(record)
verify(eventPublisher, times(1)).publishEvent(any<ResponseEvent>())
}
@Test
fun shouldNotProcessRecordsWithoutValidStatusBody() {
this.kafkaResponseProcessor.onMessage(createKafkaRecord(requestId = "TestID1234", statusBody = null))
verify(eventPublisher, never()).publishEvent(any<ResponseEvent>())
}
@ParameterizedTest
@MethodSource("statusCodeSource")
fun shouldProcessValidRecordsWithStatusCode(statusCode: Int) {
this.kafkaResponseProcessor.onMessage(createKafkaRecord("TestID1234", statusCode))
verify(eventPublisher, times(1)).publishEvent(any<ResponseEvent>())
}
companion object {
@JvmStatic
fun statusCodeSource(): Set<Int> {
return setOf(
HttpStatus.OK,
HttpStatus.CREATED,
HttpStatus.BAD_REQUEST,
HttpStatus.NOT_FOUND,
HttpStatus.UNPROCESSABLE_ENTITY,
HttpStatus.INTERNAL_SERVER_ERROR
)
.map { it.value() }
.toSet()
}
}
}

View File

@ -0,0 +1,150 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken, Datenintegrationszentrum Philipps-Universität Marburg and Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package dev.dnpm.etl.processor.web
import com.fasterxml.jackson.databind.ObjectMapper
import de.ukw.ccc.bwhc.dto.*
import dev.dnpm.etl.processor.services.RequestProcessor
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.Mock
import org.mockito.Mockito.times
import org.mockito.Mockito.verify
import org.mockito.junit.jupiter.MockitoExtension
import org.mockito.kotlin.any
import org.mockito.kotlin.argumentCaptor
import org.springframework.http.MediaType
import org.springframework.test.web.servlet.MockMvc
import org.springframework.test.web.servlet.delete
import org.springframework.test.web.servlet.post
import org.springframework.test.web.servlet.setup.MockMvcBuilders
@ExtendWith(MockitoExtension::class)
class MtbFileRestControllerTest {
private lateinit var mockMvc: MockMvc
private lateinit var requestProcessor: RequestProcessor
private val objectMapper = ObjectMapper()
@BeforeEach
fun setup(
@Mock requestProcessor: RequestProcessor
) {
this.requestProcessor = requestProcessor
val controller = MtbFileRestController(requestProcessor)
this.mockMvc = MockMvcBuilders.standaloneSetup(controller).build()
}
@Test
fun shouldProcessMtbFilePostRequest() {
val mtbFile = MtbFile.builder()
.withPatient(
Patient.builder()
.withId("TEST_12345678")
.withBirthDate("2000-08-08")
.withGender(Patient.Gender.MALE)
.build()
)
.withConsent(
Consent.builder()
.withId("1")
.withStatus(Consent.Status.ACTIVE)
.withPatient("TEST_12345678")
.build()
)
.withEpisode(
Episode.builder()
.withId("1")
.withPatient("TEST_12345678")
.withPeriod(PeriodStart("2023-08-08"))
.build()
)
.build()
mockMvc.post("/mtbfile") {
content = objectMapper.writeValueAsString(mtbFile)
contentType = MediaType.APPLICATION_JSON
}.andExpect {
status {
isAccepted()
}
}
verify(requestProcessor, times(1)).processMtbFile(any())
}
@Test
fun shouldProcessMtbFilePostRequestWithRejectedConsent() {
val mtbFile = MtbFile.builder()
.withPatient(
Patient.builder()
.withId("TEST_12345678")
.withBirthDate("2000-08-08")
.withGender(Patient.Gender.MALE)
.build()
)
.withConsent(
Consent.builder()
.withId("1")
.withStatus(Consent.Status.REJECTED)
.withPatient("TEST_12345678")
.build()
)
.withEpisode(
Episode.builder()
.withId("1")
.withPatient("TEST_12345678")
.withPeriod(PeriodStart("2023-08-08"))
.build()
)
.build()
mockMvc.post("/mtbfile") {
content = objectMapper.writeValueAsString(mtbFile)
contentType = MediaType.APPLICATION_JSON
}.andExpect {
status {
isAccepted()
}
}
val captor = argumentCaptor<String>()
verify(requestProcessor, times(1)).processDeletion(captor.capture())
assertThat(captor.firstValue).isEqualTo("TEST_12345678")
}
@Test
fun shouldProcessMtbFileDeleteRequest() {
mockMvc.delete("/mtbfile/TEST_12345678").andExpect {
status {
isAccepted()
}
}
val captor = argumentCaptor<String>()
verify(requestProcessor, times(1)).processDeletion(captor.capture())
assertThat(captor.firstValue).isEqualTo("TEST_12345678")
}
}