1
0
mirror of https://github.com/CCC-MF/bwhc-kafka-rest-proxy.git synced 2025-04-19 19:16:51 +00:00

refactor: extract mtb file sender

This commit is contained in:
Paul-Christian Volkmer 2024-03-28 16:52:29 +01:00
parent 22f7a7c65d
commit c99ee8cdfb
2 changed files with 74 additions and 57 deletions

View File

@ -1,28 +1,24 @@
use std::time::Duration;
use axum::{Extension, Json, Router};
use axum::body::Body; use axum::body::Body;
use axum::extract::Path; use axum::extract::Path;
use axum::http::{Request, StatusCode};
use axum::http::header::AUTHORIZATION; use axum::http::header::AUTHORIZATION;
use axum::http::{Request, StatusCode};
use axum::middleware::{from_fn, Next}; use axum::middleware::{from_fn, Next};
use axum::response::Response; use axum::response::Response;
use axum::routing::{delete, post}; use axum::routing::{delete, post};
use axum::{Extension, Json, Router};
use bwhc_dto::MtbFile; use bwhc_dto::MtbFile;
use clap::Parser; use clap::Parser;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use rdkafka::ClientConfig;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
use tower_http::trace::TraceLayer; use tower_http::trace::TraceLayer;
use uuid::Uuid;
use crate::cli::Cli; use crate::cli::Cli;
use crate::sender::MtbFileSender;
mod auth; mod auth;
mod cli; mod cli;
mod sender;
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct RecordKey { struct RecordKey {
@ -43,16 +39,12 @@ async fn main() {
.init(); .init();
} }
let producer: FutureProducer = ClientConfig::new() let sender = MtbFileSender::new(&CONFIG.topic, &CONFIG.bootstrap_server);
.set("bootstrap.servers", CONFIG.bootstrap_server.as_str())
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
let app = Router::new() let app = Router::new()
.route("/mtbfile", post(handle_post)) .route("/mtbfile", post(handle_post))
.route("/mtbfile/:patient_id", delete(handle_delete)) .route("/mtbfile/:patient_id", delete(handle_delete))
.layer(Extension(producer)) .layer(Extension(sender))
.layer(from_fn(check_basic_auth)); .layer(from_fn(check_basic_auth));
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
@ -83,63 +75,26 @@ async fn check_basic_auth(request: Request<Body>, next: Next) -> Response {
async fn handle_delete( async fn handle_delete(
Path(patient_id): Path<String>, Path(patient_id): Path<String>,
Extension(producer): Extension<FutureProducer>, Extension(sender): Extension<MtbFileSender>,
) -> Response { ) -> Response {
let delete_mtb_file = MtbFile::new_with_consent_rejected(&patient_id); let delete_mtb_file = MtbFile::new_with_consent_rejected(&patient_id);
match send_mtb_file(producer, &CONFIG.topic, delete_mtb_file).await { match sender.send(delete_mtb_file).await {
Ok(request_id) => success_response(&request_id), Ok(request_id) => success_response(&request_id),
_ => error_response(), _ => error_response(),
} }
} }
async fn handle_post( async fn handle_post(
Extension(producer): Extension<FutureProducer>, Extension(sender): Extension<MtbFileSender>,
Json(mtb_file): Json<MtbFile>, Json(mtb_file): Json<MtbFile>,
) -> Response { ) -> Response {
match send_mtb_file(producer, &CONFIG.topic, mtb_file).await { match sender.send(mtb_file).await {
Ok(request_id) => success_response(&request_id), Ok(request_id) => success_response(&request_id),
_ => error_response(), _ => error_response(),
} }
} }
async fn send_mtb_file(
producer: FutureProducer,
topic: &str,
mtb_file: MtbFile,
) -> Result<String, ()> {
let request_id = Uuid::new_v4();
let record_key = RecordKey {
patient_id: mtb_file.patient.id.to_string(),
};
let record_headers = OwnedHeaders::default().insert(Header {
key: "requestId",
value: Some(&request_id.to_string()),
});
let record_key = serde_json::to_string(&record_key).map_err(|_| ())?;
match serde_json::to_string(&mtb_file) {
Ok(json) => {
producer
.send(
FutureRecord::to(topic)
.key(&record_key)
.headers(record_headers)
.payload(&json),
Duration::from_secs(1),
)
.await
.map_err(|_| ())
.map(|_| ())?;
Ok(request_id.to_string())
}
Err(_) => Err(()),
}
}
fn success_response(request_id: &str) -> Response { fn success_response(request_id: &str) -> Response {
Response::builder() Response::builder()
.status(StatusCode::ACCEPTED) .status(StatusCode::ACCEPTED)
@ -175,5 +130,4 @@ mod tests {
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(response.headers().contains_key("x-request-id"), false); assert_eq!(response.headers().contains_key("x-request-id"), false);
} }
} }

63
src/sender.rs Normal file
View File

@ -0,0 +1,63 @@
use std::time::Duration;
use bwhc_dto::MtbFile;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::ClientConfig;
use uuid::Uuid;
use crate::RecordKey;
#[derive(Clone)]
pub struct MtbFileSender {
topic: String,
producer: FutureProducer,
}
impl MtbFileSender {
pub fn new(topic: &str, bootstrap_server: &str) -> Self {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", bootstrap_server)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
Self {
topic: topic.to_string(),
producer,
}
}
pub async fn send(&self, mtb_file: MtbFile) -> Result<String, ()> {
let request_id = Uuid::new_v4();
let record_key = RecordKey {
patient_id: mtb_file.patient.id.to_string(),
};
let record_headers = OwnedHeaders::default().insert(Header {
key: "requestId",
value: Some(&request_id.to_string()),
});
let record_key = serde_json::to_string(&record_key).map_err(|_| ())?;
match serde_json::to_string(&mtb_file) {
Ok(json) => {
self.producer
.send(
FutureRecord::to(&self.topic)
.key(&record_key)
.headers(record_headers)
.payload(&json),
Duration::from_secs(1),
)
.await
.map_err(|_| ())
.map(|_| ())?;
Ok(request_id.to_string())
}
Err(_) => Err(()),
}
}
}