From 9057789b3842d23090f884588731fca7c9369beb Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Sat, 12 Jul 2025 11:18:09 +0200 Subject: [PATCH] feat: use `application/vnd.dnpm.v2.mtb+json` in record headers --- src/main.rs | 13 +++++++++---- src/sender.rs | 26 ++++++++++++-------------- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/main.rs b/src/main.rs index d64330e..41d9536 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,8 @@ use axum::http::{HeaderValue, Request, StatusCode}; use axum::middleware::{from_fn, Next}; use axum::response::{IntoResponse, Response}; use clap::Parser; +use rdkafka::producer::FutureProducer; +use rdkafka::ClientConfig; use serde::{Deserialize, Serialize}; use std::sync::{Arc, LazyLock}; #[cfg(debug_assertions)] @@ -63,10 +65,13 @@ async fn main() -> Result<(), ()> { .init(); } - let sender = Arc::new(DefaultMtbFileSender::new( - &CONFIG.topic, - &CONFIG.bootstrap_server, - )?); + let producer = ClientConfig::new() + .set("bootstrap.servers", &CONFIG.bootstrap_server) + .set("message.timeout.ms", "5000") + .create::() + .map_err(|_| ())?; + + let sender = Arc::new(DefaultMtbFileSender::new(&CONFIG.topic, producer)); let routes = routes(sender).layer(from_fn(check_basic_auth)); diff --git a/src/sender.rs b/src/sender.rs index b6f4484..1e7cb26 100644 --- a/src/sender.rs +++ b/src/sender.rs @@ -2,7 +2,6 @@ use async_trait::async_trait; use mv64e_mtb_dto::Mtb; use rdkafka::message::{Header, OwnedHeaders}; use rdkafka::producer::{FutureProducer, FutureRecord}; -use rdkafka::ClientConfig; use std::sync::Arc; use std::time::Duration; use uuid::Uuid; @@ -28,17 +27,11 @@ pub struct DefaultMtbFileSender { } impl DefaultMtbFileSender { - pub fn new(topic: &str, bootstrap_server: &str) -> Result { - let producer = ClientConfig::new() - .set("bootstrap.servers", bootstrap_server) - .set("message.timeout.ms", "5000") - .create::() - .map_err(|_| ())?; - - Ok(Self { + pub fn new(topic: &str, producer: FutureProducer) -> Self { + Self { topic: topic.to_string(), producer, - }) + } } } @@ -51,10 +44,15 @@ impl MtbFileSender for DefaultMtbFileSender { patient_id: mtb.patient.id.to_string(), }; - let record_headers = OwnedHeaders::default().insert(Header { - key: "requestId", - value: Some(&request_id.to_string()), - }); + let record_headers = OwnedHeaders::default() + .insert(Header { + key: "requestId", + value: Some(&request_id.to_string()), + }) + .insert(Header { + key: "contentType", + value: Some("application/vnd.dnpm.v2.mtb+json"), + }); let record_key = serde_json::to_string(&record_key).map_err(|_| ())?;