From c99ee8cdfb26874283ab49f05308122a74e79eec Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Thu, 28 Mar 2024 16:52:29 +0100 Subject: [PATCH] refactor: extract mtb file sender --- src/main.rs | 68 +++++++++------------------------------------------ src/sender.rs | 63 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 57 deletions(-) create mode 100644 src/sender.rs diff --git a/src/main.rs b/src/main.rs index 7a96447..974793b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,28 +1,24 @@ -use std::time::Duration; - -use axum::{Extension, Json, Router}; use axum::body::Body; use axum::extract::Path; -use axum::http::{Request, StatusCode}; use axum::http::header::AUTHORIZATION; +use axum::http::{Request, StatusCode}; use axum::middleware::{from_fn, Next}; use axum::response::Response; use axum::routing::{delete, post}; +use axum::{Extension, Json, Router}; use bwhc_dto::MtbFile; use clap::Parser; use lazy_static::lazy_static; -use rdkafka::ClientConfig; -use rdkafka::message::{Header, OwnedHeaders}; -use rdkafka::producer::{FutureProducer, FutureRecord}; use serde::{Deserialize, Serialize}; #[cfg(debug_assertions)] use tower_http::trace::TraceLayer; -use uuid::Uuid; use crate::cli::Cli; +use crate::sender::MtbFileSender; mod auth; mod cli; +mod sender; #[derive(Serialize, Deserialize)] struct RecordKey { @@ -43,16 +39,12 @@ async fn main() { .init(); } - let producer: FutureProducer = ClientConfig::new() - .set("bootstrap.servers", CONFIG.bootstrap_server.as_str()) - .set("message.timeout.ms", "5000") - .create() - .expect("Producer creation error"); + let sender = MtbFileSender::new(&CONFIG.topic, &CONFIG.bootstrap_server); let app = Router::new() .route("/mtbfile", post(handle_post)) .route("/mtbfile/:patient_id", delete(handle_delete)) - .layer(Extension(producer)) + .layer(Extension(sender)) .layer(from_fn(check_basic_auth)); #[cfg(debug_assertions)] @@ -83,63 +75,26 @@ async fn check_basic_auth(request: Request, next: Next) -> Response { async fn handle_delete( Path(patient_id): Path, - Extension(producer): Extension, + Extension(sender): Extension, ) -> Response { let delete_mtb_file = MtbFile::new_with_consent_rejected(&patient_id); - match send_mtb_file(producer, &CONFIG.topic, delete_mtb_file).await { + match sender.send(delete_mtb_file).await { Ok(request_id) => success_response(&request_id), _ => error_response(), } } async fn handle_post( - Extension(producer): Extension, + Extension(sender): Extension, Json(mtb_file): Json, ) -> Response { - match send_mtb_file(producer, &CONFIG.topic, mtb_file).await { + match sender.send(mtb_file).await { Ok(request_id) => success_response(&request_id), _ => error_response(), } } -async fn send_mtb_file( - producer: FutureProducer, - topic: &str, - mtb_file: MtbFile, -) -> Result { - let request_id = Uuid::new_v4(); - - let record_key = RecordKey { - patient_id: mtb_file.patient.id.to_string(), - }; - - let record_headers = OwnedHeaders::default().insert(Header { - key: "requestId", - value: Some(&request_id.to_string()), - }); - - let record_key = serde_json::to_string(&record_key).map_err(|_| ())?; - - match serde_json::to_string(&mtb_file) { - Ok(json) => { - producer - .send( - FutureRecord::to(topic) - .key(&record_key) - .headers(record_headers) - .payload(&json), - Duration::from_secs(1), - ) - .await - .map_err(|_| ()) - .map(|_| ())?; - Ok(request_id.to_string()) - } - Err(_) => Err(()), - } -} - fn success_response(request_id: &str) -> Response { Response::builder() .status(StatusCode::ACCEPTED) @@ -175,5 +130,4 @@ mod tests { assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(response.headers().contains_key("x-request-id"), false); } - -} \ No newline at end of file +} diff --git a/src/sender.rs b/src/sender.rs new file mode 100644 index 0000000..10ec86a --- /dev/null +++ b/src/sender.rs @@ -0,0 +1,63 @@ +use std::time::Duration; + +use bwhc_dto::MtbFile; +use rdkafka::message::{Header, OwnedHeaders}; +use rdkafka::producer::{FutureProducer, FutureRecord}; +use rdkafka::ClientConfig; +use uuid::Uuid; + +use crate::RecordKey; + +#[derive(Clone)] +pub struct MtbFileSender { + topic: String, + producer: FutureProducer, +} + +impl MtbFileSender { + pub fn new(topic: &str, bootstrap_server: &str) -> Self { + let producer: FutureProducer = ClientConfig::new() + .set("bootstrap.servers", bootstrap_server) + .set("message.timeout.ms", "5000") + .create() + .expect("Producer creation error"); + + Self { + topic: topic.to_string(), + producer, + } + } + + pub async fn send(&self, mtb_file: MtbFile) -> Result { + let request_id = Uuid::new_v4(); + + let record_key = RecordKey { + patient_id: mtb_file.patient.id.to_string(), + }; + + let record_headers = OwnedHeaders::default().insert(Header { + key: "requestId", + value: Some(&request_id.to_string()), + }); + + let record_key = serde_json::to_string(&record_key).map_err(|_| ())?; + + match serde_json::to_string(&mtb_file) { + Ok(json) => { + self.producer + .send( + FutureRecord::to(&self.topic) + .key(&record_key) + .headers(record_headers) + .payload(&json), + Duration::from_secs(1), + ) + .await + .map_err(|_| ()) + .map(|_| ())?; + Ok(request_id.to_string()) + } + Err(_) => Err(()), + } + } +}