diff --git a/src/main.rs b/src/main.rs
index 7a96447..974793b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,28 +1,24 @@
-use std::time::Duration;
-
-use axum::{Extension, Json, Router};
use axum::body::Body;
use axum::extract::Path;
-use axum::http::{Request, StatusCode};
use axum::http::header::AUTHORIZATION;
+use axum::http::{Request, StatusCode};
use axum::middleware::{from_fn, Next};
use axum::response::Response;
use axum::routing::{delete, post};
+use axum::{Extension, Json, Router};
use bwhc_dto::MtbFile;
use clap::Parser;
use lazy_static::lazy_static;
-use rdkafka::ClientConfig;
-use rdkafka::message::{Header, OwnedHeaders};
-use rdkafka::producer::{FutureProducer, FutureRecord};
use serde::{Deserialize, Serialize};
#[cfg(debug_assertions)]
use tower_http::trace::TraceLayer;
-use uuid::Uuid;
use crate::cli::Cli;
+use crate::sender::MtbFileSender;
mod auth;
mod cli;
+mod sender;
#[derive(Serialize, Deserialize)]
struct RecordKey {
@@ -43,16 +39,12 @@ async fn main() {
.init();
}
- let producer: FutureProducer = ClientConfig::new()
- .set("bootstrap.servers", CONFIG.bootstrap_server.as_str())
- .set("message.timeout.ms", "5000")
- .create()
- .expect("Producer creation error");
+ let sender = MtbFileSender::new(&CONFIG.topic, &CONFIG.bootstrap_server);
let app = Router::new()
.route("/mtbfile", post(handle_post))
.route("/mtbfile/:patient_id", delete(handle_delete))
- .layer(Extension(producer))
+ .layer(Extension(sender))
.layer(from_fn(check_basic_auth));
#[cfg(debug_assertions)]
@@ -83,63 +75,26 @@ async fn check_basic_auth(request: Request
, next: Next) -> Response {
async fn handle_delete(
Path(patient_id): Path,
- Extension(producer): Extension,
+ Extension(sender): Extension,
) -> Response {
let delete_mtb_file = MtbFile::new_with_consent_rejected(&patient_id);
- match send_mtb_file(producer, &CONFIG.topic, delete_mtb_file).await {
+ match sender.send(delete_mtb_file).await {
Ok(request_id) => success_response(&request_id),
_ => error_response(),
}
}
async fn handle_post(
- Extension(producer): Extension,
+ Extension(sender): Extension,
Json(mtb_file): Json,
) -> Response {
- match send_mtb_file(producer, &CONFIG.topic, mtb_file).await {
+ match sender.send(mtb_file).await {
Ok(request_id) => success_response(&request_id),
_ => error_response(),
}
}
-async fn send_mtb_file(
- producer: FutureProducer,
- topic: &str,
- mtb_file: MtbFile,
-) -> Result {
- let request_id = Uuid::new_v4();
-
- let record_key = RecordKey {
- patient_id: mtb_file.patient.id.to_string(),
- };
-
- let record_headers = OwnedHeaders::default().insert(Header {
- key: "requestId",
- value: Some(&request_id.to_string()),
- });
-
- let record_key = serde_json::to_string(&record_key).map_err(|_| ())?;
-
- match serde_json::to_string(&mtb_file) {
- Ok(json) => {
- producer
- .send(
- FutureRecord::to(topic)
- .key(&record_key)
- .headers(record_headers)
- .payload(&json),
- Duration::from_secs(1),
- )
- .await
- .map_err(|_| ())
- .map(|_| ())?;
- Ok(request_id.to_string())
- }
- Err(_) => Err(()),
- }
-}
-
fn success_response(request_id: &str) -> Response {
Response::builder()
.status(StatusCode::ACCEPTED)
@@ -175,5 +130,4 @@ mod tests {
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(response.headers().contains_key("x-request-id"), false);
}
-
-}
\ No newline at end of file
+}
diff --git a/src/sender.rs b/src/sender.rs
new file mode 100644
index 0000000..10ec86a
--- /dev/null
+++ b/src/sender.rs
@@ -0,0 +1,63 @@
+use std::time::Duration;
+
+use bwhc_dto::MtbFile;
+use rdkafka::message::{Header, OwnedHeaders};
+use rdkafka::producer::{FutureProducer, FutureRecord};
+use rdkafka::ClientConfig;
+use uuid::Uuid;
+
+use crate::RecordKey;
+
+#[derive(Clone)]
+pub struct MtbFileSender {
+ topic: String,
+ producer: FutureProducer,
+}
+
+impl MtbFileSender {
+ pub fn new(topic: &str, bootstrap_server: &str) -> Self {
+ let producer: FutureProducer = ClientConfig::new()
+ .set("bootstrap.servers", bootstrap_server)
+ .set("message.timeout.ms", "5000")
+ .create()
+ .expect("Producer creation error");
+
+ Self {
+ topic: topic.to_string(),
+ producer,
+ }
+ }
+
+ pub async fn send(&self, mtb_file: MtbFile) -> Result {
+ let request_id = Uuid::new_v4();
+
+ let record_key = RecordKey {
+ patient_id: mtb_file.patient.id.to_string(),
+ };
+
+ let record_headers = OwnedHeaders::default().insert(Header {
+ key: "requestId",
+ value: Some(&request_id.to_string()),
+ });
+
+ let record_key = serde_json::to_string(&record_key).map_err(|_| ())?;
+
+ match serde_json::to_string(&mtb_file) {
+ Ok(json) => {
+ self.producer
+ .send(
+ FutureRecord::to(&self.topic)
+ .key(&record_key)
+ .headers(record_headers)
+ .payload(&json),
+ Duration::from_secs(1),
+ )
+ .await
+ .map_err(|_| ())
+ .map(|_| ())?;
+ Ok(request_id.to_string())
+ }
+ Err(_) => Err(()),
+ }
+ }
+}