1
0
mirror of https://github.com/pcvolkmer/etl-processor.git synced 2025-07-01 06:02:54 +00:00

34 Commits

Author SHA1 Message Date
2f32834de0 Release 0.1.2 2023-08-30 13:45:06 +02:00
79709caa39 Merge remote-tracking branch 'origin/add-docker-build' 2023-08-30 13:29:06 +02:00
c52509054d chore: Add kafka-clients dependency with fixed version to mitigate CVEs
This will use version 3.5.1 of kafka-clients dependency to prevent issues due to
CVE-2023-34453, CVE-2023-34454 and CVE-2023-34455
2023-08-30 13:26:05 +02:00
8fd587c2a3 chore: remove unused HealthCheck.java 2023-08-30 11:54:44 +02:00
edafe30a4b chore: added log msg to GpasPseudonymGenerator 2023-08-30 11:51:08 +02:00
e24be0d325 chore: cleanup deployment docker-compose.yaml and env-sample.env. added 'DNPM' prefix for better integration into productive environment. 2023-08-30 11:50:24 +02:00
5e93e834ad Remove comment to use host alias 2023-08-30 10:24:48 +02:00
5e5bd579fb test: * added additional external host 'localhost', now we can connect without additional host alias. * added akhq to dev-compose.yml 2023-08-30 10:21:38 +02:00
a24f869c84 Update dependencies 2023-08-30 10:03:04 +02:00
635985bfd1 chore: remove previous build via Dockerfile. Fix security issue: CVE-2023-34453, CVE-2023-34454, CVE-2023-34455, CVE-2022-1471 2023-08-28 14:27:28 +02:00
25143745c4 chore: added deployment docker-compose.yaml and env-sample.env added. 2023-08-28 12:54:14 +02:00
532254593f test: * added additional external host 'localhost', now we can connect without additional host alias. * added akhq to dev-compose.yml 2023-08-28 12:47:09 +02:00
01ff53ab23 chore: deployment environment has maria db entries 2023-08-25 13:42:07 +02:00
9643c80cc5 build: locally build docker image has license entry,now 2023-08-25 13:39:42 +02:00
aa40da4995 chore: dev kafka is available via localhost, now. 2023-08-25 13:11:10 +02:00
da26b5a2c8 Merge branch 'master' into add-docker-build
# Conflicts:
#	README.md
#	build.gradle.kts
2023-08-25 12:59:38 +02:00
bbea48322f chore: added deployment port mapping for monitoring access 2023-08-25 12:50:29 +02:00
480f165c7b chore: add deployment docker-compose.yaml and fitting env-sample.env file 2023-08-24 13:48:46 +02:00
3d2c73ff8f doc: gPas Version requirement added 2023-08-24 13:01:29 +02:00
9921e1e684 Throw PseudonymRequestFailed exception with error message
This will throw an exception with error message describing what the error is instead of
having a more generic NoSuchElementException to be thrown if Optional.get() has no value
after calling findFirst() on an empty stream.
2023-08-19 11:45:21 +02:00
5bd26b894c Add information about key based retention config 2023-08-18 22:15:10 +02:00
8dc82225a4 Issue #7: Send and expect requestId in record body, not in record key (#8) 2023-08-16 15:25:46 +02:00
2eb5cc61b9 Change Kafka response body JSON alias 2023-08-15 10:58:17 +02:00
78b2287163 Add information about Kafka retention time 2023-08-15 08:51:40 +02:00
66dc96680d Update dev config and added related information into README.md 2023-08-15 01:09:22 +02:00
64b8636145 Update Apache Kafka service config for KRaft mode 2023-08-15 00:49:43 +02:00
2e7ef25a49 Update project version and versions in gradle file 2023-08-12 23:16:17 +02:00
7186a45f6c Add link to onkostar-plugin-dnpmexport 2023-08-12 22:27:20 +02:00
72295202ec Code cleanup 2023-08-12 22:19:29 +02:00
bc48a7217e Add more information about usage in an ETl process 2023-08-11 14:37:48 +02:00
0e1034d964 New version and add status badge to README.md 2023-08-11 09:47:20 +02:00
6ecb439007 Issue #3: Detect the request type of request with last known status (#5) 2023-08-11 09:22:54 +02:00
cb9c590472 Issue #2: Do not serialize JSON string as custom string (#4)
In addition to that, if REST request did not contain a response body, use empty
string as data quality report string.
2023-08-11 09:13:45 +02:00
a075f73162 feat: add Dockerfile for build within docker environment and run application within a container. 2023-08-02 15:19:38 +02:00
26 changed files with 495 additions and 168 deletions

1
.gitignore vendored
View File

@ -36,3 +36,4 @@ out/
### VS Code ###
.vscode/
/dev/gpas*
/deploy/.env

109
README.md
View File

