use crate::cli::Cli; use crate::http_client::{HttpClient, HttpClientError, HttpResponse}; use clap::Parser; use mv64e_mtb_dto::Mtb; use rdkafka::config::RDKafkaLogLevel; use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer}; use rdkafka::message::Headers; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::{ClientConfig, Message}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; use std::error::Error; use std::string::ToString; use std::sync::LazyLock; use std::time::Duration; use tracing::{error, info}; mod cli; mod http_client; #[derive(Serialize, Deserialize)] struct ResponsePayload { request_id: String, status_code: u16, status_body: Value, } #[cfg(not(test))] static CONFIG: LazyLock = LazyLock::new(Cli::parse); async fn handle_record(payload: Mtb) -> Result { let client = HttpClient::new( &CONFIG.dnpm_dip_uri, CONFIG.dnpm_dip_username.clone(), CONFIG.dnpm_dip_password.clone(), ); if let Some(metadata) = &payload.metadata { if !metadata.model_project_consent.provisions.is_empty() || metadata.research_consents.is_some() { client.send_mtb_request(&payload).await } else { client.send_delete_request(&payload.patient.id).await } } else { client.send_mtb_request(&payload).await } } #[tokio::main] async fn main() -> Result<(), Box> { #[cfg(debug_assertions)] { tracing_subscriber::fmt() .with_max_level(tracing::Level::DEBUG) .init(); } #[cfg(not(debug_assertions))] { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .init(); } let consumer: StreamConsumer = ClientConfig::new() .set("bootstrap.servers", &CONFIG.bootstrap_servers) .set("group.id", &CONFIG.group_id) .set("enable.partition.eof", "false") .set("auto.offset.reset", "earliest") .set("enable.auto.commit", "false") .set_log_level(RDKafkaLogLevel::Debug) .create() .expect("Kafka consumer created"); let topic: &str = &CONFIG.topic.clone(); consumer.subscribe(&[topic])?; info!("Kafka topic '{}' subscribed", CONFIG.topic); let producer: &FutureProducer = &ClientConfig::new() .set("bootstrap.servers", &CONFIG.bootstrap_servers.to_string()) .set("message.timeout.ms", "5000") .create() .expect("Producer creation error"); loop { match consumer.recv().await { Ok(msg) => match msg.payload_view::() { Some(Ok(payload)) => match msg.key_view::() { Some(Ok(key)) => { let payload = if let Ok(payload) = serde_json::from_str::(&payload) { payload } else { error!("Error deserializing payload"); continue; }; let request_id = match msg.headers() { None => None, Some(headers) => { if let Some(x) = headers .iter() .filter(|header| header.key == "requestId") .next() .unwrap() .value { Some(str::from_utf8(x).unwrap().to_string()) } else { None } } } .unwrap_or_default(); match handle_record(payload).await { Err(err) => error!("{}", err), Ok(response) => { let response_payload = ResponsePayload { request_id, status_code: response.status_code, status_body: serde_json::from_str::( &response.status_body, ) .unwrap_or(json!({})), }; let payload = serde_json::to_string(&response_payload)?; let response_record = FutureRecord::to(&CONFIG.response_topic) .key(key) .payload(&payload); let _ = if let Some(headers) = msg.headers() { producer .send( response_record.headers(headers.detach()), Duration::from_secs(1), ) .await } else { producer.send(response_record, Duration::from_secs(1)).await }; consumer .commit_message(&msg, CommitMode::Async) .expect("Cound not commit message: {}"); } } } _ => error!("Error getting key"), }, _ => error!("Error getting message"), }, Err(err) => { error!("Error receiving message: {}", err); } } } } // Test Configuration #[cfg(test)] static CONFIG: LazyLock = LazyLock::new(|| Cli { bootstrap_servers: "localhost:9094".to_string(), topic: "test-topic".to_string(), response_topic: "test-response-topic".to_string(), group_id: "test-group-id".to_string(), dnpm_dip_uri: "http://localhost:8000/api".to_string(), dnpm_dip_username: None, dnpm_dip_password: None, ssl_ca_file: None, ssl_cert_file: None, ssl_key_file: None, ssl_key_password: None, });