mirror of
https://github.com/pcvolkmer/etl-processor.git
synced 2025-07-01 06:02:54 +00:00
Compare commits
34 Commits
Author | SHA1 | Date | |
---|---|---|---|
2f32834de0 | |||
79709caa39 | |||
c52509054d | |||
8fd587c2a3 | |||
edafe30a4b | |||
e24be0d325 | |||
5e93e834ad | |||
5e5bd579fb | |||
a24f869c84 | |||
635985bfd1 | |||
25143745c4 | |||
532254593f | |||
01ff53ab23 | |||
9643c80cc5 | |||
aa40da4995 | |||
da26b5a2c8 | |||
bbea48322f | |||
480f165c7b | |||
3d2c73ff8f | |||
9921e1e684 | |||
5bd26b894c | |||
8dc82225a4 | |||
2eb5cc61b9 | |||
78b2287163 | |||
66dc96680d | |||
64b8636145 | |||
2e7ef25a49 | |||
7186a45f6c | |||
72295202ec | |||
bc48a7217e | |||
0e1034d964 | |||
6ecb439007 | |||
cb9c590472 | |||
a075f73162 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -36,3 +36,4 @@ out/
|
||||
### VS Code ###
|
||||
.vscode/
|
||||
/dev/gpas*
|
||||
/deploy/.env
|
||||
|
109
README.md
109
README.md
@ -1,10 +1,32 @@
|
||||
# ETL-Processor for bwHC data
|
||||
# ETL-Processor for bwHC data [](https://github.com/CCC-MF/etl-processor/actions/workflows/test.yml)
|
||||
|
||||
Diese Anwendung versendet ein bwHC-MTB-File an das bwHC-Backend und pseudonymisiert die Patienten-ID.
|
||||
|
||||
### Einordnung innerhalb einer DNPM-ETL-Strecke
|
||||
|
||||
Diese Anwendung erlaubt das Entgegennehmen HTTP/REST-Anfragen aus dem Onkostar-Plugin **[onkostar-plugin-dnpmexport](https://github.com/CCC-MF/onkostar-plugin-dnpmexport)**.
|
||||
|
||||
Der Inhalt einer Anfrage, wenn ein bwHC-MTBFile, wird pseudonymisiert und auf Duplikate geprüft.
|
||||
Duplikate werden verworfen, Änderungen werden weitergeleitet.
|
||||
|
||||
Löschanfragen werden immer als Löschanfrage an das bwHC-backend weitergeleitet.
|
||||
|
||||

|
||||
|
||||
#### HTTP/REST-Konfiguration
|
||||
|
||||
Anfragen werden, wenn nicht als Duplikat behandelt, nach der Pseudonymisierung direkt an das bwHC-Backend gesendet.
|
||||
|
||||
#### Konfiguration für Apache Kafka
|
||||
|
||||
Anfragen werden, wenn nicht als Duplikat behandelt, nach der Pseudonymisierung an Apache Kafka übergeben.
|
||||
Eine Antwort wird dabei ebenfalls mithilfe von Apache Kafka übermittelt und nach der Entgegennahme verarbeitet.
|
||||
|
||||
Siehe hierzu auch: https://github.com/CCC-MF/kafka-to-bwhc
|
||||
|
||||
## Pseudonymisierung der Patienten-ID
|
||||
|
||||
Wenn eine URI zu einer gPAS-Instanz angegeben ist, wird diese verwendet.
|
||||
Wenn eine URI zu einer gPAS-Instanz (Version >= 2023.1.0) angegeben ist, wird diese verwendet.
|
||||
Ist diese nicht gesetzt. wird intern eine Anonymisierung der Patienten-ID vorgenommen.
|
||||
|
||||
* `APP_PSEUDONYMIZE_PREFIX`: Standortbezogenes Prefix - `UNKNOWN`, wenn nicht gesetzt
|
||||
@ -20,7 +42,8 @@ als Patienten-Pseudonym verwendet.
|
||||
|
||||
Wurde die Verwendung von gPAS konfiguriert, so sind weitere Angaben zu konfigurieren.
|
||||
|
||||
* `APP_PSEUDONYMIZE_GPAS_URI`: URI der gPAS-Instanz inklusive Endpoint (z.B. `http://localhost:8080/ttp-fhir/fhir/gpas/$pseudonymizeAllowCreate`)
|
||||
* `APP_PSEUDONYMIZE_GPAS_URI`: URI der gPAS-Instanz inklusive Endpoint (
|
||||
z.B. `http://localhost:8080/ttp-fhir/fhir/gpas/$$pseudonymizeAllowCreate`)
|
||||
* `APP_PSEUDONYMIZE_GPAS_TARGET`: gPas Domänenname
|
||||
* `APP_PSEUDONYMIZE_GPAS_USERNAME`: gPas Basic-Auth Benutzername
|
||||
* `APP_PSEUDONYMIZE_GPAS_PASSWORD`: gPas Basic-Auth Passwort
|
||||
@ -55,6 +78,84 @@ Weitere Einstellungen können über die Parameter von Spring Kafka konfiguriert
|
||||
Lässt sich keine Verbindung zu dem bwHC-Backend aufbauen, wird eine Rückantwort mit Status-Code `900` erwartet, welchen es
|
||||
für HTTP nicht gibt.
|
||||
|
||||
#### Retention Time
|
||||
|
||||
Generell werden in Apache Kafka alle Records entsprechend der Konfiguration vorgehalten.
|
||||
So wird ohne spezielle Konfiguration ein Record für 7 Tage in Apache Kafka gespeichert.
|
||||
Es sind innerhalb dieses Zeitraums auch alte Informationen weiterhin enthalten, wenn der Consent später abgelehnt wurde.
|
||||
|
||||
Durch eine entsprechende Konfiguration des Topics kann dies verhindert werden.
|
||||
|
||||
Beispiel - auszuführen innerhalb des Kafka-Containers: Löschen alter Records nach einem Tag
|
||||
```
|
||||
kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config retention.ms=86400000
|
||||
```
|
||||
|
||||
#### Key based Retention
|
||||
|
||||
Möchten Sie hingegen immer nur die letzte Meldung für einen Patienten und eine Erkrankung in Apache Kafka vorhalten,
|
||||
so ist die nachfolgend genannte Konfiguration der Kafka-Topics hilfreich.
|
||||
|
||||
|
||||
* `retention.ms`: Möglichst kurze Zeit in der alte Records noch erhalten bleiben, z.B. 10 Sekunden 10000
|
||||
* `cleanup.policy`: Löschen alter Records und Beibehalten des letzten Records zu einem Key [delete,compact]
|
||||
|
||||
Beispiele für ein Topic `test`, hier bitte an die verwendeten Topics anpassen.
|
||||
|
||||
```
|
||||
kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config retention.ms=10000
|
||||
kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config cleanup.policy=[delete,compact]
|
||||
```
|
||||
|
||||
Da als Key eines Records die (pseudonymisierte) Patienten-ID und die (anonymisierte) Erkrankungs-ID verwendet wird,
|
||||
stehen mit obiger Konfiguration der Kafka-Topics nach 10 Sekunden nur noch der jeweils letzte Eintrag für den entsprechenden
|
||||
Key zur Verfügung.
|
||||
|
||||
Da der Key sowohl für die Records in Richtung bwHC-Backend für die Rückantwort identisch aufgebaut ist, lassen sich so
|
||||
auch im Falle eines Consent-Widerspruchs die enthaltenen Daten als auch die Offenlegung durch Verifikationsdaten in der
|
||||
Antwort effektiv verhindern, da diese nach 10 Sekunden gelöscht werden.
|
||||
Es steht dann nur noch die jeweils letzten Information zur Verfügung, dass für einen Patienten/eine Erkrankung
|
||||
ein Consent-Widerspruch erfolgte.
|
||||
|
||||
## Docker-Images
|
||||
|
||||
Diese Anwendung ist auch als Docker-Image verfügbar: https://github.com/CCC-MF/etl-processor/pkgs/container/etl-processor
|
||||
Diese Anwendung ist auch als Docker-Image verfügbar: https://github.com/CCC-MF/etl-processor/pkgs/container/etl-processor
|
||||
|
||||
### Images lokal bauen
|
||||
|
||||
```bash
|
||||
./gradlew bootBuildImage
|
||||
```
|
||||
|
||||
## Deployment
|
||||
*Ausführen als Docker Conatiner:*
|
||||
|
||||
```bash
|
||||
cd ./deploy
|
||||
cp env-sample.env .env
|
||||
```
|
||||
Wenn gewünscht, Änderungen in der `.env` vornehmen.
|
||||
|
||||
```bash
|
||||
docker compose up -d
|
||||
```
|
||||
|
||||
## Entwicklungssetup
|
||||
|
||||
Zum Starten einer lokalen Entwicklungs- und Testumgebung kann die beiliegende Datei `dev-compose.yml` verwendet werden.
|
||||
Diese kann zur Nutzung der Datenbanken **MariaDB** als auch **PostgreSQL** angepasst werden.
|
||||
|
||||
Zur Nutzung von Apache Kafka muss dazu ein Eintrag im hosts-File vorgenommen werden und der Hostname `kafka` auf die lokale
|
||||
IP-Adresse verweisen. Ohne diese Einstellung ist eine Nutzung von Apache Kafka außerhalb der Docker-Umgebung nicht möglich.
|
||||
|
||||
Beim Start der Anwendung mit dem Profil `dev` wird die in `dev-compose.yml` definierte Umgebung beim Start der
|
||||
Anwendung mit gestartet:
|
||||
|
||||
```
|
||||
SPRING_PROFILES_ACTIVE=dev ./gradlew bootRun
|
||||
```
|
||||
|
||||
Die Datei `application-dev.yml` enthält hierzu die Konfiguration für das Profil `dev`.
|
||||
|
||||
Beim Ausführen der Integrationstests wird eine Testdatenbank in einem Docker-Container gestartet.
|
||||
Siehe hier auch die Klasse `AbstractTestcontainerTest` unter `src/integrationTest`.
|
||||
|
@ -4,14 +4,25 @@ import org.springframework.boot.gradle.tasks.bundling.BootBuildImage
|
||||
|
||||
plugins {
|
||||
war
|
||||
id("org.springframework.boot") version "3.1.2"
|
||||
id("io.spring.dependency-management") version "1.1.0"
|
||||
kotlin("jvm") version "1.9.0"
|
||||
kotlin("plugin.spring") version "1.9.0"
|
||||
id("org.springframework.boot") version "3.1.3"
|
||||
id("io.spring.dependency-management") version "1.1.3"
|
||||
kotlin("jvm") version "1.9.10"
|
||||
kotlin("plugin.spring") version "1.9.10"
|
||||
}
|
||||
|
||||
group = "de.ukw.ccc"
|
||||
version = "0.1.0"
|
||||
version = "0.1.2"
|
||||
|
||||
var versions = mapOf(
|
||||
"bwhc-dto-java" to "0.2.0",
|
||||
"hapi-fhir" to "6.6.2",
|
||||
"httpclient5" to "5.2.1",
|
||||
"mockito-kotlin" to "5.1.0"
|
||||
)
|
||||
|
||||
// Override Apache Kafka to be used
|
||||
// Fixes: CVE-2023-34455, CVE-2023-34454, CVE-2023-34453
|
||||
extra["kafka.version"] = "3.5.1"
|
||||
|
||||
java {
|
||||
sourceCompatibility = JavaVersion.VERSION_17
|
||||
@ -49,13 +60,15 @@ dependencies {
|
||||
implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
|
||||
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
|
||||
implementation("org.springframework.kafka:spring-kafka")
|
||||
// fix CVE-2022-1471
|
||||
implementation("org.yaml:snakeyaml:2.1")
|
||||
implementation("org.flywaydb:flyway-mysql")
|
||||
implementation("commons-codec:commons-codec")
|
||||
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
|
||||
implementation("de.ukw.ccc:bwhc-dto-java:0.2.0")
|
||||
implementation("ca.uhn.hapi.fhir:hapi-fhir-base:6.6.2")
|
||||
implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:6.6.2")
|
||||
implementation("org.apache.httpcomponents.client5:httpclient5:5.2.1")
|
||||
implementation("de.ukw.ccc:bwhc-dto-java:${versions["bwhc-dto-java"]}")
|
||||
implementation("ca.uhn.hapi.fhir:hapi-fhir-base:${versions["hapi-fhir"]}")
|
||||
implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:${versions["hapi-fhir"]}")
|
||||
implementation("org.apache.httpcomponents.client5:httpclient5:${versions["httpclient5"]}")
|
||||
runtimeOnly("org.mariadb.jdbc:mariadb-java-client")
|
||||
runtimeOnly("org.postgresql:postgresql")
|
||||
developmentOnly("org.springframework.boot:spring-boot-devtools")
|
||||
@ -64,7 +77,7 @@ dependencies {
|
||||
providedRuntime("org.springframework.boot:spring-boot-starter-tomcat")
|
||||
testImplementation("org.springframework.boot:spring-boot-starter-test")
|
||||
testImplementation("io.projectreactor:reactor-test")
|
||||
testImplementation("org.mockito.kotlin:mockito-kotlin:5.0.0")
|
||||
testImplementation("org.mockito.kotlin:mockito-kotlin:${versions["mockito-kotlin"]}")
|
||||
integrationTestImplementation("org.testcontainers:junit-jupiter")
|
||||
integrationTestImplementation("org.testcontainers:postgresql")
|
||||
}
|
||||
|
55
deploy/docker-compose.yaml
Normal file
55
deploy/docker-compose.yaml
Normal file
@ -0,0 +1,55 @@
|
||||
|
||||
|
||||
services:
|
||||
dnpm-etl-processor:
|
||||
image: ghcr.io/ccc-mf/etl-processor:latest
|
||||
environment:
|
||||
LOGGING_LEVEL_DEV: ${DNPM_LOG_LEVEL:-INFO}
|
||||
SPRING_KAFKA_SECURITY_PROTOCOL: ${DNPM_KAFKA_SECURITY_PROTOCOL:-SSL}
|
||||
SPRING_KAFKA_SSL_TRUST-STORE-TYPE: PKCS12
|
||||
SPRING_KAFKA_SSL_TRUST-STORE-LOCATION: /opt/dnpm-processor/ssl/truststore.jks
|
||||
SPRING_KAFKA_SSL_TRUST-STORE-PASSWORD: ${KAFKA_TRUST_STORE_PASSWORD}
|
||||
SPRING_KAFKA_SSL_KEY-STORE-TYPE: PKCS12
|
||||
SPRING_KAFKA_SSL_KEY-STORE-LOCATION: /opt/dnpm-processor/ssl/keystore.jks
|
||||
SPRING_KAFKA_SSL_KEY-STORE-PASSWORD: ${DNPM_PROCESSOR_KEY_STORE_PASSWORD}
|
||||
SPRING_KAFKA_PRODUCER_COMPRESSION-TYPE: gzip
|
||||
APP_KAFKA_TOPIC: ${DNPM_KAFKA_TOPIC}
|
||||
APP_KAFKA_SERVERS: ${KAFKA_BROKERS}
|
||||
APP_KAFKA_GROUP_ID: ${DNPM_KAFKA_GROUP_ID}
|
||||
APP_KAFKA_RESPONSE_TOPIC: ${DNPM_KAFKA_RESPONSE_TOPIC}
|
||||
APP_REST_URI: ${DNPM_BWHC_REST_URI}
|
||||
SPRING_DATASOURCE_URL: ${DNPM_DATASOURCE_URL}
|
||||
SPRING_DATASOURCE_PASSWORD: ${DNPM_MARIADB_USER_PW}
|
||||
SPRING_DATASOURCE_USERNAME: ${DNPM_MARIADB_DB}
|
||||
APP_PSEUDONYMIZE_GPAS_SSLCALOCATION: /workspace/opt/dnpm-processor/ssl/mosaic.crt
|
||||
APP_PSEUDONYMIZE_GPAS_PASSWORD: ${DNPM_PSEUDONYMIZE_GPAS_PASSWORD}
|
||||
APP_PSEUDONYMIZE_GPAS_USERNAME: ${DNPM_PSEUDONYMIZE_GPAS_USERNAME}
|
||||
APP_PSEUDONYMIZE_GPAS_TARGET: ${DNPM_PSEUDONYMIZE_GPAS_TARGET}
|
||||
APP_PSEUDONYMIZE_GPAS_URI: ${DNPM_PSEUDONYMIZE_GPAS_URI}
|
||||
APP_PSEUDONYMIZE_PREFIX: ${DNPM_APP_PSEUDONYMIZE_PREFIX}
|
||||
APP_PSEUDONYMIZER: ${DNPM_PSEUDONYMIZE_GENERATOR}
|
||||
volumes:
|
||||
- /etc/localtime:/etc/localtime:ro
|
||||
- /etc/timezone:/etc/timezone:ro
|
||||
#- ${DNPM_TO_SSL_KEYSTORE_LOCATION}:/workspace/opt/dnpm-processor/ssl/keystore.jks:ro
|
||||
#- ${KAFKA_TRUST_STORE_LOCATION}:/workspace/opt/dnpm-processor/ssl/truststore.jks:ro
|
||||
#- ${DNPM_PSEUDONYMIZE_GPAS_SSLCALOCATION}:/workspace/opt/dnpm-processor/ssl/mosaic.crt
|
||||
|
||||
depends_on:
|
||||
- dnpm-monitor-db
|
||||
ports:
|
||||
- "${DNPM_MONITORING_HTTP_PORT:-8080}:8080"
|
||||
|
||||
# todo add volume
|
||||
dnpm-monitor-db:
|
||||
image: mariadb:10
|
||||
environment:
|
||||
MARIADB_DATABASE: ${DNPM_MARIADB_DB}
|
||||
MARIADB_USER: ${DNPM_MARIADB_USER}
|
||||
MARIADB_PASSWORD: ${DNPM_MARIADB_USER_PW}
|
||||
MARIADB_ROOT_PASSWORD: ${DNPM_MARIADB_ROOT_PW}
|
||||
expose:
|
||||
- "3306"
|
||||
|
||||
|
||||
|
40
deploy/env-sample.env
Normal file
40
deploy/env-sample.env
Normal file
@ -0,0 +1,40 @@
|
||||
# monitoring access port
|
||||
DNPM_MONITORING_HTTP_PORT=8088
|
||||
DNPM_LOG_LEVEL=INFO
|
||||
|
||||
# GPAS or BUILDIN
|
||||
DNPM_PSEUDONYMIZE_GENERATOR=BUILDIN
|
||||
DNPM_APP_PSEUDONYMIZE_PREFIX=ANONYM
|
||||
DNPM_PSEUDONYMIZE_GPAS_URI=
|
||||
DNPM_PSEUDONYMIZE_GPAS_TARGET=
|
||||
DNPM_PSEUDONYMIZE_GPAS_USERNAME=
|
||||
DNPM_PSEUDONYMIZE_GPAS_PASSWORD=
|
||||
|
||||
# path to ca root cert if needed
|
||||
DNPM_PSEUDONYMIZE_GPAS_SSLCALOCATION=
|
||||
|
||||
DNPM_MARIADB_DB=dnpm_monitoring
|
||||
DNPM_MARIADB_USER=$DNPM_MARIADB_DB
|
||||
DNPM_MARIADB_USER_PW=MySuperSecurePassword111
|
||||
DNPM_MARIADB_ROOT_PW=MySuperDuperSecurePassword111
|
||||
|
||||
# monitoring data db
|
||||
DNPM_DATASOURCE_URL=jdbc:mariadb://dnpm-monitor-db:3306/$DNPM_MARIADB_DB
|
||||
|
||||
## TARGET SYSTEMS CONFIG
|
||||
# in case of direct access to bwhc enter endpoint url here
|
||||
DNPM_BWHC_REST_URI=
|
||||
|
||||
# produce mtb files to this topic - values 'false' disabling kafka processing
|
||||
DNPM_KAFKA_TOPIC=false
|
||||
KAFKA_BROKERS=false
|
||||
DNPM_KAFKA_SECURITY_PROTOCOL=PLAINTEXT
|
||||
|
||||
# here we receive responses from bwhc
|
||||
DNPM_KAFKA_RESPONSE_TOPIC=dnpm-response
|
||||
DNPM_KAFKA_GROUP_ID=dnpm
|
||||
|
||||
# SSL or PLAINTEXT
|
||||
DNPM_PROCESSOR_KEY_STORE_PASSWORD=
|
||||
DNPM_TO_SSL_KEYSTORE_LOCATION=
|
||||
|
@ -1,11 +1,38 @@
|
||||
services:
|
||||
# Note: Make sure, hostname "kafka" points to 127.0.0.1
|
||||
# otherwise connection will not be available
|
||||
kafka:
|
||||
image: bitnami/kafka
|
||||
hostname: kafka
|
||||
ports:
|
||||
- "9092:9092"
|
||||
- "9094:9094"
|
||||
environment:
|
||||
ALLOW_PLAINTEXT_LISTENER: "yes"
|
||||
KAFKA_CFG_NODE_ID: "0"
|
||||
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
|
||||
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
|
||||
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
|
||||
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
|
||||
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
|
||||
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true
|
||||
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
|
||||
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
|
||||
|
||||
akhq:
|
||||
image: tchiotludo/akhq:0.21.0
|
||||
environment:
|
||||
AKHQ_CONFIGURATION: |
|
||||
akhq:
|
||||
connections:
|
||||
docker-kafka-server:
|
||||
properties:
|
||||
bootstrap.servers: "kafka:9092"
|
||||
connect:
|
||||
- name: "kafka-connect"
|
||||
url: "http://kafka-connect:8083"
|
||||
ports:
|
||||
- "8084:8080"
|
||||
|
||||
mariadb:
|
||||
image: mariadb:10
|
||||
@ -16,6 +43,7 @@ services:
|
||||
MARIADB_USER: dev
|
||||
MARIADB_PASSWORD: dev
|
||||
MARIADB_ROOT_PASSWORD: dev
|
||||
|
||||
# postgres:
|
||||
# image: postgres:alpine
|
||||
# ports:
|
||||
@ -23,4 +51,4 @@ services:
|
||||
# environment:
|
||||
# POSTGRES_DB: dev
|
||||
# POSTGRES_USER: dev
|
||||
# POSTGRES_PASSWORD: dev
|
||||
# POSTGRES_PASSWORD: dev
|
BIN
docs/etl.png
Normal file
BIN
docs/etl.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 75 KiB |
@ -20,10 +20,13 @@
|
||||
package dev.dnpm.etl.processor
|
||||
|
||||
import dev.dnpm.etl.processor.output.MtbFileSender
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.extension.ExtendWith
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.boot.test.context.SpringBootTest
|
||||
import org.springframework.boot.test.mock.mockito.MockBean
|
||||
import org.springframework.context.ApplicationContext
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension
|
||||
import org.testcontainers.junit.jupiter.Testcontainers
|
||||
|
||||
@ -34,7 +37,9 @@ import org.testcontainers.junit.jupiter.Testcontainers
|
||||
class EtlProcessorApplicationTests : AbstractTestcontainerTest() {
|
||||
|
||||
@Test
|
||||
fun contextLoadsIfMtbFileSenderConfigured() {
|
||||
fun contextLoadsIfMtbFileSenderConfigured(@Autowired context: ApplicationContext) {
|
||||
// Simply check bean configuration
|
||||
assertThat(context).isNotNull
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -116,7 +116,7 @@ class RequestServiceIntegrationTest : AbstractTestcontainerTest() {
|
||||
fun shouldReturnDeleteRequestAsLastRequest() {
|
||||
setupTestData()
|
||||
|
||||
val actual = requestService.isLastRequestDeletion("TEST_12345678901")
|
||||
val actual = requestService.isLastRequestWithKnownStatusDeletion("TEST_12345678901")
|
||||
|
||||
assertThat(actual).isTrue()
|
||||
}
|
||||
|
@ -22,6 +22,21 @@ package dev.dnpm.etl.processor.pseudonym;
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.parser.IParser;
|
||||
import dev.dnpm.etl.processor.config.GPasConfigProperties;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.KeyStore;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.security.cert.CertificateFactory;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.Base64;
|
||||
import java.util.HashMap;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||
import org.apache.hc.client5.http.impl.classic.HttpClients;
|
||||
@ -39,7 +54,11 @@ import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.*;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
|
||||
import org.springframework.retry.RetryCallback;
|
||||
import org.springframework.retry.RetryContext;
|
||||
@ -51,31 +70,13 @@ import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.web.client.RestClientException;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.KeyStore;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.security.cert.CertificateFactory;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.Base64;
|
||||
import java.util.HashMap;
|
||||
|
||||
public class GpasPseudonymGenerator implements Generator {
|
||||
|
||||
private final static FhirContext r4Context = FhirContext.forR4();
|
||||
private final String gPasUrl;
|
||||
private final String psnTargetDomain;
|
||||
private static FhirContext r4Context = FhirContext.forR4();
|
||||
private final HttpHeaders httpHeader;
|
||||
|
||||
private final RetryTemplate retryTemplate = defaultTemplate();
|
||||
|
||||
private final Logger log = LoggerFactory.getLogger(GpasPseudonymGenerator.class);
|
||||
|
||||
private SSLContext customSslContext;
|
||||
@ -90,12 +91,16 @@ public class GpasPseudonymGenerator implements Generator {
|
||||
try {
|
||||
if (StringUtils.isNotBlank(gpasCfg.getSslCaLocation())) {
|
||||
customSslContext = getSslContext(gpasCfg.getSslCaLocation());
|
||||
log.debug(String.format("%s has been initialized with SSL certificate %s",
|
||||
this.getClass().getName(), gpasCfg.getSslCaLocation()));
|
||||
}
|
||||
} catch (IOException | KeyManagementException | KeyStoreException | CertificateException |
|
||||
NoSuchAlgorithmException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
log.debug(String.format("%s has been initialized", this.getClass().getName()));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -110,12 +115,19 @@ public class GpasPseudonymGenerator implements Generator {
|
||||
|
||||
@NotNull
|
||||
public static String unwrapPseudonym(Parameters gPasPseudonymResult) {
|
||||
Identifier pseudonym = (Identifier) gPasPseudonymResult.getParameter().stream().findFirst()
|
||||
.get().getPart().stream().filter(a -> a.getName().equals("pseudonym")).findFirst()
|
||||
final var parameters = gPasPseudonymResult.getParameter().stream().findFirst();
|
||||
|
||||
if (parameters.isEmpty()) {
|
||||
throw new PseudonymRequestFailed("Empty HL7 parameters, cannot find first one");
|
||||
}
|
||||
|
||||
final var identifier = (Identifier) parameters.get().getPart().stream()
|
||||
.filter(a -> a.getName().equals("pseudonym"))
|
||||
.findFirst()
|
||||
.orElseGet(ParametersParameterComponent::new).getValue();
|
||||
|
||||
// pseudonym
|
||||
return pseudonym.getSystem() + "|" + pseudonym.getValue();
|
||||
return identifier.getSystem() + "|" + identifier.getValue();
|
||||
}
|
||||
|
||||
|
||||
|
@ -23,7 +23,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties
|
||||
|
||||
@ConfigurationProperties(AppConfigProperties.NAME)
|
||||
data class AppConfigProperties(
|
||||
var bwhc_uri: String?,
|
||||
var bwhcUri: String?,
|
||||
var generator: PseudonymGenerator = PseudonymGenerator.BUILDIN
|
||||
) {
|
||||
companion object {
|
||||
|
@ -40,7 +40,7 @@ class KafkaMtbFileSender(
|
||||
val result = kafkaTemplate.send(
|
||||
kafkaTargetProperties.topic,
|
||||
key(request),
|
||||
objectMapper.writeValueAsString(request.mtbFile)
|
||||
objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
|
||||
)
|
||||
if (result.get() != null) {
|
||||
logger.debug("Sent file via KafkaMtbFileSender")
|
||||
@ -68,7 +68,7 @@ class KafkaMtbFileSender(
|
||||
val result = kafkaTemplate.send(
|
||||
kafkaTargetProperties.topic,
|
||||
key(request),
|
||||
objectMapper.writeValueAsString(dummyMtbFile)
|
||||
objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
|
||||
)
|
||||
|
||||
if (result.get() != null) {
|
||||
@ -85,12 +85,12 @@ class KafkaMtbFileSender(
|
||||
|
||||
private fun key(request: MtbFileSender.MtbFileRequest): String {
|
||||
return "{\"pid\": \"${request.mtbFile.patient.id}\", " +
|
||||
"\"eid\": \"${request.mtbFile.episode.id}\", " +
|
||||
"\"requestId\": \"${request.requestId}\"}"
|
||||
"\"eid\": \"${request.mtbFile.episode.id}\"}"
|
||||
}
|
||||
|
||||
private fun key(request: MtbFileSender.DeleteRequest): String {
|
||||
return "{\"pid\": \"${request.patientId}\", " +
|
||||
"\"requestId\": \"${request.requestId}\"}"
|
||||
return "{\"pid\": \"${request.patientId}\"}"
|
||||
}
|
||||
|
||||
data class Data(val requestId: String, val content: MtbFile)
|
||||
}
|
@ -50,7 +50,7 @@ class RestMtbFileSender(
|
||||
return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}")
|
||||
}
|
||||
logger.debug("Sent file via RestMtbFileSender")
|
||||
return MtbFileSender.Response(response.statusCode.asRequestStatus())
|
||||
return MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty())
|
||||
} catch (e: IllegalArgumentException) {
|
||||
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
|
||||
} catch (e: RestClientException) {
|
||||
|
@ -38,7 +38,7 @@ infix fun MtbFile.pseudonymizeWith(pseudonymizeService: PseudonymizeService) {
|
||||
this.histologyReports.forEach { it.patient = patientPseudonym }
|
||||
this.lastGuidelineTherapies.forEach { it.patient = patientPseudonym }
|
||||
this.molecularPathologyFindings.forEach { it.patient = patientPseudonym }
|
||||
this.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } }
|
||||
this.molecularTherapies.forEach { molecularTherapy -> molecularTherapy.history.forEach { it.patient = patientPseudonym } }
|
||||
this.ngsReports.forEach { it.patient = patientPseudonym }
|
||||
this.previousGuidelineTherapies.forEach { it.patient = patientPseudonym }
|
||||
this.rebiopsyRequests.forEach { it.patient = patientPseudonym }
|
||||
|
@ -30,7 +30,6 @@ import dev.dnpm.etl.processor.pseudonym.PseudonymizeService
|
||||
import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith
|
||||
import org.apache.commons.codec.binary.Base32
|
||||
import org.apache.commons.codec.digest.DigestUtils
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.springframework.context.ApplicationEventPublisher
|
||||
import org.springframework.stereotype.Service
|
||||
import java.time.Instant
|
||||
@ -45,8 +44,6 @@ class RequestProcessor(
|
||||
private val applicationEventPublisher: ApplicationEventPublisher
|
||||
) {
|
||||
|
||||
private val logger = LoggerFactory.getLogger(RequestProcessor::class.java)
|
||||
|
||||
fun processMtbFile(mtbFile: MtbFile) {
|
||||
val requestId = UUID.randomUUID().toString()
|
||||
val pid = mtbFile.patient.id
|
||||
@ -95,7 +92,7 @@ class RequestProcessor(
|
||||
private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean {
|
||||
val lastMtbFileRequestForPatient =
|
||||
requestService.lastMtbFileRequestForPatientPseudonym(pseudonymizedMtbFile.patient.id)
|
||||
val isLastRequestDeletion = requestService.isLastRequestDeletion(pseudonymizedMtbFile.patient.id)
|
||||
val isLastRequestDeletion = requestService.isLastRequestWithKnownStatusDeletion(pseudonymizedMtbFile.patient.id)
|
||||
|
||||
return null != lastMtbFileRequestForPatient
|
||||
&& !isLastRequestDeletion
|
||||
|
@ -38,8 +38,8 @@ class RequestService(
|
||||
fun lastMtbFileRequestForPatientPseudonym(patientPseudonym: String) =
|
||||
Companion.lastMtbFileRequestForPatientPseudonym(allRequestsByPatientPseudonym(patientPseudonym))
|
||||
|
||||
fun isLastRequestDeletion(patientPseudonym: String) =
|
||||
Companion.isLastRequestDeletion(allRequestsByPatientPseudonym(patientPseudonym))
|
||||
fun isLastRequestWithKnownStatusDeletion(patientPseudonym: String) =
|
||||
Companion.isLastRequestWithKnownStatusDeletion(allRequestsByPatientPseudonym(patientPseudonym))
|
||||
|
||||
companion object {
|
||||
|
||||
@ -48,7 +48,8 @@ class RequestService(
|
||||
.sortedByDescending { it.processedAt }
|
||||
.firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING }
|
||||
|
||||
fun isLastRequestDeletion(allRequests: List<Request>) = allRequests
|
||||
fun isLastRequestWithKnownStatusDeletion(allRequests: List<Request>) = allRequests
|
||||
.filter { it.status != RequestStatus.UNKNOWN }
|
||||
.maxByOrNull { it.processedAt }?.type == RequestType.DELETE
|
||||
|
||||
}
|
||||
|
@ -19,7 +19,6 @@
|
||||
|
||||
package dev.dnpm.etl.processor.services
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import dev.dnpm.etl.processor.monitoring.Report
|
||||
import dev.dnpm.etl.processor.monitoring.RequestRepository
|
||||
import dev.dnpm.etl.processor.monitoring.RequestStatus
|
||||
@ -33,8 +32,7 @@ import java.util.*
|
||||
@Service
|
||||
class ResponseProcessor(
|
||||
private val requestRepository: RequestRepository,
|
||||
private val statisticsUpdateProducer: Sinks.Many<Any>,
|
||||
private val objectMapper: ObjectMapper
|
||||
private val statisticsUpdateProducer: Sinks.Many<Any>
|
||||
) {
|
||||
|
||||
private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java)
|
||||
@ -55,14 +53,14 @@ class ResponseProcessor(
|
||||
RequestStatus.WARNING -> {
|
||||
it.report = Report(
|
||||
"Warnungen über mangelhafte Daten",
|
||||
objectMapper.writeValueAsString(event.body)
|
||||
event.body.orElse("")
|
||||
)
|
||||
}
|
||||
|
||||
RequestStatus.ERROR -> {
|
||||
it.report = Report(
|
||||
"Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar",
|
||||
objectMapper.writeValueAsString(event.body)
|
||||
event.body.orElse("")
|
||||
)
|
||||
}
|
||||
|
||||
@ -73,7 +71,7 @@ class ResponseProcessor(
|
||||
}
|
||||
|
||||
else -> {
|
||||
logger.error("Cannot process response: Unknown response code!")
|
||||
logger.error("Cannot process response: Unknown response!")
|
||||
return@ifPresentOrElse
|
||||
}
|
||||
}
|
||||
|
@ -41,50 +41,40 @@ class KafkaResponseProcessor(
|
||||
|
||||
override fun onMessage(data: ConsumerRecord<String, String>) {
|
||||
try {
|
||||
Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java))
|
||||
Optional.of(objectMapper.readValue(data.value(), ResponseBody::class.java))
|
||||
} catch (e: Exception) {
|
||||
logger.error("Cannot process Kafka response", e)
|
||||
Optional.empty()
|
||||
}.ifPresentOrElse({ responseKey ->
|
||||
val event = try {
|
||||
val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java)
|
||||
ResponseEvent(
|
||||
responseKey.requestId,
|
||||
Instant.ofEpochMilli(data.timestamp()),
|
||||
responseBody.statusCode.asRequestStatus(),
|
||||
when (responseBody.statusCode.asRequestStatus()) {
|
||||
RequestStatus.SUCCESS -> {
|
||||
Optional.empty()
|
||||
}
|
||||
|
||||
RequestStatus.WARNING, RequestStatus.ERROR -> {
|
||||
Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
|
||||
}
|
||||
|
||||
else -> {
|
||||
logger.error("Kafka response: Unknown response code!")
|
||||
Optional.empty()
|
||||
}
|
||||
}.ifPresentOrElse({ responseBody ->
|
||||
val event = ResponseEvent(
|
||||
responseBody.requestId,
|
||||
Instant.ofEpochMilli(data.timestamp()),
|
||||
responseBody.statusCode.asRequestStatus(),
|
||||
when (responseBody.statusCode.asRequestStatus()) {
|
||||
RequestStatus.SUCCESS -> {
|
||||
Optional.empty()
|
||||
}
|
||||
)
|
||||
} catch (e: Exception) {
|
||||
logger.error("Cannot process Kafka response", e)
|
||||
ResponseEvent(
|
||||
responseKey.requestId,
|
||||
Instant.ofEpochMilli(data.timestamp()),
|
||||
RequestStatus.ERROR,
|
||||
Optional.of("Cannot process Kafka response")
|
||||
)
|
||||
}
|
||||
|
||||
RequestStatus.WARNING, RequestStatus.ERROR -> {
|
||||
Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
|
||||
}
|
||||
|
||||
else -> {
|
||||
logger.error("Kafka response: Unknown response code '{}'!", responseBody.statusCode)
|
||||
Optional.empty()
|
||||
}
|
||||
}
|
||||
)
|
||||
eventPublisher.publishEvent(event)
|
||||
}, {
|
||||
logger.error("No response key in Kafka response")
|
||||
logger.error("No requestId in Kafka response")
|
||||
})
|
||||
}
|
||||
|
||||
data class ResponseKey(val requestId: String)
|
||||
|
||||
data class ResponseBody(
|
||||
@JsonProperty("status_code") @JsonAlias("status code") val statusCode: Int,
|
||||
@JsonProperty("status_body") val statusBody: Map<String, Any>
|
||||
@JsonProperty("request_id") @JsonAlias("requestId") val requestId: String,
|
||||
@JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int,
|
||||
@JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map<String, Any>
|
||||
)
|
||||
|
||||
}
|
@ -83,9 +83,9 @@ class StatisticsRestController(
|
||||
.groupBy { formatter.format(it.processedAt) }
|
||||
.map {
|
||||
val requestList = it.value
|
||||
.groupBy { it.status }
|
||||
.map {
|
||||
Pair(it.key, it.value.size)
|
||||
.groupBy { request -> request.status }
|
||||
.map { request ->
|
||||
Pair(request.key, request.value.size)
|
||||
}
|
||||
.toMap()
|
||||
Pair(
|
||||
|
@ -4,12 +4,12 @@ spring:
|
||||
file: ./dev-compose.yml
|
||||
|
||||
app:
|
||||
rest:
|
||||
uri: http://localhost:9000/bwhc/etl/api
|
||||
#rest:
|
||||
# uri: http://localhost:9000/bwhc/etl/api
|
||||
kafka:
|
||||
topic: test
|
||||
response-topic: test-response
|
||||
servers: kafka:9092
|
||||
response-topic: test_response
|
||||
servers: localhost:9094
|
||||
|
||||
server:
|
||||
port: 8000
|
||||
|
@ -97,9 +97,9 @@ class KafkaMtbFileSenderTest {
|
||||
val captor = argumentCaptor<String>()
|
||||
verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture())
|
||||
assertThat(captor.firstValue).isNotNull
|
||||
assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\", \"requestId\": \"TestID\"}")
|
||||
assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\"}")
|
||||
assertThat(captor.secondValue).isNotNull
|
||||
assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.ACTIVE)))
|
||||
assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.ACTIVE)))
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -113,9 +113,9 @@ class KafkaMtbFileSenderTest {
|
||||
val captor = argumentCaptor<String>()
|
||||
verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture())
|
||||
assertThat(captor.firstValue).isNotNull
|
||||
assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"requestId\": \"TestID\"}")
|
||||
assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\"}")
|
||||
assertThat(captor.secondValue).isNotNull
|
||||
assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.REJECTED)))
|
||||
assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.REJECTED)))
|
||||
}
|
||||
|
||||
companion object {
|
||||
@ -154,6 +154,10 @@ class KafkaMtbFileSenderTest {
|
||||
}.build()
|
||||
}
|
||||
|
||||
fun kafkaRecordData(requestId: String, consentStatus: Consent.Status): KafkaMtbFileSender.Data {
|
||||
return KafkaMtbFileSender.Data(requestId, mtbFile(consentStatus))
|
||||
}
|
||||
|
||||
data class TestData(val requestStatus: RequestStatus, val exception: Throwable? = null)
|
||||
|
||||
@JvmStatic
|
||||
|
@ -61,7 +61,8 @@ class RestMtbFileSenderTest {
|
||||
}
|
||||
|
||||
val response = restMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID"))
|
||||
assertThat(response.status).isEqualTo(requestWithResponse.requestStatus)
|
||||
assertThat(response.status).isEqualTo(requestWithResponse.response.status)
|
||||
assertThat(response.body).isEqualTo(requestWithResponse.response.body)
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ -75,11 +76,16 @@ class RestMtbFileSenderTest {
|
||||
}
|
||||
|
||||
val response = restMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile))
|
||||
assertThat(response.status).isEqualTo(requestWithResponse.requestStatus)
|
||||
assertThat(response.status).isEqualTo(requestWithResponse.response.status)
|
||||
assertThat(response.body).isEqualTo(requestWithResponse.response.body)
|
||||
}
|
||||
|
||||
companion object {
|
||||
data class RequestWithResponse(val httpStatus: HttpStatus, val body: String, val requestStatus: RequestStatus)
|
||||
data class RequestWithResponse(
|
||||
val httpStatus: HttpStatus,
|
||||
val body: String,
|
||||
val response: MtbFileSender.Response
|
||||
)
|
||||
|
||||
private val warningBody = """
|
||||
{
|
||||
@ -99,7 +105,7 @@ class RestMtbFileSenderTest {
|
||||
}
|
||||
""".trimIndent()
|
||||
|
||||
val mtbFile = MtbFile.builder()
|
||||
val mtbFile: MtbFile = MtbFile.builder()
|
||||
.withPatient(
|
||||
Patient.builder()
|
||||
.withId("PID")
|
||||
@ -123,6 +129,8 @@ class RestMtbFileSenderTest {
|
||||
)
|
||||
.build()
|
||||
|
||||
private const val ERROR_RESPONSE_BODY = "Sonstiger Fehler bei der Übertragung"
|
||||
|
||||
/**
|
||||
* Synthetic http responses with related request status
|
||||
* Also see: https://ibmi-intra.cs.uni-tuebingen.de/display/ZPM/bwHC+REST+API
|
||||
@ -130,13 +138,33 @@ class RestMtbFileSenderTest {
|
||||
@JvmStatic
|
||||
fun mtbFileRequestWithResponseSource(): Set<RequestWithResponse> {
|
||||
return setOf(
|
||||
RequestWithResponse(HttpStatus.OK, "{}", RequestStatus.SUCCESS),
|
||||
RequestWithResponse(HttpStatus.CREATED, warningBody, RequestStatus.WARNING),
|
||||
RequestWithResponse(HttpStatus.BAD_REQUEST, "??", RequestStatus.ERROR),
|
||||
RequestWithResponse(HttpStatus.UNPROCESSABLE_ENTITY, errorBody, RequestStatus.ERROR),
|
||||
RequestWithResponse(HttpStatus.OK, "{}", MtbFileSender.Response(RequestStatus.SUCCESS, "{}")),
|
||||
RequestWithResponse(
|
||||
HttpStatus.CREATED,
|
||||
warningBody,
|
||||
MtbFileSender.Response(RequestStatus.WARNING, warningBody)
|
||||
),
|
||||
RequestWithResponse(
|
||||
HttpStatus.BAD_REQUEST,
|
||||
"??",
|
||||
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
|
||||
),
|
||||
RequestWithResponse(
|
||||
HttpStatus.UNPROCESSABLE_ENTITY,
|
||||
errorBody,
|
||||
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
|
||||
),
|
||||
// Some more errors not mentioned in documentation
|
||||
RequestWithResponse(HttpStatus.NOT_FOUND, "what????", RequestStatus.ERROR),
|
||||
RequestWithResponse(HttpStatus.INTERNAL_SERVER_ERROR, "what????", RequestStatus.ERROR)
|
||||
RequestWithResponse(
|
||||
HttpStatus.NOT_FOUND,
|
||||
"what????",
|
||||
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
|
||||
),
|
||||
RequestWithResponse(
|
||||
HttpStatus.INTERNAL_SERVER_ERROR,
|
||||
"what????",
|
||||
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
@ -147,10 +175,18 @@ class RestMtbFileSenderTest {
|
||||
@JvmStatic
|
||||
fun deleteRequestWithResponseSource(): Set<RequestWithResponse> {
|
||||
return setOf(
|
||||
RequestWithResponse(HttpStatus.OK, "", RequestStatus.SUCCESS),
|
||||
RequestWithResponse(HttpStatus.OK, "", MtbFileSender.Response(RequestStatus.SUCCESS)),
|
||||
// Some more errors not mentioned in documentation
|
||||
RequestWithResponse(HttpStatus.NOT_FOUND, "what????", RequestStatus.ERROR),
|
||||
RequestWithResponse(HttpStatus.INTERNAL_SERVER_ERROR, "what????", RequestStatus.ERROR)
|
||||
RequestWithResponse(
|
||||
HttpStatus.NOT_FOUND,
|
||||
"what????",
|
||||
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
|
||||
),
|
||||
RequestWithResponse(
|
||||
HttpStatus.INTERNAL_SERVER_ERROR,
|
||||
"what????",
|
||||
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -92,7 +92,7 @@ class RequestProcessorTest {
|
||||
|
||||
doAnswer {
|
||||
false
|
||||
}.`when`(requestService).isLastRequestDeletion(anyString())
|
||||
}.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
|
||||
|
||||
doAnswer {
|
||||
it.arguments[0] as String
|
||||
@ -147,7 +147,7 @@ class RequestProcessorTest {
|
||||
|
||||
doAnswer {
|
||||
false
|
||||
}.`when`(requestService).isLastRequestDeletion(anyString())
|
||||
}.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
|
||||
|
||||
doAnswer {
|
||||
it.arguments[0] as String
|
||||
@ -202,7 +202,7 @@ class RequestProcessorTest {
|
||||
|
||||
doAnswer {
|
||||
false
|
||||
}.`when`(requestService).isLastRequestDeletion(anyString())
|
||||
}.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
|
||||
|
||||
doAnswer {
|
||||
MtbFileSender.Response(status = RequestStatus.SUCCESS)
|
||||
@ -261,7 +261,7 @@ class RequestProcessorTest {
|
||||
|
||||
doAnswer {
|
||||
false
|
||||
}.`when`(requestService).isLastRequestDeletion(anyString())
|
||||
}.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
|
||||
|
||||
doAnswer {
|
||||
MtbFileSender.Response(status = RequestStatus.ERROR)
|
||||
|
@ -68,23 +68,33 @@ class RequestServiceTest {
|
||||
patientId = "TEST_12345678901",
|
||||
pid = "P1",
|
||||
fingerprint = "0123456789abcdef1",
|
||||
type = RequestType.DELETE,
|
||||
status = RequestStatus.SUCCESS,
|
||||
processedAt = Instant.parse("2023-08-08T02:00:00Z")
|
||||
),
|
||||
Request(
|
||||
id = 1L,
|
||||
uuid = UUID.randomUUID().toString(),
|
||||
patientId = "TEST_12345678902",
|
||||
pid = "P2",
|
||||
fingerprint = "0123456789abcdef2",
|
||||
type = RequestType.MTB_FILE,
|
||||
status = RequestStatus.WARNING,
|
||||
processedAt = Instant.parse("2023-08-08T00:00:00Z")
|
||||
processedAt = Instant.parse("2023-07-07T00:00:00Z")
|
||||
),
|
||||
Request(
|
||||
id = 2L,
|
||||
uuid = UUID.randomUUID().toString(),
|
||||
patientId = "TEST_12345678901",
|
||||
pid = "P1",
|
||||
fingerprint = "0123456789abcdefd",
|
||||
type = RequestType.DELETE,
|
||||
status = RequestStatus.WARNING,
|
||||
processedAt = Instant.parse("2023-07-07T02:00:00Z")
|
||||
),
|
||||
Request(
|
||||
id = 3L,
|
||||
uuid = UUID.randomUUID().toString(),
|
||||
patientId = "TEST_12345678901",
|
||||
pid = "P1",
|
||||
fingerprint = "0123456789abcdef1",
|
||||
type = RequestType.MTB_FILE,
|
||||
status = RequestStatus.UNKNOWN,
|
||||
processedAt = Instant.parse("2023-08-11T00:00:00Z")
|
||||
)
|
||||
)
|
||||
|
||||
val actual = RequestService.isLastRequestDeletion(requests)
|
||||
val actual = RequestService.isLastRequestWithKnownStatusDeletion(requests)
|
||||
|
||||
assertThat(actual).isTrue()
|
||||
}
|
||||
@ -98,23 +108,33 @@ class RequestServiceTest {
|
||||
patientId = "TEST_12345678901",
|
||||
pid = "P1",
|
||||
fingerprint = "0123456789abcdef1",
|
||||
type = RequestType.DELETE,
|
||||
status = RequestStatus.SUCCESS,
|
||||
type = RequestType.MTB_FILE,
|
||||
status = RequestStatus.WARNING,
|
||||
processedAt = Instant.parse("2023-07-07T00:00:00Z")
|
||||
),
|
||||
Request(
|
||||
id = 2L,
|
||||
uuid = UUID.randomUUID().toString(),
|
||||
patientId = "TEST_12345678901",
|
||||
pid = "P1",
|
||||
fingerprint = "0123456789abcdef1",
|
||||
type = RequestType.MTB_FILE,
|
||||
status = RequestStatus.WARNING,
|
||||
processedAt = Instant.parse("2023-07-07T02:00:00Z")
|
||||
),
|
||||
Request(
|
||||
id = 1L,
|
||||
id = 3L,
|
||||
uuid = UUID.randomUUID().toString(),
|
||||
patientId = "TEST_12345678902",
|
||||
pid = "P2",
|
||||
fingerprint = "0123456789abcdef2",
|
||||
patientId = "TEST_12345678901",
|
||||
pid = "P1",
|
||||
fingerprint = "0123456789abcdef1",
|
||||
type = RequestType.MTB_FILE,
|
||||
status = RequestStatus.WARNING,
|
||||
processedAt = Instant.parse("2023-08-08T00:00:00Z")
|
||||
status = RequestStatus.UNKNOWN,
|
||||
processedAt = Instant.parse("2023-08-11T00:00:00Z")
|
||||
)
|
||||
)
|
||||
|
||||
val actual = RequestService.isLastRequestDeletion(requests)
|
||||
val actual = RequestService.isLastRequestWithKnownStatusDeletion(requests)
|
||||
|
||||
assertThat(actual).isFalse()
|
||||
}
|
||||
@ -197,7 +217,7 @@ class RequestServiceTest {
|
||||
|
||||
@Test
|
||||
fun isLastRequestDeletionShouldRequestAllRequestsForPatientPseudonym() {
|
||||
requestService.isLastRequestDeletion("TEST_12345678901")
|
||||
requestService.isLastRequestWithKnownStatusDeletion("TEST_12345678901")
|
||||
|
||||
verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString())
|
||||
}
|
||||
|
@ -19,8 +19,6 @@
|
||||
|
||||
package dev.dnpm.etl.processor.services
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.module.kotlin.KotlinModule
|
||||
import dev.dnpm.etl.processor.monitoring.Request
|
||||
import dev.dnpm.etl.processor.monitoring.RequestRepository
|
||||
import dev.dnpm.etl.processor.monitoring.RequestStatus
|
||||
@ -62,12 +60,10 @@ class ResponseProcessorTest {
|
||||
@Mock requestRepository: RequestRepository,
|
||||
@Mock statisticsUpdateProducer: Sinks.Many<Any>
|
||||
) {
|
||||
val objectMapper = ObjectMapper().registerModule(KotlinModule.Builder().build())
|
||||
|
||||
this.requestRepository = requestRepository
|
||||
this.statisticsUpdateProducer = statisticsUpdateProducer
|
||||
|
||||
this.responseProcessor = ResponseProcessor(requestRepository, statisticsUpdateProducer, objectMapper)
|
||||
this.responseProcessor = ResponseProcessor(requestRepository, statisticsUpdateProducer)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -45,8 +45,8 @@ class KafkaResponseProcessorTest {
|
||||
|
||||
private lateinit var kafkaResponseProcessor: KafkaResponseProcessor
|
||||
|
||||
private fun createkafkaRecord(
|
||||
requestId: String? = null,
|
||||
private fun createKafkaRecord(
|
||||
requestId: String,
|
||||
statusCode: Int = 200,
|
||||
statusBody: Map<String, Any>? = mapOf()
|
||||
): ConsumerRecord<String, String> {
|
||||
@ -54,15 +54,11 @@ class KafkaResponseProcessorTest {
|
||||
"test-topic",
|
||||
0,
|
||||
0,
|
||||
if (requestId == null) {
|
||||
null
|
||||
} else {
|
||||
this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseKey(requestId))
|
||||
},
|
||||
null,
|
||||
if (statusBody == null) {
|
||||
""
|
||||
} else {
|
||||
this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(statusCode, statusBody))
|
||||
this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(requestId, statusCode, statusBody))
|
||||
}
|
||||
)
|
||||
}
|
||||
@ -78,23 +74,57 @@ class KafkaResponseProcessorTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
fun shouldNotProcessRecordsWithoutValidKey() {
|
||||
this.kafkaResponseProcessor.onMessage(createkafkaRecord(null, 200))
|
||||
fun shouldNotProcessRecordsWithoutRequestIdInBody() {
|
||||
val record = ConsumerRecord<String, String>(
|
||||
"test-topic",
|
||||
0,
|
||||
0,
|
||||
null,
|
||||
"""
|
||||
{
|
||||
"statusCode": 200,
|
||||
"statusBody": {}
|
||||
}
|
||||
""".trimIndent()
|
||||
)
|
||||
|
||||
verify(eventPublisher, never()).publishEvent(any())
|
||||
this.kafkaResponseProcessor.onMessage(record)
|
||||
|
||||
verify(eventPublisher, never()).publishEvent(any<ResponseEvent>())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun shouldNotProcessRecordsWithoutValidBody() {
|
||||
this.kafkaResponseProcessor.onMessage(createkafkaRecord(requestId = "TestID1234", statusBody = null))
|
||||
fun shouldProcessRecordsWithAliasNames() {
|
||||
val record = ConsumerRecord<String, String>(
|
||||
"test-topic",
|
||||
0,
|
||||
0,
|
||||
null,
|
||||
"""
|
||||
{
|
||||
"request_id": "test0123456789",
|
||||
"status_code": 200,
|
||||
"status_body": {}
|
||||
}
|
||||
""".trimIndent()
|
||||
)
|
||||
|
||||
verify(eventPublisher, never()).publishEvent(any())
|
||||
this.kafkaResponseProcessor.onMessage(record)
|
||||
|
||||
verify(eventPublisher, times(1)).publishEvent(any<ResponseEvent>())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun shouldNotProcessRecordsWithoutValidStatusBody() {
|
||||
this.kafkaResponseProcessor.onMessage(createKafkaRecord(requestId = "TestID1234", statusBody = null))
|
||||
|
||||
verify(eventPublisher, never()).publishEvent(any<ResponseEvent>())
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("statusCodeSource")
|
||||
fun shouldProcessValidRecordsWithStatusCode(statusCode: Int) {
|
||||
this.kafkaResponseProcessor.onMessage(createkafkaRecord("TestID1234", statusCode))
|
||||
this.kafkaResponseProcessor.onMessage(createKafkaRecord("TestID1234", statusCode))
|
||||
verify(eventPublisher, times(1)).publishEvent(any<ResponseEvent>())
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user