Skip to content

Parallel packet process using Vertx#2234

Open
MuralitharanK wants to merge 3 commits into
mosip:developfrom
iiitb-pilot:dst-develop
Open

Parallel packet process using Vertx#2234
MuralitharanK wants to merge 3 commits into
mosip:developfrom
iiitb-pilot:dst-develop

Conversation

@MuralitharanK

@MuralitharanK MuralitharanK commented Nov 27, 2025

Copy link
Copy Markdown
Contributor

Summary by CodeRabbit

  • Bug Fixes

    • Improved error handling for batch reprocessing with clearer detection and reporting of batch-level failures.
  • Refactor

    • Reworked reprocessing flow to process items in parallel and aggregate results for more efficient throughput.
    • Consolidated batch-level logging and auditing for clearer, single-point audit events and summaries.

@coderabbitai

coderabbitai Bot commented Nov 27, 2025

Copy link
Copy Markdown

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

ReprocessorVerticle was refactored from synchronous per-item processing to asynchronous parallel batch processing using Vert.x executeBlocking and CompositeFuture. Per-item logic moved into a private processDTO(...). Batch-level success/failure now drives a single audit finalization and consolidated error handling.

Changes

Cohort / File(s) Summary
Vert.x Async Batch Processing Refactor
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java
Converted per-item synchronous processing to parallel async execution using executeBlocking + CompositeFuture. Added private processDTO(...) for item handling, aggregated per-batch outcome (isReprocessorSuccessful), consolidated audit events (batch-level RPR_402/RPR_405), collected registration IDs for batch logs, moved error handling to batch scope, and renamed intializeReprocessRestartTriggerMapping()initializeReprocessRestartTriggerMapping(). Imported AuditLogConstant.

Sequence Diagram

sequenceDiagram
    participant V as ReprocessorVerticle
    participant VX as Vert.x executeBlocking
    participant CF as CompositeFuture
    participant DB as Database
    participant Audit as Audit Logger

    rect rgba(200,220,255,0.5)
    Note over V: OLD: Synchronous Per-Item
    V->>DB: For each item: fetch/process/update
    V->>Audit: Log per-item event
    end

    rect rgba(220,255,200,0.5)
    Note over V: NEW: Asynchronous Batch Processing
    V->>V: Build futures list
    loop For each item
        V->>VX: executeBlocking(processDTO)
        VX->>DB: Fetch & process item
        VX->>DB: Update item status
        VX-->>CF: Return future result
    end
    V->>CF: Await all futures
    CF->>V: All items complete
    V->>V: Evaluate batch success
    alt Batch Success
        V->>Audit: Emit RPR_402 (batch success)
    else Batch Failure
        V->>Audit: Emit RPR_405 (batch failure)
    end
    V->>Audit: Finalize batch audit with registration IDs
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐇 I hopped through queues and futures bright,

Parallel paws in morning light.
Per-item chores now in a row,
One batch cheer — now watch us go! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 22.35% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Parallel packet process using Vertx' directly reflects the main change: refactoring per-item processing into batch-oriented parallel execution using Vert.x.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (2)

11-11: Consider using explicit imports instead of wildcard import.

Wildcard imports can obscure dependencies and introduce naming conflicts. Consider importing only the specific classes needed (e.g., Promise, Future, CompositeFuture, Vertx, AsyncResult).


361-372: Hardcoded success moduleId is used even for failed transactions.

When isTransactionSuccessful is false (reprocess count exceeded at lines 313-323), the moduleId on line 362 is still set to RPR_SENT_TO_REPROCESS_SUCCESS. This seems inconsistent—consider using description.getCode() which would contain the failure code (RPR_RE_PROCESS_FAILED).

 		/** Module-Id can be Both Success/Error code */
-		String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode();
+		String moduleId = isTransactionSuccessful 
+			? PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode() 
+			: description.getCode();
 		String moduleName = ModuleName.RE_PROCESSOR.toString();
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 30dff2a and 6f1d870.

📒 Files selected for processing (1)
  • registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/constants/ReprocessorConstants.java (1)
  • ReprocessorConstants (3-37)

@MuralitharanK

Copy link
Copy Markdown
Contributor Author

@coderabbitai generate docstrings

@coderabbitai

coderabbitai Bot commented Nov 28, 2025

Copy link
Copy Markdown
✅ Actions performed

Initiated docstring generation; will generate only if new commits exist.

@coderabbitai

coderabbitai Bot commented Nov 28, 2025

Copy link
Copy Markdown

Note

Docstrings generation - SUCCESS
Generated docstrings for this pull request at #2235

coderabbitai Bot added a commit that referenced this pull request Nov 28, 2025
Docstrings generation was requested by @MuralitharanK.

* #2234 (comment)

The following files were modified:

