1
0
mirror of https://github.com/pcvolkmer/mv64e-kafka-to-rest-gateway synced 2025-09-13 09:12:52 +00:00

refactor: extract basic client config

This commit is contained in:
2025-09-06 12:55:40 +02:00
parent 1aeab5d4ef
commit d27c8276fe

View File

@@ -4,7 +4,7 @@ use clap::Parser;
use mv64e_mtb_dto::Mtb;
use rdkafka::config::RDKafkaLogLevel;
use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
use rdkafka::message::Headers;
use rdkafka::message::{BorrowedMessage, Headers};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::{ClientConfig, Message};
use serde::{Deserialize, Serialize};
@@ -48,26 +48,29 @@ async fn handle_record(payload: Mtb) -> Result<HttpResponse, HttpClientError> {
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
#[cfg(debug_assertions)]
{
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
}
#[cfg(not(debug_assertions))]
{
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
fn extract_request_id(msg: &BorrowedMessage) -> Option<String> {
match msg.headers() {
None => None,
Some(headers) => {
if let Some(value) = headers
.iter().find(|header| header.key == "requestId")?.value
{
match str::from_utf8(value) {
Ok(value) => Some(value.to_string()),
Err(_) => None,
}
} else {
None
}
}
}
}
fn client_config() -> ClientConfig {
let mut client_config = ClientConfig::new();
client_config.set("bootstrap.servers", &CONFIG.bootstrap_servers.to_string());
let client_config = if CONFIG.ssl_cert_file.is_some() || CONFIG.ssl_key_file.is_some() {
if CONFIG.ssl_cert_file.is_some() || CONFIG.ssl_key_file.is_some() {
client_config
.set("security.protocol", "ssl")
.set(
@@ -88,9 +91,26 @@ async fn main() -> Result<(), Box<dyn Error>> {
client_config
} else {
client_config
};
}
}
let mut consumer_client_config = client_config.clone();
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
#[cfg(debug_assertions)]
{
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
}
#[cfg(not(debug_assertions))]
{
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
}
let mut consumer_client_config = client_config();
let consumer: StreamConsumer = consumer_client_config
.set("group.id", &CONFIG.group_id)
@@ -107,7 +127,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
info!("Kafka topic '{}' subscribed", CONFIG.topic);
let mut producer_client_config = client_config.clone();
let mut producer_client_config = client_config();
let producer: &FutureProducer = &producer_client_config
.set("bootstrap.servers", &CONFIG.bootstrap_servers.to_string())
@@ -126,23 +146,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
continue;
};
let request_id = match msg.headers() {
None => None,
Some(headers) => {
if let Some(x) = headers
.iter()
.filter(|header| header.key == "requestId")
.next()
.unwrap()
.value
{
Some(str::from_utf8(x).unwrap().to_string())
} else {
None
}
}
}
.unwrap_or_default();
let request_id = extract_request_id(&msg).unwrap_or_default();
match handle_record(payload).await {
Err(err) => error!("{}", err),