diff --git a/src/main.rs b/src/main.rs index 64cad0e..e4eda3b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use rdkafka::message::Headers; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::{ClientConfig, Message}; use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; +use serde_json::{Value, json}; use std::error::Error; use std::string::ToString; use std::sync::LazyLock; @@ -64,8 +64,35 @@ async fn main() -> Result<(), Box> { .init(); } - let consumer: StreamConsumer = ClientConfig::new() - .set("bootstrap.servers", &CONFIG.bootstrap_servers) + let mut client_config = ClientConfig::new(); + client_config.set("bootstrap.servers", &CONFIG.bootstrap_servers.to_string()); + + let client_config = if CONFIG.ssl_cert_file.is_some() || CONFIG.ssl_key_file.is_some() { + client_config + .set("security.protocol", "ssl") + .set( + "ssl.ca.location", + CONFIG.ssl_ca_file.clone().unwrap_or_default(), + ) + .set( + "ssl.certificate.location", + CONFIG.ssl_cert_file.clone().unwrap_or_default(), + ) + .set( + "ssl.key.location", + CONFIG.ssl_key_file.clone().unwrap_or_default(), + ); + if let Some(ssl_key_password) = &CONFIG.ssl_key_password { + client_config.set("ssl.key.password", ssl_key_password); + } + client_config + } else { + client_config + }; + + let mut consumer_client_config = client_config.clone(); + + let consumer: StreamConsumer = consumer_client_config .set("group.id", &CONFIG.group_id) .set("enable.partition.eof", "false") .set("auto.offset.reset", "earliest") @@ -80,7 +107,9 @@ async fn main() -> Result<(), Box> { info!("Kafka topic '{}' subscribed", CONFIG.topic); - let producer: &FutureProducer = &ClientConfig::new() + let mut producer_client_config = client_config.clone(); + + let producer: &FutureProducer = &producer_client_config .set("bootstrap.servers", &CONFIG.bootstrap_servers.to_string()) .set("message.timeout.ms", "5000") .create() @@ -120,7 +149,7 @@ async fn main() -> Result<(), Box> { Err(err) => error!("{}", err), Ok(response) => { let response_payload = ResponsePayload { - request_id, + request_id: request_id.to_string(), status_code: response.status_code, status_body: serde_json::from_str::( &response.status_body,