* `registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (3)

100-101: Instance field isBatchSuccessful remains incorrectly scoped.

This issue was previously flagged. The field is shared across concurrent batch executions and should be a local variable within the process method to avoid interference between concurrent scheduler triggers.


252-270: Race condition persists: finally block executes before async processing completes.

This issue was previously flagged. CompositeFuture.all(...).onComplete(...) is non-blocking, so the finally block (lines 291-307) executes immediately while async work is still in progress. This causes:

  1. Audit logging always uses the initial isBatchSuccessful = true state
  2. Async failures logged at line 268 never update isBatchSuccessful

The fix requires moving audit/cleanup logic inside the onComplete callback or blocking until completion.

 				CompositeFuture.all(futures).onComplete(ar -> {
 					if (ar.succeeded()) {
 						regProcLogger.info("Successfully processed count - {}", futures.size());
 					} else {
+						isBatchSuccessful = false;
 						regProcLogger.error("Failed to process some DTOs", ar.cause());
 					}
+					// Move audit logging here to ensure it runs after async completion
+					performAuditLogging(isBatchSuccessful, description);
 				});

Note: The above is a partial fix illustration. A complete solution requires restructuring to either block until completion or move all post-processing logic into the callback.


249-249: Use parameterized type instead of raw Future.

Raw types bypass generic type checking and produce compiler warnings.

-				List<Future> futures = new ArrayList<>();
+				List<Future<Void>> futures = new ArrayList<>();
🧹 Nitpick comments (3)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (3)

11-11: Prefer explicit imports over wildcard import.

Using io.vertx.core.*; imports the entire package. Consider importing only the specific classes used: Vertx, Promise, Future, CompositeFuture, AsyncResult.

-import io.vertx.core.*;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;

311-383: Add exception handling in processDTO for better observability.

If any call within processDTO fails (e.g., registrationStatusService.updateRegistrationStatusForWorkflowEngine), the exception propagates silently to the executeBlocking handler without logging which specific registration failed. Consider wrapping the method body in try-catch to log the registrationId on failure.

 private void processDTO(Map<String, Set<String>> reprocessRestartTriggerMap, InternalRegistrationStatusDto dto) {
-
 	boolean isTransactionSuccessful = false;
 	LogDescription description = new LogDescription();
-
 	String registrationId = dto.getRegistrationId();
+	try {
 		MessageDTO messageDTO = new MessageDTO();
 		messageDTO.setRid(registrationId);
 		// ... rest of the method body ...
+	} catch (Exception e) {
+		regProcLogger.error(LoggerFileConstant.SESSIONID.toString(),
+			LoggerFileConstant.REGISTRATIONID.toString(), registrationId,
+			"Failed to process DTO: " + e.getMessage());
+		throw e; // Re-throw to fail the future
+	}
 }

398-403: Inconsistent indentation in the else block.

The closing braces have inconsistent indentation which affects readability.

 			} else {
 				latestTransactionStatusCodeSet = new HashSet<String>();
 				setReprocessRestartTriggerMap(reprocessRestartTriggerMap, stageName, latestTransactionStatusCode,
-					latestTransactionStatusCodeSet);
-		}
-	}
+						latestTransactionStatusCodeSet);
+			}
+		}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6f1d870 and e8a80ab.

📒 Files selected for processing (1)
  • registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/constants/ReprocessorConstants.java (1)
  • ReprocessorConstants (3-37)

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (5)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (5)

100-101: Instance field isBatchSuccessful is not thread-safe.

This field is shared across all invocations of process(). If the scheduler triggers concurrent batch executions, different batches will interfere with each other's state. This should be a local variable (or AtomicBoolean if mutation from lambdas is needed) within the process method.


264-307: Critical race condition: finally block executes before async processing completes.

CompositeFuture.all(...).onComplete(...) is non-blocking. The finally block (lines 291-307) executes immediately after this code, before any async work completes. This means:

  1. isBatchSuccessful remains true (initial value) when audit log is written
  2. The audit log will report success even for in-progress or failed batches

Move the audit logging inside the onComplete callback to ensure it executes after all processing completes.


371-382: moduleId doesn't reflect failure when isTransactionSuccessful is false.

When reprocess count is exhausted (line 323), isTransactionSuccessful remains false, but moduleId is always set to RPR_SENT_TO_REPROCESS_SUCCESS.getCode(). This creates inconsistency with the failure state in description.

-		String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode();
+		String moduleId = isTransactionSuccessful 
+			? PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode()
+			: description.getCode();

249-249: Use parameterized Future type.

List<Future> is a raw type. Use List<Future<Void>> for type safety and to avoid compiler warnings.

-				List<Future> futures = new ArrayList<>();
+				List<Future<Void>> futures = new ArrayList<>();

251-263: Missing exception handling in executeBlocking - failures won't set isBatchSuccessful to false.

If processDTO throws an exception, the blocking code fails, but isBatchSuccessful is never set to false. This means the finally block will incorrectly report success even when individual items fail.

 				reprocessorDtoList.forEach(dto -> {
 					Promise<Void> promise = Promise.promise();
 					vertx.executeBlocking(p -> {
-						processDTO(reprocessRestartTriggerMap, dto);
-						p.complete();
+						try {
+							processDTO(reprocessRestartTriggerMap, dto);
+							p.complete();
+						} catch (Exception e) {
+							regProcLogger.error("Error processing DTO: {}", dto.getRegistrationId(), e);
+							p.fail(e);
+						}
 					}, false, res -> {
 						if (res.succeeded()) {
 							promise.complete();
 						} else {
+							isBatchSuccessful = false;
 							promise.fail(res.cause());
 						}
 					});
 					futures.add(promise.future());
 				});

Note: Even with this fix, there's still a race condition with the finally block (see separate comment). Consider moving audit logging into the onComplete callback.

🧹 Nitpick comments (6)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)

246-270: Parallel processing logic looks good, but ensure proper synchronization.

The parallel processing implementation using vertx.executeBlocking with ordered=false enables concurrent processing. However, note that:

  1. The isBatchSuccessful field being modified from multiple threads (if the result handler is called concurrently) requires synchronization or should be an AtomicBoolean.
  2. Consider adding a count of successful vs failed items in the onComplete callback for better observability.
+			AtomicBoolean batchSuccess = new AtomicBoolean(true);
+			AtomicInteger failedCount = new AtomicInteger(0);
 			if (!CollectionUtils.isEmpty(reprocessorDtoList)) {
-				isBatchSuccessful = true;
 				regProcLogger.info("Reprocess count - {}", reprocessorDtoList.size());
-				List<Future> futures = new ArrayList<>();
+				List<Future<Void>> futures = new ArrayList<>();
 				reprocessorDtoList.forEach(dto -> {
 					Promise<Void> promise = Promise.promise();
 					vertx.executeBlocking(p -> {
 						try {
 							processDTO(reprocessRestartTriggerMap, dto);
 							p.complete();
 						} catch (Exception e) {
 							p.fail(e);
 						}
 					}, false, res -> {
 						if (res.succeeded()) {
 							promise.complete();
 						} else {
+							batchSuccess.set(false);
+							failedCount.incrementAndGet();
 							promise.fail(res.cause());
 						}
 					});
 					futures.add(promise.future());
 				});
 				CompositeFuture.all(futures).onComplete(ar -> {
 					if (ar.succeeded()) {
 						regProcLogger.info("Successfully processed count - {}", futures.size());
 					} else {
-						regProcLogger.error("Failed to process some DTOs", ar.cause());
+						regProcLogger.error("Failed to process {} DTOs", failedCount.get(), ar.cause());
 					}
+					// Move audit logging here to execute after all processing completes
 				});
 			}
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java (3)

40-41: Redundant static import.

Line 41 imports parseUTCToLocalDateTime specifically, but line 40 already imports all static members from DateUtils via the wildcard import. Remove the redundant line.

 import static io.mosip.kernel.core.util.DateUtils.*;
-import static io.mosip.kernel.core.util.DateUtils.parseUTCToLocalDateTime;

696-701: Simplify HashMap initialization.

The forEach with lambda for copying entries is unnecessarily verbose. Use the copy constructor instead.

 			// Copy "others" metadata if present
 			if(birs.getOthers() != null) {
-				HashMap<String, String> others = new HashMap<>();
-				birs.getOthers().entrySet().forEach(e -> {
-					others.put(e.getKey(), e.getValue());
-				});
+				HashMap<String, String> others = new HashMap<>(birs.getOthers());
 				biometricRecord.setOthers(others);
 			}

803-815: Consider narrowing exception handling.

Catching Exception hides the distinction between expected conditions (e.g., UIN not found) and unexpected errors (e.g., network failures). Consider catching specific exception types or at least preserving the original exception in the thrown BiometricClassificationException.

 	public boolean allBiometricHaveException(String rid, String registrationType, ProviderStageName stageName) throws BiometricClassificationException {
 		try {
 			String uin = getUIn(rid, registrationType, stageName);
 			BiometricRecord bm = getBiometricRecordfromIdrepo(uin, rid);
 			return allBiometricHaveException(bm.getSegments(), rid);
-		} catch (Exception e) {
+		} catch (IOException | ApisResourceAccessException | PacketManagerException | JsonProcessingException e) {
 			regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid,
 					"utility::allBiometricHaveException():: Error while classifying biometric exceptions", e);
 			throw new BiometricClassificationException(
 					PlatformErrorMessages.RPR_BDD_UNABLE_TO_FETCH_BIOMETRIC_INFO.getCode(),
-					PlatformErrorMessages.RPR_BDD_UNABLE_TO_FETCH_BIOMETRIC_INFO.getMessage());
+					PlatformErrorMessages.RPR_BDD_UNABLE_TO_FETCH_BIOMETRIC_INFO.getMessage(), e);
 		}
 	}
registration-processor/core-processor/registration-processor-uin-generator-stage/src/test/java/io/mosip/registration/processor/stages/uigenerator/UinGeneratorStageTest.java (1)

381-381: PacketCreatedOn tests correctly cover NEW/UPDATE/LOST scenarios; consider tightening expectations

The new stubbing of utility.retrieveCreatedDateFromPacket(...) and the added tests around updatePacketCreatedOnInDemographicIdentity nicely exercise:

  • null mapped-field key,
  • null created-date value,
  • and skipping for LOST registrations, matching the intended scope for packetCreatedOn.

If you want the tests to be more precise, you could:

  • Stub retrieveCreatedDateFromPacket with test-specific values instead of a single global constant, or
  • Assert that retrieveCreatedDateFromPacket is invoked with the expected registrationId and registrationType for NEW vs UPDATE.

This would better align the “metaInfo present” test names with what’s actually being asserted, but is not strictly required for correctness.

Also applies to: 2673-2674, 2685-2707, 2710-2732, 2735-2754, 2757-2776

registration-processor/registration-processor-info-storage-service/src/test/java/io/mosip/registration/processor/packet/storage/utils/UtilityTest.java (1)

63-69: Instance‑based Utility tests look good; consider safer stubbing for the spy

Switching this suite to exercise the concrete Utility instance gives you strong coverage over the new helper methods and their edge cases (date parsing, age limits/buffers, packetCreated derivation, biometric exception classification, etc.), which is valuable.

One small robustness nit: several tests stub methods on the @Spy utility using the when(utility.someMethod(...)).thenReturn(...) form. With spies this pattern can invoke the real implementation during stubbing, which can be fragile if the method gains more dependencies later.

Where you are stubbing utility’s own methods (e.g., getUIn, getIdVidMetadata, etc.), you may want to switch to the doReturn(...).when(utility).someMethod(...) style:

- when(utility.getUIn(any(), any(), any())).thenReturn("123456789012");
+ doReturn("123456789012").when(utility).getUIn(any(), any(), any());

Same idea applies to other internal utility methods you mock out. This is optional but can make the tests more resilient to future refactors.

Also applies to: 82-85, 100-105, 188-221, 239-266, 300-315, 318-335, 345-365, 367-396, 398-427, 429-455, 485-579, 595-615, 619-622, 625-636, 650-659, 661-667, 670-695, 701-772

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e8a80ab and 353c3a7.

📒 Files selected for processing (8)
  • registration-processor/core-processor/registration-processor-bio-dedupe-stage/src/main/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessor.java (1 hunks)
  • registration-processor/core-processor/registration-processor-bio-dedupe-stage/src/test/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessorTest.java (5 hunks)
  • registration-processor/core-processor/registration-processor-uin-generator-stage/src/main/java/io/mosip/registration/processor/stages/uingenerator/stage/UinGeneratorStage.java (2 hunks)
  • registration-processor/core-processor/registration-processor-uin-generator-stage/src/test/java/io/mosip/registration/processor/stages/uigenerator/UinGeneratorStageTest.java (6 hunks)
  • registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utilities.java (3 hunks)
  • registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java (4 hunks)
  • registration-processor/registration-processor-info-storage-service/src/test/java/io/mosip/registration/processor/packet/storage/utils/UtilityTest.java (34 hunks)
  • registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (6 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-11-18T09:19:25.334Z
Learnt from: ashok-ksharma
Repo: mosip/registration PR: 2209
File: registration-processor/core-processor/registration-processor-uin-generator-stage/src/main/java/io/mosip/registration/processor/stages/uingenerator/stage/UinGeneratorStage.java:1168-1177
Timestamp: 2025-11-18T09:19:25.334Z
Learning: In the MOSIP registration processor, the `updatePacketCreatedOnInDemographicIdentity` method in `UinGeneratorStage.java` is intentionally limited to `RegistrationType.NEW` and `RegistrationType.UPDATE` only, because these are the registration types that update biometrics. `RES_UPDATE` and Mapped `UPDATE` do not update biometrics, so they should not trigger the `packetCreatedOn` update logic.

Applied to files:

  • registration-processor/core-processor/registration-processor-uin-generator-stage/src/test/java/io/mosip/registration/processor/stages/uigenerator/UinGeneratorStageTest.java
  • registration-processor/core-processor/registration-processor-bio-dedupe-stage/src/main/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessor.java
  • registration-processor/core-processor/registration-processor-bio-dedupe-stage/src/test/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessorTest.java
  • registration-processor/registration-processor-info-storage-service/src/test/java/io/mosip/registration/processor/packet/storage/utils/UtilityTest.java
  • registration-processor/core-processor/registration-processor-uin-generator-stage/src/main/java/io/mosip/registration/processor/stages/uingenerator/stage/UinGeneratorStage.java
  • registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utilities.java
  • registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java
🧬 Code graph analysis (2)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/constants/ReprocessorConstants.java (1)
  • ReprocessorConstants (3-37)
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java (3)
registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/util/JsonUtil.java (1)
  • JsonUtil (34-317)
registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/constant/MappingJsonConstants.java (1)
  • MappingJsonConstants (3-50)
registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/constant/JsonConstant.java (1)
  • JsonConstant (8-270)
🔇 Additional comments (9)
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utilities.java (1)

5-8: LGTM - Formatting changes only.

The changes in this file are limited to import reorganization and minor whitespace adjustments in method signatures. No functional changes.

Also applies to: 541-541, 726-726

registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java (5)

58-112: Well-documented configuration properties.

The new dependencies and configuration properties are properly injected with clear Javadoc explaining their purpose and default values.


259-297: LGTM - Clear implementation with proper null handling.

The method properly validates inputs and throws descriptive exceptions when required data is unavailable. The fallback strategy in resolveLastPacketProcessedDate provides multiple ways to determine the packet date.


618-648: LGTM - Fallback pattern with graceful degradation.

The method is used as part of a multi-strategy date resolution. Returning null on parse errors allows the caller to proceed to the next fallback strategy. Exception details are logged for debugging.


411-417: LGTM - Helper methods are well-implemented.

These utility methods have proper null checks, clear logging, and follow consistent patterns for date extraction and validation.

Also applies to: 427-455, 465-480, 505-532, 596-609, 829-847


491-497: LGTM.

Simple delegation to IdRepoService. Callers appropriately handle the nullable return value.

registration-processor/core-processor/registration-processor-bio-dedupe-stage/src/test/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessorTest.java (1)

741-754: Updated stubbing against Utility aligns with new BioDedupeProcessor behavior

The switch from Utilities to the injected Utility in these tests is consistent with the production changes, and the scenarios (MANUAL_VERIFICATION vs REJECTED, all-biometric-exception, infant, and BiometricClassificationException) remain well covered. No issues from a test-behavior standpoint.

Also applies to: 782-795, 820-833, 858-867, 940-947

registration-processor/core-processor/registration-processor-bio-dedupe-stage/src/main/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessor.java (1)

120-122: Utility injection/use in post‑ABIS logic looks consistent and safe

The introduction of the autowired Utility and its use in postAbisIdentification (wasInfantWhenLastPacketProcessed / allBiometricHaveException) preserves the existing decision flow for update packets with no ABIS match, while routing the calls through the shared helper. Exceptions from these methods are already handled at the process() level, so there’s no new uncaught path.

Also applies to: 469-507

registration-processor/core-processor/registration-processor-uin-generator-stage/src/main/java/io/mosip/registration/processor/stages/uingenerator/stage/UinGeneratorStage.java (1)

190-197: Verify logger call signature and confirm placeholder substitution behavior

The registration-type guard in updatePacketCreatedOnInDemographicIdentity is correct—it intentionally limits updates to NEW and UPDATE only, excluding RES_UPDATE and mapped UPDATE types that do not update biometrics.

However, the regProcLogger.info(...) call at line 1174 uses a {} placeholder with five arguments:

regProcLogger.info(LoggerFileConstant.SESSIONID.toString(),
        LoggerFileConstant.REGISTRATIONID.toString(),
        registrationId,
        "Skipping update of packetCreatedOn. registrationType: {}", 
        object.getReg_type());

MOSIP's kernel logger uses a custom wrapper for structured logging (sessionId, registrationId). Verify whether this logger implementation supports SLF4J-style {} placeholder substitution with varargs. If not, use string concatenation instead:

regProcLogger.info(LoggerFileConstant.SESSIONID.toString(),
        LoggerFileConstant.REGISTRATIONID.toString(),
        registrationId,
        "Skipping update of packetCreatedOn. registrationType: " + object.getReg_type());

Comment on lines +574 to +580

// Check if date is older than 100 years
LocalDateTime hundredYearsAgo = now.minusYears(200);
if (before(ldt, hundredYearsAgo)) {
regProcLogger.error("Date is older than 200 years : {}", ldt);
return null;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Inconsistent comment: says 100 years but code checks 200 years.

The comment states "Check if date is older than 100 years" but the code actually checks for 200 years. Update the comment to match the implementation.

-				// Check if date is older than 100 years
+				// Check if date is older than 200 years
 				LocalDateTime hundredYearsAgo = now.minusYears(200);

Also consider renaming hundredYearsAgo to twoHundredYearsAgo for consistency.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Check if date is older than 100 years
LocalDateTime hundredYearsAgo = now.minusYears(200);
if (before(ldt, hundredYearsAgo)) {
regProcLogger.error("Date is older than 200 years : {}", ldt);
return null;
}
// Check if date is older than 200 years
LocalDateTime hundredYearsAgo = now.minusYears(200);
if (before(ldt, hundredYearsAgo)) {
regProcLogger.error("Date is older than 200 years : {}", ldt);
return null;
}
🤖 Prompt for AI Agents
In
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java
around lines 574 to 580, the inline comment incorrectly says "100 years" while
the code checks for 200 years; change the comment to state "Check if date is
older than 200 years" and rename the local variable hundredYearsAgo to
twoHundredYearsAgo (and update all its references) so the identifier matches the
check and maintain consistency with the existing log message.

Comment on lines +746 to +763
for (BIR bir : birs) {
BiometricType type = bir.getBdbInfo().getType().get(0);
boolean isFaceOrExceptionPhoto = type == BiometricType.FACE || type == BiometricType.EXCEPTION_PHOTO;

regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid,
"utility::allBiometricHaveException():: Checking biometric type: {} ",type);

if (hasOthers) {
if (!isFaceOrExceptionPhoto) {
String exceptionValue = bir.getOthers().get(EXCEPTION);
regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid,
"utility::allBiometricHaveException():: Biometric type: {}, exceptionValue: {}", type, exceptionValue);

if (exceptionValue == null || !exceptionValue.equalsIgnoreCase(TRUE)) {
regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid,
"utility::allBiometricHaveException():: Biometric type: {} does not have exception", type);
return false;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential NullPointerException risks in biometric processing.

Two NPE concerns:

  1. Line 747: bir.getBdbInfo().getType().get(0) - No null checks. If getBdbInfo() is null, getType() is null, or the list is empty, this will throw.

  2. Line 755: bir.getOthers().get(EXCEPTION) - hasOthers checks if any BIR in the list has others, but this specific bir might have null for getOthers().

 		for (BIR bir : birs) {
-			BiometricType type = bir.getBdbInfo().getType().get(0);
+			if (bir.getBdbInfo() == null || bir.getBdbInfo().getType() == null || bir.getBdbInfo().getType().isEmpty()) {
+				regProcLogger.warn(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid,
+						"utility::allBiometricHaveException():: Skipping BIR with missing BdbInfo or Type");
+				continue;
+			}
+			BiometricType type = bir.getBdbInfo().getType().get(0);
 			boolean isFaceOrExceptionPhoto = type == BiometricType.FACE || type == BiometricType.EXCEPTION_PHOTO;
 
 			// ... later in the code
 			if (hasOthers) {
 				if (!isFaceOrExceptionPhoto) {
-					String exceptionValue = bir.getOthers().get(EXCEPTION);
+					Map<String, String> birOthers = bir.getOthers();
+					String exceptionValue = (birOthers != null) ? birOthers.get(EXCEPTION) : null;
🤖 Prompt for AI Agents
In
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java
around lines 746 to 763, the code can throw NPEs when bir.getBdbInfo(),
bir.getBdbInfo().getType() is null or empty, or when bir.getOthers() is null for
the current bir; add defensive null/empty checks before dereferencing: first
verify bir!=null && bir.getBdbInfo()!=null && bir.getBdbInfo().getType()!=null
&& !bir.getBdbInfo().getType().isEmpty() before accessing get(0) and skip or
treat missing type as not matching FACE/EXCEPTION_PHOTO; second, ensure
bir.getOthers()!=null and bir.getOthers().containsKey(EXCEPTION) before calling
get(EXCEPTION) and treat absent exception entries as not TRUE (log and return
false per current behavior). Update debug/info logs to reflect when values are
missing to aid tracing.

Signed-off-by: Muralitharan K <muralitharan@mosip.io>
Signed-off-by: Muralitharan K <muralitharan@mosip.io>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (3)

100-101: Instance field isBatchSuccessful is not thread-safe and scoped incorrectly.

This issue was raised in a previous review but remains unresolved. The field is shared across all invocations of process(). If the scheduler triggers concurrent batch executions, different batches will interfere with each other's state. This should be a local variable within the process method.

Additionally, this field is read in the finally block (lines 294-306) which executes immediately after the async CompositeFuture.all(...) call, before the async work completes. See the related race condition comment below.


371-382: moduleId and audit event codes don't reflect failure state.

These issues were raised in previous reviews but remain unresolved:

  1. Line 372: moduleId is always set to RPR_SENT_TO_REPROCESS_SUCCESS.getCode() even when isTransactionSuccessful is false (reprocess count exhausted at line 323).

  2. Lines 375-377: When isTransactionSuccessful is false, the audit is logged with success event codes (RPR_402, UPDATE, BUSINESS) instead of failure codes.

🔎 Proposed fix
 		/** Module-Id can be Both Success/Error code */
-		String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode();
+		String moduleId = isTransactionSuccessful 
+			? PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode()
+			: description.getCode();
 		String moduleName = ModuleName.RE_PROCESSOR.toString();
 		registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName);
-		String eventId = EventId.RPR_402.toString();
-		String eventName = EventName.UPDATE.toString();
-		String eventType = EventType.BUSINESS.toString();
+		String eventId = isTransactionSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString();
+		String eventName = isTransactionSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString();
+		String eventType = isTransactionSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString();
 
 		if (!isTransactionSuccessful) {
 			auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName,
 					eventType, moduleId, moduleName, registrationId);
 		}

246-270: Critical race condition: finally block executes before async processing completes.

The CompositeFuture.all(...).onComplete(...) at line 264 is non-blocking. The finally block (lines 291-307) executes immediately after, before any async work completes. This means:

  1. isBatchSuccessful will be true (set at line 247) when audit is logged, regardless of actual outcomes
  2. Individual processDTO failures won't update batch status since processing happens asynchronously

Additionally, if processDTO throws an exception at line 253, p.complete() at line 254 is never called, causing the promise to hang indefinitely.

🔎 Recommended fix

Move audit logging inside the onComplete callback and add exception handling:

 			if (!CollectionUtils.isEmpty(reprocessorDtoList)) {
-				isBatchSuccessful = true;
 				regProcLogger.info("Reprocess count - {}", reprocessorDtoList.size());
-				List<Future> futures = new ArrayList<>();
+				List<Future<Void>> futures = new ArrayList<>();
+				boolean[] batchSuccess = {true};
 				reprocessorDtoList.forEach(dto -> {
 					Promise<Void> promise = Promise.promise();
 					vertx.executeBlocking(p -> {
-						processDTO(reprocessRestartTriggerMap, dto);
-						p.complete();
+						try {
+							processDTO(reprocessRestartTriggerMap, dto);
+							p.complete();
+						} catch (Exception e) {
+							p.fail(e);
+						}
 					}, false, res -> {
 						if (res.succeeded()) {
 							promise.complete();
 						} else {
+							batchSuccess[0] = false;
 							promise.fail(res.cause());
 						}
 					});
 					futures.add(promise.future());
 				});
 				CompositeFuture.all(futures).onComplete(ar -> {
-					if (ar.succeeded()) {
-						regProcLogger.info("Successfully processed count - {}", futures.size());
-					} else {
-						regProcLogger.error("Failed to process some DTOs", ar.cause());
-					}
+					// Move audit logging here
+					LogDescription desc = new LogDescription();
+					if (ar.succeeded() && batchSuccess[0]) {
+						desc.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getMessage());
+						// Log success audit
+					} else {
+						// Log failure audit
+					}
 				});
 			}
🧹 Nitpick comments (2)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (2)

11-11: Consider explicit imports over wildcard import.

Wildcard imports (io.vertx.core.*;) can reduce readability and may cause naming conflicts. Consider importing only the specific classes used: Future, Promise, CompositeFuture, Vertx, AsyncResult.


249-249: Use parameterized type instead of raw Future.

List<Future> uses a raw type. Use List<Future<Void>> for type safety and to avoid compiler warnings.

-				List<Future> futures = new ArrayList<>();
+				List<Future<Void>> futures = new ArrayList<>();
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 353c3a7 and 7bfef95.

📒 Files selected for processing (1)
  • registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java
🔇 Additional comments (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)

385-407: Typo fix in method name is correct.

The rename from intializeReprocessRestartTriggerMapping to initializeReprocessRestartTriggerMapping corrects the spelling. The initialization logic for the restart trigger mapping appears sound.

Signed-off-by: Muralitharan K <muralitharan@mosip.io>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)

221-221: Prefer StringBuilder over StringBuffer.

ridSb is populated in a sequential forEach before any async work begins and is only read (not written) inside the onComplete lambda. The synchronization overhead of StringBuffer is unnecessary here.

♻️ Proposed fix
-		StringBuffer ridSb=new StringBuffer();
+		StringBuilder ridSb = new StringBuilder();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`
at line 221, The code uses a synchronized StringBuffer for ridSb in
ReprocessorVerticle which is unnecessary; replace the StringBuffer instantiation
with a non-synchronized StringBuilder (i.e., change the
declaration/initialization of ridSb to use StringBuilder) and ensure all
subsequent uses (appends and final read in the onComplete lambda) work with
StringBuilder to remove unnecessary synchronization overhead.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`:
- Around line 282-286: Remove the stray extra semicolon after getCode() when
assigning moduleId, avoid the unnecessary enum toString() call by using
ModuleName.RE_PROCESSOR.name() (or otherwise assign the enum's String name) for
moduleName, and replace ridSb.toString().length() with ridSb.length() to avoid
creating a temporary String; these changes should be applied around the
moduleId/moduleName assignment and the call to
auditLogRequestBuilder.createAuditRequestBuilder(...) in ReprocessorVerticle.
- Around line 265-288: The batch currently uses CompositeFuture.all(futures)
which fails fast and triggers the audit before all per-DTO executeBlocking tasks
finish; change CompositeFuture.all(futures) to CompositeFuture.join(futures) so
the handler runs only after every future completes (success or failure), keep
the isBatchSuccessful = ar.succeeded() check, and leave the existing
logging/audit call sequence (regProcLogger.error/ regProcLogger.info and
auditLogRequestBuilder.createAuditRequestBuilder) intact so the batch audit is
emitted only after all worker futures have finished.

---

Duplicate comments:
In
`@registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`:
- Around line 249-264: Change the raw List<Future> to a typed List<Future<Void>>
and make the blocking handler explicitly propagate exceptions: inside the
vertx.executeBlocking handler that calls processDTO(dto) wrap the call in
try-catch and call p.fail(e) on exception (and p.complete() on success) so
failures are delivered to the Promise and then to promise.fail(res.cause()) as
already wired; update the list declaration and usages to List<Future<Void>> and
keep Promise<Void> promise = Promise.promise() for each dto.
- Around line 391-401: The code always uses success constants for moduleId and
audit event fields even when isTransactionSuccessful is false; update the logic
in ReprocessorVerticle so moduleId, eventId, eventName and eventType are chosen
based on isTransactionSuccessful (use the appropriate failure constants when
false) before calling
registrationStatusService.updateRegistrationStatusForWorkflowEngine(...) and
before calling auditLogRequestBuilder.createAuditRequestBuilder(...), ensuring
the updateRegistrationStatusForWorkflowEngine and audit call receive the correct
failure codes (refer to variables moduleId, eventId, eventName, eventType,
method registrationStatusService.updateRegistrationStatusForWorkflowEngine and
auditLogRequestBuilder.createAuditRequestBuilder, and the
isTransactionSuccessful flag).

---

Nitpick comments:
In
`@registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`:
- Line 221: The code uses a synchronized StringBuffer for ridSb in
ReprocessorVerticle which is unnecessary; replace the StringBuffer instantiation
with a non-synchronized StringBuilder (i.e., change the
declaration/initialization of ridSb to use StringBuilder) and ensure all
subsequent uses (appends and final read in the onComplete lambda) work with
StringBuilder to remove unnecessary synchronization overhead.

Comment on lines +265 to 288
CompositeFuture.all(futures).onComplete(ar -> {
boolean isBatchSuccessful = ar.succeeded();
try {
if (!isBatchSuccessful) {
regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
null, PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getMessage()
+ ExceptionUtils.getStackTrace(ar.cause()));
}
} finally {
String message = isBatchSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getMessage() :
PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getMessage();
regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
null, message);
String eventId = isBatchSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString();
String eventName = isBatchSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString();
String eventType = isBatchSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString();
/** Module-Id can be Both Success/Error code */
String moduleId = isBatchSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode() :
PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode();;
String moduleName = ModuleName.RE_PROCESSOR.toString();
auditLogRequestBuilder.createAuditRequestBuilder(message, eventId, eventName, eventType,
moduleId, moduleName, (ridSb.toString().length()>1?ridSb.substring(0,ridSb.length()-1):""));
}
regProcLogger.info(LoggerFileConstant.SESSIONID.toString(),
LoggerFileConstant.REGISTRATIONID.toString(), registrationId, description.getMessage());

/** Module-Id can be Both Success/Error code */
String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode();
String moduleName = ModuleName.RE_PROCESSOR.toString();
registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName);
String eventId = EventId.RPR_402.toString();
String eventName = EventName.UPDATE.toString();
String eventType = EventType.BUSINESS.toString();

if (!isTransactionSuccessful)
auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName,
eventType, moduleId, moduleName, registrationId);
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

CompositeFuture.all() fails fast — batch audit fires before all DTOs complete on partial failure.

CompositeFuture.all() fails immediately when any future fails, while CompositeFuture.join() continues to run all remaining futures even after one fails. With the current all() semantics, if the first DTO's executeBlocking fails, onComplete fires immediately with isBatchSuccessful = false, logging the batch audit while the remaining N-1 worker threads are still executing processDTO. Those threads will still call registrationStatusService.updateRegistrationStatusForWorkflowEngine() concurrently after the batch audit has already been emitted.

For batch-processing semantics (process all, then audit), Future.join() (or CompositeFuture.join()) is more appropriate:

♻️ Proposed fix
-			Future.all(futures).onComplete(ar -> {
+			Future.join(futures).onComplete(ar -> {
 				boolean isBatchSuccessful = ar.succeeded();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`
