From 531a8589db2bf170e6272602ccb4a3c4457186d8 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 17 Jan 2024 14:32:42 +0100 Subject: [PATCH] feat: push connection available state to client --- .../etl/processor/config/AppConfiguration.kt | 15 ++++++++++----- .../processor/config/AppKafkaConfiguration.kt | 5 +++-- .../etl/processor/config/AppRestConfiguration.kt | 6 ++++-- .../monitoring/ConnectionCheckService.kt | 12 ++++++++++-- .../dnpm/etl/processor/web/ConfigController.kt | 16 ++++++++++++++++ .../processor/web/StatisticsRestController.kt | 2 ++ src/main/resources/templates/configs.html | 6 +++++- 7 files changed, 50 insertions(+), 12 deletions(-) diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt index 8d71b62..83cc568 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppConfiguration.kt @@ -94,11 +94,6 @@ class AppConfiguration { return ReportService(objectMapper) } - @Bean - fun statisticsUpdateProducer(): Sinks.Many { - return Sinks.many().multicast().directBestEffort() - } - @Bean fun transformationService( objectMapper: ObjectMapper, @@ -119,5 +114,15 @@ class AppConfiguration { .build() } + @Bean + fun statisticsUpdateProducer(): Sinks.Many { + return Sinks.many().multicast().directBestEffort() + } + + @Bean + fun configsUpdateProducer(): Sinks.Many { + return Sinks.many().multicast().directBestEffort() + } + } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt index 15ed798..68b86b2 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppKafkaConfiguration.kt @@ -38,6 +38,7 @@ import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.listener.ContainerProperties import org.springframework.kafka.listener.KafkaMessageListenerContainer import org.springframework.retry.support.RetryTemplate +import reactor.core.publisher.Sinks @Configuration @EnableConfigurationProperties( @@ -81,8 +82,8 @@ class AppKafkaConfiguration { } @Bean - fun connectionCheckService(consumerFactory: ConsumerFactory): ConnectionCheckService { - return KafkaConnectionCheckService(consumerFactory.createConsumer()) + fun connectionCheckService(consumerFactory: ConsumerFactory, configsUpdateProducer: Sinks.Many): ConnectionCheckService { + return KafkaConnectionCheckService(consumerFactory.createConsumer(), configsUpdateProducer) } } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt index 64e91e7..eea5724 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/config/AppRestConfiguration.kt @@ -32,6 +32,7 @@ import org.springframework.context.annotation.Configuration import org.springframework.core.annotation.Order import org.springframework.retry.support.RetryTemplate import org.springframework.web.client.RestTemplate +import reactor.core.publisher.Sinks @Configuration @EnableConfigurationProperties( @@ -64,9 +65,10 @@ class AppRestConfiguration { @Bean fun connectionCheckService( restTemplate: RestTemplate, - restTargetProperties: RestTargetProperties + restTargetProperties: RestTargetProperties, + configsUpdateProducer: Sinks.Many ): ConnectionCheckService { - return RestConnectionCheckService(restTemplate, restTargetProperties) + return RestConnectionCheckService(restTemplate, restTargetProperties, configsUpdateProducer) } } diff --git a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ConnectionCheckService.kt b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ConnectionCheckService.kt index d109326..54f25b3 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ConnectionCheckService.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/monitoring/ConnectionCheckService.kt @@ -24,9 +24,11 @@ import dev.dnpm.etl.processor.config.RestTargetProperties import jakarta.annotation.PostConstruct import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.common.errors.TimeoutException +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.http.HttpStatus import org.springframework.scheduling.annotation.Scheduled import org.springframework.web.client.RestTemplate +import reactor.core.publisher.Sinks import kotlin.time.Duration.Companion.seconds import kotlin.time.toJavaDuration @@ -37,7 +39,9 @@ interface ConnectionCheckService { } class KafkaConnectionCheckService( - private val consumer: Consumer + private val consumer: Consumer, + @Qualifier("configsUpdateProducer") + private val configsUpdateProducer: Sinks.Many ) : ConnectionCheckService { private var connectionAvailable: Boolean = false @@ -51,6 +55,7 @@ class KafkaConnectionCheckService( } catch (e: TimeoutException) { false } + configsUpdateProducer.emitNext(connectionAvailable, Sinks.EmitFailureHandler.FAIL_FAST) } override fun connectionAvailable(): Boolean { @@ -61,7 +66,9 @@ class KafkaConnectionCheckService( class RestConnectionCheckService( private val restTemplate: RestTemplate, - private val restTargetProperties: RestTargetProperties + private val restTargetProperties: RestTargetProperties, + @Qualifier("configsUpdateProducer") + private val configsUpdateProducer: Sinks.Many ) : ConnectionCheckService { private var connectionAvailable: Boolean = false @@ -77,6 +84,7 @@ class RestConnectionCheckService( } catch (e: Exception) { false } + configsUpdateProducer.emitNext(connectionAvailable, Sinks.EmitFailureHandler.FAIL_FAST) } override fun connectionAvailable(): Boolean { diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/ConfigController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/ConfigController.kt index 18bf8d7..be291ea 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/ConfigController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/ConfigController.kt @@ -23,14 +23,21 @@ import dev.dnpm.etl.processor.monitoring.ConnectionCheckService import dev.dnpm.etl.processor.output.MtbFileSender import dev.dnpm.etl.processor.pseudonym.Generator import dev.dnpm.etl.processor.services.TransformationService +import org.springframework.beans.factory.annotation.Qualifier +import org.springframework.http.MediaType +import org.springframework.http.codec.ServerSentEvent import org.springframework.stereotype.Controller import org.springframework.ui.Model import org.springframework.web.bind.annotation.GetMapping import org.springframework.web.bind.annotation.RequestMapping +import reactor.core.publisher.Flux +import reactor.core.publisher.Sinks @Controller @RequestMapping(path = ["configs"]) class ConfigController( + @Qualifier("configsUpdateProducer") + private val configsUpdateProducer: Sinks.Many, private val transformationService: TransformationService, private val pseudonymGenerator: Generator, private val mtbFileSender: MtbFileSender, @@ -58,4 +65,13 @@ class ConfigController( return "configs/connectionAvailable" } + @GetMapping(path = ["events"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE]) + fun events(): Flux> { + return configsUpdateProducer.asFlux().map { + ServerSentEvent.builder() + .event("connection-available").id("none").data("") + .build() + } + } + } \ No newline at end of file diff --git a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt index 546909e..74ae238 100644 --- a/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt +++ b/src/main/kotlin/dev/dnpm/etl/processor/web/StatisticsRestController.kt @@ -22,6 +22,7 @@ package dev.dnpm.etl.processor.web import dev.dnpm.etl.processor.monitoring.RequestRepository import dev.dnpm.etl.processor.monitoring.RequestStatus import dev.dnpm.etl.processor.monitoring.RequestType +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.http.MediaType import org.springframework.http.codec.ServerSentEvent import org.springframework.web.bind.annotation.GetMapping @@ -38,6 +39,7 @@ import java.time.temporal.ChronoUnit @RestController @RequestMapping(path = ["/statistics"]) class StatisticsRestController( + @Qualifier("statisticsUpdateProducer") private val statisticsUpdateProducer: Sinks.Many, private val requestRepository: RequestRepository ) { diff --git a/src/main/resources/templates/configs.html b/src/main/resources/templates/configs.html index 1d6dd20..3c3d744 100644 --- a/src/main/resources/templates/configs.html +++ b/src/main/resources/templates/configs.html @@ -37,7 +37,10 @@ -
+
+
+
+

Transformationen

@@ -84,5 +87,6 @@
+ \ No newline at end of file