diff --git a/src/main.rs b/src/main.rs index 02accdf..e8bf28e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use clap::Parser; use mv64e_mtb_dto::Mtb; use rdkafka::config::RDKafkaLogLevel; use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer}; -use rdkafka::message::Headers; +use rdkafka::message::{BorrowedMessage, Headers}; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::{ClientConfig, Message}; use serde::{Deserialize, Serialize}; @@ -48,26 +48,29 @@ async fn handle_record(payload: Mtb) -> Result { } } -#[tokio::main] -async fn main() -> Result<(), Box> { - #[cfg(debug_assertions)] - { - tracing_subscriber::fmt() - .with_max_level(tracing::Level::DEBUG) - .init(); - } - - #[cfg(not(debug_assertions))] - { - tracing_subscriber::fmt() - .with_max_level(tracing::Level::INFO) - .init(); +fn extract_request_id(msg: &BorrowedMessage) -> Option { + match msg.headers() { + None => None, + Some(headers) => { + if let Some(value) = headers + .iter().find(|header| header.key == "requestId")?.value + { + match str::from_utf8(value) { + Ok(value) => Some(value.to_string()), + Err(_) => None, + } + } else { + None + } + } } +} +fn client_config() -> ClientConfig { let mut client_config = ClientConfig::new(); client_config.set("bootstrap.servers", &CONFIG.bootstrap_servers.to_string()); - let client_config = if CONFIG.ssl_cert_file.is_some() || CONFIG.ssl_key_file.is_some() { + if CONFIG.ssl_cert_file.is_some() || CONFIG.ssl_key_file.is_some() { client_config .set("security.protocol", "ssl") .set( @@ -88,9 +91,26 @@ async fn main() -> Result<(), Box> { client_config } else { client_config - }; + } +} - let mut consumer_client_config = client_config.clone(); +#[tokio::main] +async fn main() -> Result<(), Box> { + #[cfg(debug_assertions)] + { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .init(); + } + + #[cfg(not(debug_assertions))] + { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .init(); + } + + let mut consumer_client_config = client_config(); let consumer: StreamConsumer = consumer_client_config .set("group.id", &CONFIG.group_id) @@ -107,7 +127,7 @@ async fn main() -> Result<(), Box> { info!("Kafka topic '{}' subscribed", CONFIG.topic); - let mut producer_client_config = client_config.clone(); + let mut producer_client_config = client_config(); let producer: &FutureProducer = &producer_client_config .set("bootstrap.servers", &CONFIG.bootstrap_servers.to_string()) @@ -126,23 +146,7 @@ async fn main() -> Result<(), Box> { continue; }; - let request_id = match msg.headers() { - None => None, - Some(headers) => { - if let Some(x) = headers - .iter() - .filter(|header| header.key == "requestId") - .next() - .unwrap() - .value - { - Some(str::from_utf8(x).unwrap().to_string()) - } else { - None - } - } - } - .unwrap_or_default(); + let request_id = extract_request_id(&msg).unwrap_or_default(); match handle_record(payload).await { Err(err) => error!("{}", err),