mirror of
https://github.com/pcvolkmer/etl-processor.git
synced 2025-04-19 17:26:51 +00:00
feat: patient pid may be replaced with gPAS pseudonym, now.
This commit is contained in:
parent
79d83ef04a
commit
90c5b81c2b
@ -8,8 +8,11 @@ Wenn eine URI zu einer gPAS-Instanz angegeben ist, wird diese verwendet.
|
||||
Ist diese nicht gesetzt. wird intern eine Anonymisierung der Patienten-ID vorgenommen.
|
||||
|
||||
* `APP_PSEUDONYMIZE_PREFIX`: Standortbezogenes Prefix - `UNKNOWN`, wenn nicht gesetzt
|
||||
* `APP_PSEUDONYMIZE_GPAS_URI`: URI der gPAS-Instanz
|
||||
* `APP_PSEUDONYMIZE_GPAS_URI`: URI der gPAS-Instanz inklusive Endpoint (z.B. `http://localhost:8080/ttp-fhir/fhir/gpas/$pseudonymizeAllowCreate`)
|
||||
* `APP_PSEUDONYMIZE_GPAS_TARGET`: gPas Domänenname
|
||||
* `APP_PSEUDONYMIZE_GPAS_USERNAME`: gPas Basic-Auth Benutzername
|
||||
* `APP_PSEUDONYMIZE_GPAS_PASSWORD`: gPas Basic-Auth Passwort
|
||||
* `APP_PSEUDONYMIZE_GPAS_SSLCALOCATION`: Root Zertifikat für gPas, falls es dediziert hinzugefügt werden muss.
|
||||
|
||||
## Mögliche Endpunkte
|
||||
|
||||
|
@ -39,6 +39,9 @@ dependencies {
|
||||
implementation("commons-codec:commons-codec")
|
||||
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
|
||||
implementation("de.ukw.ccc:bwhc-dto-java:0.2.0")
|
||||
implementation("ca.uhn.hapi.fhir:hapi-fhir-base:6.6.2")
|
||||
implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:6.6.2")
|
||||
implementation("org.apache.httpcomponents.client5:httpclient5:5.2.1")
|
||||
runtimeOnly("org.mariadb.jdbc:mariadb-java-client")
|
||||
runtimeOnly("org.postgresql:postgresql")
|
||||
developmentOnly("org.springframework.boot:spring-boot-devtools")
|
||||
|
@ -19,23 +19,253 @@
|
||||
|
||||
package dev.dnpm.etl.processor.pseudonym;
|
||||
|
||||
import java.net.URI;
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.parser.IParser;
|
||||
import dev.dnpm.etl.processor.config.GPasConfigProperties;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.KeyStore;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.security.cert.CertificateFactory;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.Base64;
|
||||
import java.util.HashMap;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||
import org.apache.hc.client5.http.impl.classic.HttpClients;
|
||||
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
|
||||
import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
|
||||
import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
|
||||
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
|
||||
import org.apache.hc.core5.http.config.Registry;
|
||||
import org.apache.hc.core5.http.config.RegistryBuilder;
|
||||
import org.hl7.fhir.r4.model.Identifier;
|
||||
import org.hl7.fhir.r4.model.Parameters;
|
||||
import org.hl7.fhir.r4.model.Parameters.ParametersParameterComponent;
|
||||
import org.hl7.fhir.r4.model.StringType;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
|
||||
import org.springframework.retry.RetryCallback;
|
||||
import org.springframework.retry.RetryContext;
|
||||
import org.springframework.retry.RetryListener;
|
||||
import org.springframework.retry.RetryPolicy;
|
||||
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
|
||||
import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.client.RestClientException;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
@Component
|
||||
@ConditionalOnProperty(value = "app.pseudonymizer", havingValue = "GPAS")
|
||||
public class GpasPseudonymGenerator implements Generator {
|
||||
|
||||
private final URI uri;
|
||||
private final String gPasUrl;
|
||||
private final String psnTargetDomain;
|
||||
private static FhirContext r4Context = FhirContext.forR4();
|
||||
private final HttpHeaders httpHeader;
|
||||
|
||||
private final String target;
|
||||
private final RetryTemplate retryTemplate = defaultTemplate();
|
||||
|
||||
private final Logger log = LoggerFactory.getLogger(GpasPseudonymGenerator.class);
|
||||
|
||||
private SSLContext customSslContext;
|
||||
private RestTemplate restTemplate;
|
||||
|
||||
@Autowired
|
||||
public GpasPseudonymGenerator(GPasConfigProperties gpasCfg) {
|
||||
|
||||
this.gPasUrl = gpasCfg.getUri();
|
||||
this.psnTargetDomain = gpasCfg.getTarget();
|
||||
httpHeader = getHttpHeaders(gpasCfg.getUsername(), gpasCfg.getPassword());
|
||||
|
||||
try {
|
||||
if (StringUtils.isNotBlank(gpasCfg.getSslCaLocation())) {
|
||||
customSslContext = getSslContext(gpasCfg.getSslCaLocation());
|
||||
}
|
||||
} catch (IOException | KeyManagementException | KeyStoreException | CertificateException |
|
||||
NoSuchAlgorithmException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
public GpasPseudonymGenerator(URI uri, String target) {
|
||||
this.uri = uri;
|
||||
this.target = target;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String generate(String id) {
|
||||
// TODO Implement this
|
||||
return "?";
|
||||
var gPasRequestBody = getGpasRequestBody(id);
|
||||
var responseEntity = getGpasPseudonym(gPasRequestBody);
|
||||
var gPasPseudonymResult = (Parameters) r4Context.newJsonParser()
|
||||
.parseResource(responseEntity.getBody());
|
||||
|
||||
return unwrapPseudonym(gPasPseudonymResult);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
public static String unwrapPseudonym(Parameters gPasPseudonymResult) {
|
||||
Identifier pseudonym = (Identifier) gPasPseudonymResult.getParameter().stream().findFirst()
|
||||
.get().getPart().stream().filter(a -> a.getName().equals("pseudonym")).findFirst()
|
||||
.orElseGet(ParametersParameterComponent::new).getValue();
|
||||
|
||||
// pseudonym
|
||||
return pseudonym.getSystem() + "|" + pseudonym.getValue();
|
||||
}
|
||||
|
||||
|
||||
@NotNull
|
||||
protected ResponseEntity<String> getGpasPseudonym(String gPasRequestBody) {
|
||||
|
||||
HttpEntity<String> requestEntity = new HttpEntity<>(gPasRequestBody, this.httpHeader);
|
||||
ResponseEntity<String> responseEntity;
|
||||
var restTemplate = getRestTemplete();
|
||||
|
||||
try {
|
||||
responseEntity = retryTemplate.execute(
|
||||
ctx -> restTemplate.exchange(gPasUrl, HttpMethod.POST, requestEntity,
|
||||
String.class));
|
||||
|
||||
if (responseEntity.getStatusCode().is2xxSuccessful()) {
|
||||
log.debug("API request succeeded. Response: {}", responseEntity.getStatusCode());
|
||||
} else {
|
||||
log.warn("API request unsuccessful. Response: {}", requestEntity.getBody());
|
||||
throw new PseudonymRequestFailed("API request unsuccessful gPas unsuccessful.");
|
||||
}
|
||||
|
||||
return responseEntity;
|
||||
} catch (Exception unexpected) {
|
||||
throw new PseudonymRequestFailed(
|
||||
"API request due unexpected error unsuccessful gPas unsuccessful.", unexpected);
|
||||
}
|
||||
}
|
||||
|
||||
protected String getGpasRequestBody(String id) {
|
||||
var requestParameters = new Parameters();
|
||||
requestParameters.addParameter().setName("target")
|
||||
.setValue(new StringType().setValue(psnTargetDomain));
|
||||
requestParameters.addParameter().setName("original")
|
||||
.setValue(new StringType().setValue(id));
|
||||
final IParser iParser = r4Context.newJsonParser();
|
||||
return iParser.encodeResourceToString(requestParameters);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
protected HttpHeaders getHttpHeaders(String gPasUserName, String gPasPassword) {
|
||||
var headers = new HttpHeaders();
|
||||
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||
|
||||
if (StringUtils.isBlank(gPasUserName) || StringUtils.isBlank(gPasPassword)) {
|
||||
return headers;
|
||||
}
|
||||
|
||||
String authHeader = gPasUserName + ":" + gPasPassword;
|
||||
byte[] authHeaderBytes = authHeader.getBytes();
|
||||
byte[] encodedAuthHeaderBytes = Base64.getEncoder().encode(authHeaderBytes);
|
||||
String encodedAuthHeader = new String(encodedAuthHeaderBytes);
|
||||
|
||||
if (StringUtils.isNotBlank(gPasUserName) && StringUtils.isNotBlank(gPasPassword)) {
|
||||
headers.set("Authorization", "Basic " + encodedAuthHeader);
|
||||
}
|
||||
|
||||
return headers;
|
||||
}
|
||||
|
||||
protected RetryTemplate defaultTemplate() {
|
||||
RetryTemplate retryTemplate = new RetryTemplate();
|
||||
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
|
||||
backOffPolicy.setInitialInterval(1000);
|
||||
backOffPolicy.setMultiplier(1.25);
|
||||
retryTemplate.setBackOffPolicy(backOffPolicy);
|
||||
HashMap<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
|
||||
retryableExceptions.put(RestClientException.class, true);
|
||||
retryableExceptions.put(ConnectException.class, true);
|
||||
RetryPolicy retryPolicy = new SimpleRetryPolicy(3, retryableExceptions);
|
||||
retryTemplate.setRetryPolicy(retryPolicy);
|
||||
|
||||
retryTemplate.registerListener(new RetryListener() {
|
||||
@Override
|
||||
public <T, E extends Throwable> void onError(RetryContext context,
|
||||
RetryCallback<T, E> callback, Throwable throwable) {
|
||||
log.warn("HTTP Error occurred: {}. Retrying {}", throwable.getMessage(),
|
||||
context.getRetryCount());
|
||||
RetryListener.super.onError(context, callback, throwable);
|
||||
}
|
||||
});
|
||||
|
||||
return retryTemplate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read SSL root certificate and return SSLContext
|
||||
*
|
||||
* @param certificateLocation file location to root certificate (PEM)
|
||||
* @return initialized SSLContext
|
||||
* @throws IOException file cannot be read
|
||||
* @throws CertificateException in case we have an invalid certificate of type X.509
|
||||
* @throws KeyStoreException keystore cannot be initialized
|
||||
* @throws NoSuchAlgorithmException missing trust manager algorithmus
|
||||
* @throws KeyManagementException key management failed at init SSLContext
|
||||
*/
|
||||
@Nullable
|
||||
protected SSLContext getSslContext(String certificateLocation)
|
||||
throws IOException, CertificateException, KeyStoreException, KeyManagementException, NoSuchAlgorithmException {
|
||||
|
||||
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
|
||||
|
||||
FileInputStream fis = new FileInputStream(certificateLocation);
|
||||
X509Certificate ca = (X509Certificate) CertificateFactory.getInstance("X.509")
|
||||
.generateCertificate(new BufferedInputStream(fis));
|
||||
|
||||
ks.load(null, null);
|
||||
ks.setCertificateEntry(Integer.toString(1), ca);
|
||||
|
||||
TrustManagerFactory tmf = TrustManagerFactory.getInstance(
|
||||
TrustManagerFactory.getDefaultAlgorithm());
|
||||
tmf.init(ks);
|
||||
|
||||
SSLContext sslContext = SSLContext.getInstance("TLS");
|
||||
sslContext.init(null, tmf.getTrustManagers(), null);
|
||||
|
||||
return sslContext;
|
||||
}
|
||||
|
||||
protected RestTemplate getRestTemplete() {
|
||||
|
||||
if (restTemplate != null) {
|
||||
return restTemplate;
|
||||
}
|
||||
|
||||
if (customSslContext == null) {
|
||||
restTemplate = new RestTemplate();
|
||||
return restTemplate;
|
||||
}
|
||||
final var sslsf = new SSLConnectionSocketFactory(customSslContext);
|
||||
final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
|
||||
.register("https", sslsf).register("http", new PlainConnectionSocketFactory()).build();
|
||||
|
||||
final BasicHttpClientConnectionManager connectionManager = new BasicHttpClientConnectionManager(
|
||||
socketFactoryRegistry);
|
||||
final CloseableHttpClient httpClient = HttpClients.custom()
|
||||
.setConnectionManager(connectionManager).build();
|
||||
|
||||
final HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
|
||||
httpClient);
|
||||
restTemplate = new RestTemplate(requestFactory);
|
||||
return restTemplate;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,12 @@
|
||||
package dev.dnpm.etl.processor.pseudonym;
|
||||
|
||||
public class PseudonymRequestFailed extends RuntimeException {
|
||||
|
||||
public PseudonymRequestFailed(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public PseudonymRequestFailed(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
@ -45,7 +45,11 @@ data class PseudonymizeConfigProperties(
|
||||
@ConfigurationProperties(GPasConfigProperties.NAME)
|
||||
data class GPasConfigProperties(
|
||||
val uri: String?,
|
||||
val target: String = "etl-processor"
|
||||
val target: String = "etl-processor",
|
||||
val username: String?,
|
||||
val password: String?,
|
||||
val sslCaLocation: String?,
|
||||
|
||||
) {
|
||||
companion object {
|
||||
const val NAME = "app.pseudonymize.gpas"
|
||||
|
@ -24,20 +24,14 @@ import dev.dnpm.etl.processor.monitoring.ReportService
|
||||
import dev.dnpm.etl.processor.output.KafkaMtbFileSender
|
||||
import dev.dnpm.etl.processor.output.MtbFileSender
|
||||
import dev.dnpm.etl.processor.output.RestMtbFileSender
|
||||
import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator
|
||||
import dev.dnpm.etl.processor.pseudonym.Generator
|
||||
import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator
|
||||
import dev.dnpm.etl.processor.pseudonym.PseudonymizeService
|
||||
import org.reactivestreams.Publisher
|
||||
import dev.dnpm.etl.processor.pseudonym.*
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.kafka.core.KafkaTemplate
|
||||
import reactor.core.publisher.Flux
|
||||
import reactor.core.publisher.Sinks
|
||||
import java.net.URI
|
||||
import java.time.Duration
|
||||
|
||||
@Configuration
|
||||
@EnableConfigurationProperties(
|
||||
@ -54,7 +48,7 @@ class AppConfiguration {
|
||||
@ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "GPAS")
|
||||
@Bean
|
||||
fun gpasPseudonymGenerator(configProperties: GPasConfigProperties): Generator {
|
||||
return GpasPseudonymGenerator(URI.create(configProperties.uri!!), configProperties.target)
|
||||
return GpasPseudonymGenerator(configProperties)
|
||||
}
|
||||
|
||||
@ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "BUILDIN", matchIfMissing = true)
|
||||
|
@ -35,21 +35,21 @@ class KafkaMtbFileSender(
|
||||
|
||||
override fun send(mtbFile: MtbFile): MtbFileSender.Response {
|
||||
return try {
|
||||
var result = kafkaTemplate.sendDefault(
|
||||
val result = kafkaTemplate.sendDefault(
|
||||
String.format(
|
||||
"{\"pid\": \"%s\", \"eid\": \"%s\"}", mtbFile.patient.id,
|
||||
mtbFile.episode.id
|
||||
), objectMapper.writeValueAsString(mtbFile)
|
||||
)
|
||||
if (result.get() != null) {
|
||||
logger.debug("Sent file via KafkaMtbFileSender");
|
||||
MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS);
|
||||
logger.debug("Sent file via KafkaMtbFileSender")
|
||||
MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
|
||||
} else {
|
||||
MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
|
||||
}
|
||||
|
||||
} catch (e: Exception) {
|
||||
logger.error("An error occured sending to kafka", e)
|
||||
logger.error("An error occurred sending to kafka", e)
|
||||
MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
|
||||
}
|
||||
}
|
||||
|
@ -2,14 +2,28 @@ spring:
|
||||
docker:
|
||||
compose:
|
||||
file: ./dev-compose.yml
|
||||
|
||||
datasource:
|
||||
hikari:
|
||||
jdbc-url: "jdbc:mariadb://localhost:13306/dev"
|
||||
driver-class-name: "org.mariadb.jdbc.Driver"
|
||||
username: "dev"
|
||||
password: "dev"
|
||||
url: "jdbc:mariadb://localhost:13306/dev"
|
||||
app:
|
||||
rest:
|
||||
uri: http://localhost:9000/bwhc/etl/api/MTBFile
|
||||
#kafka:
|
||||
# topic: test
|
||||
# servers: kafka:9092
|
||||
# rest:
|
||||
# uri: "https://bwhc-test.diz.uni-marburg.de/bwhc/etl/api/MTBFile"
|
||||
pseudonymizer: GPAS
|
||||
pseudonymize:
|
||||
prefix: "test"
|
||||
gpas:
|
||||
target: "demo.study.demo"
|
||||
uri: "http://localhost:8080/ttp-fhir/fhir/gpas/$pseudonymizeAllowCreate"
|
||||
username: ""
|
||||
password: ""
|
||||
sslCaLocation: ""
|
||||
|
||||
kafka:
|
||||
servers: "localhost:9092"
|
||||
topic: test-dev
|
||||
server:
|
||||
port: 8000
|
||||
|
||||
|
@ -4,4 +4,16 @@ spring:
|
||||
template:
|
||||
default-topic: ${app.kafka.topic}
|
||||
flyway:
|
||||
locations: "classpath:db/migration/{vendor}"
|
||||
locations: "classpath:db/migration/{vendor}"
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
|
||||
|
||||
app:
|
||||
rest:
|
||||
#uri: "localhost:9000/bwhc/etl/api/MTBFile"
|
||||
pseudonymizer: BUILDIN
|
||||
|
||||
|
||||
server:
|
||||
port: 8000
|
||||
|
Loading…
x
Reference in New Issue
Block a user