diff --git a/.gitignore b/.gitignore index c2065bc..4ae22a7 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,4 @@ out/ ### VS Code ### .vscode/ +/dev/gpas* diff --git a/README.md b/README.md index d8b86ca..d49ef02 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,11 @@ Wenn eine URI zu einer gPAS-Instanz angegeben ist, wird diese verwendet. Ist diese nicht gesetzt. wird intern eine Anonymisierung der Patienten-ID vorgenommen. * `APP_PSEUDONYMIZE_PREFIX`: Standortbezogenes Prefix - `UNKNOWN`, wenn nicht gesetzt -* `APP_PSEUDONYMIZE_GPAS_URI`: URI der gPAS-Instanz +* `APP_PSEUDONYMIZE_GPAS_URI`: URI der gPAS-Instanz inklusive Endpoint (z.B. `http://localhost:8080/ttp-fhir/fhir/gpas/$pseudonymizeAllowCreate`) * `APP_PSEUDONYMIZE_GPAS_TARGET`: gPas Domänenname +* `APP_PSEUDONYMIZE_GPAS_USERNAME`: gPas Basic-Auth Benutzername +* `APP_PSEUDONYMIZE_GPAS_PASSWORD`: gPas Basic-Auth Passwort +* `APP_PSEUDONYMIZE_GPAS_SSLCALOCATION`: Root Zertifikat für gPas, falls es dediziert hinzugefügt werden muss. ## Mögliche Endpunkte diff --git a/build.gradle.kts b/build.gradle.kts index 3371bb4..eecd959 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -39,6 +39,9 @@ dependencies { implementation("commons-codec:commons-codec") implementation("io.projectreactor.kotlin:reactor-kotlin-extensions") implementation("de.ukw.ccc:bwhc-dto-java:0.2.0") + implementation("ca.uhn.hapi.fhir:hapi-fhir-base:6.6.2") + implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:6.6.2") + implementation("org.apache.httpcomponents.client5:httpclient5:5.2.1") runtimeOnly("org.mariadb.jdbc:mariadb-java-client") runtimeOnly("org.postgresql:postgresql") developmentOnly("org.springframework.boot:spring-boot-devtools") diff --git a/dev/README_TEST_WITH_GPAS.md b/dev/README_TEST_WITH_GPAS.md new file mode 100644 index 0000000..ff9f62b --- /dev/null +++ b/dev/README_TEST_WITH_GPAS.md @@ -0,0 +1,11 @@ +# Test with gPAS +1. Download from [Latest Docker-compose version of gPAS](https://www.ths-greifswald.de/gpas/#_download "") +2. copy `./demo/demo_gpas.sql` into `./sqls` folder +3. if needed change port mapping +4. startup via `docker compose up -d` + +By default, PSN are created via `localhost:8080/ttp-fhir/fhir/gpas/$pseudonymizeAllowCreate` endpoint +You can review created PSN via gPAs web interface running at `http://localhost:8080/gpas-web/` + + + diff --git a/dev/docker-compose.dev.yml b/dev/docker-compose.dev.yml new file mode 100644 index 0000000..d7a436b --- /dev/null +++ b/dev/docker-compose.dev.yml @@ -0,0 +1,90 @@ +version: '3.7' + +services: + + zoo1: + image: zookeeper:3.8.0 + hostname: zoo1 + ports: + - "2181:2181" + environment: + ZOO_MY_ID: 1 + ZOO_PORT: 2181 + ZOO_SERVERS: server.1=zoo1:2888:3888;2181 + + kafka1: + image: confluentinc/cp-kafka:7.2.1 + hostname: kafka1 + ports: + - "9092:9092" + environment: + KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL + KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" + KAFKA_BROKER_ID: 1 + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + depends_on: + - zoo1 + + kafka-rest-proxy: + image: confluentinc/cp-kafka-rest:7.2.1 + hostname: kafka-rest-proxy + ports: + - "8082:8082" + environment: + # KAFKA_REST_ZOOKEEPER_CONNECT: zoo1:2181 + KAFKA_REST_LISTENERS: http://0.0.0.0:8082/ + KAFKA_REST_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081/ + KAFKA_REST_HOST_NAME: kafka-rest-proxy + KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092 + depends_on: + - zoo1 + - kafka1 + + kafka-connect: + image: confluentinc/cp-kafka-connect:7.2.1 + hostname: kafka-connect + ports: + - "8083:8083" + environment: + CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092" + CONNECT_REST_PORT: 8083 + CONNECT_GROUP_ID: compose-connect-group + CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs + CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets + CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status + CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter" + CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" + CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" + CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO" + CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" + CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars" + #volumes: + # - ./connectors:/etc/kafka-connect/jars/ + depends_on: + - zoo1 + - kafka1 + - kafka-rest-proxy + + akhq: + image: tchiotludo/akhq:0.21.0 + environment: + AKHQ_CONFIGURATION: | + akhq: + connections: + docker-kafka-server: + properties: + bootstrap.servers: "kafka1:19092" + connect: + - name: "kafka-connect" + url: "http://kafka-connect:8083" + ports: + - "8084:8080" + depends_on: + - kafka1 + - kafka-connect diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java index 98f4ba6..f13a034 100644 --- a/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java +++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/GpasPseudonymGenerator.java @@ -19,23 +19,244 @@ package dev.dnpm.etl.processor.pseudonym; -import java.net.URI; +import ca.uhn.fhir.context.FhirContext; +import ca.uhn.fhir.parser.IParser; +import dev.dnpm.etl.processor.config.GPasConfigProperties; +import org.apache.commons.lang3.StringUtils; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager; +import org.apache.hc.client5.http.socket.ConnectionSocketFactory; +import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory; +import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory; +import org.apache.hc.core5.http.config.Registry; +import org.apache.hc.core5.http.config.RegistryBuilder; +import org.hl7.fhir.r4.model.Identifier; +import org.hl7.fhir.r4.model.Parameters; +import org.hl7.fhir.r4.model.Parameters.ParametersParameterComponent; +import org.hl7.fhir.r4.model.StringType; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.*; +import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; +import org.springframework.retry.RetryCallback; +import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryListener; +import org.springframework.retry.RetryPolicy; +import org.springframework.retry.backoff.ExponentialBackOffPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; +import org.springframework.web.client.RestClientException; +import org.springframework.web.client.RestTemplate; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.ConnectException; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; +import java.util.Base64; +import java.util.HashMap; public class GpasPseudonymGenerator implements Generator { - private final URI uri; + private final String gPasUrl; + private final String psnTargetDomain; + private static FhirContext r4Context = FhirContext.forR4(); + private final HttpHeaders httpHeader; - private final String target; + private final RetryTemplate retryTemplate = defaultTemplate(); + + private final Logger log = LoggerFactory.getLogger(GpasPseudonymGenerator.class); + + private SSLContext customSslContext; + private RestTemplate restTemplate; + + public GpasPseudonymGenerator(GPasConfigProperties gpasCfg) { + + this.gPasUrl = gpasCfg.getUri(); + this.psnTargetDomain = gpasCfg.getTarget(); + httpHeader = getHttpHeaders(gpasCfg.getUsername(), gpasCfg.getPassword()); + + try { + if (StringUtils.isNotBlank(gpasCfg.getSslCaLocation())) { + customSslContext = getSslContext(gpasCfg.getSslCaLocation()); + } + } catch (IOException | KeyManagementException | KeyStoreException | CertificateException | + NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } - public GpasPseudonymGenerator(URI uri, String target) { - this.uri = uri; - this.target = target; } @Override public String generate(String id) { - // TODO Implement this - return "?"; + var gPasRequestBody = getGpasRequestBody(id); + var responseEntity = getGpasPseudonym(gPasRequestBody); + var gPasPseudonymResult = (Parameters) r4Context.newJsonParser() + .parseResource(responseEntity.getBody()); + + return unwrapPseudonym(gPasPseudonymResult); } + @NotNull + public static String unwrapPseudonym(Parameters gPasPseudonymResult) { + Identifier pseudonym = (Identifier) gPasPseudonymResult.getParameter().stream().findFirst() + .get().getPart().stream().filter(a -> a.getName().equals("pseudonym")).findFirst() + .orElseGet(ParametersParameterComponent::new).getValue(); + + // pseudonym + return pseudonym.getSystem() + "|" + pseudonym.getValue(); + } + + + @NotNull + protected ResponseEntity getGpasPseudonym(String gPasRequestBody) { + + HttpEntity requestEntity = new HttpEntity<>(gPasRequestBody, this.httpHeader); + ResponseEntity responseEntity; + var restTemplate = getRestTemplete(); + + try { + responseEntity = retryTemplate.execute( + ctx -> restTemplate.exchange(gPasUrl, HttpMethod.POST, requestEntity, + String.class)); + + if (responseEntity.getStatusCode().is2xxSuccessful()) { + log.debug("API request succeeded. Response: {}", responseEntity.getStatusCode()); + } else { + log.warn("API request unsuccessful. Response: {}", requestEntity.getBody()); + throw new PseudonymRequestFailed("API request unsuccessful gPas unsuccessful."); + } + + return responseEntity; + } catch (Exception unexpected) { + throw new PseudonymRequestFailed( + "API request due unexpected error unsuccessful gPas unsuccessful.", unexpected); + } + } + + protected String getGpasRequestBody(String id) { + var requestParameters = new Parameters(); + requestParameters.addParameter().setName("target") + .setValue(new StringType().setValue(psnTargetDomain)); + requestParameters.addParameter().setName("original") + .setValue(new StringType().setValue(id)); + final IParser iParser = r4Context.newJsonParser(); + return iParser.encodeResourceToString(requestParameters); + } + + @NotNull + protected HttpHeaders getHttpHeaders(String gPasUserName, String gPasPassword) { + var headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + + if (StringUtils.isBlank(gPasUserName) || StringUtils.isBlank(gPasPassword)) { + return headers; + } + + String authHeader = gPasUserName + ":" + gPasPassword; + byte[] authHeaderBytes = authHeader.getBytes(); + byte[] encodedAuthHeaderBytes = Base64.getEncoder().encode(authHeaderBytes); + String encodedAuthHeader = new String(encodedAuthHeaderBytes); + + if (StringUtils.isNotBlank(gPasUserName) && StringUtils.isNotBlank(gPasPassword)) { + headers.set("Authorization", "Basic " + encodedAuthHeader); + } + + return headers; + } + + protected RetryTemplate defaultTemplate() { + RetryTemplate retryTemplate = new RetryTemplate(); + ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); + backOffPolicy.setInitialInterval(1000); + backOffPolicy.setMultiplier(1.25); + retryTemplate.setBackOffPolicy(backOffPolicy); + HashMap, Boolean> retryableExceptions = new HashMap<>(); + retryableExceptions.put(RestClientException.class, true); + retryableExceptions.put(ConnectException.class, true); + RetryPolicy retryPolicy = new SimpleRetryPolicy(3, retryableExceptions); + retryTemplate.setRetryPolicy(retryPolicy); + + retryTemplate.registerListener(new RetryListener() { + @Override + public void onError(RetryContext context, + RetryCallback callback, Throwable throwable) { + log.warn("HTTP Error occurred: {}. Retrying {}", throwable.getMessage(), + context.getRetryCount()); + RetryListener.super.onError(context, callback, throwable); + } + }); + + return retryTemplate; + } + + /** + * Read SSL root certificate and return SSLContext + * + * @param certificateLocation file location to root certificate (PEM) + * @return initialized SSLContext + * @throws IOException file cannot be read + * @throws CertificateException in case we have an invalid certificate of type X.509 + * @throws KeyStoreException keystore cannot be initialized + * @throws NoSuchAlgorithmException missing trust manager algorithmus + * @throws KeyManagementException key management failed at init SSLContext + */ + @Nullable + protected SSLContext getSslContext(String certificateLocation) + throws IOException, CertificateException, KeyStoreException, KeyManagementException, NoSuchAlgorithmException { + + KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); + + FileInputStream fis = new FileInputStream(certificateLocation); + X509Certificate ca = (X509Certificate) CertificateFactory.getInstance("X.509") + .generateCertificate(new BufferedInputStream(fis)); + + ks.load(null, null); + ks.setCertificateEntry(Integer.toString(1), ca); + + TrustManagerFactory tmf = TrustManagerFactory.getInstance( + TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ks); + + SSLContext sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, tmf.getTrustManagers(), null); + + return sslContext; + } + + protected RestTemplate getRestTemplete() { + + if (restTemplate != null) { + return restTemplate; + } + + if (customSslContext == null) { + restTemplate = new RestTemplate(); + return restTemplate; + } + final var sslsf = new SSLConnectionSocketFactory(customSslContext); + final Registry socketFactoryRegistry = RegistryBuilder.create() + .register("https", sslsf).register("http", new PlainConnectionSocketFactory()).build(); + + final BasicHttpClientConnectionManager connectionManager = new BasicHttpClientConnectionManager( + socketFactoryRegistry); + final CloseableHttpClient httpClient = HttpClients.custom() + .setConnectionManager(connectionManager).build(); + + final HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory( + httpClient); + restTemplate = new RestTemplate(requestFactory); + return restTemplate; + } } diff --git a/src/main/java/dev/dnpm/etl/processor/pseudonym/PseudonymRequestFailed.java b/src/main/java/dev/dnpm/etl/processor/pseudonym/PseudonymRequestFailed.java new file mode 100644 index 0000000..79b4ba6 --- /dev/null +++ b/src/main/java/dev/dnpm/etl/processor/pseudonym/PseudonymRequestFailed.java @@ -0,0 +1,12 @@ +package dev.dnpm.etl.processor.pseudonym; + +public class PseudonymRequestFailed extends RuntimeException { + + public PseudonymRequestFailed(String message) { + super(message); + } + + public PseudonymRequestFailed(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt index 1575c39..0f257e8 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfigProperties.kt @@ -45,7 +45,11 @@ data class PseudonymizeConfigProperties( @ConfigurationProperties(GPasConfigProperties.NAME) data class GPasConfigProperties( val uri: String?, - val target: String = "etl-processor" + val target: String = "etl-processor", + val username: String?, + val password: String?, + val sslCaLocation: String?, + ) { companion object { const val NAME = "app.pseudonymize.gpas" diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index 5c3add2..c677f2b 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -28,16 +28,12 @@ import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator import dev.dnpm.etl.processor.pseudonym.Generator import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator import dev.dnpm.etl.processor.pseudonym.PseudonymizeService -import org.reactivestreams.Publisher import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.KafkaTemplate -import reactor.core.publisher.Flux import reactor.core.publisher.Sinks -import java.net.URI -import java.time.Duration @Configuration @EnableConfigurationProperties( @@ -54,7 +50,7 @@ class AppConfiguration { @ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "GPAS") @Bean fun gpasPseudonymGenerator(configProperties: GPasConfigProperties): Generator { - return GpasPseudonymGenerator(URI.create(configProperties.uri!!), configProperties.target) + return GpasPseudonymGenerator(configProperties) } @ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "BUILDIN", matchIfMissing = true) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt index 374c0af..9867deb 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/output/KafkaMtbFileSender.kt @@ -33,11 +33,22 @@ class KafkaMtbFileSender( override fun send(mtbFile: MtbFile): MtbFileSender.Response { return try { - kafkaTemplate.sendDefault(objectMapper.writeValueAsString(mtbFile)) - logger.debug("Sent file via KafkaMtbFileSender") - MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) + val result = kafkaTemplate.sendDefault( + String.format( + "{\"pid\": \"%s\", \"eid\": \"%s\"}", + mtbFile.patient.id, + mtbFile.episode.id + ), objectMapper.writeValueAsString(mtbFile) + ) + if (result.get() != null) { + logger.debug("Sent file via KafkaMtbFileSender") + MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS) + } else { + MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR) + } + } catch (e: Exception) { - logger.error("An error occured sending to kafka", e) + logger.error("An error occurred sending to kafka", e) MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN) } }