1
0
mirror of https://github.com/pcvolkmer/onco-analytics-monitor.git synced 2025-04-19 11:06:52 +00:00

feat: only emit latest statistics within time window using SSE

This will reduce browser activity and updates statistics only once in
500ms with the latest statistics update event within this time window
and event type.

If no updates are available, no SSE will be sent.
This commit is contained in:
Paul-Christian Volkmer 2024-08-09 21:21:02 +02:00
parent 5d35c31f3e
commit afdd52eb8b
2 changed files with 42 additions and 3 deletions

View File

@ -7,6 +7,8 @@ import org.springframework.http.codec.ServerSentEvent
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController
import reactor.core.publisher.Flux
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.toJavaDuration
@RestController
class EventStreamController(
@ -15,9 +17,13 @@ class EventStreamController(
@GetMapping(path = ["/events"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun eventStream(): Flux<ServerSentEvent<Statistics>> {
return statisticsEventProducer.asFlux().map {
ServerSentEvent.builder(it).event(it.name).build()
}.doOnComplete { println("X") }
return statisticsEventProducer.asFlux()
.window(500.milliseconds.toJavaDuration())
.flatMap { statistics -> statistics.groupBy { it.name } }
.flatMap { group -> group.last() }
.map {
ServerSentEvent.builder(it).event(it.name).build()
}
}
}

View File

@ -2,6 +2,7 @@ package dev.pcvolkmer.oncoanalytics.monitor.web
import dev.pcvolkmer.oncoanalytics.monitor.StatisticsSink
import dev.pcvolkmer.oncoanalytics.monitor.conditions.Statistics
import dev.pcvolkmer.oncoanalytics.monitor.conditions.StatisticsEntry
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
@ -10,6 +11,7 @@ import org.springframework.http.MediaType
import org.springframework.test.context.junit.jupiter.SpringExtension
import org.springframework.test.web.reactive.server.WebTestClient
import org.springframework.test.web.reactive.server.returnResult
import reactor.core.publisher.Flux
import reactor.core.publisher.Sinks
import reactor.test.StepVerifier
import kotlin.time.Duration.Companion.seconds
@ -46,4 +48,35 @@ class EventStreamControllerTest {
.verify()
}
@Test
fun shouldSendOnlyLatestEventsWithinTimeWindow() {
val latestValue = 10
Flux.fromIterable(0..latestValue).subscribe { value ->
this.statisticsEventProducer.emitNext(
Statistics(
"test",
listOf(StatisticsEntry("test", value))
)
) { _, _ -> false }
}
val result = webClient
.get().uri("/events")
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.expectStatus().isOk
.returnResult<Statistics>()
StepVerifier.create(result.responseBody)
.expectNext(
Statistics(
"test",
listOf(StatisticsEntry("test", latestValue))
)
)
.expectTimeout(3.seconds.toJavaDuration())
.verify()
}
}