mirror of
https://github.com/pcvolkmer/etl-processor.git
synced 2025-04-19 17:26:51 +00:00
commit
c8b2e50d47
1
.gitignore
vendored
1
.gitignore
vendored
@ -35,3 +35,4 @@ out/
|
||||
|
||||
### VS Code ###
|
||||
.vscode/
|
||||
/dev/gpas*
|
||||
|
@ -8,8 +8,11 @@ Wenn eine URI zu einer gPAS-Instanz angegeben ist, wird diese verwendet.
|
||||
Ist diese nicht gesetzt. wird intern eine Anonymisierung der Patienten-ID vorgenommen.
|
||||
|
||||
* `APP_PSEUDONYMIZE_PREFIX`: Standortbezogenes Prefix - `UNKNOWN`, wenn nicht gesetzt
|
||||
* `APP_PSEUDONYMIZE_GPAS_URI`: URI der gPAS-Instanz
|
||||
* `APP_PSEUDONYMIZE_GPAS_URI`: URI der gPAS-Instanz inklusive Endpoint (z.B. `http://localhost:8080/ttp-fhir/fhir/gpas/$pseudonymizeAllowCreate`)
|
||||
* `APP_PSEUDONYMIZE_GPAS_TARGET`: gPas Domänenname
|
||||
* `APP_PSEUDONYMIZE_GPAS_USERNAME`: gPas Basic-Auth Benutzername
|
||||
* `APP_PSEUDONYMIZE_GPAS_PASSWORD`: gPas Basic-Auth Passwort
|
||||
* `APP_PSEUDONYMIZE_GPAS_SSLCALOCATION`: Root Zertifikat für gPas, falls es dediziert hinzugefügt werden muss.
|
||||
|
||||
## Mögliche Endpunkte
|
||||
|
||||
|
@ -39,6 +39,9 @@ dependencies {
|
||||
implementation("commons-codec:commons-codec")
|
||||
implementation("io.projectreactor.kotlin:reactor-kotlin-extensions")
|
||||
implementation("de.ukw.ccc:bwhc-dto-java:0.2.0")
|
||||
implementation("ca.uhn.hapi.fhir:hapi-fhir-base:6.6.2")
|
||||
implementation("ca.uhn.hapi.fhir:hapi-fhir-structures-r4:6.6.2")
|
||||
implementation("org.apache.httpcomponents.client5:httpclient5:5.2.1")
|
||||
runtimeOnly("org.mariadb.jdbc:mariadb-java-client")
|
||||
runtimeOnly("org.postgresql:postgresql")
|
||||
developmentOnly("org.springframework.boot:spring-boot-devtools")
|
||||
|
11
dev/README_TEST_WITH_GPAS.md
Normal file
11
dev/README_TEST_WITH_GPAS.md
Normal file
@ -0,0 +1,11 @@
|
||||
# Test with gPAS
|
||||
1. Download from [Latest Docker-compose version of gPAS](https://www.ths-greifswald.de/gpas/#_download "")
|
||||
2. copy `./demo/demo_gpas.sql` into `./sqls` folder
|
||||
3. if needed change port mapping
|
||||
4. startup via `docker compose up -d`
|
||||
|
||||
By default, PSN are created via `localhost:8080/ttp-fhir/fhir/gpas/$pseudonymizeAllowCreate` endpoint
|
||||
You can review created PSN via gPAs web interface running at `http://localhost:8080/gpas-web/`
|
||||
|
||||
|
||||
|
90
dev/docker-compose.dev.yml
Normal file
90
dev/docker-compose.dev.yml
Normal file
@ -0,0 +1,90 @@
|
||||
version: '3.7'
|
||||
|
||||
services:
|
||||
|
||||
zoo1:
|
||||
image: zookeeper:3.8.0
|
||||
hostname: zoo1
|
||||
ports:
|
||||
- "2181:2181"
|
||||
environment:
|
||||
ZOO_MY_ID: 1
|
||||
ZOO_PORT: 2181
|
||||
ZOO_SERVERS: server.1=zoo1:2888:3888;2181
|
||||
|
||||
kafka1:
|
||||
image: confluentinc/cp-kafka:7.2.1
|
||||
hostname: kafka1
|
||||
ports:
|
||||
- "9092:9092"
|
||||
environment:
|
||||
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
|
||||
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
depends_on:
|
||||
- zoo1
|
||||
|
||||
kafka-rest-proxy:
|
||||
image: confluentinc/cp-kafka-rest:7.2.1
|
||||
hostname: kafka-rest-proxy
|
||||
ports:
|
||||
- "8082:8082"
|
||||
environment:
|
||||
# KAFKA_REST_ZOOKEEPER_CONNECT: zoo1:2181
|
||||
KAFKA_REST_LISTENERS: http://0.0.0.0:8082/
|
||||
KAFKA_REST_SCHEMA_REGISTRY_URL: http://kafka-schema-registry:8081/
|
||||
KAFKA_REST_HOST_NAME: kafka-rest-proxy
|
||||
KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092
|
||||
depends_on:
|
||||
- zoo1
|
||||
- kafka1
|
||||
|
||||
kafka-connect:
|
||||
image: confluentinc/cp-kafka-connect:7.2.1
|
||||
hostname: kafka-connect
|
||||
ports:
|
||||
- "8083:8083"
|
||||
environment:
|
||||
CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092"
|
||||
CONNECT_REST_PORT: 8083
|
||||
CONNECT_GROUP_ID: compose-connect-group
|
||||
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
|
||||
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
|
||||
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
|
||||
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
|
||||
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
|
||||
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
|
||||
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
|
||||
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
|
||||
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
|
||||
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
|
||||
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
|
||||
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
|
||||
#volumes:
|
||||
# - ./connectors:/etc/kafka-connect/jars/
|
||||
depends_on:
|
||||
- zoo1
|
||||
- kafka1
|
||||
- kafka-rest-proxy
|
||||
|
||||
akhq:
|
||||
image: tchiotludo/akhq:0.21.0
|
||||
environment:
|
||||
AKHQ_CONFIGURATION: |
|
||||
akhq:
|
||||
connections:
|
||||
docker-kafka-server:
|
||||
properties:
|
||||
bootstrap.servers: "kafka1:19092"
|
||||
connect:
|
||||
- name: "kafka-connect"
|
||||
url: "http://kafka-connect:8083"
|
||||
ports:
|
||||
- "8084:8080"
|
||||
depends_on:
|
||||
- kafka1
|
||||
- kafka-connect
|
@ -19,23 +19,244 @@
|
||||
|
||||
package dev.dnpm.etl.processor.pseudonym;
|
||||
|
||||
import java.net.URI;
|
||||
import ca.uhn.fhir.context.FhirContext;
|
||||
import ca.uhn.fhir.parser.IParser;
|
||||
import dev.dnpm.etl.processor.config.GPasConfigProperties;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
|
||||
import org.apache.hc.client5.http.impl.classic.HttpClients;
|
||||
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
|
||||
import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
|
||||
import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
|
||||
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
|
||||
import org.apache.hc.core5.http.config.Registry;
|
||||
import org.apache.hc.core5.http.config.RegistryBuilder;
|
||||
import org.hl7.fhir.r4.model.Identifier;
|
||||
import org.hl7.fhir.r4.model.Parameters;
|
||||
import org.hl7.fhir.r4.model.Parameters.ParametersParameterComponent;
|
||||
import org.hl7.fhir.r4.model.StringType;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.*;
|
||||
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
|
||||
import org.springframework.retry.RetryCallback;
|
||||
import org.springframework.retry.RetryContext;
|
||||
import org.springframework.retry.RetryListener;
|
||||
import org.springframework.retry.RetryPolicy;
|
||||
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
|
||||
import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
import org.springframework.web.client.RestClientException;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.KeyStore;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.security.cert.CertificateFactory;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.Base64;
|
||||
import java.util.HashMap;
|
||||
|
||||
public class GpasPseudonymGenerator implements Generator {
|
||||
|
||||
private final URI uri;
|
||||
private final String gPasUrl;
|
||||
private final String psnTargetDomain;
|
||||
private static FhirContext r4Context = FhirContext.forR4();
|
||||
private final HttpHeaders httpHeader;
|
||||
|
||||
private final String target;
|
||||
private final RetryTemplate retryTemplate = defaultTemplate();
|
||||
|
||||
private final Logger log = LoggerFactory.getLogger(GpasPseudonymGenerator.class);
|
||||
|
||||
private SSLContext customSslContext;
|
||||
private RestTemplate restTemplate;
|
||||
|
||||
public GpasPseudonymGenerator(GPasConfigProperties gpasCfg) {
|
||||
|
||||
this.gPasUrl = gpasCfg.getUri();
|
||||
this.psnTargetDomain = gpasCfg.getTarget();
|
||||
httpHeader = getHttpHeaders(gpasCfg.getUsername(), gpasCfg.getPassword());
|
||||
|
||||
try {
|
||||
if (StringUtils.isNotBlank(gpasCfg.getSslCaLocation())) {
|
||||
customSslContext = getSslContext(gpasCfg.getSslCaLocation());
|
||||
}
|
||||
} catch (IOException | KeyManagementException | KeyStoreException | CertificateException |
|
||||
NoSuchAlgorithmException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
public GpasPseudonymGenerator(URI uri, String target) {
|
||||
this.uri = uri;
|
||||
this.target = target;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String generate(String id) {
|
||||
// TODO Implement this
|
||||
return "?";
|
||||
var gPasRequestBody = getGpasRequestBody(id);
|
||||
var responseEntity = getGpasPseudonym(gPasRequestBody);
|
||||
var gPasPseudonymResult = (Parameters) r4Context.newJsonParser()
|
||||
.parseResource(responseEntity.getBody());
|
||||
|
||||
return unwrapPseudonym(gPasPseudonymResult);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
public static String unwrapPseudonym(Parameters gPasPseudonymResult) {
|
||||
Identifier pseudonym = (Identifier) gPasPseudonymResult.getParameter().stream().findFirst()
|
||||
.get().getPart().stream().filter(a -> a.getName().equals("pseudonym")).findFirst()
|
||||
.orElseGet(ParametersParameterComponent::new).getValue();
|
||||
|
||||
// pseudonym
|
||||
return pseudonym.getSystem() + "|" + pseudonym.getValue();
|
||||
}
|
||||
|
||||
|
||||
@NotNull
|
||||
protected ResponseEntity<String> getGpasPseudonym(String gPasRequestBody) {
|
||||
|
||||
HttpEntity<String> requestEntity = new HttpEntity<>(gPasRequestBody, this.httpHeader);
|
||||
ResponseEntity<String> responseEntity;
|
||||
var restTemplate = getRestTemplete();
|
||||
|
||||
try {
|
||||
responseEntity = retryTemplate.execute(
|
||||
ctx -> restTemplate.exchange(gPasUrl, HttpMethod.POST, requestEntity,
|
||||
String.class));
|
||||
|
||||
if (responseEntity.getStatusCode().is2xxSuccessful()) {
|
||||
log.debug("API request succeeded. Response: {}", responseEntity.getStatusCode());
|
||||
} else {
|
||||
log.warn("API request unsuccessful. Response: {}", requestEntity.getBody());
|
||||
throw new PseudonymRequestFailed("API request unsuccessful gPas unsuccessful.");
|
||||
}
|
||||
|
||||
return responseEntity;
|
||||
} catch (Exception unexpected) {
|
||||
throw new PseudonymRequestFailed(
|
||||
"API request due unexpected error unsuccessful gPas unsuccessful.", unexpected);
|
||||
}
|
||||
}
|
||||
|
||||
protected String getGpasRequestBody(String id) {
|
||||
var requestParameters = new Parameters();
|
||||
requestParameters.addParameter().setName("target")
|
||||
.setValue(new StringType().setValue(psnTargetDomain));
|
||||
requestParameters.addParameter().setName("original")
|
||||
.setValue(new StringType().setValue(id));
|
||||
final IParser iParser = r4Context.newJsonParser();
|
||||
return iParser.encodeResourceToString(requestParameters);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
protected HttpHeaders getHttpHeaders(String gPasUserName, String gPasPassword) {
|
||||
var headers = new HttpHeaders();
|
||||
headers.setContentType(MediaType.APPLICATION_JSON);
|
||||
|
||||
if (StringUtils.isBlank(gPasUserName) || StringUtils.isBlank(gPasPassword)) {
|
||||
return headers;
|
||||
}
|
||||
|
||||
String authHeader = gPasUserName + ":" + gPasPassword;
|
||||
byte[] authHeaderBytes = authHeader.getBytes();
|
||||
byte[] encodedAuthHeaderBytes = Base64.getEncoder().encode(authHeaderBytes);
|
||||
String encodedAuthHeader = new String(encodedAuthHeaderBytes);
|
||||
|
||||
if (StringUtils.isNotBlank(gPasUserName) && StringUtils.isNotBlank(gPasPassword)) {
|
||||
headers.set("Authorization", "Basic " + encodedAuthHeader);
|
||||
}
|
||||
|
||||
return headers;
|
||||
}
|
||||
|
||||
protected RetryTemplate defaultTemplate() {
|
||||
RetryTemplate retryTemplate = new RetryTemplate();
|
||||
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
|
||||
backOffPolicy.setInitialInterval(1000);
|
||||
backOffPolicy.setMultiplier(1.25);
|
||||
retryTemplate.setBackOffPolicy(backOffPolicy);
|
||||
HashMap<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
|
||||
retryableExceptions.put(RestClientException.class, true);
|
||||
retryableExceptions.put(ConnectException.class, true);
|
||||
RetryPolicy retryPolicy = new SimpleRetryPolicy(3, retryableExceptions);
|
||||
retryTemplate.setRetryPolicy(retryPolicy);
|
||||
|
||||
retryTemplate.registerListener(new RetryListener() {
|
||||
@Override
|
||||
public <T, E extends Throwable> void onError(RetryContext context,
|
||||
RetryCallback<T, E> callback, Throwable throwable) {
|
||||
log.warn("HTTP Error occurred: {}. Retrying {}", throwable.getMessage(),
|
||||
context.getRetryCount());
|
||||
RetryListener.super.onError(context, callback, throwable);
|
||||
}
|
||||
});
|
||||
|
||||
return retryTemplate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read SSL root certificate and return SSLContext
|
||||
*
|
||||
* @param certificateLocation file location to root certificate (PEM)
|
||||
* @return initialized SSLContext
|
||||
* @throws IOException file cannot be read
|
||||
* @throws CertificateException in case we have an invalid certificate of type X.509
|
||||
* @throws KeyStoreException keystore cannot be initialized
|
||||
* @throws NoSuchAlgorithmException missing trust manager algorithmus
|
||||
* @throws KeyManagementException key management failed at init SSLContext
|
||||
*/
|
||||
@Nullable
|
||||
protected SSLContext getSslContext(String certificateLocation)
|
||||
throws IOException, CertificateException, KeyStoreException, KeyManagementException, NoSuchAlgorithmException {
|
||||
|
||||
KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
|
||||
|
||||
FileInputStream fis = new FileInputStream(certificateLocation);
|
||||
X509Certificate ca = (X509Certificate) CertificateFactory.getInstance("X.509")
|
||||
.generateCertificate(new BufferedInputStream(fis));
|
||||
|
||||
ks.load(null, null);
|
||||
ks.setCertificateEntry(Integer.toString(1), ca);
|
||||
|
||||
TrustManagerFactory tmf = TrustManagerFactory.getInstance(
|
||||
TrustManagerFactory.getDefaultAlgorithm());
|
||||
tmf.init(ks);
|
||||
|
||||
SSLContext sslContext = SSLContext.getInstance("TLS");
|
||||
sslContext.init(null, tmf.getTrustManagers(), null);
|
||||
|
||||
return sslContext;
|
||||
}
|
||||
|
||||
protected RestTemplate getRestTemplete() {
|
||||
|
||||
if (restTemplate != null) {
|
||||
return restTemplate;
|
||||
}
|
||||
|
||||
if (customSslContext == null) {
|
||||
restTemplate = new RestTemplate();
|
||||
return restTemplate;
|
||||
}
|
||||
final var sslsf = new SSLConnectionSocketFactory(customSslContext);
|
||||
final Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create()
|
||||
.register("https", sslsf).register("http", new PlainConnectionSocketFactory()).build();
|
||||
|
||||
final BasicHttpClientConnectionManager connectionManager = new BasicHttpClientConnectionManager(
|
||||
socketFactoryRegistry);
|
||||
final CloseableHttpClient httpClient = HttpClients.custom()
|
||||
.setConnectionManager(connectionManager).build();
|
||||
|
||||
final HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
|
||||
httpClient);
|
||||
restTemplate = new RestTemplate(requestFactory);
|
||||
return restTemplate;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,12 @@
|
||||
package dev.dnpm.etl.processor.pseudonym;
|
||||
|
||||
public class PseudonymRequestFailed extends RuntimeException {
|
||||
|
||||
public PseudonymRequestFailed(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public PseudonymRequestFailed(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
@ -45,7 +45,11 @@ data class PseudonymizeConfigProperties(
|
||||
@ConfigurationProperties(GPasConfigProperties.NAME)
|
||||
data class GPasConfigProperties(
|
||||
val uri: String?,
|
||||
val target: String = "etl-processor"
|
||||
val target: String = "etl-processor",
|
||||
val username: String?,
|
||||
val password: String?,
|
||||
val sslCaLocation: String?,
|
||||
|
||||
) {
|
||||
companion object {
|
||||
const val NAME = "app.pseudonymize.gpas"
|
||||
|
@ -28,16 +28,12 @@ import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator
|
||||
import dev.dnpm.etl.processor.pseudonym.Generator
|
||||
import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator
|
||||
import dev.dnpm.etl.processor.pseudonym.PseudonymizeService
|
||||
import org.reactivestreams.Publisher
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.kafka.core.KafkaTemplate
|
||||
import reactor.core.publisher.Flux
|
||||
import reactor.core.publisher.Sinks
|
||||
import java.net.URI
|
||||
import java.time.Duration
|
||||
|
||||
@Configuration
|
||||
@EnableConfigurationProperties(
|
||||
@ -54,7 +50,7 @@ class AppConfiguration {
|
||||
@ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "GPAS")
|
||||
@Bean
|
||||
fun gpasPseudonymGenerator(configProperties: GPasConfigProperties): Generator {
|
||||
return GpasPseudonymGenerator(URI.create(configProperties.uri!!), configProperties.target)
|
||||
return GpasPseudonymGenerator(configProperties)
|
||||
}
|
||||
|
||||
@ConditionalOnProperty(value = ["app.pseudonymizer"], havingValue = "BUILDIN", matchIfMissing = true)
|
||||
|
@ -33,11 +33,22 @@ class KafkaMtbFileSender(
|
||||
|
||||
override fun send(mtbFile: MtbFile): MtbFileSender.Response {
|
||||
return try {
|
||||
kafkaTemplate.sendDefault(objectMapper.writeValueAsString(mtbFile))
|
||||
logger.debug("Sent file via KafkaMtbFileSender")
|
||||
MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
|
||||
val result = kafkaTemplate.sendDefault(
|
||||
String.format(
|
||||
"{\"pid\": \"%s\", \"eid\": \"%s\"}",
|
||||
mtbFile.patient.id,
|
||||
mtbFile.episode.id
|
||||
), objectMapper.writeValueAsString(mtbFile)
|
||||
)
|
||||
if (result.get() != null) {
|
||||
logger.debug("Sent file via KafkaMtbFileSender")
|
||||
MtbFileSender.Response(MtbFileSender.ResponseStatus.SUCCESS)
|
||||
} else {
|
||||
MtbFileSender.Response(MtbFileSender.ResponseStatus.ERROR)
|
||||
}
|
||||
|
||||
} catch (e: Exception) {
|
||||
logger.error("An error occured sending to kafka", e)
|
||||
logger.error("An error occurred sending to kafka", e)
|
||||
MtbFileSender.Response(MtbFileSender.ResponseStatus.UNKNOWN)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user