From 1aeab5d4ef22ea8bf1de79ad9598f4a77b66a5b9 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Sat, 6 Sep 2025 12:37:07 +0200 Subject: [PATCH] refactor: replace loop with while let --- src/main.rs | 160 ++++++++++++++++++++++++++-------------------------- 1 file changed, 81 insertions(+), 79 deletions(-) diff --git a/src/main.rs b/src/main.rs index a25ab2d..02accdf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use rdkafka::message::Headers; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::{ClientConfig, Message}; use serde::{Deserialize, Serialize}; -use serde_json::{Value, json}; +use serde_json::{json, Value}; use std::error::Error; use std::string::ToString; use std::sync::LazyLock; @@ -115,90 +115,92 @@ async fn main() -> Result<(), Box> { .create() .expect("Producer creation error"); - loop { - match consumer.recv().await { - Ok(msg) => match msg.payload_view::() { - Some(Ok(payload)) => match msg.key_view::() { - Some(Ok(key)) => { - let payload = if let Ok(payload) = serde_json::from_str::(&payload) { - payload - } else { - error!("Error deserializing payload"); - continue; - }; + while let Ok(msg) = consumer.recv().await { + match msg.payload_view::() { + Some(Ok(payload)) => match msg.key_view::() { + Some(Ok(key)) => { + let payload = if let Ok(payload) = serde_json::from_str::(&payload) { + payload + } else { + error!("Error deserializing payload"); + continue; + }; - let request_id = match msg.headers() { - None => None, - Some(headers) => { - if let Some(x) = headers - .iter() - .filter(|header| header.key == "requestId") - .next() - .unwrap() - .value - { - Some(str::from_utf8(x).unwrap().to_string()) - } else { - None - } - } - } - .unwrap_or_default(); - - match handle_record(payload).await { - Err(err) => error!("{}", err), - Ok(response) => { - let response_payload = ResponsePayload { - request_id: request_id.to_string(), - status_code: response.status_code, - status_body: serde_json::from_str::( - &response.status_body, - ) - .unwrap_or(json!({})), - }; - let response_payload = serde_json::to_string(&response_payload)?; - - let response_record = FutureRecord::to(&CONFIG.response_topic) - .key(key) - .payload(&response_payload); - - match if let Some(headers) = msg.headers() { - producer - .send( - response_record.headers(headers.detach()), - Duration::from_secs(1), - ) - .await - } else { - producer.send(response_record, Duration::from_secs(1)).await - } { - Ok(_) => { - info!("Response for '{request_id}' sent successfully"); - } - Err((err, _)) => { - error!("Could not send response for '{request_id}': {err}"); - } - } - - if response.status_code == 200 || response.status_code == 201 || response.status_code == 400 || response.status_code == 422 { - consumer - .commit_message(&msg, CommitMode::Async) - .expect("Cound not commit message: {}"); - } else { - warn!("Unexpected Status Code for Request '{}': HTTP {}", &request_id, response.status_code); - } + let request_id = match msg.headers() { + None => None, + Some(headers) => { + if let Some(x) = headers + .iter() + .filter(|header| header.key == "requestId") + .next() + .unwrap() + .value + { + Some(str::from_utf8(x).unwrap().to_string()) + } else { + None } } } - _ => error!("Error getting key"), - }, - _ => error!("Error getting message"), + .unwrap_or_default(); + + match handle_record(payload).await { + Err(err) => error!("{}", err), + Ok(response) => { + let response_payload = ResponsePayload { + request_id: request_id.to_string(), + status_code: response.status_code, + status_body: serde_json::from_str::(&response.status_body) + .unwrap_or(json!({})), + }; + let response_payload = serde_json::to_string(&response_payload)?; + + let response_record = FutureRecord::to(&CONFIG.response_topic) + .key(key) + .payload(&response_payload); + + match if let Some(headers) = msg.headers() { + producer + .send( + response_record.headers(headers.detach()), + Duration::from_secs(1), + ) + .await + } else { + producer.send(response_record, Duration::from_secs(1)).await + } { + Ok(_) => { + info!("Response for '{request_id}' sent successfully"); + } + Err((err, _)) => { + error!("Could not send response for '{request_id}': {err}"); + } + } + + if response.status_code == 200 + || response.status_code == 201 + || response.status_code == 400 + || response.status_code == 422 + { + consumer + .commit_message(&msg, CommitMode::Async) + .expect("Cound not commit message: {}"); + } else { + warn!( + "Unexpected Status Code for Request '{}': HTTP {}", + &request_id, response.status_code + ); + } + } + } + } + _ => error!("Error getting key"), }, - Err(err) => { - error!("Error receiving message: {}", err); - } + _ => error!("Error getting payload"), } - } + }; + + Ok(()) } // Test Configuration