1
0
mirror of https://github.com/pcvolkmer/mv64e-etl-processor synced 2025-09-13 09:02:50 +00:00

10 Commits

Author SHA1 Message Date
3d9d84438d refactor: several changes related to code style and readability (#152)
* refactor: extract provision code extraction
* refactor: catch exceptions by type without later type check
* refactor: further code cleanup
* chore: log error with error level, not debug level
2025-09-04 12:47:56 +02:00
10b5bedac3 Merge branch '0.11.x'
# Conflicts:
#	build.gradle.kts
2025-09-03 22:03:52 +02:00
96f22a6744 feat: mark older request with unknown state (#150) 2025-09-03 21:30:36 +02:00
6dfec5c341 fix: add status badge for 'NO_CONSENT' (#149) 2025-09-03 21:18:28 +02:00
c38c0c6197 build: prepare for v0.12 development (#147) 2025-09-02 10:40:30 +02:00
4602032bcf chore: bump version 2025-09-01 13:33:29 +02:00
9cc9f130df chore: add custom banner file (#146) 2025-09-01 13:31:08 +02:00
b92fbae2c5 chore: update dependencies (#145) 2025-09-01 13:25:51 +02:00
5704282a1c docs: some additions to README.md (#143) 2025-08-28 19:37:57 +02:00
ba21d029d1 fix: add missing requestId to KafkaMtbFileSender (#142) 2025-08-27 15:07:43 +02:00
10 changed files with 114 additions and 65 deletions

View File

@@ -268,7 +268,7 @@ zur Nutzung des MTB-File-Endpunkts eine HTTP-Basic-Authentifizierung voraussetze
![Tokenverwaltung](docs/tokens.png) ![Tokenverwaltung](docs/tokens.png)
In diesem Fall kann der Endpunkt für das Onkostar-Plugin * In diesem Fall kann der Endpunkt für das Onkostar-Plugin *
*[onkostar-plugin-dnpmexport](https://github.com/CCC-MF/onkostar-plugin-dnpmexport)** wie folgt *[mv64e-onkostar-plugin-export](https://github.com/pcvolkmer/mv64e-onkostar-plugin-export)** wie folgt
konfiguriert werden: konfiguriert werden:
``` ```
@@ -427,9 +427,9 @@ Die PEM-Datei mit dem/den Root CA Zertifikat(en) muss dabei im vorbereiteten Ver
#### Integration zur Laufzeit #### Integration zur Laufzeit
Hier muss die Umgebungsvariable `SERVICE_BINDING_ROOT` z.B. auf den Wert `/bindings` gesetzt sein. Hier muss die Umgebungsvariable `SERVICE_BINDING_ROOT` z.B. auf den Wert `/bindings` gesetzt sein.
Zudem muss ein Verzeichnis `bindings/ca-certificates` - analog zum Verzeichnis [ Zudem muss ein Verzeichnis `bindings/ca-certificates` - analog zum Verzeichnis
`bindings/ca-certificates`](bindings/ca-certificates) mit einer PEM-Datei als Docker-Volume [`bindings/ca-certificates`](bindings/ca-certificates) mit einer PEM-Datei und der
eingebunden werden. Datei [`bindings/ca-certificates/type`](bindings/ca-certificates/type) als Docker-Volume eingebunden werden.
Beispiel für Docker-Compose: Beispiel für Docker-Compose:

View File

@@ -5,24 +5,24 @@ import org.springframework.boot.gradle.tasks.bundling.BootBuildImage
plugins { plugins {
war war
id("org.springframework.boot") version "3.5.3" id("org.springframework.boot") version "3.5.5"
id("io.spring.dependency-management") version "1.1.7" id("io.spring.dependency-management") version "1.1.7"
kotlin("jvm") version "1.9.25" kotlin("jvm") version "2.2.10"
kotlin("plugin.spring") version "1.9.25" kotlin("plugin.spring") version "2.2.10"
jacoco jacoco
} }
group = "dev.dnpm" group = "dev.dnpm"
version = "0.11.0-SNAPSHOT" version = "0.12.0-SNAPSHOT"
var versions = mapOf( var versions = mapOf(
"mtb-dto" to "0.1.0-SNAPSHOT", "mtb-dto" to "0.1.0-SNAPSHOT",
"hapi-fhir" to "7.6.1", "hapi-fhir" to "8.4.0",
"mockito-kotlin" to "5.4.0", "mockito-kotlin" to "6.0.0",
"archunit" to "1.3.0", "archunit" to "1.4.1",
// Webjars // Webjars
"webjars-locator" to "0.52", "webjars-locator" to "0.52",
"echarts" to "5.4.3", "echarts" to "6.0.0",
"htmx.org" to "1.9.12" "htmx.org" to "1.9.12"
) )
@@ -111,6 +111,8 @@ dependencies {
integrationTestImplementation("com.tngtech.archunit:archunit:${versions["archunit"]}") integrationTestImplementation("com.tngtech.archunit:archunit:${versions["archunit"]}")
integrationTestImplementation("org.htmlunit:htmlunit") integrationTestImplementation("org.htmlunit:htmlunit")
integrationTestImplementation("org.springframework:spring-webflux") integrationTestImplementation("org.springframework:spring-webflux")
// Fix for CVE-2024-25710
integrationTestImplementation("org.apache.commons:commons-compress:1.26.0")
} }
tasks.withType<KotlinCompile> { tasks.withType<KotlinCompile> {

View File

@@ -282,6 +282,30 @@ class HomeControllerTest {
assertThat(page.querySelectorAll("tbody tr")).isEmpty() assertThat(page.querySelectorAll("tbody tr")).isEmpty()
assertThat(page.querySelectorAll("div.notification.info")).hasSize(1) assertThat(page.querySelectorAll("div.notification.info")).hasSize(1)
} }
@Test
@WithMockUser(username = "admin", roles = ["ADMIN"])
fun testShouldShowNoConsentStatusBadge() {
whenever(requestService.findRequestByPatientId(anyValueClass(), any<Pageable>())).thenReturn(
PageImpl(
listOf(
Request(
1L,
randomRequestId(),
PatientPseudonym("PSEUDO1"),
PatientId("PATIENT1"),
Fingerprint("ashdkasdh"),
RequestType.MTB_FILE,
RequestStatus.NO_CONSENT
)
)
)
)
val page = webClient.getPage<HtmlPage>("http://localhost/patient/PSEUDO1")
assertThat(page.querySelectorAll("tbody tr")).hasSize(1)
assertThat(page.querySelectorAll("tbody tr > td > small").first().textContent).isEqualTo("NO_CONSENT")
}
} }
} }

View File

@@ -23,8 +23,6 @@ import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.parser.IParser; import ca.uhn.fhir.parser.IParser;
import dev.dnpm.etl.processor.config.AppFhirConfig; import dev.dnpm.etl.processor.config.AppFhirConfig;
import dev.dnpm.etl.processor.config.GPasConfigProperties; import dev.dnpm.etl.processor.config.GPasConfigProperties;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hc.core5.net.URIBuilder; import org.apache.hc.core5.net.URIBuilder;
@@ -39,9 +37,11 @@ import org.springframework.http.*;
import org.springframework.retry.support.RetryTemplate; import org.springframework.retry.support.RetryTemplate;
import org.springframework.web.client.HttpClientErrorException.BadRequest; import org.springframework.web.client.HttpClientErrorException.BadRequest;
import org.springframework.web.client.HttpClientErrorException.Unauthorized; import org.springframework.web.client.HttpClientErrorException.Unauthorized;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import java.net.URI;
import java.net.URISyntaxException;
public class GpasPseudonymGenerator implements Generator { public class GpasPseudonymGenerator implements Generator {
private final FhirContext r4Context; private final FhirContext r4Context;
@@ -52,10 +52,10 @@ public class GpasPseudonymGenerator implements Generator {
private final RestTemplate restTemplate; private final RestTemplate restTemplate;
private final @NotNull String genomDeTanDomain; private final @NotNull String genomDeTanDomain;
private final @NotNull String pidPsnDomain; private final @NotNull String pidPsnDomain;
protected final static String createOrGetPsn = "$pseudonymizeAllowCreate"; protected static final String CREATE_OR_GET_PSN = "$pseudonymizeAllowCreate";
protected final static String createMultiDomainPsn = "$pseudonymize-secondary"; protected static final String CREATE_MULTI_DOMAIN_PSN = "$pseudonymize-secondary";
private final static String SINGLE_PSN_PART_NAME = "pseudonym"; private static final String SINGLE_PSN_PART_NAME = "pseudonym";
private final static String MULTI_PSN_PART_NAME = "value"; private static final String MULTI_PSN_PART_NAME = "value";
public GpasPseudonymGenerator(GPasConfigProperties gpasCfg, RetryTemplate retryTemplate, public GpasPseudonymGenerator(GPasConfigProperties gpasCfg, RetryTemplate retryTemplate,
RestTemplate restTemplate, AppFhirConfig appFhirConfig) { RestTemplate restTemplate, AppFhirConfig appFhirConfig) {
@@ -85,7 +85,7 @@ public class GpasPseudonymGenerator implements Generator {
switch (domainType) { switch (domainType) {
case SINGLE_PSN_DOMAIN -> { case SINGLE_PSN_DOMAIN -> {
final var requestBody = createSinglePsnRequestBody(id, pidPsnDomain); final var requestBody = createSinglePsnRequestBody(id, pidPsnDomain);
final var responseEntity = getGpasPseudonym(requestBody, createOrGetPsn); final var responseEntity = getGpasPseudonym(requestBody, CREATE_OR_GET_PSN);
final var gPasPseudonymResult = (Parameters) r4Context.newJsonParser() final var gPasPseudonymResult = (Parameters) r4Context.newJsonParser()
.parseResource(responseEntity.getBody()); .parseResource(responseEntity.getBody());
@@ -93,7 +93,7 @@ public class GpasPseudonymGenerator implements Generator {
} }
case MULTI_PSN_DOMAIN -> { case MULTI_PSN_DOMAIN -> {
final var requestBody = createMultiPsnRequestBody(id, genomDeTanDomain); final var requestBody = createMultiPsnRequestBody(id, genomDeTanDomain);
final var responseEntity = getGpasPseudonym(requestBody, createMultiDomainPsn); final var responseEntity = getGpasPseudonym(requestBody, CREATE_MULTI_DOMAIN_PSN);
final var gPasPseudonymResult = (Parameters) r4Context.newJsonParser() final var gPasPseudonymResult = (Parameters) r4Context.newJsonParser()
.parseResource(responseEntity.getBody()); .parseResource(responseEntity.getBody());
@@ -150,23 +150,22 @@ public class GpasPseudonymGenerator implements Generator {
log.debug("API request succeeded. Response: {}", responseEntity.getStatusCode()); log.debug("API request succeeded. Response: {}", responseEntity.getStatusCode());
return responseEntity; return responseEntity;
} }
} catch (RestClientException rce) { } catch (BadRequest e) {
if (rce instanceof BadRequest) { String msg = "gPas or request configuration is incorrect. Please check both."
String msg = "gPas or request configuration is incorrect. Please check both." + e.getMessage();
+ rce.getMessage(); log.error(msg);
log.debug( throw new PseudonymRequestFailed(msg, e);
msg); } catch (Unauthorized e) {
throw new PseudonymRequestFailed(msg, rce); var msg = "gPas access credentials are invalid check your configuration. msg: '%s"
} .formatted(e.getMessage());
if (rce instanceof Unauthorized) { log.error(msg);
var msg = "gPas access credentials are invalid check your configuration. msg: '%s".formatted( throw new PseudonymRequestFailed(msg, e);
rce.getMessage()); }
log.error(msg); catch (Exception unexpected) {
throw new PseudonymRequestFailed(msg, rce);
}
} catch (Exception unexpected) {
throw new PseudonymRequestFailed( throw new PseudonymRequestFailed(
"API request due unexpected error unsuccessful gPas unsuccessful.", unexpected); "API request due unexpected error unsuccessful gPas unsuccessful.",
unexpected
);
} }
throw new PseudonymRequestFailed( throw new PseudonymRequestFailed(
"API request due unexpected error unsuccessful gPas unsuccessful."); "API request due unexpected error unsuccessful gPas unsuccessful.");

View File

@@ -30,6 +30,7 @@ import org.springframework.data.relational.core.mapping.Table
import org.springframework.data.repository.CrudRepository import org.springframework.data.repository.CrudRepository
import org.springframework.data.repository.PagingAndSortingRepository import org.springframework.data.repository.PagingAndSortingRepository
import java.time.Instant import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.* import java.util.*
@Table("request") @Table("request")
@@ -65,6 +66,12 @@ data class Request(
processedAt: Instant processedAt: Instant
) : ) :
this(null, uuid, patientPseudonym, pid, fingerprint, type, status, processedAt) this(null, uuid, patientPseudonym, pid, fingerprint, type, status, processedAt)
fun isPendingUnknown(): Boolean {
return this.status == RequestStatus.UNKNOWN && this.processedAt.isBefore(
Instant.now().minus(10, ChronoUnit.MINUTES)
)
}
} }
@JvmRecord @JvmRecord
@@ -90,19 +97,23 @@ interface RequestRepository : CrudRepository<Request, Long>, PagingAndSortingRep
@Query("SELECT count(*) AS count, status FROM request WHERE type = 'MTB_FILE' GROUP BY status ORDER BY status, count DESC;") @Query("SELECT count(*) AS count, status FROM request WHERE type = 'MTB_FILE' GROUP BY status ORDER BY status, count DESC;")
fun countStates(): List<CountedState> fun countStates(): List<CountedState>
@Query("SELECT count(*) AS count, status FROM (" + @Query(
"SELECT status, rank() OVER (PARTITION BY patient_pseudonym ORDER BY processed_at DESC) AS rank FROM request " + "SELECT count(*) AS count, status FROM (" +
"WHERE type = 'MTB_FILE' AND status NOT IN ('DUPLICATION') " + "SELECT status, rank() OVER (PARTITION BY patient_pseudonym ORDER BY processed_at DESC) AS rank FROM request " +
") rank WHERE rank = 1 GROUP BY status ORDER BY status, count DESC;") "WHERE type = 'MTB_FILE' AND status NOT IN ('DUPLICATION') " +
") rank WHERE rank = 1 GROUP BY status ORDER BY status, count DESC;"
)
fun findPatientUniqueStates(): List<CountedState> fun findPatientUniqueStates(): List<CountedState>
@Query("SELECT count(*) AS count, status FROM request WHERE type = 'DELETE' GROUP BY status ORDER BY status, count DESC;") @Query("SELECT count(*) AS count, status FROM request WHERE type = 'DELETE' GROUP BY status ORDER BY status, count DESC;")
fun countDeleteStates(): List<CountedState> fun countDeleteStates(): List<CountedState>
@Query("SELECT count(*) AS count, status FROM (" + @Query(
"SELECT status, rank() OVER (PARTITION BY patient_pseudonym ORDER BY processed_at DESC) AS rank FROM request " + "SELECT count(*) AS count, status FROM (" +
"WHERE type = 'DELETE'" + "SELECT status, rank() OVER (PARTITION BY patient_pseudonym ORDER BY processed_at DESC) AS rank FROM request " +
") rank WHERE rank = 1 GROUP BY status ORDER BY status, count DESC;") "WHERE type = 'DELETE'" +
") rank WHERE rank = 1 GROUP BY status ORDER BY status, count DESC;"
)
fun findPatientUniqueDeleteStates(): List<CountedState> fun findPatientUniqueDeleteStates(): List<CountedState>
} }

View File

@@ -27,7 +27,6 @@ import dev.pcvolkmer.mv64e.mtb.Mtb
import dev.pcvolkmer.mv64e.mtb.MvhMetadata import dev.pcvolkmer.mv64e.mtb.MvhMetadata
import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.ProducerRecord
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import org.springframework.http.MediaType
import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.core.KafkaTemplate
import org.springframework.retry.support.RetryTemplate import org.springframework.retry.support.RetryTemplate
@@ -47,8 +46,9 @@ class KafkaMtbFileSender(
ProducerRecord( ProducerRecord(
kafkaProperties.outputTopic, kafkaProperties.outputTopic,
key(request), key(request),
objectMapper.writeValueAsString(request) objectMapper.writeValueAsString(request),
) )
record.headers().add("requestId", request.requestId.value.toByteArray())
when (request) { when (request) {
is DnpmV2MtbFileRequest -> record.headers() is DnpmV2MtbFileRequest -> record.headers()
.add( .add(
@@ -82,7 +82,6 @@ class KafkaMtbFileSender(
ProducerRecord( ProducerRecord(
kafkaProperties.outputTopic, kafkaProperties.outputTopic,
key(request), key(request),
// Always use old BwhcV1FileRequest with Consent REJECT
objectMapper.writeValueAsString( objectMapper.writeValueAsString(
DnpmV2MtbFileRequest( DnpmV2MtbFileRequest(
request.requestId, request.requestId,
@@ -90,7 +89,7 @@ class KafkaMtbFileSender(
) )
) )
) )
record.headers().add("requestId", request.requestId.value.toByteArray())
val result = kafkaTemplate.send(record) val result = kafkaTemplate.send(record)
if (result.get() != null) { if (result.get() != null) {
logger.debug("Sent deletion request via KafkaMtbFileSender") logger.debug("Sent deletion request via KafkaMtbFileSender")

View File

@@ -137,15 +137,7 @@ class ConsentProcessor(
} }
val provisionComponent: ProvisionComponent = provisions.first() val provisionComponent: ProvisionComponent = provisions.first()
val provisionCode = getProvisionCode(provisionComponent)
var provisionCode: String? = null
if (provisionComponent.code != null && provisionComponent.code.isNotEmpty()) {
val codableConcept: CodeableConcept = provisionComponent.code.first()
if (codableConcept.coding != null && codableConcept.coding.isNotEmpty()) {
provisionCode = codableConcept.coding.first().code
}
}
if (provisionCode != null) { if (provisionCode != null) {
try { try {
val modelProjectConsentPurpose = val modelProjectConsentPurpose =
@@ -177,6 +169,17 @@ class ConsentProcessor(
} }
} }
private fun getProvisionCode(provisionComponent: ProvisionComponent): String? {
var provisionCode: String? = null
if (provisionComponent.code != null && provisionComponent.code.isNotEmpty()) {
val codableConcept: CodeableConcept = provisionComponent.code.first()
if (codableConcept.coding != null && codableConcept.coding.isNotEmpty()) {
provisionCode = codableConcept.coding.first().code
}
}
return provisionCode
}
private fun setGenomDeSubmissionType(mtbFile: Mtb) { private fun setGenomDeSubmissionType(mtbFile: Mtb) {
if (appConfigProperties.genomDeTestSubmission) { if (appConfigProperties.genomDeTestSubmission) {
mtbFile.metadata.type = MvhSubmissionType.TEST mtbFile.metadata.type = MvhSubmissionType.TEST
@@ -238,9 +241,9 @@ class ConsentProcessor(
consent.provision.provision.filter { subProvision -> consent.provision.provision.filter { subProvision ->
isRequestDateInRange(requestDate, subProvision.period) isRequestDateInRange(requestDate, subProvision.period)
// search coding entries of current provision for code and system // search coding entries of current provision for code and system
subProvision.code.map { c -> c.coding }.flatten().firstOrNull { code -> subProvision.code.map { c -> c.coding }.flatten().any { code ->
targetCode.equals(code.code) && targetSystem.equals(code.system) targetCode.equals(code.code) && targetSystem.equals(code.system)
} != null }
}.map { subProvision -> }.map { subProvision ->
subProvision subProvision
} }
@@ -257,11 +260,11 @@ class ConsentProcessor(
researchAllowedPolicySystem: String?, researchAllowedPolicySystem: String?,
policyRules: Collection<Coding> policyRules: Collection<Coding>
): Boolean { ): Boolean {
return policyRules.find { code -> return policyRules.any { code ->
researchAllowedPolicySystem.equals(code.getSystem()) && (researchAllowedPolicyOid.equals( researchAllowedPolicySystem.equals(code.getSystem()) && (researchAllowedPolicyOid.equals(
code.getCode() code.getCode()
)) ))
} != null }
} }
fun isRequestDateInRange(requestDate: Date?, provPeriod: Period): Boolean { fun isRequestDateInRange(requestDate: Date?, provPeriod: Period): Boolean {

View File

@@ -0,0 +1,7 @@
__ _ _ _ _
_ __ _____ __/ /_ | || | ___ ___| |_| | _ __ _ __ ___ ___ ___ ___ ___ ___ _ __
| '_ ` _ \ \ / / '_ \| || |_ / _ \_____ / _ \ __| |_____| '_ \| '__/ _ \ / __/ _ \/ __/ __|/ _ \| '__|
| | | | | \ V /| (_) |__ _| __/_____| __/ |_| |_____| |_) | | | (_) | (_| __/\__ \__ \ (_) | |
|_| |_| |_|\_/ \___/ |_| \___| \___|\__|_| | .__/|_| \___/ \___\___||___/___/\___/|_|
|_|
:: mv64e-etl-processor :: ${application.formatted-version}

View File

@@ -52,8 +52,10 @@
<td th:if="${request.status.value.contains('success')}" class="bg-green"><small>[[ ${request.status} ]]</small></td> <td th:if="${request.status.value.contains('success')}" class="bg-green"><small>[[ ${request.status} ]]</small></td>
<td th:if="${request.status.value.contains('warning')}" class="bg-yellow"><small>[[ ${request.status} ]]</small></td> <td th:if="${request.status.value.contains('warning')}" class="bg-yellow"><small>[[ ${request.status} ]]</small></td>
<td th:if="${request.status.value.contains('error')}" class="bg-red"><small>[[ ${request.status} ]]</small></td> <td th:if="${request.status.value.contains('error')}" class="bg-red"><small>[[ ${request.status} ]]</small></td>
<td th:if="${request.status.value == 'unknown'}" class="bg-gray"><small>[[ ${request.status} ]]</small></td> <td th:if="${request.status.value == 'unknown' and not request.isPendingUnknown()}" class="bg-gray"><small>[[ ${request.status} ]]</small></td>
<td th:if="${request.status.value == 'unknown' and request.isPendingUnknown()}" class="bg-yellow"><small>⏰ [[ ${request.status} ]] ⏰</small></td>
<td th:if="${request.status.value == 'duplication'}" class="bg-gray"><small>[[ ${request.status} ]]</small></td> <td th:if="${request.status.value == 'duplication'}" class="bg-gray"><small>[[ ${request.status} ]]</small></td>
<td th:if="${request.status.value == 'no-consent'}" class="bg-blue"><small>[[ ${request.status} ]]</small></td>
<td th:style="${request.type.value == 'delete'} ? 'color: red;'"><small>[[ ${request.type} ]]</small></td> <td th:style="${request.type.value == 'delete'} ? 'color: red;'"><small>[[ ${request.type} ]]</small></td>
<td th:if="not ${request.report}">[[ ${request.uuid} ]]</td> <td th:if="not ${request.report}">[[ ${request.uuid} ]]</td>
<td th:if="${request.report}"> <td th:if="${request.report}">
@@ -100,4 +102,4 @@
}); });
</script> </script>
</body> </body>
</html> </html>

View File

@@ -163,6 +163,8 @@ class KafkaMtbFileSenderTest {
assertThat(captor.firstValue.key()).isEqualTo("{\"pid\": \"PID\"}") assertThat(captor.firstValue.key()).isEqualTo("{\"pid\": \"PID\"}")
assertThat(captor.firstValue.headers().headers("contentType")).isNotNull assertThat(captor.firstValue.headers().headers("contentType")).isNotNull
assertThat(captor.firstValue.headers().headers("contentType")?.firstOrNull()?.value()).isEqualTo(CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray()) assertThat(captor.firstValue.headers().headers("contentType")?.firstOrNull()?.value()).isEqualTo(CustomMediaType.APPLICATION_VND_DNPM_V2_MTB_JSON_VALUE.toByteArray())
assertThat(captor.firstValue.headers().headers("requestId")).isNotNull
assertThat(captor.firstValue.headers().headers("requestId")?.firstOrNull()?.value()).isEqualTo(TEST_REQUEST_ID.value.toByteArray())
assertThat(captor.firstValue.value()).isNotNull assertThat(captor.firstValue.value()).isNotNull
assertThat(captor.firstValue.value()).isEqualTo(objectMapper.writeValueAsString(dnmpV2kafkaRecordData(TEST_REQUEST_ID))) assertThat(captor.firstValue.value()).isEqualTo(objectMapper.writeValueAsString(dnmpV2kafkaRecordData(TEST_REQUEST_ID)))
} }