1
0
mirror of https://github.com/pcvolkmer/mv64e-rest-to-kafka-gateway synced 2025-09-13 09:12:51 +00:00

Initial commit

This commit is contained in:
2025-07-11 23:15:18 +02:00
commit 523ad89783
13 changed files with 3851 additions and 0 deletions

68
src/auth.rs Normal file
View File

@@ -0,0 +1,68 @@
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
#[allow(clippy::module_name_repetitions)]
pub fn check_basic_auth(auth_header: &str, expected_token: &str) -> bool {
let split = auth_header.split(' ').collect::<Vec<_>>();
if split.len() == 2 && split.first().map(|first| first.to_lowercase()) == Some("basic".into()) {
if let Ok(auth) = BASE64_STANDARD.decode(split.last().unwrap_or(&"")) {
if let Ok(auth) = String::from_utf8(auth) {
let split = auth.split(':').collect::<Vec<_>>();
if split.len() == 2 && split.first() == Some(&"token") {
match split.last() {
None => {}
Some(&token) => {
if let Ok(true) = bcrypt::verify(token, expected_token) {
return true;
}
}
}
}
}
}
}
false
}
#[cfg(test)]
mod tests {
use crate::auth::check_basic_auth;
// plain text value 'very-secret'
const EXPECTED_TOKEN: &str = "$2y$05$LIIFF4Rbi3iRVA4UIqxzPeTJ0NOn/cV2hDnSKFftAMzbEZRa42xSG";
#[test]
fn should_reject_non_basic_header_content() {
assert!(!check_basic_auth("token 123456789", EXPECTED_TOKEN));
}
#[test]
fn should_reject_invalid_basic_auth() {
assert!(!check_basic_auth("Basic 123456789", EXPECTED_TOKEN));
}
#[test]
fn should_reject_basic_auth_without_token_username() {
assert!(!check_basic_auth(
"Basic dXNlcjoxMjM0NTY3ODk=",
EXPECTED_TOKEN
));
}
#[test]
fn should_reject_basic_auth_without_wrong_token() {
assert!(!check_basic_auth(
"Basic dG9rZW46MTIzNDU2Nzg5",
EXPECTED_TOKEN
));
}
#[test]
fn should_accept_basic_auth_without_correct_token() {
assert!(check_basic_auth(
"Basic dG9rZW46dmVyeS1zZWNyZXQ=",
EXPECTED_TOKEN
));
}
}

37
src/cli.rs Normal file
View File

@@ -0,0 +1,37 @@
use clap::Parser;
#[derive(Parser)]
#[command(author, version, about)]
#[command(arg_required_else_help(true))]
pub struct Cli {
#[arg(
long,
alias = "kafka-servers",
env = "APP_KAFKA_SERVERS",
default_value = "kafka:9094",
help = "Kafka Bootstrap Server"
)]
pub bootstrap_server: String,
#[arg(
long,
alias = "kafka-topic",
env = "APP_KAFKA_TOPIC",
default_value = "etl-processor_input",
help = "Kafka Topic"
)]
pub topic: String,
#[arg(
long,
alias = "security-token",
env = "APP_SECURITY_TOKEN",
help = "bcrypt hashed Security Token"
)]
pub token: String,
#[arg(
long,
env = "APP_LISTEN",
default_value = "[::]:3000",
help = "Address and port for HTTP requests"
)]
pub listen: String,
}

132
src/main.rs Normal file
View File

