1
0
mirror of https://github.com/pcvolkmer/mv64e-kafka-to-rest-gateway synced 2025-09-13 09:12:52 +00:00

refactor: replace nested if..let with let..else

This commit is contained in:
2025-09-07 11:39:09 +02:00
parent d39c2dbb1a
commit ece5654c14

View File

@@ -136,63 +136,65 @@ async fn main() -> Result<(), Box<dyn Error>> {
.create()?; .create()?;
while let Ok(msg) = consumer.recv().await { while let Ok(msg) = consumer.recv().await {
if let Some(Ok(payload)) = msg.payload_view::<str>() { let Some(Ok(payload)) = msg.payload_view::<str>() else {
if let Some(Ok(key)) = msg.key_view::<str>() { error!("Error getting payload");
let Ok(payload) = serde_json::from_str::<Mtb>(payload) else { continue;
error!("Error deserializing payload"); };
continue;
let Ok(payload) = serde_json::from_str::<Mtb>(payload) else {
error!("Error deserializing payload");
continue;
};
let Some(Ok(key)) = msg.key_view::<str>() else {
error!("Error getting key");
continue;
};
let request_id = extract_request_id(&msg).unwrap_or_default();
match handle_record(payload).await {
Err(err) => error!("{}", err),
Ok(response) => {
let response_payload = ResponsePayload {
request_id: request_id.to_string(),
status_code: response.status_code,
status_body: serde_json::from_str::<Value>(&response.status_body)
.unwrap_or(json!({})),
}; };
let response_payload = serde_json::to_string(&response_payload)?;
let request_id = extract_request_id(&msg).unwrap_or_default(); let response_record = FutureRecord::to(&CONFIG.response_topic)
.key(key)
.payload(&response_payload);
match handle_record(payload).await { match if let Some(headers) = msg.headers() {
Err(err) => error!("{}", err), producer
Ok(response) => { .send(
let response_payload = ResponsePayload { response_record.headers(headers.detach()),
request_id: request_id.to_string(), Duration::from_secs(1),
status_code: response.status_code, )
status_body: serde_json::from_str::<Value>(&response.status_body) .await
.unwrap_or(json!({})), } else {
}; producer.send(response_record, Duration::from_secs(1)).await
let response_payload = serde_json::to_string(&response_payload)?; } {
Ok(_) => {
let response_record = FutureRecord::to(&CONFIG.response_topic) info!("Response for '{request_id}' sent successfully");
.key(key) }
.payload(&response_payload); Err((err, _)) => {
error!("Could not send response for '{request_id}': {err}");
match if let Some(headers) = msg.headers() {
producer
.send(
response_record.headers(headers.detach()),
Duration::from_secs(1),
)
.await
} else {
producer.send(response_record, Duration::from_secs(1)).await
} {
Ok(_) => {
info!("Response for '{request_id}' sent successfully");
}
Err((err, _)) => {
error!("Could not send response for '{request_id}': {err}");
}
}
if response.has_valid_response_code() {
consumer.commit_message(&msg, CommitMode::Async)?;
} else {
warn!(
"Unexpected Status Code for Request '{}': HTTP {}",
&request_id, response.status_code
);
}
} }
} }
} else {
error!("Error getting key"); if response.has_valid_response_code() {
consumer.commit_message(&msg, CommitMode::Async)?;
} else {
warn!(
"Unexpected Status Code for Request '{}': HTTP {}",
&request_id, response.status_code
);
}
} }
} else {
error!("Error getting payload");
} }
} }