1
0
mirror of https://github.com/pcvolkmer/mv64e-rest-to-kafka-gateway synced 2025-09-13 17:22:52 +00:00

feat: use application/vnd.dnpm.v2.mtb+json in record headers

This commit is contained in:
2025-07-12 11:18:09 +02:00
parent ea72454a79
commit 9057789b38
2 changed files with 21 additions and 18 deletions

View File

@@ -4,6 +4,8 @@ use axum::http::{HeaderValue, Request, StatusCode};
use axum::middleware::{from_fn, Next};
use axum::response::{IntoResponse, Response};
use clap::Parser;
use rdkafka::producer::FutureProducer;
use rdkafka::ClientConfig;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, LazyLock};
#[cfg(debug_assertions)]
@@ -63,10 +65,13 @@ async fn main() -> Result<(), ()> {
.init();
}
let sender = Arc::new(DefaultMtbFileSender::new(
&CONFIG.topic,
&CONFIG.bootstrap_server,
)?);
let producer = ClientConfig::new()
.set("bootstrap.servers", &CONFIG.bootstrap_server)
.set("message.timeout.ms", "5000")
.create::<FutureProducer>()
.map_err(|_| ())?;
let sender = Arc::new(DefaultMtbFileSender::new(&CONFIG.topic, producer));
let routes = routes(sender).layer(from_fn(check_basic_auth));

View File

@@ -2,7 +2,6 @@ use async_trait::async_trait;
use mv64e_mtb_dto::Mtb;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::ClientConfig;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
@@ -28,17 +27,11 @@ pub struct DefaultMtbFileSender {
}
impl DefaultMtbFileSender {
pub fn new(topic: &str, bootstrap_server: &str) -> Result<Self, ()> {
let producer = ClientConfig::new()
.set("bootstrap.servers", bootstrap_server)
.set("message.timeout.ms", "5000")
.create::<FutureProducer>()
.map_err(|_| ())?;
Ok(Self {
pub fn new(topic: &str, producer: FutureProducer) -> Self {
Self {
topic: topic.to_string(),
producer,
})
}
}
}
@@ -51,10 +44,15 @@ impl MtbFileSender for DefaultMtbFileSender {
patient_id: mtb.patient.id.to_string(),
};
let record_headers = OwnedHeaders::default().insert(Header {
key: "requestId",
value: Some(&request_id.to_string()),
});
let record_headers = OwnedHeaders::default()
.insert(Header {
key: "requestId",
value: Some(&request_id.to_string()),
})
.insert(Header {
key: "contentType",
value: Some("application/vnd.dnpm.v2.mtb+json"),
});
let record_key = serde_json::to_string(&record_key).map_err(|_| ())?;