diff --git a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamController.kt b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamController.kt index 8e6e0dd..f788c18 100644 --- a/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamController.kt +++ b/src/main/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamController.kt @@ -7,6 +7,8 @@ import org.springframework.http.codec.ServerSentEvent import org.springframework.web.bind.annotation.GetMapping import org.springframework.web.bind.annotation.RestController import reactor.core.publisher.Flux +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.toJavaDuration @RestController class EventStreamController( @@ -15,9 +17,13 @@ class EventStreamController( @GetMapping(path = ["/events"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE]) fun eventStream(): Flux> { - return statisticsEventProducer.asFlux().map { - ServerSentEvent.builder(it).event(it.name).build() - }.doOnComplete { println("X") } + return statisticsEventProducer.asFlux() + .window(500.milliseconds.toJavaDuration()) + .flatMap { statistics -> statistics.groupBy { it.name } } + .flatMap { group -> group.last() } + .map { + ServerSentEvent.builder(it).event(it.name).build() + } } } \ No newline at end of file diff --git a/src/test/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamControllerTest.kt b/src/test/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamControllerTest.kt index 37f4da0..b7c9f73 100644 --- a/src/test/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamControllerTest.kt +++ b/src/test/kotlin/dev/pcvolkmer/oncoanalytics/monitor/web/EventStreamControllerTest.kt @@ -2,6 +2,7 @@ package dev.pcvolkmer.oncoanalytics.monitor.web import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink import dev.pcvolkmer.oncoanalytics.monitor.conditions.Statistics +import dev.pcvolkmer.oncoanalytics.monitor.conditions.StatisticsEntry import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith @@ -10,6 +11,7 @@ import org.springframework.http.MediaType import org.springframework.test.context.junit.jupiter.SpringExtension import org.springframework.test.web.reactive.server.WebTestClient import org.springframework.test.web.reactive.server.returnResult +import reactor.core.publisher.Flux import reactor.core.publisher.Sinks import reactor.test.StepVerifier import kotlin.time.Duration.Companion.seconds @@ -46,4 +48,35 @@ class EventStreamControllerTest { .verify() } + @Test + fun shouldSendOnlyLatestEventsWithinTimeWindow() { + val latestValue = 10 + + Flux.fromIterable(0..latestValue).subscribe { value -> + this.statisticsEventProducer.emitNext( + Statistics( + "test", + listOf(StatisticsEntry("test", value)) + ) + ) { _, _ -> false } + } + + val result = webClient + .get().uri("/events") + .accept(MediaType.TEXT_EVENT_STREAM) + .exchange() + .expectStatus().isOk + .returnResult() + + StepVerifier.create(result.responseBody) + .expectNext( + Statistics( + "test", + listOf(StatisticsEntry("test", latestValue)) + ) + ) + .expectTimeout(3.seconds.toJavaDuration()) + .verify() + } + } \ No newline at end of file