@ -1,10 +1,32 @@
# ETL-Processor for bwHC data
# ETL-Processor for bwHC data [![Run Tests](https://github.com/CCC-MF/etl-processor/actions/workflows/test.yml/badge.svg)](https://github.com/CCC-MF/etl-processor/actions/workflows/test.yml)
Diese Anwendung versendet ein bwHC-MTB-File an das bwHC-Backend und pseudonymisiert die Patienten-ID.
### Einordnung innerhalb einer DNPM-ETL-Strecke
Diese Anwendung erlaubt das Entgegennehmen HTTP/REST-Anfragen aus dem Onkostar-Plugin **[onkostar-plugin-dnpmexport](https://github.com/CCC-MF/onkostar-plugin-dnpmexport)**.
Der Inhalt einer Anfrage, wenn ein bwHC-MTBFile, wird pseudonymisiert und auf Duplikate geprüft.
Duplikate werden verworfen, Änderungen werden weitergeleitet.
Löschanfragen werden immer als Löschanfrage an das bwHC-backend weitergeleitet.
![Modell DNPM-ETL-Strecke](docs/etl.png)
#### HTTP/REST-Konfiguration
Anfragen werden, wenn nicht als Duplikat behandelt, nach der Pseudonymisierung direkt an das bwHC-Backend gesendet.
#### Konfiguration für Apache Kafka
Anfragen werden, wenn nicht als Duplikat behandelt, nach der Pseudonymisierung an Apache Kafka übergeben.
Eine Antwort wird dabei ebenfalls mithilfe von Apache Kafka übermittelt und nach der Entgegennahme verarbeitet.
Siehe hierzu auch: https://github.com/CCC-MF/kafka-to-bwhc
## Pseudonymisierung der Patienten-ID
Wenn eine URI zu einer gPAS-Instanz angegeben ist, wird diese verwendet.
Wenn eine URI zu einer gPAS-Instanz (Version >= 2023.1.0) angegeben ist, wird diese verwendet.
Ist diese nicht gesetzt. wird intern eine Anonymisierung der Patienten-ID vorgenommen.
* `APP_PSEUDONYMIZE_PREFIX`: Standortbezogenes Prefix - `UNKNOWN`, wenn nicht gesetzt
@ -20,7 +42,8 @@ als Patienten-Pseudonym verwendet.
Wurde die Verwendung von gPAS konfiguriert, so sind weitere Angaben zu konfigurieren.
* `APP_PSEUDONYMIZE_GPAS_URI`: URI der gPAS-Instanz inklusive Endpoint (z.B. `http://localhost:8080/ttp-fhir/fhir/gpas/$pseudonymizeAllowCreate`)
* `APP_PSEUDONYMIZE_GPAS_URI`: URI der gPAS-Instanz inklusive Endpoint (
z.B. `http://localhost:8080/ttp-fhir/fhir/gpas/$$pseudonymizeAllowCreate`)
* `APP_PSEUDONYMIZE_GPAS_TARGET`: gPas Domänenname
* `APP_PSEUDONYMIZE_GPAS_USERNAME`: gPas Basic-Auth Benutzername
* `APP_PSEUDONYMIZE_GPAS_PASSWORD`: gPas Basic-Auth Passwort
@ -55,6 +78,84 @@ Weitere Einstellungen können über die Parameter von Spring Kafka konfiguriert
Lässt sich keine Verbindung zu dem bwHC-Backend aufbauen, wird eine Rückantwort mit Status-Code `900` erwartet, welchen es
für HTTP nicht gibt.
#### Retention Time
Generell werden in Apache Kafka alle Records entsprechend der Konfiguration vorgehalten.
So wird ohne spezielle Konfiguration ein Record für 7 Tage in Apache Kafka gespeichert.
Es sind innerhalb dieses Zeitraums auch alte Informationen weiterhin enthalten, wenn der Consent später abgelehnt wurde.
Durch eine entsprechende Konfiguration des Topics kann dies verhindert werden.
Beispiel - auszuführen innerhalb des Kafka-Containers: Löschen alter Records nach einem Tag
```
kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config retention.ms=86400000
```
#### Key based Retention
Möchten Sie hingegen immer nur die letzte Meldung für einen Patienten und eine Erkrankung in Apache Kafka vorhalten,
so ist die nachfolgend genannte Konfiguration der Kafka-Topics hilfreich.
* `retention.ms`: Möglichst kurze Zeit in der alte Records noch erhalten bleiben, z.B. 10 Sekunden 10000
* `cleanup.policy`: Löschen alter Records und Beibehalten des letzten Records zu einem Key [delete,compact]
Beispiele für ein Topic `test`, hier bitte an die verwendeten Topics anpassen.
```
kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config retention.ms=10000
kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic test --add-config cleanup.policy=[delete,compact]
```
Da als Key eines Records die (pseudonymisierte) Patienten-ID und die (anonymisierte) Erkrankungs-ID verwendet wird,
stehen mit obiger Konfiguration der Kafka-Topics nach 10 Sekunden nur noch der jeweils letzte Eintrag für den entsprechenden
Key zur Verfügung.
Da der Key sowohl für die Records in Richtung bwHC-Backend für die Rückantwort identisch aufgebaut ist, lassen sich so
auch im Falle eines Consent-Widerspruchs die enthaltenen Daten als auch die Offenlegung durch Verifikationsdaten in der
Antwort effektiv verhindern, da diese nach 10 Sekunden gelöscht werden.
Es steht dann nur noch die jeweils letzten Information zur Verfügung, dass für einen Patienten/eine Erkrankung
ein Consent-Widerspruch erfolgte.
## Docker-Images
Diese Anwendung ist auch als Docker-Image verfügbar: https://github.com/CCC-MF/etl-processor/pkgs/container/etl-processor
Diese Anwendung ist auch als Docker-Image verfügbar: https://github.com/CCC-MF/etl-processor/pkgs/container/etl-processor
### Images lokal bauen
```bash
./gradlew bootBuildImage
```
## Deployment
*Ausführen als Docker Conatiner:*
```bash
cd ./deploy
cp env-sample.env .env
```
Wenn gewünscht, Änderungen in der `.env` vornehmen.
```bash
docker compose up -d
```
## Entwicklungssetup
Zum Starten einer lokalen Entwicklungs- und Testumgebung kann die beiliegende Datei `dev-compose.yml` verwendet werden.
Diese kann zur Nutzung der Datenbanken **MariaDB** als auch **PostgreSQL** angepasst werden.
Zur Nutzung von Apache Kafka muss dazu ein Eintrag im hosts-File vorgenommen werden und der Hostname `kafka` auf die lokale
IP-Adresse verweisen. Ohne diese Einstellung ist eine Nutzung von Apache Kafka außerhalb der Docker-Umgebung nicht möglich.
Beim Start der Anwendung mit dem Profil `dev` wird die in `dev-compose.yml` definierte Umgebung beim Start der
Anwendung mit gestartet:
```
SPRING_PROFILES_ACTIVE=dev ./gradlew bootRun
```
Die Datei `application-dev.yml` enthält hierzu die Konfiguration für das Profil `dev`.
Beim Ausführen der Integrationstests wird eine Testdatenbank in einem Docker-Container gestartet.
Siehe hier auch die Klasse `AbstractTestcontainerTest` unter `src/integrationTest`.

View File

@ -4,14 +4,25 @@ import org.springframework.boot.gradle.tasks.bundling.BootBuildImage
plugins {
war
id("org.springframework.boot") version "3.1.2"
id("io.spring.dependency-management") version "1.1.0"
kotlin("jvm") version "1.9.0"
kotlin("plugin.spring") version "1.9.0"
id("org.springframework.boot") version "3.1.3"
id("io.spring.dependency-management") version "1.1.3"
kotlin("jvm") version "1.9.10"
kotlin("plugin.spring") version "1.9.10"
}
group = "de.ukw.ccc"
version = "0.1.0"
version = "0.1.2"
var versions = mapOf(
"bwhc-dto-java" to "0.2.0",
"hapi-fhir" to "6.6.2",
"httpclient5" to "5.2.1",
"mockito-kotlin" to "5.1.0"
)
// Override Apache Kafka to be used
// Fixes: CVE-2023-34455, CVE-2023-34454, CVE-2023-34453
extra["kafka.version"] = "3.5.1"
java {
sourceCompatibility = JavaVersion.VERSION_17
@ -49,13 +60,15 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-jdbc")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.springframework.kafka:spring-kafka")
// fix CVE-2022-1471
implementation("org.yaml:snakeyaml:2.1")
implementation("org.flywaydb:flyway-mysql")
implementation("commons-codec:commons-codec")
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
implementation("de.ukw.ccc:bwhc-dto-java:0.2.0")
implementation("ca.uhn.hapi.fhir:hapi-fhir-base:6.6.2")
implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:6.6.2")
implementation("org.apache.httpcomponents.client5:httpclient5:5.2.1")
implementation("de.ukw.ccc:bwhc-dto-java:${versions["bwhc-dto-java"]}")
implementation("ca.uhn.hapi.fhir:hapi-fhir-base:${versions["hapi-fhir"]}")
implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:${versions["hapi-fhir"]}")
implementation("org.apache.httpcomponents.client5:httpclient5:${versions["httpclient5"]}")
runtimeOnly("org.mariadb.jdbc:mariadb-java-client")
runtimeOnly("org.postgresql:postgresql")
developmentOnly("org.springframework.boot:spring-boot-devtools")
@ -64,7 +77,7 @@ dependencies {
providedRuntime("org.springframework.boot:spring-boot-starter-tomcat")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.projectreactor:reactor-test")
testImplementation("org.mockito.kotlin:mockito-kotlin:5.0.0")
testImplementation("org.mockito.kotlin:mockito-kotlin:${versions["mockito-kotlin"]}")
integrationTestImplementation("org.testcontainers:junit-jupiter")
integrationTestImplementation("org.testcontainers:postgresql")
}

View File

@ -0,0 +1,55 @@
services:
dnpm-etl-processor:
image: ghcr.io/ccc-mf/etl-processor:latest
environment:
LOGGING_LEVEL_DEV: ${DNPM_LOG_LEVEL:-INFO}
SPRING_KAFKA_SECURITY_PROTOCOL: ${DNPM_KAFKA_SECURITY_PROTOCOL:-SSL}
SPRING_KAFKA_SSL_TRUST-STORE-TYPE: PKCS12
SPRING_KAFKA_SSL_TRUST-STORE-LOCATION: /opt/dnpm-processor/ssl/truststore.jks
SPRING_KAFKA_SSL_TRUST-STORE-PASSWORD: ${KAFKA_TRUST_STORE_PASSWORD}
SPRING_KAFKA_SSL_KEY-STORE-TYPE: PKCS12
SPRING_KAFKA_SSL_KEY-STORE-LOCATION: /opt/dnpm-processor/ssl/keystore.jks
SPRING_KAFKA_SSL_KEY-STORE-PASSWORD: ${DNPM_PROCESSOR_KEY_STORE_PASSWORD}
SPRING_KAFKA_PRODUCER_COMPRESSION-TYPE: gzip
APP_KAFKA_TOPIC: ${DNPM_KAFKA_TOPIC}
APP_KAFKA_SERVERS: ${KAFKA_BROKERS}
APP_KAFKA_GROUP_ID: ${DNPM_KAFKA_GROUP_ID}
APP_KAFKA_RESPONSE_TOPIC: ${DNPM_KAFKA_RESPONSE_TOPIC}
APP_REST_URI: ${DNPM_BWHC_REST_URI}
SPRING_DATASOURCE_URL: ${DNPM_DATASOURCE_URL}
SPRING_DATASOURCE_PASSWORD: ${DNPM_MARIADB_USER_PW}
SPRING_DATASOURCE_USERNAME: ${DNPM_MARIADB_DB}
APP_PSEUDONYMIZE_GPAS_SSLCALOCATION: /workspace/opt/dnpm-processor/ssl/mosaic.crt
APP_PSEUDONYMIZE_GPAS_PASSWORD: ${DNPM_PSEUDONYMIZE_GPAS_PASSWORD}
APP_PSEUDONYMIZE_GPAS_USERNAME: ${DNPM_PSEUDONYMIZE_GPAS_USERNAME}
APP_PSEUDONYMIZE_GPAS_TARGET: ${DNPM_PSEUDONYMIZE_GPAS_TARGET}
APP_PSEUDONYMIZE_GPAS_URI: ${DNPM_PSEUDONYMIZE_GPAS_URI}
APP_PSEUDONYMIZE_PREFIX: ${DNPM_APP_PSEUDONYMIZE_PREFIX}
APP_PSEUDONYMIZER: ${DNPM_PSEUDONYMIZE_GENERATOR}
volumes:
- /etc/localtime:/etc/localtime:ro
- /etc/timezone:/etc/timezone:ro
#- ${DNPM_TO_SSL_KEYSTORE_LOCATION}:/workspace/opt/dnpm-processor/ssl/keystore.jks:ro
#- ${KAFKA_TRUST_STORE_LOCATION}:/workspace/opt/dnpm-processor/ssl/truststore.jks:ro
#- ${DNPM_PSEUDONYMIZE_GPAS_SSLCALOCATION}:/workspace/opt/dnpm-processor/ssl/mosaic.crt
depends_on:
- dnpm-monitor-db
ports:
- "${DNPM_MONITORING_HTTP_PORT:-8080}:8080"
# todo add volume
dnpm-monitor-db:
image: mariadb:10
environment:
MARIADB_DATABASE: ${DNPM_MARIADB_DB}
MARIADB_USER: ${DNPM_MARIADB_USER}
MARIADB_PASSWORD: ${DNPM_MARIADB_USER_PW}
MARIADB_ROOT_PASSWORD: ${DNPM_MARIADB_ROOT_PW}
expose:
- "3306"

40
deploy/env-sample.env Normal file
View File

@ -0,0 +1,40 @@
# monitoring access port
DNPM_MONITORING_HTTP_PORT=8088
DNPM_LOG_LEVEL=INFO
# GPAS or BUILDIN
DNPM_PSEUDONYMIZE_GENERATOR=BUILDIN
DNPM_APP_PSEUDONYMIZE_PREFIX=ANONYM
DNPM_PSEUDONYMIZE_GPAS_URI=
DNPM_PSEUDONYMIZE_GPAS_TARGET=
DNPM_PSEUDONYMIZE_GPAS_USERNAME=
DNPM_PSEUDONYMIZE_GPAS_PASSWORD=
# path to ca root cert if needed
DNPM_PSEUDONYMIZE_GPAS_SSLCALOCATION=
DNPM_MARIADB_DB=dnpm_monitoring
DNPM_MARIADB_USER=$DNPM_MARIADB_DB
DNPM_MARIADB_USER_PW=MySuperSecurePassword111
DNPM_MARIADB_ROOT_PW=MySuperDuperSecurePassword111
# monitoring data db
DNPM_DATASOURCE_URL=jdbc:mariadb://dnpm-monitor-db:3306/$DNPM_MARIADB_DB
## TARGET SYSTEMS CONFIG
# in case of direct access to bwhc enter endpoint url here
DNPM_BWHC_REST_URI=
# produce mtb files to this topic - values 'false' disabling kafka processing
DNPM_KAFKA_TOPIC=false
KAFKA_BROKERS=false
DNPM_KAFKA_SECURITY_PROTOCOL=PLAINTEXT
# here we receive responses from bwhc
DNPM_KAFKA_RESPONSE_TOPIC=dnpm-response
DNPM_KAFKA_GROUP_ID=dnpm
# SSL or PLAINTEXT
DNPM_PROCESSOR_KEY_STORE_PASSWORD=
DNPM_TO_SSL_KEYSTORE_LOCATION=

View File

@ -1,11 +1,38 @@
services:
# Note: Make sure, hostname "kafka" points to 127.0.0.1
# otherwise connection will not be available
kafka:
image: bitnami/kafka
hostname: kafka
ports:
- "9092:9092"
- "9094:9094"
environment:
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_CFG_NODE_ID: "0"
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
akhq:
image: tchiotludo/akhq:0.21.0
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
docker-kafka-server:
properties:
bootstrap.servers: "kafka:9092"
connect:
- name: "kafka-connect"
url: "http://kafka-connect:8083"
ports:
- "8084:8080"
mariadb:
image: mariadb:10
@ -16,6 +43,7 @@ services:
MARIADB_USER: dev
MARIADB_PASSWORD: dev
MARIADB_ROOT_PASSWORD: dev
# postgres:
# image: postgres:alpine
# ports:
@ -23,4 +51,4 @@ services:
# environment:
# POSTGRES_DB: dev
# POSTGRES_USER: dev
# POSTGRES_PASSWORD: dev
# POSTGRES_PASSWORD: dev

BIN
docs/etl.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 75 KiB

View File

@ -20,10 +20,13 @@
package dev.dnpm.etl.processor
import dev.dnpm.etl.processor.output.MtbFileSender
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.context.ApplicationContext
import org.springframework.test.context.junit.jupiter.SpringExtension
import org.testcontainers.junit.jupiter.Testcontainers
@ -34,7 +37,9 @@ import org.testcontainers.junit.jupiter.Testcontainers
class EtlProcessorApplicationTests : AbstractTestcontainerTest() {
@Test
fun contextLoadsIfMtbFileSenderConfigured() {
fun contextLoadsIfMtbFileSenderConfigured(@Autowired context: ApplicationContext) {
// Simply check bean configuration
assertThat(context).isNotNull
}
}

View File

@ -116,7 +116,7 @@ class RequestServiceIntegrationTest : AbstractTestcontainerTest() {
fun shouldReturnDeleteRequestAsLastRequest() {
setupTestData()
val actual = requestService.isLastRequestDeletion("TEST_12345678901")
val actual = requestService.isLastRequestWithKnownStatusDeletion("TEST_12345678901")
assertThat(actual).isTrue()
}

View File

@ -22,6 +22,21 @@ package dev.dnpm.etl.processor.pseudonym;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.parser.IParser;
import dev.dnpm.etl.processor.config.GPasConfigProperties;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.Base64;
import java.util.HashMap;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
@ -39,7 +54,11 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.*;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
@ -51,31 +70,13 @@ import org.springframework.retry.support.RetryTemplate;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.Base64;
import java.util.HashMap;
public class GpasPseudonymGenerator implements Generator {
private final static FhirContext r4Context = FhirContext.forR4();
private final String gPasUrl;
private final String psnTargetDomain;
private static FhirContext r4Context = FhirContext.forR4();
private final HttpHeaders httpHeader;
private final RetryTemplate retryTemplate = defaultTemplate();
private final Logger log = LoggerFactory.getLogger(GpasPseudonymGenerator.class);
private SSLContext customSslContext;
@ -90,12 +91,16 @@ public class GpasPseudonymGenerator implements Generator {
try {
if (StringUtils.isNotBlank(gpasCfg.getSslCaLocation())) {
customSslContext = getSslContext(gpasCfg.getSslCaLocation());
log.debug(String.format("%s has been initialized with SSL certificate %s",
this.getClass().getName(), gpasCfg.getSslCaLocation()));
}
} catch (IOException | KeyManagementException | KeyStoreException | CertificateException |
NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
log.debug(String.format("%s has been initialized", this.getClass().getName()));
}
@Override
@ -110,12 +115,19 @@ public class GpasPseudonymGenerator implements Generator {
@NotNull
public static String unwrapPseudonym(Parameters gPasPseudonymResult) {
Identifier pseudonym = (Identifier) gPasPseudonymResult.getParameter().stream().findFirst()
.get().getPart().stream().filter(a -> a.getName().equals("pseudonym")).findFirst()
final var parameters = gPasPseudonymResult.getParameter().stream().findFirst();
if (parameters.isEmpty()) {
throw new PseudonymRequestFailed("Empty HL7 parameters, cannot find first one");
}
final var identifier = (Identifier) parameters.get().getPart().stream()
.filter(a -> a.getName().equals("pseudonym"))
.findFirst()
.orElseGet(ParametersParameterComponent::new).getValue();
// pseudonym
return pseudonym.getSystem() + "|" + pseudonym.getValue();
return identifier.getSystem() + "|" + identifier.getValue();
}

View File

@ -23,7 +23,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties
@ConfigurationProperties(AppConfigProperties.NAME)
data class AppConfigProperties(
var bwhc_uri: String?,
var bwhcUri: String?,
var generator: PseudonymGenerator = PseudonymGenerator.BUILDIN
) {
companion object {

View File

@ -40,7 +40,7 @@ class KafkaMtbFileSender(
val result = kafkaTemplate.send(
kafkaTargetProperties.topic,
key(request),
objectMapper.writeValueAsString(request.mtbFile)
objectMapper.writeValueAsString(Data(request.requestId, request.mtbFile))
)
if (result.get() != null) {
logger.debug("Sent file via KafkaMtbFileSender")
@ -68,7 +68,7 @@ class KafkaMtbFileSender(
val result = kafkaTemplate.send(
kafkaTargetProperties.topic,
key(request),
objectMapper.writeValueAsString(dummyMtbFile)
objectMapper.writeValueAsString(Data(request.requestId, dummyMtbFile))
)
if (result.get() != null) {
@ -85,12 +85,12 @@ class KafkaMtbFileSender(
private fun key(request: MtbFileSender.MtbFileRequest): String {
return "{\"pid\": \"${request.mtbFile.patient.id}\", " +
"\"eid\": \"${request.mtbFile.episode.id}\", " +
"\"requestId\": \"${request.requestId}\"}"
"\"eid\": \"${request.mtbFile.episode.id}\"}"
}
private fun key(request: MtbFileSender.DeleteRequest): String {
return "{\"pid\": \"${request.patientId}\", " +
"\"requestId\": \"${request.requestId}\"}"
return "{\"pid\": \"${request.patientId}\"}"
}
data class Data(val requestId: String, val content: MtbFile)
}

View File

@ -50,7 +50,7 @@ class RestMtbFileSender(
return MtbFileSender.Response(response.statusCode.asRequestStatus(), "Status-Code: ${response.statusCode.value()}")
}
logger.debug("Sent file via RestMtbFileSender")
return MtbFileSender.Response(response.statusCode.asRequestStatus())
return MtbFileSender.Response(response.statusCode.asRequestStatus(), response.body.orEmpty())
} catch (e: IllegalArgumentException) {
logger.error("Not a valid URI to export to: '{}'", restTargetProperties.uri!!)
} catch (e: RestClientException) {

View File

@ -38,7 +38,7 @@ infix fun MtbFile.pseudonymizeWith(pseudonymizeService: PseudonymizeService) {
this.histologyReports.forEach { it.patient = patientPseudonym }
this.lastGuidelineTherapies.forEach { it.patient = patientPseudonym }
this.molecularPathologyFindings.forEach { it.patient = patientPseudonym }
this.molecularTherapies.forEach { it.history.forEach { it.patient = patientPseudonym } }
this.molecularTherapies.forEach { molecularTherapy -> molecularTherapy.history.forEach { it.patient = patientPseudonym } }
this.ngsReports.forEach { it.patient = patientPseudonym }
this.previousGuidelineTherapies.forEach { it.patient = patientPseudonym }
this.rebiopsyRequests.forEach { it.patient = patientPseudonym }

View File

@ -30,7 +30,6 @@ import dev.dnpm.etl.processor.pseudonym.PseudonymizeService
import dev.dnpm.etl.processor.pseudonym.pseudonymizeWith
import org.apache.commons.codec.binary.Base32
import org.apache.commons.codec.digest.DigestUtils
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Service
import java.time.Instant
@ -45,8 +44,6 @@ class RequestProcessor(
private val applicationEventPublisher: ApplicationEventPublisher
) {
private val logger = LoggerFactory.getLogger(RequestProcessor::class.java)
fun processMtbFile(mtbFile: MtbFile) {
val requestId = UUID.randomUUID().toString()
val pid = mtbFile.patient.id
@ -95,7 +92,7 @@ class RequestProcessor(
private fun isDuplication(pseudonymizedMtbFile: MtbFile): Boolean {
val lastMtbFileRequestForPatient =
requestService.lastMtbFileRequestForPatientPseudonym(pseudonymizedMtbFile.patient.id)
val isLastRequestDeletion = requestService.isLastRequestDeletion(pseudonymizedMtbFile.patient.id)
val isLastRequestDeletion = requestService.isLastRequestWithKnownStatusDeletion(pseudonymizedMtbFile.patient.id)
return null != lastMtbFileRequestForPatient
&& !isLastRequestDeletion

View File

@ -38,8 +38,8 @@ class RequestService(
fun lastMtbFileRequestForPatientPseudonym(patientPseudonym: String) =
Companion.lastMtbFileRequestForPatientPseudonym(allRequestsByPatientPseudonym(patientPseudonym))
fun isLastRequestDeletion(patientPseudonym: String) =
Companion.isLastRequestDeletion(allRequestsByPatientPseudonym(patientPseudonym))
fun isLastRequestWithKnownStatusDeletion(patientPseudonym: String) =
Companion.isLastRequestWithKnownStatusDeletion(allRequestsByPatientPseudonym(patientPseudonym))
companion object {
@ -48,7 +48,8 @@ class RequestService(
.sortedByDescending { it.processedAt }
.firstOrNull { it.status == RequestStatus.SUCCESS || it.status == RequestStatus.WARNING }
fun isLastRequestDeletion(allRequests: List<Request>) = allRequests
fun isLastRequestWithKnownStatusDeletion(allRequests: List<Request>) = allRequests
.filter { it.status != RequestStatus.UNKNOWN }
.maxByOrNull { it.processedAt }?.type == RequestType.DELETE
}

View File

@ -19,7 +19,6 @@
package dev.dnpm.etl.processor.services
import com.fasterxml.jackson.databind.ObjectMapper
import dev.dnpm.etl.processor.monitoring.Report
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.monitoring.RequestStatus
@ -33,8 +32,7 @@ import java.util.*
@Service
class ResponseProcessor(
private val requestRepository: RequestRepository,
private val statisticsUpdateProducer: Sinks.Many<Any>,
private val objectMapper: ObjectMapper
private val statisticsUpdateProducer: Sinks.Many<Any>
) {
private val logger = LoggerFactory.getLogger(ResponseProcessor::class.java)
@ -55,14 +53,14 @@ class ResponseProcessor(
RequestStatus.WARNING -> {
it.report = Report(
"Warnungen über mangelhafte Daten",
objectMapper.writeValueAsString(event.body)
event.body.orElse("")
)
}
RequestStatus.ERROR -> {
it.report = Report(
"Fehler bei der Datenübertragung oder Inhalt nicht verarbeitbar",
objectMapper.writeValueAsString(event.body)
event.body.orElse("")
)
}
@ -73,7 +71,7 @@ class ResponseProcessor(
}
else -> {
logger.error("Cannot process response: Unknown response code!")
logger.error("Cannot process response: Unknown response!")
return@ifPresentOrElse
}
}

View File

@ -41,50 +41,40 @@ class KafkaResponseProcessor(
override fun onMessage(data: ConsumerRecord<String, String>) {
try {
Optional.of(objectMapper.readValue(data.key(), ResponseKey::class.java))
Optional.of(objectMapper.readValue(data.value(), ResponseBody::class.java))
} catch (e: Exception) {
logger.error("Cannot process Kafka response", e)
Optional.empty()
}.ifPresentOrElse({ responseKey ->
val event = try {
val responseBody = objectMapper.readValue(data.value(), ResponseBody::class.java)
ResponseEvent(
responseKey.requestId,
Instant.ofEpochMilli(data.timestamp()),
responseBody.statusCode.asRequestStatus(),
when (responseBody.statusCode.asRequestStatus()) {
RequestStatus.SUCCESS -> {
Optional.empty()
}
RequestStatus.WARNING, RequestStatus.ERROR -> {
Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
}
else -> {
logger.error("Kafka response: Unknown response code!")
Optional.empty()
}
}.ifPresentOrElse({ responseBody ->
val event = ResponseEvent(
responseBody.requestId,
Instant.ofEpochMilli(data.timestamp()),
responseBody.statusCode.asRequestStatus(),
when (responseBody.statusCode.asRequestStatus()) {
RequestStatus.SUCCESS -> {
Optional.empty()
}
)
} catch (e: Exception) {
logger.error("Cannot process Kafka response", e)
ResponseEvent(
responseKey.requestId,
Instant.ofEpochMilli(data.timestamp()),
RequestStatus.ERROR,
Optional.of("Cannot process Kafka response")
)
}
RequestStatus.WARNING, RequestStatus.ERROR -> {
Optional.of(objectMapper.writeValueAsString(responseBody.statusBody))
}
else -> {
logger.error("Kafka response: Unknown response code '{}'!", responseBody.statusCode)
Optional.empty()
}
}
)
eventPublisher.publishEvent(event)
}, {
logger.error("No response key in Kafka response")
logger.error("No requestId in Kafka response")
})
}
data class ResponseKey(val requestId: String)
data class ResponseBody(
@JsonProperty("status_code") @JsonAlias("status code") val statusCode: Int,
@JsonProperty("status_body") val statusBody: Map<String, Any>
@JsonProperty("request_id") @JsonAlias("requestId") val requestId: String,
@JsonProperty("status_code") @JsonAlias("statusCode") val statusCode: Int,
@JsonProperty("status_body") @JsonAlias("statusBody") val statusBody: Map<String, Any>
)
}

View File

@ -83,9 +83,9 @@ class StatisticsRestController(
.groupBy { formatter.format(it.processedAt) }
.map {
val requestList = it.value
.groupBy { it.status }
.map {
Pair(it.key, it.value.size)
.groupBy { request -> request.status }
.map { request ->
Pair(request.key, request.value.size)
}
.toMap()
Pair(

View File

@ -4,12 +4,12 @@ spring:
file: ./dev-compose.yml
app:
rest:
uri: http://localhost:9000/bwhc/etl/api
#rest:
# uri: http://localhost:9000/bwhc/etl/api
kafka:
topic: test
response-topic: test-response
servers: kafka:9092
response-topic: test_response
servers: localhost:9094
server:
port: 8000

View File

@ -97,9 +97,9 @@ class KafkaMtbFileSenderTest {
val captor = argumentCaptor<String>()
verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture())
assertThat(captor.firstValue).isNotNull
assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\", \"requestId\": \"TestID\"}")
assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"eid\": \"1\"}")
assertThat(captor.secondValue).isNotNull
assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.ACTIVE)))
assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.ACTIVE)))
}
@Test
@ -113,9 +113,9 @@ class KafkaMtbFileSenderTest {
val captor = argumentCaptor<String>()
verify(kafkaTemplate, times(1)).send(anyString(), captor.capture(), captor.capture())
assertThat(captor.firstValue).isNotNull
assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\", \"requestId\": \"TestID\"}")
assertThat(captor.firstValue).isEqualTo("{\"pid\": \"PID\"}")
assertThat(captor.secondValue).isNotNull
assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(mtbFile(Consent.Status.REJECTED)))
assertThat(captor.secondValue).isEqualTo(objectMapper.writeValueAsString(kafkaRecordData("TestID", Consent.Status.REJECTED)))
}
companion object {
@ -154,6 +154,10 @@ class KafkaMtbFileSenderTest {
}.build()
}
fun kafkaRecordData(requestId: String, consentStatus: Consent.Status): KafkaMtbFileSender.Data {
return KafkaMtbFileSender.Data(requestId, mtbFile(consentStatus))
}
data class TestData(val requestStatus: RequestStatus, val exception: Throwable? = null)
@JvmStatic

View File

@ -61,7 +61,8 @@ class RestMtbFileSenderTest {
}
val response = restMtbFileSender.send(MtbFileSender.DeleteRequest("TestID", "PID"))
assertThat(response.status).isEqualTo(requestWithResponse.requestStatus)
assertThat(response.status).isEqualTo(requestWithResponse.response.status)
assertThat(response.body).isEqualTo(requestWithResponse.response.body)
}
@ParameterizedTest
@ -75,11 +76,16 @@ class RestMtbFileSenderTest {
}
val response = restMtbFileSender.send(MtbFileSender.MtbFileRequest("TestID", mtbFile))
assertThat(response.status).isEqualTo(requestWithResponse.requestStatus)
assertThat(response.status).isEqualTo(requestWithResponse.response.status)
assertThat(response.body).isEqualTo(requestWithResponse.response.body)
}
companion object {
data class RequestWithResponse(val httpStatus: HttpStatus, val body: String, val requestStatus: RequestStatus)
data class RequestWithResponse(
val httpStatus: HttpStatus,
val body: String,
val response: MtbFileSender.Response
)
private val warningBody = """
{
@ -99,7 +105,7 @@ class RestMtbFileSenderTest {
}
""".trimIndent()
val mtbFile = MtbFile.builder()
val mtbFile: MtbFile = MtbFile.builder()
.withPatient(
Patient.builder()
.withId("PID")
@ -123,6 +129,8 @@ class RestMtbFileSenderTest {
)
.build()
private const val ERROR_RESPONSE_BODY = "Sonstiger Fehler bei der Übertragung"
/**
* Synthetic http responses with related request status
* Also see: https://ibmi-intra.cs.uni-tuebingen.de/display/ZPM/bwHC+REST+API
@ -130,13 +138,33 @@ class RestMtbFileSenderTest {
@JvmStatic
fun mtbFileRequestWithResponseSource(): Set<RequestWithResponse> {
return setOf(
RequestWithResponse(HttpStatus.OK, "{}", RequestStatus.SUCCESS),
RequestWithResponse(HttpStatus.CREATED, warningBody, RequestStatus.WARNING),
RequestWithResponse(HttpStatus.BAD_REQUEST, "??", RequestStatus.ERROR),
RequestWithResponse(HttpStatus.UNPROCESSABLE_ENTITY, errorBody, RequestStatus.ERROR),
RequestWithResponse(HttpStatus.OK, "{}", MtbFileSender.Response(RequestStatus.SUCCESS, "{}")),
RequestWithResponse(
HttpStatus.CREATED,
warningBody,
MtbFileSender.Response(RequestStatus.WARNING, warningBody)
),
RequestWithResponse(
HttpStatus.BAD_REQUEST,
"??",
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
),
RequestWithResponse(
HttpStatus.UNPROCESSABLE_ENTITY,
errorBody,
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
),
// Some more errors not mentioned in documentation
RequestWithResponse(HttpStatus.NOT_FOUND, "what????", RequestStatus.ERROR),
RequestWithResponse(HttpStatus.INTERNAL_SERVER_ERROR, "what????", RequestStatus.ERROR)
RequestWithResponse(
HttpStatus.NOT_FOUND,
"what????",
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
),
RequestWithResponse(
HttpStatus.INTERNAL_SERVER_ERROR,
"what????",
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
)
)
}
@ -147,10 +175,18 @@ class RestMtbFileSenderTest {
@JvmStatic
fun deleteRequestWithResponseSource(): Set<RequestWithResponse> {
return setOf(
RequestWithResponse(HttpStatus.OK, "", RequestStatus.SUCCESS),
RequestWithResponse(HttpStatus.OK, "", MtbFileSender.Response(RequestStatus.SUCCESS)),
// Some more errors not mentioned in documentation
RequestWithResponse(HttpStatus.NOT_FOUND, "what????", RequestStatus.ERROR),
RequestWithResponse(HttpStatus.INTERNAL_SERVER_ERROR, "what????", RequestStatus.ERROR)
RequestWithResponse(
HttpStatus.NOT_FOUND,
"what????",
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
),
RequestWithResponse(
HttpStatus.INTERNAL_SERVER_ERROR,
"what????",
MtbFileSender.Response(RequestStatus.ERROR, ERROR_RESPONSE_BODY)
)
)
}
}

View File

@ -92,7 +92,7 @@ class RequestProcessorTest {
doAnswer {
false
}.`when`(requestService).isLastRequestDeletion(anyString())
}.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
doAnswer {
it.arguments[0] as String
@ -147,7 +147,7 @@ class RequestProcessorTest {
doAnswer {
false
}.`when`(requestService).isLastRequestDeletion(anyString())
}.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
doAnswer {
it.arguments[0] as String
@ -202,7 +202,7 @@ class RequestProcessorTest {
doAnswer {
false
}.`when`(requestService).isLastRequestDeletion(anyString())
}.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
doAnswer {
MtbFileSender.Response(status = RequestStatus.SUCCESS)
@ -261,7 +261,7 @@ class RequestProcessorTest {
doAnswer {
false
}.`when`(requestService).isLastRequestDeletion(anyString())
}.`when`(requestService).isLastRequestWithKnownStatusDeletion(anyString())
doAnswer {
MtbFileSender.Response(status = RequestStatus.ERROR)

View File

@ -68,23 +68,33 @@ class RequestServiceTest {
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdef1",
type = RequestType.DELETE,
status = RequestStatus.SUCCESS,
processedAt = Instant.parse("2023-08-08T02:00:00Z")
),
Request(
id = 1L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678902",
pid = "P2",
fingerprint = "0123456789abcdef2",
type = RequestType.MTB_FILE,
status = RequestStatus.WARNING,
processedAt = Instant.parse("2023-08-08T00:00:00Z")
processedAt = Instant.parse("2023-07-07T00:00:00Z")
),
Request(
id = 2L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdefd",
type = RequestType.DELETE,
status = RequestStatus.WARNING,
processedAt = Instant.parse("2023-07-07T02:00:00Z")
),
Request(
id = 3L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdef1",
type = RequestType.MTB_FILE,
status = RequestStatus.UNKNOWN,
processedAt = Instant.parse("2023-08-11T00:00:00Z")
)
)
val actual = RequestService.isLastRequestDeletion(requests)
val actual = RequestService.isLastRequestWithKnownStatusDeletion(requests)
assertThat(actual).isTrue()
}
@ -98,23 +108,33 @@ class RequestServiceTest {
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdef1",
type = RequestType.DELETE,
status = RequestStatus.SUCCESS,
type = RequestType.MTB_FILE,
status = RequestStatus.WARNING,
processedAt = Instant.parse("2023-07-07T00:00:00Z")
),
Request(
id = 2L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdef1",
type = RequestType.MTB_FILE,
status = RequestStatus.WARNING,
processedAt = Instant.parse("2023-07-07T02:00:00Z")
),
Request(
id = 1L,
id = 3L,
uuid = UUID.randomUUID().toString(),
patientId = "TEST_12345678902",
pid = "P2",
fingerprint = "0123456789abcdef2",
patientId = "TEST_12345678901",
pid = "P1",
fingerprint = "0123456789abcdef1",
type = RequestType.MTB_FILE,
status = RequestStatus.WARNING,
processedAt = Instant.parse("2023-08-08T00:00:00Z")
status = RequestStatus.UNKNOWN,
processedAt = Instant.parse("2023-08-11T00:00:00Z")
)
)
val actual = RequestService.isLastRequestDeletion(requests)
val actual = RequestService.isLastRequestWithKnownStatusDeletion(requests)
assertThat(actual).isFalse()
}
@ -197,7 +217,7 @@ class RequestServiceTest {
@Test
fun isLastRequestDeletionShouldRequestAllRequestsForPatientPseudonym() {
requestService.isLastRequestDeletion("TEST_12345678901")
requestService.isLastRequestWithKnownStatusDeletion("TEST_12345678901")
verify(requestRepository, times(1)).findAllByPatientIdOrderByProcessedAtDesc(anyString())
}

View File

@ -19,8 +19,6 @@
package dev.dnpm.etl.processor.services
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import dev.dnpm.etl.processor.monitoring.Request
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.monitoring.RequestStatus
@ -62,12 +60,10 @@ class ResponseProcessorTest {
@Mock requestRepository: RequestRepository,
@Mock statisticsUpdateProducer: Sinks.Many<Any>
) {
val objectMapper = ObjectMapper().registerModule(KotlinModule.Builder().build())
this.requestRepository = requestRepository
this.statisticsUpdateProducer = statisticsUpdateProducer
this.responseProcessor = ResponseProcessor(requestRepository, statisticsUpdateProducer, objectMapper)
this.responseProcessor = ResponseProcessor(requestRepository, statisticsUpdateProducer)
}
@Test

View File

@ -45,8 +45,8 @@ class KafkaResponseProcessorTest {
private lateinit var kafkaResponseProcessor: KafkaResponseProcessor
private fun createkafkaRecord(
requestId: String? = null,
private fun createKafkaRecord(
requestId: String,
statusCode: Int = 200,
statusBody: Map<String, Any>? = mapOf()
): ConsumerRecord<String, String> {
@ -54,15 +54,11 @@ class KafkaResponseProcessorTest {
"test-topic",
0,
0,
if (requestId == null) {
null
} else {
this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseKey(requestId))
},
null,
if (statusBody == null) {
""
} else {
this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(statusCode, statusBody))
this.objectMapper.writeValueAsString(KafkaResponseProcessor.ResponseBody(requestId, statusCode, statusBody))
}
)
}
@ -78,23 +74,57 @@ class KafkaResponseProcessorTest {
}
@Test
fun shouldNotProcessRecordsWithoutValidKey() {
this.kafkaResponseProcessor.onMessage(createkafkaRecord(null, 200))
fun shouldNotProcessRecordsWithoutRequestIdInBody() {
val record = ConsumerRecord<String, String>(
"test-topic",
0,
0,
null,
"""
{
"statusCode": 200,
"statusBody": {}
}
""".trimIndent()
)
verify(eventPublisher, never()).publishEvent(any())
this.kafkaResponseProcessor.onMessage(record)
verify(eventPublisher, never()).publishEvent(any<ResponseEvent>())
}
@Test
fun shouldNotProcessRecordsWithoutValidBody() {
this.kafkaResponseProcessor.onMessage(createkafkaRecord(requestId = "TestID1234", statusBody = null))
fun shouldProcessRecordsWithAliasNames() {
val record = ConsumerRecord<String, String>(
"test-topic",
0,
0,
null,
"""
{
"request_id": "test0123456789",
"status_code": 200,
"status_body": {}
}
""".trimIndent()
)
verify(eventPublisher, never()).publishEvent(any())
this.kafkaResponseProcessor.onMessage(record)
verify(eventPublisher, times(1)).publishEvent(any<ResponseEvent>())
}
@Test
fun shouldNotProcessRecordsWithoutValidStatusBody() {
this.kafkaResponseProcessor.onMessage(createKafkaRecord(requestId = "TestID1234", statusBody = null))
verify(eventPublisher, never()).publishEvent(any<ResponseEvent>())
}
@ParameterizedTest
@MethodSource("statusCodeSource")
fun shouldProcessValidRecordsWithStatusCode(statusCode: Int) {
this.kafkaResponseProcessor.onMessage(createkafkaRecord("TestID1234", statusCode))
this.kafkaResponseProcessor.onMessage(createKafkaRecord("TestID1234", statusCode))
verify(eventPublisher, times(1)).publishEvent(any<ResponseEvent>())
}