around lines 265 - 288, The batch currently uses CompositeFuture.all(futures)
which fails fast and triggers the audit before all per-DTO executeBlocking tasks
finish; change CompositeFuture.all(futures) to CompositeFuture.join(futures) so
the handler runs only after every future completes (success or failure), keep
the isBatchSuccessful = ar.succeeded() check, and leave the existing
logging/audit call sequence (regProcLogger.error/ regProcLogger.info and
auditLogRequestBuilder.createAuditRequestBuilder) intact so the batch audit is
emitted only after all worker futures have finished.

Comment on lines +282 to +286
String moduleId = isBatchSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode() :
PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode();;
String moduleName = ModuleName.RE_PROCESSOR.toString();
auditLogRequestBuilder.createAuditRequestBuilder(message, eventId, eventName, eventType,
moduleId, moduleName, (ridSb.toString().length()>1?ridSb.substring(0,ridSb.length()-1):""));

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Double semicolon on line 283 and unnecessary toString() call on line 286.

Line 283 has a stray extra semicolon (getCode();;). Line 286 calls ridSb.toString().length(), which allocates a temporary String object just to read the length; ridSb.length() is equivalent and allocation-free.

♻️ Proposed fixes
 					String moduleId = isBatchSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode() :
-							PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode();;
+							PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode();
 					String moduleName = ModuleName.RE_PROCESSOR.toString();
 					auditLogRequestBuilder.createAuditRequestBuilder(message, eventId, eventName, eventType,
-							moduleId, moduleName, (ridSb.toString().length()>1?ridSb.substring(0,ridSb.length()-1):""));
+							moduleId, moduleName, (ridSb.length() > 1 ? ridSb.substring(0, ridSb.length()-1) : ""));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`
around lines 282 - 286, Remove the stray extra semicolon after getCode() when
assigning moduleId, avoid the unnecessary enum toString() call by using
ModuleName.RE_PROCESSOR.name() (or otherwise assign the enum's String name) for
moduleName, and replace ridSb.toString().length() with ridSb.length() to avoid
creating a temporary String; these changes should be applied around the
moduleId/moduleName assignment and the call to
auditLogRequestBuilder.createAuditRequestBuilder(...) in ReprocessorVerticle.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants