mirror of
https://github.com/CCC-MF/bwhc-kafka-rest-proxy.git
synced 2025-04-19 19:16:51 +00:00
parent
d5bc8779a0
commit
aafd7f314d
@ -8,6 +8,13 @@ description = "bwHC MTB-File REST Proxy für Kafka"
|
|||||||
|
|
||||||
# Dependencies
|
# Dependencies
|
||||||
|
|
||||||
|
[dependencies.clap]
|
||||||
|
version = "4.5"
|
||||||
|
features = ["derive", "env"]
|
||||||
|
|
||||||
|
[dependencies.lazy_static]
|
||||||
|
version = "1.4"
|
||||||
|
|
||||||
[dependencies.log]
|
[dependencies.log]
|
||||||
version = "0.4"
|
version = "0.4"
|
||||||
|
|
||||||
|
20
README.md
20
README.md
@ -13,7 +13,21 @@ Verwendung im Zusammenspiel mit https://github.com/CCC-MF/etl-processor
|
|||||||
|
|
||||||
## Konfiguration
|
## Konfiguration
|
||||||
|
|
||||||
Die Anwendung lässt sich mit Umgebungsvariablen konfigurieren.
|
Beim Start der Anwendung können Parameter angegeben werden.
|
||||||
|
|
||||||
|
```
|
||||||
|
Usage: bwhc-kafka-rest-proxy [OPTIONS] --token <TOKEN>
|
||||||
|
|
||||||
|
Options:
|
||||||
|
--bootstrap-server <BOOTSTRAP_SERVER>
|
||||||
|
Kafka Bootstrap-Server(s) [env: KAFKA_BOOTSTRAP_SERVERS=] [default: kafka:9094]
|
||||||
|
--topic <TOPIC>
|
||||||
|
Kafka Topic [env: APP_KAFKA_TOPIC=] [default: etl-processor_input]
|
||||||
|
--token <TOKEN>
|
||||||
|
bcrypt hashed Security Token [env: APP_SECURITY_TOKEN=]
|
||||||
|
```
|
||||||
|
|
||||||
|
Die Anwendung lässt sich auch mit Umgebungsvariablen konfigurieren.
|
||||||
|
|
||||||
* `APP_KAFKA_SERVERS`: Zu verwendende Kafka-Bootstrap-Server als kommagetrennte Liste
|
* `APP_KAFKA_SERVERS`: Zu verwendende Kafka-Bootstrap-Server als kommagetrennte Liste
|
||||||
* `APP_KAFKA_TOPIC`: Zu verwendendes Topic zum Warten auf neue Anfragen. Standardwert: `etl-processor_input`
|
* `APP_KAFKA_TOPIC`: Zu verwendendes Topic zum Warten auf neue Anfragen. Standardwert: `etl-processor_input`
|
||||||
@ -81,10 +95,10 @@ Resultierender Kafka-Record:
|
|||||||
|
|
||||||
#### Löschen von Patienten
|
#### Löschen von Patienten
|
||||||
|
|
||||||
Anfrage auch hier mit **curl**:
|
Anfrage auch hier mit *curl*:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
curl -v -u token:very-secret \
|
curl -u token:very-secret \
|
||||||
-H "Content-Type: application/json" \
|
-H "Content-Type: application/json" \
|
||||||
-X DELETE \
|
-X DELETE \
|
||||||
http://localhost:3000/mtbfile/P1
|
http://localhost:3000/mtbfile/P1
|
||||||
|
30
src/cli.rs
Normal file
30
src/cli.rs
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
use clap::Parser;
|
||||||
|
|
||||||
|
#[derive(Parser)]
|
||||||
|
#[command(author, version, about)]
|
||||||
|
#[command(arg_required_else_help(true))]
|
||||||
|
pub struct Cli {
|
||||||
|
#[arg(
|
||||||
|
long,
|
||||||
|
alias = "kafka-servers",
|
||||||
|
env = "APP_KAFKA_SERVERS",
|
||||||
|
default_value = "kafka:9094",
|
||||||
|
help = "Kafka Bootstrap Server"
|
||||||
|
)]
|
||||||
|
pub bootstrap_server: String,
|
||||||
|
#[arg(
|
||||||
|
long,
|
||||||
|
alias = "kafka-topic",
|
||||||
|
env = "APP_KAFKA_TOPIC",
|
||||||
|
default_value = "etl-processor_input",
|
||||||
|
help = "Kafka Topic"
|
||||||
|
)]
|
||||||
|
pub topic: String,
|
||||||
|
#[arg(
|
||||||
|
long,
|
||||||
|
alias = "security-token",
|
||||||
|
env = "APP_SECURITY_TOKEN",
|
||||||
|
help = "bcrypt hashed Security Token"
|
||||||
|
)]
|
||||||
|
pub token: String,
|
||||||
|
}
|
38
src/main.rs
38
src/main.rs
@ -1,24 +1,28 @@
|
|||||||
use std::env;
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use axum::{Extension, Json, Router};
|
||||||
use axum::body::Body;
|
use axum::body::Body;
|
||||||
use axum::extract::Path;
|
use axum::extract::Path;
|
||||||
use axum::http::header::AUTHORIZATION;
|
|
||||||
use axum::http::{Request, StatusCode};
|
use axum::http::{Request, StatusCode};
|
||||||
|
use axum::http::header::AUTHORIZATION;
|
||||||
use axum::middleware::{from_fn, Next};
|
use axum::middleware::{from_fn, Next};
|
||||||
use axum::response::Response;
|
use axum::response::Response;
|
||||||
use axum::routing::{delete, post};
|
use axum::routing::{delete, post};
|
||||||
use axum::{Extension, Json, Router};
|
|
||||||
use bwhc_dto::MtbFile;
|
use bwhc_dto::MtbFile;
|
||||||
|
use clap::Parser;
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
use rdkafka::ClientConfig;
|
||||||
use rdkafka::message::{Header, OwnedHeaders};
|
use rdkafka::message::{Header, OwnedHeaders};
|
||||||
use rdkafka::producer::{FutureProducer, FutureRecord};
|
use rdkafka::producer::{FutureProducer, FutureRecord};
|
||||||
use rdkafka::ClientConfig;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
use tower_http::trace::TraceLayer;
|
use tower_http::trace::TraceLayer;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::cli::Cli;
|
||||||
|
|
||||||
mod auth;
|
mod auth;
|
||||||
|
mod cli;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct RecordKey {
|
struct RecordKey {
|
||||||
@ -26,10 +30,12 @@ struct RecordKey {
|
|||||||
patient_id: String,
|
patient_id: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static ref CONFIG: Cli = Cli::parse();
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
let _ = bcrypt_hashed_token();
|
|
||||||
|
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
{
|
{
|
||||||
tracing_subscriber::fmt()
|
tracing_subscriber::fmt()
|
||||||
@ -37,10 +43,8 @@ async fn main() {
|
|||||||
.init();
|
.init();
|
||||||
}
|
}
|
||||||
|
|
||||||
let boostrap_servers = env::var("KAFKA_BOOTSTRAP_SERVERS").unwrap_or("kafka:9094".into());
|
|
||||||
|
|
||||||
let producer: FutureProducer = ClientConfig::new()
|
let producer: FutureProducer = ClientConfig::new()
|
||||||
.set("bootstrap.servers", boostrap_servers.as_str())
|
.set("bootstrap.servers", CONFIG.bootstrap_server.as_str())
|
||||||
.set("message.timeout.ms", "5000")
|
.set("message.timeout.ms", "5000")
|
||||||
.create()
|
.create()
|
||||||
.expect("Producer creation error");
|
.expect("Producer creation error");
|
||||||
@ -60,7 +64,7 @@ async fn main() {
|
|||||||
|
|
||||||
async fn check_basic_auth(request: Request<Body>, next: Next) -> Response {
|
async fn check_basic_auth(request: Request<Body>, next: Next) -> Response {
|
||||||
if let Some(Ok(auth_header)) = request.headers().get(AUTHORIZATION).map(|x| x.to_str()) {
|
if let Some(Ok(auth_header)) = request.headers().get(AUTHORIZATION).map(|x| x.to_str()) {
|
||||||
if auth::check_basic_auth(auth_header, &bcrypt_hashed_token()) {
|
if auth::check_basic_auth(auth_header, &CONFIG.token) {
|
||||||
return next.run(request).await;
|
return next.run(request).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -76,7 +80,7 @@ async fn handle_delete(
|
|||||||
) -> Response {
|
) -> Response {
|
||||||
let delete_mtb_file = MtbFile::new_with_consent_rejected(&patient_id);
|
let delete_mtb_file = MtbFile::new_with_consent_rejected(&patient_id);
|
||||||
|
|
||||||
match send_mtb_file(producer, &dst_topic(), delete_mtb_file).await {
|
match send_mtb_file(producer, &CONFIG.topic, delete_mtb_file).await {
|
||||||
Ok(request_id) => success_response(&request_id),
|
Ok(request_id) => success_response(&request_id),
|
||||||
_ => error_response(),
|
_ => error_response(),
|
||||||
}
|
}
|
||||||
@ -86,7 +90,7 @@ async fn handle_post(
|
|||||||
Extension(producer): Extension<FutureProducer>,
|
Extension(producer): Extension<FutureProducer>,
|
||||||
Json(mtb_file): Json<MtbFile>,
|
Json(mtb_file): Json<MtbFile>,
|
||||||
) -> Response {
|
) -> Response {
|
||||||
match send_mtb_file(producer, &dst_topic(), mtb_file).await {
|
match send_mtb_file(producer, &CONFIG.topic, mtb_file).await {
|
||||||
Ok(request_id) => success_response(&request_id),
|
Ok(request_id) => success_response(&request_id),
|
||||||
_ => error_response(),
|
_ => error_response(),
|
||||||
}
|
}
|
||||||
@ -143,13 +147,3 @@ fn error_response() -> Response {
|
|||||||
.body(Body::empty())
|
.body(Body::empty())
|
||||||
.expect("response built")
|
.expect("response built")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dst_topic() -> String {
|
|
||||||
env::var("APP_KAFKA_TOPIC").unwrap_or("etl-processor_input".into())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn bcrypt_hashed_token() -> String {
|
|
||||||
env::var("APP_SECURITY_TOKEN").unwrap_or_else(|_| {
|
|
||||||
panic!("Missing configuration 'APP_SECURITY_TOKEN'. Provide bcrypt hashed token value.")
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user