1
0
mirror of https://github.com/pcvolkmer/etl-processor.git synced 2025-04-19 17:26:51 +00:00

feat: push connection available state to client

This commit is contained in:
Paul-Christian Volkmer 2024-01-17 14:32:42 +01:00
parent fa89a64ddd
commit 531a8589db
7 changed files with 50 additions and 12 deletions

View File

@ -94,11 +94,6 @@ class AppConfiguration {
return ReportService(objectMapper)
}
@Bean
fun statisticsUpdateProducer(): Sinks.Many<Any> {
return Sinks.many().multicast().directBestEffort()
}
@Bean
fun transformationService(
objectMapper: ObjectMapper,
@ -119,5 +114,15 @@ class AppConfiguration {
.build()
}
@Bean
fun statisticsUpdateProducer(): Sinks.Many<Any> {
return Sinks.many().multicast().directBestEffort()
}
@Bean
fun configsUpdateProducer(): Sinks.Many<Boolean> {
return Sinks.many().multicast().directBestEffort()
}
}

View File

@ -38,6 +38,7 @@ import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.retry.support.RetryTemplate
import reactor.core.publisher.Sinks
@Configuration
@EnableConfigurationProperties(
@ -81,8 +82,8 @@ class AppKafkaConfiguration {
}
@Bean
fun connectionCheckService(consumerFactory: ConsumerFactory<String, String>): ConnectionCheckService {
return KafkaConnectionCheckService(consumerFactory.createConsumer())
fun connectionCheckService(consumerFactory: ConsumerFactory<String, String>, configsUpdateProducer: Sinks.Many<Boolean>): ConnectionCheckService {
return KafkaConnectionCheckService(consumerFactory.createConsumer(), configsUpdateProducer)
}
}

View File

@ -32,6 +32,7 @@ import org.springframework.context.annotation.Configuration
import org.springframework.core.annotation.Order
import org.springframework.retry.support.RetryTemplate
import org.springframework.web.client.RestTemplate
import reactor.core.publisher.Sinks
@Configuration
@EnableConfigurationProperties(
@ -64,9 +65,10 @@ class AppRestConfiguration {
@Bean
fun connectionCheckService(
restTemplate: RestTemplate,
restTargetProperties: RestTargetProperties
restTargetProperties: RestTargetProperties,
configsUpdateProducer: Sinks.Many<Boolean>
): ConnectionCheckService {
return RestConnectionCheckService(restTemplate, restTargetProperties)
return RestConnectionCheckService(restTemplate, restTargetProperties, configsUpdateProducer)
}
}

View File

@ -24,9 +24,11 @@ import dev.dnpm.etl.processor.config.RestTargetProperties
import jakarta.annotation.PostConstruct
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.common.errors.TimeoutException
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.http.HttpStatus
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.web.client.RestTemplate
import reactor.core.publisher.Sinks
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
@ -37,7 +39,9 @@ interface ConnectionCheckService {
}
class KafkaConnectionCheckService(
private val consumer: Consumer<String, String>
private val consumer: Consumer<String, String>,
@Qualifier("configsUpdateProducer")
private val configsUpdateProducer: Sinks.Many<Boolean>
) : ConnectionCheckService {
private var connectionAvailable: Boolean = false
@ -51,6 +55,7 @@ class KafkaConnectionCheckService(
} catch (e: TimeoutException) {
false
}
configsUpdateProducer.emitNext(connectionAvailable, Sinks.EmitFailureHandler.FAIL_FAST)
}
override fun connectionAvailable(): Boolean {
@ -61,7 +66,9 @@ class KafkaConnectionCheckService(
class RestConnectionCheckService(
private val restTemplate: RestTemplate,
private val restTargetProperties: RestTargetProperties
private val restTargetProperties: RestTargetProperties,
@Qualifier("configsUpdateProducer")
private val configsUpdateProducer: Sinks.Many<Boolean>
) : ConnectionCheckService {
private var connectionAvailable: Boolean = false
@ -77,6 +84,7 @@ class RestConnectionCheckService(
} catch (e: Exception) {
false
}
configsUpdateProducer.emitNext(connectionAvailable, Sinks.EmitFailureHandler.FAIL_FAST)
}
override fun connectionAvailable(): Boolean {

View File

@ -23,14 +23,21 @@ import dev.dnpm.etl.processor.monitoring.ConnectionCheckService
import dev.dnpm.etl.processor.output.MtbFileSender
import dev.dnpm.etl.processor.pseudonym.Generator
import dev.dnpm.etl.processor.services.TransformationService
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.http.MediaType
import org.springframework.http.codec.ServerSentEvent
import org.springframework.stereotype.Controller
import org.springframework.ui.Model
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
@Controller
@RequestMapping(path = ["configs"])
class ConfigController(
@Qualifier("configsUpdateProducer")
private val configsUpdateProducer: Sinks.Many<Boolean>,
private val transformationService: TransformationService,
private val pseudonymGenerator: Generator,
private val mtbFileSender: MtbFileSender,
@ -58,4 +65,13 @@ class ConfigController(
return "configs/connectionAvailable"
}
@GetMapping(path = ["events"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun events(): Flux<ServerSentEvent<Any>> {
return configsUpdateProducer.asFlux().map {
ServerSentEvent.builder<Any>()
.event("connection-available").id("none").data("")
.build()
}
}
}

View File

@ -22,6 +22,7 @@ package dev.dnpm.etl.processor.web
import dev.dnpm.etl.processor.monitoring.RequestRepository
import dev.dnpm.etl.processor.monitoring.RequestStatus
import dev.dnpm.etl.processor.monitoring.RequestType
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.http.MediaType
import org.springframework.http.codec.ServerSentEvent
import org.springframework.web.bind.annotation.GetMapping
@ -38,6 +39,7 @@ import java.time.temporal.ChronoUnit
@RestController
@RequestMapping(path = ["/statistics"])
class StatisticsRestController(
@Qualifier("statisticsUpdateProducer")
private val statisticsUpdateProducer: Sinks.Many<Any>,
private val requestRepository: RequestRepository
) {

View File

@ -37,7 +37,10 @@
</table>
</section>
<section th:insert="~{configs/connectionAvailable.html}" th:hx-get="@{/configs?connectionAvailable}" hx-trigger="every 20s"></section>
<section hx-ext="sse" th:sse-connect="@{/configs/events}">
<div th:insert="~{configs/connectionAvailable.html}" th:hx-get="@{/configs?connectionAvailable}" hx-trigger="sse:connection-available">
</div>
</section>
<section>
<h2><span th:if="${not transformations.isEmpty()}"></span><span th:if="${transformations.isEmpty()}"></span> Transformationen</h2>
@ -84,5 +87,6 @@
</section>
</main>
<script th:src="@{/webjars/htmx.org/dist/htmx.min.js}"></script>
<script th:src="@{/webjars/htmx.org/dist/ext/sse.js}"></script>
</body>
</html>