@@ -0,0 +1,132 @@
use axum::body::Body;
use axum::http::header::{AUTHORIZATION, CONTENT_TYPE};
use axum::http::{HeaderValue, Request, StatusCode};
use axum::middleware::{from_fn, Next};
use axum::response::{IntoResponse, Response};
use clap::Parser;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, LazyLock};
#[cfg(debug_assertions)]
use tower_http::trace::TraceLayer;
use crate::cli::Cli;
use crate::routes::routes;
use crate::sender::DefaultMtbFileSender;
use crate::AppResponse::{Accepted, Unauthorized, UnsupportedContentType};
mod auth;
mod cli;
mod routes;
mod sender;
#[derive(Serialize, Deserialize)]
struct RecordKey {
#[serde(rename = "pid")]
patient_id: String,
}
enum AppResponse<'a> {
Accepted(&'a str),
Unauthorized,
InternalServerError,
UnsupportedContentType,
}
#[allow(clippy::expect_used)]
impl IntoResponse for AppResponse<'_> {
fn into_response(self) -> Response {
match self {
UnsupportedContentType => (
StatusCode::UNSUPPORTED_MEDIA_TYPE,
"This application accepts DNPM data model version 2.1 with content type 'application/json'"
).into_response(),
_ => match self {
Accepted(request_id) => Response::builder()
.status(StatusCode::ACCEPTED)
.header("X-Request-Id", request_id),
Unauthorized => Response::builder().status(StatusCode::UNAUTHORIZED),
_ => Response::builder().status(StatusCode::INTERNAL_SERVER_ERROR),
}
.body(Body::empty()).expect("response built"),
}
}
}
static CONFIG: LazyLock<Cli> = LazyLock::new(Cli::parse);
#[tokio::main]
async fn main() -> Result<(), ()> {
#[cfg(debug_assertions)]
{
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.init();
}
let sender = Arc::new(DefaultMtbFileSender::new(
&CONFIG.topic,
&CONFIG.bootstrap_server,
)?);
let routes = routes(sender)
.layer(from_fn(check_content_type_header))
.layer(from_fn(check_basic_auth));
#[cfg(debug_assertions)]
let routes = routes.layer(TraceLayer::new_for_http());
match tokio::net::TcpListener::bind(&CONFIG.listen).await {
Ok(listener) => {
log::info!("Starting application listening on '{}'", CONFIG.listen);
if let Err(err) = axum::serve(listener, routes).await {
log::error!("Error starting application: {err}");
}
}
Err(err) => log::error!("Error listening on '{}': {}", CONFIG.listen, err),
}
Ok(())
}
async fn check_content_type_header(request: Request<Body>, next: Next) -> Response {
match request
.headers()
.get(CONTENT_TYPE)
.map(HeaderValue::as_bytes)
{
Some(b"application/json" | b"application/json; charset=utf-8") => next.run(request).await,
_ => UnsupportedContentType.into_response(),
}
}
async fn check_basic_auth(request: Request<Body>, next: Next) -> Response {
if let Some(Ok(auth_header)) = request.headers().get(AUTHORIZATION).map(|x| x.to_str()) {
if auth::check_basic_auth(auth_header, &CONFIG.token) {
return next.run(request).await;
}
}
Unauthorized.into_response()
}
#[cfg(test)]
mod tests {
use axum::http::StatusCode;
use axum::response::IntoResponse;
use uuid::Uuid;
use crate::AppResponse::{Accepted, InternalServerError};
#[test]
fn should_return_success_response() {
let response = Accepted(&Uuid::new_v4().to_string()).into_response();
assert_eq!(response.status(), StatusCode::ACCEPTED);
assert!(response.headers().contains_key("x-request-id"));
}
#[test]
fn should_return_error_response() {
let response = InternalServerError.into_response();
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
assert!(!response.headers().contains_key("x-request-id"));
}
}

107
src/routes.rs Normal file
View File

