1
0
mirror of https://github.com/pcvolkmer/mv64e-etl-processor synced 2025-09-14 01:12:51 +00:00

feat: add timestamp to connection check display

This commit is contained in:
2024-03-25 17:09:27 +01:00
parent 056a087065
commit 43af1aa103
3 changed files with 51 additions and 41 deletions

View File

@@ -26,22 +26,18 @@ import jakarta.annotation.PostConstruct
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.common.errors.TimeoutException
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.http.HttpEntity
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.http.MediaType
import org.springframework.http.RequestEntity
import org.springframework.http.*
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.web.client.RestTemplate
import org.springframework.web.util.UriComponentsBuilder
import reactor.core.publisher.Sinks
import java.time.Instant
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toJavaDuration
interface ConnectionCheckService {
fun connectionAvailable(): Boolean
fun connectionAvailable(): ConnectionCheckResult
}
@@ -51,9 +47,11 @@ sealed class ConnectionCheckResult {
abstract val available: Boolean
data class KafkaConnectionCheckResult(override val available: Boolean) : ConnectionCheckResult()
data class RestConnectionCheckResult(override val available: Boolean) : ConnectionCheckResult()
data class GPasConnectionCheckResult(override val available: Boolean) : ConnectionCheckResult()
abstract val timestamp: Instant
data class KafkaConnectionCheckResult(override val available: Boolean, override val timestamp: Instant) : ConnectionCheckResult()
data class RestConnectionCheckResult(override val available: Boolean, override val timestamp: Instant) : ConnectionCheckResult()
data class GPasConnectionCheckResult(override val available: Boolean, override val timestamp: Instant) : ConnectionCheckResult()
}
class KafkaConnectionCheckService(
@@ -62,25 +60,26 @@ class KafkaConnectionCheckService(
private val connectionCheckUpdateProducer: Sinks.Many<ConnectionCheckResult>
) : OutputConnectionCheckService {
private var connectionAvailable: Boolean = false
private var result = ConnectionCheckResult.KafkaConnectionCheckResult(false, Instant.now())
@PostConstruct
@Scheduled(cron = "0 * * * * *")
fun check() {
connectionAvailable = try {
null != consumer.listTopics(5.seconds.toJavaDuration())
result = try {
val available = null != consumer.listTopics(5.seconds.toJavaDuration())
ConnectionCheckResult.KafkaConnectionCheckResult(available, Instant.now())
} catch (e: TimeoutException) {
false
ConnectionCheckResult.KafkaConnectionCheckResult(false, Instant.now())
}
connectionCheckUpdateProducer.emitNext(
ConnectionCheckResult.KafkaConnectionCheckResult(connectionAvailable),
result,
Sinks.EmitFailureHandler.FAIL_FAST
)
}
override fun connectionAvailable(): Boolean {
return this.connectionAvailable
override fun connectionAvailable(): ConnectionCheckResult.KafkaConnectionCheckResult {
return this.result
}
}
@@ -92,27 +91,29 @@ class RestConnectionCheckService(
private val connectionCheckUpdateProducer: Sinks.Many<ConnectionCheckResult>
) : OutputConnectionCheckService {
private var connectionAvailable: Boolean = false
private var result: ConnectionCheckResult.RestConnectionCheckResult = ConnectionCheckResult.RestConnectionCheckResult(false, Instant.now())
@PostConstruct
@Scheduled(cron = "0 * * * * *")
fun check() {
connectionAvailable = try {
restTemplate.getForEntity(
result = try {
val available = restTemplate.getForEntity(
restTargetProperties.uri?.replace("/etl/api", "").toString(),
String::class.java
).statusCode == HttpStatus.OK
ConnectionCheckResult.RestConnectionCheckResult(available, Instant.now())
} catch (e: Exception) {
false
ConnectionCheckResult.RestConnectionCheckResult(false, Instant.now())
}
connectionCheckUpdateProducer.emitNext(
ConnectionCheckResult.RestConnectionCheckResult(connectionAvailable),
result,
Sinks.EmitFailureHandler.FAIL_FAST
)
}
override fun connectionAvailable(): Boolean {
return this.connectionAvailable
override fun connectionAvailable(): ConnectionCheckResult.RestConnectionCheckResult {
return this.result
}
}
@@ -123,12 +124,12 @@ class GPasConnectionCheckService(
private val connectionCheckUpdateProducer: Sinks.Many<ConnectionCheckResult>
) : ConnectionCheckService {
private var connectionAvailable: Boolean = false
private var result: ConnectionCheckResult.GPasConnectionCheckResult = ConnectionCheckResult.GPasConnectionCheckResult(false, Instant.now())
@PostConstruct
@Scheduled(cron = "0 * * * * *")
fun check() {
connectionAvailable = try {
result = try {
val uri = UriComponentsBuilder.fromUriString(
gPasConfigProperties.uri?.replace("/\$pseudonymizeAllowCreate", "/\$pseudonymize").toString()
)
@@ -141,22 +142,25 @@ class GPasConnectionCheckService(
if (!gPasConfigProperties.username.isNullOrBlank() && !gPasConfigProperties.password.isNullOrBlank()) {
headers.setBasicAuth(gPasConfigProperties.username, gPasConfigProperties.password)
}
restTemplate.exchange(
val available = restTemplate.exchange(
uri,
HttpMethod.GET,
HttpEntity<Void>(headers),
Void::class.java
).statusCode == HttpStatus.OK
ConnectionCheckResult.GPasConnectionCheckResult(available, Instant.now())
} catch (e: Exception) {
false
ConnectionCheckResult.GPasConnectionCheckResult(false, Instant.now())
}
connectionCheckUpdateProducer.emitNext(
ConnectionCheckResult.GPasConnectionCheckResult(connectionAvailable),
result,
Sinks.EmitFailureHandler.FAIL_FAST
)
}
override fun connectionAvailable(): Boolean {
return this.connectionAvailable
override fun connectionAvailable(): ConnectionCheckResult.GPasConnectionCheckResult {
return this.result
}
}