From 0defbb05bac3f585e75d72c02c9168d1c2de1834 Mon Sep 17 00:00:00 2001 From: Jakub Lidke Date: Wed, 26 Jul 2023 12:28:03 +0200 Subject: [PATCH 1/9] fix: add message key value while producing messages to kafka topic --- .../dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 374c0af..9520caa 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -25,15 +25,16 @@ import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate class KafkaMtbFileSender( - private val kafkaTemplate: KafkaTemplate, - private val objectMapper: ObjectMapper + private val kafkaTemplate: KafkaTemplate, + private val objectMapper: ObjectMapper ) : MtbFileSender { private val logger = LoggerFactory.getLogger(KafkaMtbFileSender::class.java) override fun send(mtbFile: MtbFile): MtbFileSender.Response { return try { - kafkaTemplate.sendDefault(objectMapper.writeValueAsString(mtbFile)) + kafkaTemplate.sendDefault(String.format("{\"pid\": %s, \"eid\": %s}", mtbFile.patient.id, + mtbFile.episode.id), objectMapper.writeValueAsString(mtbFile)) logger.debug("Sent file via KafkaMtbFileSender") MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) } catch (e: Exception) { From 4c0a444725f1107e44db41e509587b969a33ea4b Mon Sep 17 00:00:00 2001 From: Jakub Lidke Date: Thu, 27 Jul 2023 11:23:47 +0200 Subject: [PATCH 2/9] fix: fix kafka key values + add missing component attribute --- .../dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 9520caa..f83a2ab 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -23,7 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.MtbFile import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate +import org.springframework.stereotype.Component +@Component class KafkaMtbFileSender( private val kafkaTemplate: KafkaTemplate, private val objectMapper: ObjectMapper @@ -33,7 +35,7 @@ class KafkaMtbFileSender( override fun send(mtbFile: MtbFile): MtbFileSender.Response { return try { - kafkaTemplate.sendDefault(String.format("{\"pid\": %s, \"eid\": %s}", mtbFile.patient.id, + kafkaTemplate.sendDefault(String.format("{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id, mtbFile.episode.id), objectMapper.writeValueAsString(mtbFile)) logger.debug("Sent file via KafkaMtbFileSender") MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) From e9e7139ca41571546b28e48d7254eff781eadf36 Mon Sep 17 00:00:00 2001 From: Jakub Lidke Date: Thu, 27 Jul 2023 11:25:32 +0200 Subject: [PATCH 3/9] test: add kafka test/dev environment docker-compose.dev.yml; add README_TEST_WITH_GPAS.md --- dev/README_TEST_WITH_GPAS.md | 11 +++++ dev/docker-compose.dev.yml | 90 ++++++++++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 dev/README_TEST_WITH_GPAS.md create mode 100644 dev/docker-compose.dev.yml diff --git a/dev/README_TEST_WITH_GPAS.md b/dev/README_TEST_WITH_GPAS.md new file mode 100644 index 0000000..ff9f62b --- /dev/null +++ b/dev/README_TEST_WITH_GPAS.md @@ -0,0 +1,11 @@ +# Test with gPAS +1. Download from [Latest Docker-compose version of gPAS](https://www.ths-greifswald.de/gpas/#_download "") +2. copy `./demo/demo_gpas.sql` into `./sqls` folder +3. if needed change port mapping +4. startup via `docker compose up -d` + +By default, PSN are created via `localhost:8080/ttp-fhir/fhir/gpas/$pseudonymizeAllowCreate` endpoint +You can review created PSN via gPAs web interface running at `http://localhost:8080/gpas-web/` + + + diff --git a/dev/docker-compose.dev.yml b/dev/docker-compose.dev.yml new file mode 100644 index 0000000..d7a436b --- /dev/null +++ b/dev/docker-compose.dev.yml @@ -0,0 +1,90 @@ +version: '3.7' + +services: + + zoo1: + image: zookeeper:3.8.0 + hostname: zoo1 + ports: + - "2181:2181" + environment: + ZOO_MY_ID: 1 + ZOO_PORT: 2181 + ZOO_SERVERS: server.1=zoo1:2888:3888;2181 + + kafka1: + image: confluentinc/cp-kafka:7.2.1 + hostname: kafka1 + ports: + - "9092:9092" + environment: + KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 1 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + depends_on: + - zoo1 + + kafka-rest-proxy: + image: confluentinc/cp-kafka-rest:7.2.1 + hostname: kafka-rest-proxy + ports: + - "8082:8082" + environment: + # KAFKA_REST_ZOOKEEPER_CONNECT: zoo1:2181 + KAFKA_REST_LISTENERS: http://0.0.0.0:8082/ + KAFKA_REST_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081/ + KAFKA_REST_HOST_NAME: kafka-rest-proxy + KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092 + depends_on: + - zoo1 + - kafka1 + + kafka-connect: + image: confluentinc/cp-kafka-connect:7.2.1 + hostname: kafka-connect + ports: + - "8083:8083" + environment: + CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092" + CONNECT_REST_PORT: 8083 + CONNECT_GROUP_ID: compose-connect-group + CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs + CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets + CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status + CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter" + CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" + CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" + CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO" + CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars" + #volumes: + # - ./connectors:/etc/kafka-connect/jars/ + depends_on: + - zoo1 + - kafka1 + - kafka-rest-proxy + + akhq: + image: tchiotludo/akhq:0.21.0 + environment: + AKHQ_CONFIGURATION: | + akhq: + connections: + docker-kafka-server: + properties: + bootstrap.servers: "kafka1:19092" + connect: + - name: "kafka-connect" + url: "http://kafka-connect:8083" + ports: + - "8084:8080" + depends_on: + - kafka1 + - kafka-connect From cf8c5a86928da3a109e700a5221b5aa26cfe4aa7 Mon Sep 17 00:00:00 2001 From: Jakub Lidke Date: Thu, 27 Jul 2023 11:49:31 +0200 Subject: [PATCH 4/9] fix: wait for kafka to accept message and return success than --- .../processor/output/KafkaMtbFileSender.kt | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index f83a2ab..18faaf9 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -27,18 +27,27 @@ import org.springframework.stereotype.Component @Component class KafkaMtbFileSender( - private val kafkaTemplate: KafkaTemplate, - private val objectMapper: ObjectMapper + private val kafkaTemplate: KafkaTemplate, + private val objectMapper: ObjectMapper ) : MtbFileSender { private val logger = LoggerFactory.getLogger(KafkaMtbFileSender::class.java) override fun send(mtbFile: MtbFile): MtbFileSender.Response { return try { - kafkaTemplate.sendDefault(String.format("{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id, - mtbFile.episode.id), objectMapper.writeValueAsString(mtbFile)) - logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + var result = kafkaTemplate.sendDefault( + String.format( + "{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id, + mtbFile.episode.id + ), objectMapper.writeValueAsString(mtbFile) + ) + if (result.get() != null) { + logger.debug("Sent file via KafkaMtbFileSender"); + MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS); + } else { + MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + } + } catch (e: Exception) { logger.error("An error occured sending to kafka", e) MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) From 469c58fe266b9c6785ea8ad3ca127169293bbbf5 Mon Sep 17 00:00:00 2001 From: Jakub Lidke Date: Thu, 27 Jul 2023 11:50:49 +0200 Subject: [PATCH 5/9] chore: update .gitignore do not checkin gpas files --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index c2065bc..4ae22a7 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,4 @@ out/ ### VS Code ### .vscode/ +/dev/gpas* From 79d83ef04a31aa227a203aaa9345b6e12a4a8620 Mon Sep 17 00:00:00 2001 From: Jakub Lidke Date: Thu, 27 Jul 2023 12:18:17 +0200 Subject: [PATCH 6/9] fix: if no prefix is set, `_` is also not added as prefix to patient pseudonym. --- .../dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt index 364e296..21e0f9a 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt @@ -19,6 +19,7 @@ package dev.dnpm.etl.processor.pseudonym +import ca.uhn.fhir.util.StringUtil import de.ukw.ccc.bwhc.dto.MtbFile import dev.dnpm.etl.processor.config.PseudonymizeConfigProperties @@ -28,7 +29,9 @@ class PseudonymizeService( ) { fun pseudonymize(mtbFile: MtbFile): MtbFile { - val patientPseudonym = "${configProperties.prefix}_${generator.generate(mtbFile.patient.id)}" + val patientPseudonym: String = if(configProperties.prefix.isNotEmpty()) + "${configProperties.prefix}_${generator.generate(mtbFile.patient.id)}" + else generator.generate(mtbFile.patient.id) mtbFile.episode.patient = patientPseudonym mtbFile.carePlans.forEach { it.patient = patientPseudonym } From 90c5b81c2b6412a7bf01fa872ca84b5bb13ccb6a Mon Sep 17 00:00:00 2001 From: Jakub Lidke Date: Thu, 27 Jul 2023 13:01:58 +0200 Subject: [PATCH 7/9] feat: patient pid may be replaced with gPAS pseudonym, now. --- README.md | 5 +- build.gradle.kts | 3 + .../pseudonym/GpasPseudonymGenerator.java | 246 +++++++++++++++++- .../pseudonym/PseudonymRequestFailed.java | 12 + .../processor/config/AppConfigProperties.kt | 6 +- .../etl/processor/config/AppConfiguration.kt | 10 +- .../processor/output/KafkaMtbFileSender.kt | 8 +- src/main/resources/application-dev.yml | 28 +- src/main/resources/application.yml | 14 +- 9 files changed, 302 insertions(+), 30 deletions(-) create mode 100644 src/main/java/dev/dnpm/etl/processor/pseudonym/PseudonymRequestFailed.java diff --git a/README.md b/README.md index d8b86ca..d49ef02 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,11 @@ Wenn eine URI zu einer gPAS-Instanz angegeben ist, wird diese verwendet. Ist diese nicht gesetzt. wird intern eine Anonymisierung der Patienten-ID vorgenommen. * `APP_PSEUDONYMIZE_PREFIX`: Standortbezogenes Prefix - `UNKNOWN`, wenn nicht gesetzt -* `APP_PSEUDONYMIZE_GPAS_URI`: URI der gPAS-Instanz +* `APP_PSEUDONYMIZE_GPAS_URI`: URI der gPAS-Instanz inklusive Endpoint (z.B. `http://localhost:8080/ttp-fhir/fhir/gpas/$pseudonymizeAllowCreate`) * `APP_PSEUDONYMIZE_GPAS_TARGET`: gPas Domänenname +* `APP_PSEUDONYMIZE_GPAS_USERNAME`: gPas Basic-Auth Benutzername +* `APP_PSEUDONYMIZE_GPAS_PASSWORD`: gPas Basic-Auth Passwort +* `APP_PSEUDONYMIZE_GPAS_SSLCALOCATION`: Root Zertifikat für gPas, falls es dediziert hinzugefügt werden muss. ## Mögliche Endpunkte diff --git a/build.gradle.kts b/build.gradle.kts index 3371bb4..eecd959 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -39,6 +39,9 @@ dependencies { implementation("commons-codec:commons-codec") implementation("io.projectreactor.kotlin:reactor-kotlin-extensions") implementation("de.ukw.ccc:bwhc-dto-java:0.2.0") + implementation("ca.uhn.hapi.fhir:hapi-fhir-base:6.6.2") + implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:6.6.2") + implementation("org.apache.httpcomponents.client5:httpclient5:5.2.1") runtimeOnly("org.mariadb.jdbc:mariadb-java-client") runtimeOnly("org.postgresql:postgresql") developmentOnly("org.springframework.boot:spring-boot-devtools") diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java index 98f4ba6..8b93cdc 100644 --- a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java +++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java @@ -19,23 +19,253 @@ package dev.dnpm.etl.processor.pseudonym; -import java.net.URI; +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.parser.IParser; +import dev.dnpm.etl.processor.config.GPasConfigProperties; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.ConnectException; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.util.Base64; +import java.util.HashMap; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import org.apache.commons.lang3.StringUtils; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager; +import org.apache.hc.client5.http.socket.ConnectionSocketFactory; +import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory; +import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory; +import org.apache.hc.core5.http.config.Registry; +import org.apache.hc.core5.http.config.RegistryBuilder; +import org.hl7.fhir.r4.model.Identifier; +import org.hl7.fhir.r4.model.Parameters; +import org.hl7.fhir.r4.model.Parameters.ParametersParameterComponent; +import org.hl7.fhir.r4.model.StringType; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryListener; +import org.springframework.retry.RetryPolicy; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestClientException; +import org.springframework.web.client.RestTemplate; +@Component +@ConditionalOnProperty(value = "app.pseudonymizer", havingValue = "GPAS") public class GpasPseudonymGenerator implements Generator { - private final URI uri; + private final String gPasUrl; + private final String psnTargetDomain; + private static FhirContext r4Context = FhirContext.forR4(); + private final HttpHeaders httpHeader; - private final String target; + private final RetryTemplate retryTemplate = defaultTemplate(); + + private final Logger log = LoggerFactory.getLogger(GpasPseudonymGenerator.class); + + private SSLContext customSslContext; + private RestTemplate restTemplate; + + @Autowired + public GpasPseudonymGenerator(GPasConfigProperties gpasCfg) { + + this.gPasUrl = gpasCfg.getUri(); + this.psnTargetDomain = gpasCfg.getTarget(); + httpHeader = getHttpHeaders(gpasCfg.getUsername(), gpasCfg.getPassword()); + + try { + if (StringUtils.isNotBlank(gpasCfg.getSslCaLocation())) { + customSslContext = getSslContext(gpasCfg.getSslCaLocation()); + } + } catch (IOException | KeyManagementException | KeyStoreException | CertificateException | + NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } - public GpasPseudonymGenerator(URI uri, String target) { - this.uri = uri; - this.target = target; } @Override public String generate(String id) { - // TODO Implement this - return "?"; + var gPasRequestBody = getGpasRequestBody(id); + var responseEntity = getGpasPseudonym(gPasRequestBody); + var gPasPseudonymResult = (Parameters) r4Context.newJsonParser() + .parseResource(responseEntity.getBody()); + + return unwrapPseudonym(gPasPseudonymResult); } + @NotNull + public static String unwrapPseudonym(Parameters gPasPseudonymResult) { + Identifier pseudonym = (Identifier) gPasPseudonymResult.getParameter().stream().findFirst() + .get().getPart().stream().filter(a -> a.getName().equals("pseudonym")).findFirst() + .orElseGet(ParametersParameterComponent::new).getValue(); + + // pseudonym + return pseudonym.getSystem() + "|" + pseudonym.getValue(); + } + + + @NotNull + protected ResponseEntity getGpasPseudonym(String gPasRequestBody) { + + HttpEntity requestEntity = new HttpEntity<>(gPasRequestBody, this.httpHeader); + ResponseEntity responseEntity; + var restTemplate = getRestTemplete(); + + try { + responseEntity = retryTemplate.execute( + ctx -> restTemplate.exchange(gPasUrl, HttpMethod.POST, requestEntity, + String.class)); + + if (responseEntity.getStatusCode().is2xxSuccessful()) { + log.debug("API request succeeded. Response: {}", responseEntity.getStatusCode()); + } else { + log.warn("API request unsuccessful. Response: {}", requestEntity.getBody()); + throw new PseudonymRequestFailed("API request unsuccessful gPas unsuccessful."); + } + + return responseEntity; + } catch (Exception unexpected) { + throw new PseudonymRequestFailed( + "API request due unexpected error unsuccessful gPas unsuccessful.", unexpected); + } + } + + protected String getGpasRequestBody(String id) { + var requestParameters = new Parameters(); + requestParameters.addParameter().setName("target") + .setValue(new StringType().setValue(psnTargetDomain)); + requestParameters.addParameter().setName("original") + .setValue(new StringType().setValue(id)); + final IParser iParser = r4Context.newJsonParser(); + return iParser.encodeResourceToString(requestParameters); + } + + @NotNull + protected HttpHeaders getHttpHeaders(String gPasUserName, String gPasPassword) { + var headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + if (StringUtils.isBlank(gPasUserName) || StringUtils.isBlank(gPasPassword)) { + return headers; + } + + String authHeader = gPasUserName + ":" + gPasPassword; + byte[] authHeaderBytes = authHeader.getBytes(); + byte[] encodedAuthHeaderBytes = Base64.getEncoder().encode(authHeaderBytes); + String encodedAuthHeader = new String(encodedAuthHeaderBytes); + + if (StringUtils.isNotBlank(gPasUserName) && StringUtils.isNotBlank(gPasPassword)) { + headers.set("Authorization", "Basic " + encodedAuthHeader); + } + + return headers; + } + + protected RetryTemplate defaultTemplate() { + RetryTemplate retryTemplate = new RetryTemplate(); + ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); + backOffPolicy.setInitialInterval(1000); + backOffPolicy.setMultiplier(1.25); + retryTemplate.setBackOffPolicy(backOffPolicy); + HashMap, Boolean> retryableExceptions = new HashMap<>(); + retryableExceptions.put(RestClientException.class, true); + retryableExceptions.put(ConnectException.class, true); + RetryPolicy retryPolicy = new SimpleRetryPolicy(3, retryableExceptions); + retryTemplate.setRetryPolicy(retryPolicy); + + retryTemplate.registerListener(new RetryListener() { + @Override + public void onError(RetryContext context, + RetryCallback callback, Throwable throwable) { + log.warn("HTTP Error occurred: {}. Retrying {}", throwable.getMessage(), + context.getRetryCount()); + RetryListener.super.onError(context, callback, throwable); + } + }); + + return retryTemplate; + } + + /** + * Read SSL root certificate and return SSLContext + * + * @param certificateLocation file location to root certificate (PEM) + * @return initialized SSLContext + * @throws IOException file cannot be read + * @throws CertificateException in case we have an invalid certificate of type X.509 + * @throws KeyStoreException keystore cannot be initialized + * @throws NoSuchAlgorithmException missing trust manager algorithmus + * @throws KeyManagementException key management failed at init SSLContext + */ + @Nullable + protected SSLContext getSslContext(String certificateLocation) + throws IOException, CertificateException, KeyStoreException, KeyManagementException, NoSuchAlgorithmException { + + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + + FileInputStream fis = new FileInputStream(certificateLocation); + X509Certificate ca = (X509Certificate) CertificateFactory.getInstance("X.509") + .generateCertificate(new BufferedInputStream(fis)); + + ks.load(null, null); + ks.setCertificateEntry(Integer.toString(1), ca); + + TrustManagerFactory tmf = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ks); + + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, tmf.getTrustManagers(), null); + + return sslContext; + } + + protected RestTemplate getRestTemplete() { + + if (restTemplate != null) { + return restTemplate; + } + + if (customSslContext == null) { + restTemplate = new RestTemplate(); + return restTemplate; + } + final var sslsf = new SSLConnectionSocketFactory(customSslContext); + final Registry socketFactoryRegistry = RegistryBuilder.create() + .register("https", sslsf).register("http", new PlainConnectionSocketFactory()).build(); + + final BasicHttpClientConnectionManager connectionManager = new BasicHttpClientConnectionManager( + socketFactoryRegistry); + final CloseableHttpClient httpClient = HttpClients.custom() + .setConnectionManager(connectionManager).build(); + + final HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory( + httpClient); + restTemplate = new RestTemplate(requestFactory); + return restTemplate; + } } diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/PseudonymRequestFailed.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/PseudonymRequestFailed.java new file mode 100644 index 0000000..79b4ba6 --- /dev/null +++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/PseudonymRequestFailed.java @@ -0,0 +1,12 @@ +package dev.dnpm.etl.processor.pseudonym; + +public class PseudonymRequestFailed extends RuntimeException { + + public PseudonymRequestFailed(String message) { + super(message); + } + + public PseudonymRequestFailed(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt index 1575c39..0f257e8 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt @@ -45,7 +45,11 @@ data class PseudonymizeConfigProperties( @ConfigurationProperties(GPasConfigProperties.NAME) data class GPasConfigProperties( val uri: String?, - val target: String = "etl-processor" + val target: String = "etl-processor", + val username: String?, + val password: String?, + val sslCaLocation: String?, + ) { companion object { const val NAME = "app.pseudonymize.gpas" diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index 5c3add2..50d848a 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -24,20 +24,14 @@ import dev.dnpm.etl.processor.monitoring.ReportService import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.output.RestMtbFileSender -import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator -import dev.dnpm.etl.processor.pseudonym.Generator -import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator -import dev.dnpm.etl.processor.pseudonym.PseudonymizeService -import org.reactivestreams.Publisher +import dev.dnpm.etl.processor.pseudonym.* import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.KafkaTemplate -import reactor.core.publisher.Flux import reactor.core.publisher.Sinks import java.net.URI -import java.time.Duration @Configuration @EnableConfigurationProperties( @@ -54,7 +48,7 @@ class AppConfiguration { @ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "GPAS") @Bean fun gpasPseudonymGenerator(configProperties: GPasConfigProperties): Generator { - return GpasPseudonymGenerator(URI.create(configProperties.uri!!), configProperties.target) + return GpasPseudonymGenerator(configProperties) } @ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "BUILDIN", matchIfMissing = true) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 18faaf9..4bb0206 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -35,21 +35,21 @@ class KafkaMtbFileSender( override fun send(mtbFile: MtbFile): MtbFileSender.Response { return try { - var result = kafkaTemplate.sendDefault( + val result = kafkaTemplate.sendDefault( String.format( "{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id, mtbFile.episode.id ), objectMapper.writeValueAsString(mtbFile) ) if (result.get() != null) { - logger.debug("Sent file via KafkaMtbFileSender"); - MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS); + logger.debug("Sent file via KafkaMtbFileSender") + MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) } else { MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) } } catch (e: Exception) { - logger.error("An error occured sending to kafka", e) + logger.error("An error occurred sending to kafka", e) MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) } } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 99e4bbf..d840055 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -2,14 +2,28 @@ spring: docker: compose: file: ./dev-compose.yml - + datasource: + hikari: + jdbc-url: "jdbc:mariadb://localhost:13306/dev" + driver-class-name: "org.mariadb.jdbc.Driver" + username: "dev" + password: "dev" + url: "jdbc:mariadb://localhost:13306/dev" app: - rest: - uri: http://localhost:9000/bwhc/etl/api/MTBFile - #kafka: - # topic: test - # servers: kafka:9092 + # rest: + # uri: "https://bwhc-test.diz.uni-marburg.de/bwhc/etl/api/MTBFile" + pseudonymizer: GPAS + pseudonymize: + prefix: "test" + gpas: + target: "demo.study.demo" + uri: "http://localhost:8080/ttp-fhir/fhir/gpas/$pseudonymizeAllowCreate" + username: "" + password: "" + sslCaLocation: "" + kafka: + servers: "localhost:9092" + topic: test-dev server: port: 8000 - diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 39acb37..5bdf6ca 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,4 +4,16 @@ spring: template: default-topic: ${app.kafka.topic} flyway: - locations: "classpath:db/migration/{vendor}" \ No newline at end of file + locations: "classpath:db/migration/{vendor}" + main: + allow-bean-definition-overriding: true + + +app: + rest: + #uri: "localhost:9000/bwhc/etl/api/MTBFile" + pseudonymizer: BUILDIN + + +server: + port: 8000 From aa5b35970af967e972edd6ed435d75184b36d4d5 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 27 Jul 2023 23:16:19 +0200 Subject: [PATCH 8/9] Code polish for PR * Do not use component creation using annotation on classes since components will be created within AppConfiguration config class * Patient PseudonymizeService intentionally uses prefix. If no prefix is configured, the default value "UNKNOWN" will be used to prevent conflicts with other organizations and to show the source organization. --- .../pseudonym/GpasPseudonymGenerator.java | 43 ++++++++----------- .../etl/processor/config/AppConfiguration.kt | 6 ++- .../processor/output/KafkaMtbFileSender.kt | 5 +-- .../pseudonym/PseudonymizeService.kt | 5 +-- 4 files changed, 24 insertions(+), 35 deletions(-) diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java index 8b93cdc..f13a034 100644 --- a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java +++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java @@ -22,21 +22,6 @@ package dev.dnpm.etl.processor.pseudonym; import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.parser.IParser; import dev.dnpm.etl.processor.config.GPasConfigProperties; -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.net.ConnectException; -import java.security.KeyManagementException; -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; -import java.security.cert.CertificateFactory; -import java.security.cert.X509Certificate; -import java.util.Base64; -import java.util.HashMap; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManagerFactory; import org.apache.commons.lang3.StringUtils; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.HttpClients; @@ -54,13 +39,7 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.http.HttpEntity; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpMethod; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; +import org.springframework.http.*; import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; @@ -69,12 +48,25 @@ import org.springframework.retry.RetryPolicy; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryTemplate; -import org.springframework.stereotype.Component; import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; -@Component -@ConditionalOnProperty(value = "app.pseudonymizer", havingValue = "GPAS") +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.ConnectException; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.util.Base64; +import java.util.HashMap; + public class GpasPseudonymGenerator implements Generator { private final String gPasUrl; @@ -89,7 +81,6 @@ public class GpasPseudonymGenerator implements Generator { private SSLContext customSslContext; private RestTemplate restTemplate; - @Autowired public GpasPseudonymGenerator(GPasConfigProperties gpasCfg) { this.gPasUrl = gpasCfg.getUri(); diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index 50d848a..c677f2b 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -24,14 +24,16 @@ import dev.dnpm.etl.processor.monitoring.ReportService import dev.dnpm.etl.processor.output.KafkaMtbFileSender import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.output.RestMtbFileSender -import dev.dnpm.etl.processor.pseudonym.* +import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator +import dev.dnpm.etl.processor.pseudonym.Generator +import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator +import dev.dnpm.etl.processor.pseudonym.PseudonymizeService import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.KafkaTemplate import reactor.core.publisher.Sinks -import java.net.URI @Configuration @EnableConfigurationProperties( diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 4bb0206..9867deb 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -23,9 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import de.ukw.ccc.bwhc.dto.MtbFile import org.slf4j.LoggerFactory import org.springframework.kafka.core.KafkaTemplate -import org.springframework.stereotype.Component -@Component class KafkaMtbFileSender( private val kafkaTemplate: KafkaTemplate, private val objectMapper: ObjectMapper @@ -37,7 +35,8 @@ class KafkaMtbFileSender( return try { val result = kafkaTemplate.sendDefault( String.format( - "{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id, + "{\"pid\": \"%s\", \"eid\": \"%s\"}", + mtbFile.patient.id, mtbFile.episode.id ), objectMapper.writeValueAsString(mtbFile) ) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt index 21e0f9a..364e296 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/pseudonym/PseudonymizeService.kt @@ -19,7 +19,6 @@ package dev.dnpm.etl.processor.pseudonym -import ca.uhn.fhir.util.StringUtil import de.ukw.ccc.bwhc.dto.MtbFile import dev.dnpm.etl.processor.config.PseudonymizeConfigProperties @@ -29,9 +28,7 @@ class PseudonymizeService( ) { fun pseudonymize(mtbFile: MtbFile): MtbFile { - val patientPseudonym: String = if(configProperties.prefix.isNotEmpty()) - "${configProperties.prefix}_${generator.generate(mtbFile.patient.id)}" - else generator.generate(mtbFile.patient.id) + val patientPseudonym = "${configProperties.prefix}_${generator.generate(mtbFile.patient.id)}" mtbFile.episode.patient = patientPseudonym mtbFile.carePlans.forEach { it.patient = patientPseudonym } From 361bba5b65261bfba3dc160001a5036ece94d5d5 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Mon, 31 Jul 2023 09:05:51 +0200 Subject: [PATCH 9/9] Code polish for PR * Keep old config files --- src/main/resources/application-dev.yml | 30 +++++++------------------- src/main/resources/application.yml | 14 +----------- 2 files changed, 9 insertions(+), 35 deletions(-) diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index d840055..99e4bbf 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -2,28 +2,14 @@ spring: docker: compose: file: ./dev-compose.yml - datasource: - hikari: - jdbc-url: "jdbc:mariadb://localhost:13306/dev" - driver-class-name: "org.mariadb.jdbc.Driver" - username: "dev" - password: "dev" - url: "jdbc:mariadb://localhost:13306/dev" -app: - # rest: - # uri: "https://bwhc-test.diz.uni-marburg.de/bwhc/etl/api/MTBFile" - pseudonymizer: GPAS - pseudonymize: - prefix: "test" - gpas: - target: "demo.study.demo" - uri: "http://localhost:8080/ttp-fhir/fhir/gpas/$pseudonymizeAllowCreate" - username: "" - password: "" - sslCaLocation: "" - kafka: - servers: "localhost:9092" - topic: test-dev +app: + rest: + uri: http://localhost:9000/bwhc/etl/api/MTBFile + #kafka: + # topic: test + # servers: kafka:9092 + server: port: 8000 + diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 5bdf6ca..39acb37 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,16 +4,4 @@ spring: template: default-topic: ${app.kafka.topic} flyway: - locations: "classpath:db/migration/{vendor}" - main: - allow-bean-definition-overriding: true - - -app: - rest: - #uri: "localhost:9000/bwhc/etl/api/MTBFile" - pseudonymizer: BUILDIN - - -server: - port: 8000 + locations: "classpath:db/migration/{vendor}" \ No newline at end of file