@@ -0,0 +1,107 @@
use crate::sender::DynMtbFileSender;
use crate::AppResponse::{Accepted, InternalServerError};
use axum::extract::Path;
use axum::response::{IntoResponse, Response};
use axum::routing::{delete, post};
use axum::{Extension, Json, Router};
use mv64e_mtb_dto::Mtb;
pub async fn handle_delete(
Path(patient_id): Path<String>,
Extension(sender): Extension<DynMtbFileSender>,
) -> Response {
let delete_mtb_file = Mtb::new_with_consent_rejected(&patient_id);
match sender.send(delete_mtb_file).await {
Ok(request_id) => Accepted(&request_id).into_response(),
_ => InternalServerError.into_response(),
}
}
pub async fn handle_post(
Extension(sender): Extension<DynMtbFileSender>,
Json(mtb_file): Json<Mtb>,
) -> Response {
match sender.send(mtb_file).await {
Ok(request_id) => Accepted(&request_id).into_response(),
_ => InternalServerError.into_response(),
}
}
pub fn routes(sender: DynMtbFileSender) -> Router {
Router::new()
.route("/mtb/etl/patient-record", post(handle_post))
.route(
"/mtb/etl/patient-record/{patient_id}",
delete(handle_delete),
)
.layer(Extension(sender))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sender::MockMtbFileSender;
use axum::body::Body;
use axum::http::header::CONTENT_TYPE;
use axum::http::{Method, Request, StatusCode};
use std::sync::Arc;
use tower::ServiceExt;
#[tokio::test]
#[allow(clippy::expect_used)]
async fn should_handle_post_request() {
let mut sender_mock = MockMtbFileSender::new();
sender_mock
.expect_send()
.withf(|mtb| mtb.patient.id.eq("fae56ea7-24a7-4556-82fb-2b5dde71bb4d"))
.return_once(move |_| Ok(String::new()));
let router = routes(Arc::new(sender_mock) as DynMtbFileSender);
let body = Body::from(include_str!("../test-files/mv64e-mtb-fake-patient.json"));
let response = router
.oneshot(
Request::builder()
.method(Method::POST)
.uri("/mtb/etl/patient-record")
.header(CONTENT_TYPE, "application/json")
.body(body)
.expect("request built"),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::ACCEPTED);
}
#[tokio::test]
#[allow(clippy::expect_used)]
async fn should_handle_delete_request() {
let mut sender_mock = MockMtbFileSender::new();
sender_mock
.expect_send()
// Expect patient id is set in Kafka record
.withf(|mtb| mtb.patient.id.eq("fae56ea7-24a7-4556-82fb-2b5dde71bb4d"))
// Expect no Metadata => no consent in kafka record
.withf(|mtb| mtb.metadata.is_none())
.return_once(move |_| Ok(String::new()));
let router = routes(Arc::new(sender_mock) as DynMtbFileSender);
let response = router
.oneshot(
Request::builder()
.method(Method::DELETE)
.uri("/mtb/etl/patient-record/fae56ea7-24a7-4556-82fb-2b5dde71bb4d")
.header(CONTENT_TYPE, "application/json")
.body(Body::empty())
.expect("request built"),
)
.await
.unwrap();
assert_eq!(response.status(), StatusCode::ACCEPTED);
}
}

79
src/sender.rs Normal file
View File

@@ -0,0 +1,79 @@
use async_trait::async_trait;
use mv64e_mtb_dto::Mtb;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::ClientConfig;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
#[cfg(test)]
use mockall::automock;
use crate::RecordKey;
pub type DynMtbFileSender = Arc<dyn MtbFileSender + Send + Sync>;
#[cfg_attr(test, automock)]
#[async_trait]
pub trait MtbFileSender {
async fn send(&self, mtb: Mtb) -> Result<String, ()>;
}
#[allow(clippy::module_name_repetitions)]
#[derive(Clone)]
pub struct DefaultMtbFileSender {
topic: String,
producer: FutureProducer,
}
impl DefaultMtbFileSender {
pub fn new(topic: &str, bootstrap_server: &str) -> Result<Self, ()> {
let producer = ClientConfig::new()
.set("bootstrap.servers", bootstrap_server)
.set("message.timeout.ms", "5000")
.create::<FutureProducer>()
.map_err(|_| ())?;
Ok(Self {
topic: topic.to_string(),
producer,
})
}
}
#[async_trait]
impl MtbFileSender for DefaultMtbFileSender {
async fn send(&self, mtb: Mtb) -> Result<String, ()> {
let request_id = Uuid::new_v4();
let record_key = RecordKey {
patient_id: mtb.patient.id.to_string(),
};
let record_headers = OwnedHeaders::default().insert(Header {
key: "requestId",
value: Some(&request_id.to_string()),
});
let record_key = serde_json::to_string(&record_key).map_err(|_| ())?;
match serde_json::to_string(&mtb) {
Ok(json) => {
self.producer
.send(
FutureRecord::to(&self.topic)
.key(&record_key)
.headers(record_headers)
.payload(&json),
Duration::from_secs(1),
)
.await
.map_err(|_| ())
.map(|_| ())?;
Ok(request_id.to_string())
}
Err(_) => Err(()),
}
}
}