diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index 8fb9e19..4ca48cd 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -20,7 +20,7 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper -import dev.dnpm.etl.processor.monitoring.ReportService +import dev.dnpm.etl.processor.monitoring.* import dev.dnpm.etl.processor.pseudonym.AnonymizingGenerator import dev.dnpm.etl.processor.pseudonym.Generator import dev.dnpm.etl.processor.pseudonym.GpasPseudonymGenerator @@ -44,6 +44,7 @@ import org.springframework.retry.support.RetryTemplateBuilder import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.security.crypto.password.PasswordEncoder import org.springframework.security.provisioning.InMemoryUserDetailsManager +import org.springframework.web.client.RestTemplate import reactor.core.publisher.Sinks import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration @@ -142,8 +143,18 @@ class AppConfiguration { } @Bean - fun configsUpdateProducer(): Sinks.Many { - return Sinks.many().multicast().directBestEffort() + fun connectionCheckUpdateProducer(): Sinks.Many { + return Sinks.many().multicast().onBackpressureBuffer() + } + + @ConditionalOnProperty(value = ["app.pseudonymize.generator"], havingValue = "GPAS") + @Bean + fun gPasConnectionCheckService( + restTemplate: RestTemplate, + gPasConfigProperties: GPasConfigProperties, + connectionCheckUpdateProducer: Sinks.Many + ): ConnectionCheckService { + return GPasConnectionCheckService(restTemplate, gPasConfigProperties, connectionCheckUpdateProducer) } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index 3799762..80c66d2 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -21,6 +21,7 @@ package dev.dnpm.etl.processor.config import com.fasterxml.jackson.databind.ObjectMapper import dev.dnpm.etl.processor.input.KafkaInputListener +import dev.dnpm.etl.processor.monitoring.ConnectionCheckResult import dev.dnpm.etl.processor.monitoring.ConnectionCheckService import dev.dnpm.etl.processor.monitoring.KafkaConnectionCheckService import dev.dnpm.etl.processor.output.KafkaMtbFileSender @@ -105,8 +106,11 @@ class AppKafkaConfiguration { } @Bean - fun connectionCheckService(consumerFactory: ConsumerFactory, configsUpdateProducer: Sinks.Many): ConnectionCheckService { - return KafkaConnectionCheckService(consumerFactory.createConsumer(), configsUpdateProducer) + fun kafkaConnectionCheckService( + consumerFactory: ConsumerFactory, + connectionCheckUpdateProducer: Sinks.Many + ): ConnectionCheckService { + return KafkaConnectionCheckService(consumerFactory.createConsumer(), connectionCheckUpdateProducer) } } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt index eea5724..582d530 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt @@ -19,6 +19,7 @@ package dev.dnpm.etl.processor.config +import dev.dnpm.etl.processor.monitoring.ConnectionCheckResult import dev.dnpm.etl.processor.monitoring.ConnectionCheckService import dev.dnpm.etl.processor.monitoring.RestConnectionCheckService import dev.dnpm.etl.processor.output.MtbFileSender @@ -63,12 +64,12 @@ class AppRestConfiguration { } @Bean - fun connectionCheckService( + fun restConnectionCheckService( restTemplate: RestTemplate, restTargetProperties: RestTargetProperties, - configsUpdateProducer: Sinks.Many + connectionCheckUpdateProducer: Sinks.Many ): ConnectionCheckService { - return RestConnectionCheckService(restTemplate, restTargetProperties, configsUpdateProducer) + return RestConnectionCheckService(restTemplate, restTargetProperties, connectionCheckUpdateProducer) } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ConnectionCheckService.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ConnectionCheckService.kt index 54f25b3..81ad922 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ConnectionCheckService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ConnectionCheckService.kt @@ -20,14 +20,21 @@ package dev.dnpm.etl.processor.monitoring +import dev.dnpm.etl.processor.config.GPasConfigProperties import dev.dnpm.etl.processor.config.RestTargetProperties import jakarta.annotation.PostConstruct import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.common.errors.TimeoutException import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.http.HttpEntity +import org.springframework.http.HttpHeaders +import org.springframework.http.HttpMethod import org.springframework.http.HttpStatus +import org.springframework.http.MediaType +import org.springframework.http.RequestEntity import org.springframework.scheduling.annotation.Scheduled import org.springframework.web.client.RestTemplate +import org.springframework.web.util.UriComponentsBuilder import reactor.core.publisher.Sinks import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration @@ -38,11 +45,22 @@ interface ConnectionCheckService { } +interface OutputConnectionCheckService : ConnectionCheckService + +sealed class ConnectionCheckResult { + + abstract val available: Boolean + + data class KafkaConnectionCheckResult(override val available: Boolean) : ConnectionCheckResult() + data class RestConnectionCheckResult(override val available: Boolean) : ConnectionCheckResult() + data class GPasConnectionCheckResult(override val available: Boolean) : ConnectionCheckResult() +} + class KafkaConnectionCheckService( private val consumer: Consumer, - @Qualifier("configsUpdateProducer") - private val configsUpdateProducer: Sinks.Many -) : ConnectionCheckService { + @Qualifier("connectionCheckUpdateProducer") + private val connectionCheckUpdateProducer: Sinks.Many +) : OutputConnectionCheckService { private var connectionAvailable: Boolean = false @@ -55,7 +73,10 @@ class KafkaConnectionCheckService( } catch (e: TimeoutException) { false } - configsUpdateProducer.emitNext(connectionAvailable, Sinks.EmitFailureHandler.FAIL_FAST) + connectionCheckUpdateProducer.emitNext( + ConnectionCheckResult.KafkaConnectionCheckResult(connectionAvailable), + Sinks.EmitFailureHandler.FAIL_FAST + ) } override fun connectionAvailable(): Boolean { @@ -67,9 +88,9 @@ class KafkaConnectionCheckService( class RestConnectionCheckService( private val restTemplate: RestTemplate, private val restTargetProperties: RestTargetProperties, - @Qualifier("configsUpdateProducer") - private val configsUpdateProducer: Sinks.Many -) : ConnectionCheckService { + @Qualifier("connectionCheckUpdateProducer") + private val connectionCheckUpdateProducer: Sinks.Many +) : OutputConnectionCheckService { private var connectionAvailable: Boolean = false @@ -84,7 +105,55 @@ class RestConnectionCheckService( } catch (e: Exception) { false } - configsUpdateProducer.emitNext(connectionAvailable, Sinks.EmitFailureHandler.FAIL_FAST) + connectionCheckUpdateProducer.emitNext( + ConnectionCheckResult.RestConnectionCheckResult(connectionAvailable), + Sinks.EmitFailureHandler.FAIL_FAST + ) + } + + override fun connectionAvailable(): Boolean { + return this.connectionAvailable + } +} + +class GPasConnectionCheckService( + private val restTemplate: RestTemplate, + private val gPasConfigProperties: GPasConfigProperties, + @Qualifier("connectionCheckUpdateProducer") + private val connectionCheckUpdateProducer: Sinks.Many +) : ConnectionCheckService { + + private var connectionAvailable: Boolean = false + + @PostConstruct + @Scheduled(cron = "0 * * * * *") + fun check() { + connectionAvailable = try { + val uri = UriComponentsBuilder.fromUriString( + gPasConfigProperties.uri?.replace("/\$pseudonymizeAllowCreate", "/\$pseudonymize").toString() + ) + .queryParam("target", gPasConfigProperties.target) + .queryParam("original", "???") + .build().toUri() + + val headers = HttpHeaders() + headers.contentType = MediaType.APPLICATION_JSON + if (!gPasConfigProperties.username.isNullOrBlank() && !gPasConfigProperties.password.isNullOrBlank()) { + headers.setBasicAuth(gPasConfigProperties.username, gPasConfigProperties.password) + } + restTemplate.exchange( + uri, + HttpMethod.GET, + HttpEntity(headers), + Void::class.java + ).statusCode == HttpStatus.OK + } catch (e: Exception) { + false + } + connectionCheckUpdateProducer.emitNext( + ConnectionCheckResult.GPasConnectionCheckResult(connectionAvailable), + Sinks.EmitFailureHandler.FAIL_FAST + ) } override fun connectionAvailable(): Boolean { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/ConfigController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/ConfigController.kt index 44ea400..eb9d541 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/ConfigController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/ConfigController.kt @@ -19,7 +19,10 @@ package dev.dnpm.etl.processor.web +import dev.dnpm.etl.processor.monitoring.ConnectionCheckResult import dev.dnpm.etl.processor.monitoring.ConnectionCheckService +import dev.dnpm.etl.processor.monitoring.GPasConnectionCheckService +import dev.dnpm.etl.processor.monitoring.OutputConnectionCheckService import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.pseudonym.Generator import dev.dnpm.etl.processor.security.Role @@ -40,22 +43,29 @@ import reactor.core.publisher.Sinks @Controller @RequestMapping(path = ["configs"]) class ConfigController( - @Qualifier("configsUpdateProducer") - private val configsUpdateProducer: Sinks.Many, + @Qualifier("connectionCheckUpdateProducer") + private val connectionCheckUpdateProducer: Sinks.Many, private val transformationService: TransformationService, private val pseudonymGenerator: Generator, private val mtbFileSender: MtbFileSender, - private val connectionCheckService: ConnectionCheckService, + private val connectionCheckServices: List, private val tokenService: TokenService?, private val userRoleService: UserRoleService? ) { @GetMapping fun index(model: Model): String { + val outputConnectionAvailable = + connectionCheckServices.filterIsInstance().first().connectionAvailable() + + val gPasConnectionAvailable = + connectionCheckServices.filterIsInstance().firstOrNull()?.connectionAvailable() + model.addAttribute("pseudonymGenerator", pseudonymGenerator.javaClass.simpleName) model.addAttribute("mtbFileSender", mtbFileSender.javaClass.simpleName) model.addAttribute("mtbFileEndpoint", mtbFileSender.endpoint()) - model.addAttribute("connectionAvailable", connectionCheckService.connectionAvailable()) + model.addAttribute("outputConnectionAvailable", outputConnectionAvailable) + model.addAttribute("gPasConnectionAvailable", gPasConnectionAvailable) model.addAttribute("tokensEnabled", tokenService != null) if (tokenService != null) { model.addAttribute("tokens", tokenService.findAll()) @@ -73,11 +83,14 @@ class ConfigController( return "configs" } - @GetMapping(params = ["connectionAvailable"]) - fun connectionAvailable(model: Model): String { + @GetMapping(params = ["outputConnectionAvailable"]) + fun outputConnectionAvailable(model: Model): String { + val outputConnectionAvailable = + connectionCheckServices.filterIsInstance().first().connectionAvailable() + model.addAttribute("mtbFileSender", mtbFileSender.javaClass.simpleName) model.addAttribute("mtbFileEndpoint", mtbFileSender.endpoint()) - model.addAttribute("connectionAvailable", connectionCheckService.connectionAvailable()) + model.addAttribute("outputConnectionAvailable", outputConnectionAvailable) if (tokenService != null) { model.addAttribute("tokensEnabled", true) model.addAttribute("tokens", tokenService.findAll()) @@ -85,7 +98,25 @@ class ConfigController( model.addAttribute("tokens", listOf()) } - return "configs/connectionAvailable" + return "configs/outputConnectionAvailable" + } + + @GetMapping(params = ["gPasConnectionAvailable"]) + fun gPasConnectionAvailable(model: Model): String { + val gPasConnectionAvailable = + connectionCheckServices.filterIsInstance().firstOrNull()?.connectionAvailable() + + model.addAttribute("mtbFileSender", mtbFileSender.javaClass.simpleName) + model.addAttribute("mtbFileEndpoint", mtbFileSender.endpoint()) + model.addAttribute("gPasConnectionAvailable", gPasConnectionAvailable) + if (tokenService != null) { + model.addAttribute("tokensEnabled", true) + model.addAttribute("tokens", tokenService.findAll()) + } else { + model.addAttribute("tokens", listOf()) + } + + return "configs/gPasConnectionAvailable" } @PostMapping(path = ["tokens"]) @@ -152,9 +183,15 @@ class ConfigController( @GetMapping(path = ["events"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE]) fun events(): Flux> { - return configsUpdateProducer.asFlux().map { + return connectionCheckUpdateProducer.asFlux().map { + val event = when (it) { + is ConnectionCheckResult.KafkaConnectionCheckResult -> "output-connection-check" + is ConnectionCheckResult.RestConnectionCheckResult -> "output-connection-check" + is ConnectionCheckResult.GPasConnectionCheckResult -> "gpas-connection-check" + } + ServerSentEvent.builder() - .event("connection-available").id("none").data("") + .event(event).id("none").data(it) .build() } } diff --git a/src/main/resources/templates/configs.html b/src/main/resources/templates/configs.html index 1ac4a26..d94deb6 100644 --- a/src/main/resources/templates/configs.html +++ b/src/main/resources/templates/configs.html @@ -45,7 +45,12 @@
-
+
+
+
+ +
+
diff --git a/src/main/resources/templates/configs/gPasConnectionAvailable.html b/src/main/resources/templates/configs/gPasConnectionAvailable.html new file mode 100644 index 0000000..6dccc60 --- /dev/null +++ b/src/main/resources/templates/configs/gPasConnectionAvailable.html @@ -0,0 +1,19 @@ + +

🟦 gPAS nicht konfiguriert - Patienten-IDs werden intern anonymisiert

+
+ +

✅⚡ Verbindung zu gPAS

+
+ Die Verbindung ist aktuell + verfügbar. + nicht verfügbar. +
+
+ ETL-Processor + + gPAS + ETL-Processor + + gPAS +
+
\ No newline at end of file diff --git a/src/main/resources/templates/configs/connectionAvailable.html b/src/main/resources/templates/configs/outputConnectionAvailable.html similarity index 57% rename from src/main/resources/templates/configs/connectionAvailable.html rename to src/main/resources/templates/configs/outputConnectionAvailable.html index 6d52d70..699c614 100644 --- a/src/main/resources/templates/configs/connectionAvailable.html +++ b/src/main/resources/templates/configs/outputConnectionAvailable.html @@ -1,12 +1,12 @@ -

✅⚡ Verbindung zum bwHC-Backend

+

✅⚡ Verbindung zum bwHC-Backend

Verbindung über [[ ${mtbFileSender} ]]. Die Verbindung ist aktuell - verfügbar. - nicht verfügbar. + verfügbar. + nicht verfügbar.
ETL-Processor - + bwHC-Backend Kafka-Broker ETL-Processor