1
0
mirror of https://github.com/pcvolkmer/mv64e-kafka-to-rest-gateway synced 2025-09-13 09:12:52 +00:00

feat: enable ssl connection to kafka

This commit is contained in:
2025-08-28 13:16:08 +02:00
parent a5e4c908b6
commit 4f2eb56b07

View File

@@ -8,7 +8,7 @@ use rdkafka::message::Headers;
use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::{ClientConfig, Message}; use rdkafka::{ClientConfig, Message};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::{json, Value}; use serde_json::{Value, json};
use std::error::Error; use std::error::Error;
use std::string::ToString; use std::string::ToString;
use std::sync::LazyLock; use std::sync::LazyLock;
@@ -64,8 +64,35 @@ async fn main() -> Result<(), Box<dyn Error>> {
.init(); .init();
} }
let consumer: StreamConsumer = ClientConfig::new() let mut client_config = ClientConfig::new();
.set("bootstrap.servers", &CONFIG.bootstrap_servers) client_config.set("bootstrap.servers", &CONFIG.bootstrap_servers.to_string());
let client_config = if CONFIG.ssl_cert_file.is_some() || CONFIG.ssl_key_file.is_some() {
client_config
.set("security.protocol", "ssl")
.set(
"ssl.ca.location",
CONFIG.ssl_ca_file.clone().unwrap_or_default(),
)
.set(
"ssl.certificate.location",
CONFIG.ssl_cert_file.clone().unwrap_or_default(),
)
.set(
"ssl.key.location",
CONFIG.ssl_key_file.clone().unwrap_or_default(),
);
if let Some(ssl_key_password) = &CONFIG.ssl_key_password {
client_config.set("ssl.key.password", ssl_key_password);
}
client_config
} else {
client_config
};
let mut consumer_client_config = client_config.clone();
let consumer: StreamConsumer = consumer_client_config
.set("group.id", &CONFIG.group_id) .set("group.id", &CONFIG.group_id)
.set("enable.partition.eof", "false") .set("enable.partition.eof", "false")
.set("auto.offset.reset", "earliest") .set("auto.offset.reset", "earliest")
@@ -80,7 +107,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
info!("Kafka topic '{}' subscribed", CONFIG.topic); info!("Kafka topic '{}' subscribed", CONFIG.topic);
let producer: &FutureProducer = &ClientConfig::new() let mut producer_client_config = client_config.clone();
let producer: &FutureProducer = &producer_client_config
.set("bootstrap.servers", &CONFIG.bootstrap_servers.to_string()) .set("bootstrap.servers", &CONFIG.bootstrap_servers.to_string())
.set("message.timeout.ms", "5000") .set("message.timeout.ms", "5000")
.create() .create()
@@ -120,7 +149,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
Err(err) => error!("{}", err), Err(err) => error!("{}", err),
Ok(response) => { Ok(response) => {
let response_payload = ResponsePayload { let response_payload = ResponsePayload {
request_id, request_id: request_id.to_string(),
status_code: response.status_code, status_code: response.status_code,
status_body: serde_json::from_str::<Value>( status_body: serde_json::from_str::<Value>(
&response.status_body, &response.status_body,