1
0
mirror of https://github.com/pcvolkmer/mv64e-kafka-to-rest-gateway synced 2025-09-13 09:12:52 +00:00

refactor: replace loop with while let

This commit is contained in:
2025-09-06 12:37:07 +02:00
parent 36d4cc82a8
commit 1aeab5d4ef

View File

@@ -8,7 +8,7 @@ use rdkafka::message::Headers;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::{ClientConfig, Message};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use serde_json::{json, Value};
use std::error::Error;
use std::string::ToString;
use std::sync::LazyLock;
@@ -115,90 +115,92 @@ async fn main() -> Result<(), Box<dyn Error>> {
.create()
.expect("Producer creation error");
loop {
match consumer.recv().await {
Ok(msg) => match msg.payload_view::<str>() {
Some(Ok(payload)) => match msg.key_view::<str>() {
Some(Ok(key)) => {
let payload = if let Ok(payload) = serde_json::from_str::<Mtb>(&payload) {
payload
} else {
error!("Error deserializing payload");
continue;
};
while let Ok(msg) = consumer.recv().await {
match msg.payload_view::<str>() {
Some(Ok(payload)) => match msg.key_view::<str>() {
Some(Ok(key)) => {
let payload = if let Ok(payload) = serde_json::from_str::<Mtb>(&payload) {
payload
} else {
error!("Error deserializing payload");
continue;
};
let request_id = match msg.headers() {
None => None,
Some(headers) => {
if let Some(x) = headers
.iter()
.filter(|header| header.key == "requestId")
.next()
.unwrap()
.value
{
Some(str::from_utf8(x).unwrap().to_string())
} else {
None
}
}
}
.unwrap_or_default();
match handle_record(payload).await {
Err(err) => error!("{}", err),
Ok(response) => {
let response_payload = ResponsePayload {
request_id: request_id.to_string(),
status_code: response.status_code,
status_body: serde_json::from_str::<Value>(
&response.status_body,
)
.unwrap_or(json!({})),
};
let response_payload = serde_json::to_string(&response_payload)?;
let response_record = FutureRecord::to(&CONFIG.response_topic)
.key(key)
.payload(&response_payload);
match if let Some(headers) = msg.headers() {
producer
.send(
response_record.headers(headers.detach()),
Duration::from_secs(1),
)
.await
} else {
producer.send(response_record, Duration::from_secs(1)).await
} {
Ok(_) => {
info!("Response for '{request_id}' sent successfully");
}
Err((err, _)) => {
error!("Could not send response for '{request_id}': {err}");
}
}
if response.status_code == 200 || response.status_code == 201 || response.status_code == 400 || response.status_code == 422 {
consumer
.commit_message(&msg, CommitMode::Async)
.expect("Cound not commit message: {}");
} else {
warn!("Unexpected Status Code for Request '{}': HTTP {}", &request_id, response.status_code);
}
let request_id = match msg.headers() {
None => None,
Some(headers) => {
if let Some(x) = headers
.iter()
.filter(|header| header.key == "requestId")
.next()
.unwrap()
.value
{
Some(str::from_utf8(x).unwrap().to_string())
} else {
None
}
}
}
_ => error!("Error getting key"),
},
_ => error!("Error getting message"),
.unwrap_or_default();
match handle_record(payload).await {
Err(err) => error!("{}", err),
Ok(response) => {
let response_payload = ResponsePayload {
request_id: request_id.to_string(),
status_code: response.status_code,
status_body: serde_json::from_str::<Value>(&response.status_body)
.unwrap_or(json!({})),
};
let response_payload = serde_json::to_string(&response_payload)?;
let response_record = FutureRecord::to(&CONFIG.response_topic)
.key(key)
.payload(&response_payload);
match if let Some(headers) = msg.headers() {
producer
.send(
response_record.headers(headers.detach()),
Duration::from_secs(1),
)
.await
} else {
producer.send(response_record, Duration::from_secs(1)).await
} {
Ok(_) => {
info!("Response for '{request_id}' sent successfully");
}
Err((err, _)) => {
error!("Could not send response for '{request_id}': {err}");
}
}
if response.status_code == 200
|| response.status_code == 201
|| response.status_code == 400
|| response.status_code == 422
{
consumer
.commit_message(&msg, CommitMode::Async)
.expect("Cound not commit message: {}");
} else {
warn!(
"Unexpected Status Code for Request '{}': HTTP {}",
&request_id, response.status_code
);
}
}
}
}
_ => error!("Error getting key"),
},
Err(err) => {
error!("Error receiving message: {}", err);
}
_ => error!("Error getting payload"),
}
}
};
Ok(())
}
// Test Configuration