From afdd52eb8b3b7625fa5c7ec9113040ff6287db56 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Fri, 9 Aug 2024 21:21:02 +0200 Subject: [PATCH] feat: only emit latest statistics within time window using SSE This will reduce browser activity and updates statistics only once in 500ms with the latest statistics update event within this time window and event type. If no updates are available, no SSE will be sent. --- .../monitor/web/EventStreamController.kt | 12 +++++-- .../monitor/web/EventStreamControllerTest.kt | 33 +++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamController.kt b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamController.kt index 8e6e0dd..f788c18 100644 --- a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamController.kt +++ b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamController.kt @@ -7,6 +7,8 @@ import org.springframework.http.codec.ServerSentEvent import org.springframework.web.bind.annotation.GetMapping import org.springframework.web.bind.annotation.RestController import reactor.core.publisher.Flux +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.toJavaDuration @RestController class EventStreamController( @@ -15,9 +17,13 @@ class EventStreamController( @GetMapping(path = ["/events"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE]) fun eventStream(): Flux> { - return statisticsEventProducer.asFlux().map { - ServerSentEvent.builder(it).event(it.name).build() - }.doOnComplete { println("X") } + return statisticsEventProducer.asFlux() + .window(500.milliseconds.toJavaDuration()) + .flatMap { statistics -> statistics.groupBy { it.name } } + .flatMap { group -> group.last() } + .map { + ServerSentEvent.builder(it).event(it.name).build() + } } } \ No newline at end of file diff --git a/src/test/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamControllerTest.kt b/src/test/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamControllerTest.kt index 37f4da0..b7c9f73 100644 --- a/src/test/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamControllerTest.kt +++ b/src/test/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamControllerTest.kt @@ -2,6 +2,7 @@ package dev.pcvolkmer.oncoanalytics.monitor.web import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink import dev.pcvolkmer.oncoanalytics.monitor.conditions.Statistics +import dev.pcvolkmer.oncoanalytics.monitor.conditions.StatisticsEntry import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith @@ -10,6 +11,7 @@ import org.springframework.http.MediaType import org.springframework.test.context.junit.jupiter.SpringExtension import org.springframework.test.web.reactive.server.WebTestClient import org.springframework.test.web.reactive.server.returnResult +import reactor.core.publisher.Flux import reactor.core.publisher.Sinks import reactor.test.StepVerifier import kotlin.time.Duration.Companion.seconds @@ -46,4 +48,35 @@ class EventStreamControllerTest { .verify() } + @Test + fun shouldSendOnlyLatestEventsWithinTimeWindow() { + val latestValue = 10 + + Flux.fromIterable(0..latestValue).subscribe { value -> + this.statisticsEventProducer.emitNext( + Statistics( + "test", + listOf(StatisticsEntry("test", value)) + ) + ) { _, _ -> false } + } + + val result = webClient + .get().uri("/events") + .accept(MediaType.TEXT_EVENT_STREAM) + .exchange() + .expectStatus().isOk + .returnResult() + + StepVerifier.create(result.responseBody) + .expectNext( + Statistics( + "test", + listOf(StatisticsEntry("test", latestValue)) + ) + ) + .expectTimeout(3.seconds.toJavaDuration()) + .verify() + } + } \ No newline at end of file