Parallel packet process using Vertx#2234
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughReprocessorVerticle was refactored from synchronous per-item processing to asynchronous parallel batch processing using Vert.x Changes
Sequence DiagramsequenceDiagram
participant V as ReprocessorVerticle
participant VX as Vert.x executeBlocking
participant CF as CompositeFuture
participant DB as Database
participant Audit as Audit Logger
rect rgba(200,220,255,0.5)
Note over V: OLD: Synchronous Per-Item
V->>DB: For each item: fetch/process/update
V->>Audit: Log per-item event
end
rect rgba(220,255,200,0.5)
Note over V: NEW: Asynchronous Batch Processing
V->>V: Build futures list
loop For each item
V->>VX: executeBlocking(processDTO)
VX->>DB: Fetch & process item
VX->>DB: Update item status
VX-->>CF: Return future result
end
V->>CF: Await all futures
CF->>V: All items complete
V->>V: Evaluate batch success
alt Batch Success
V->>Audit: Emit RPR_402 (batch success)
else Batch Failure
V->>Audit: Emit RPR_405 (batch failure)
end
V->>Audit: Finalize batch audit with registration IDs
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (2)
11-11: Consider using explicit imports instead of wildcard import.Wildcard imports can obscure dependencies and introduce naming conflicts. Consider importing only the specific classes needed (e.g.,
Promise,Future,CompositeFuture,Vertx,AsyncResult).
361-372: Hardcoded successmoduleIdis used even for failed transactions.When
isTransactionSuccessfulisfalse(reprocess count exceeded at lines 313-323), themoduleIdon line 362 is still set toRPR_SENT_TO_REPROCESS_SUCCESS. This seems inconsistent—consider usingdescription.getCode()which would contain the failure code (RPR_RE_PROCESS_FAILED)./** Module-Id can be Both Success/Error code */ - String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode(); + String moduleId = isTransactionSuccessful + ? PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode() + : description.getCode(); String moduleName = ModuleName.RE_PROCESSOR.toString();
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/constants/ReprocessorConstants.java (1)
ReprocessorConstants(3-37)
|
@coderabbitai generate docstrings |
✅ Actions performedInitiated docstring generation; will generate only if new commits exist. |
|
Note Docstrings generation - SUCCESS |
Docstrings generation was requested by @MuralitharanK. * #2234 (comment) The following files were modified: * `registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`
e8a80ab to
353c3a7
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (3)
100-101: Instance fieldisBatchSuccessfulremains incorrectly scoped.This issue was previously flagged. The field is shared across concurrent batch executions and should be a local variable within the
processmethod to avoid interference between concurrent scheduler triggers.
252-270: Race condition persists:finallyblock executes before async processing completes.This issue was previously flagged.
CompositeFuture.all(...).onComplete(...)is non-blocking, so thefinallyblock (lines 291-307) executes immediately while async work is still in progress. This causes:
- Audit logging always uses the initial
isBatchSuccessful = truestate- Async failures logged at line 268 never update
isBatchSuccessfulThe fix requires moving audit/cleanup logic inside the
onCompletecallback or blocking until completion.CompositeFuture.all(futures).onComplete(ar -> { if (ar.succeeded()) { regProcLogger.info("Successfully processed count - {}", futures.size()); } else { + isBatchSuccessful = false; regProcLogger.error("Failed to process some DTOs", ar.cause()); } + // Move audit logging here to ensure it runs after async completion + performAuditLogging(isBatchSuccessful, description); });Note: The above is a partial fix illustration. A complete solution requires restructuring to either block until completion or move all post-processing logic into the callback.
249-249: Use parameterized type instead of rawFuture.Raw types bypass generic type checking and produce compiler warnings.
- List<Future> futures = new ArrayList<>(); + List<Future<Void>> futures = new ArrayList<>();
🧹 Nitpick comments (3)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (3)
11-11: Prefer explicit imports over wildcard import.Using
io.vertx.core.*;imports the entire package. Consider importing only the specific classes used:Vertx,Promise,Future,CompositeFuture,AsyncResult.-import io.vertx.core.*; +import io.vertx.core.AsyncResult; +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx;
311-383: Add exception handling inprocessDTOfor better observability.If any call within
processDTOfails (e.g.,registrationStatusService.updateRegistrationStatusForWorkflowEngine), the exception propagates silently to the executeBlocking handler without logging which specific registration failed. Consider wrapping the method body in try-catch to log theregistrationIdon failure.private void processDTO(Map<String, Set<String>> reprocessRestartTriggerMap, InternalRegistrationStatusDto dto) { - boolean isTransactionSuccessful = false; LogDescription description = new LogDescription(); - String registrationId = dto.getRegistrationId(); + try { MessageDTO messageDTO = new MessageDTO(); messageDTO.setRid(registrationId); // ... rest of the method body ... + } catch (Exception e) { + regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), + LoggerFileConstant.REGISTRATIONID.toString(), registrationId, + "Failed to process DTO: " + e.getMessage()); + throw e; // Re-throw to fail the future + } }
398-403: Inconsistent indentation in the else block.The closing braces have inconsistent indentation which affects readability.
} else { latestTransactionStatusCodeSet = new HashSet<String>(); setReprocessRestartTriggerMap(reprocessRestartTriggerMap, stageName, latestTransactionStatusCode, - latestTransactionStatusCodeSet); - } - } + latestTransactionStatusCodeSet); + } + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/constants/ReprocessorConstants.java (1)
ReprocessorConstants(3-37)
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (5)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (5)
100-101: Instance fieldisBatchSuccessfulis not thread-safe.This field is shared across all invocations of
process(). If the scheduler triggers concurrent batch executions, different batches will interfere with each other's state. This should be a local variable (orAtomicBooleanif mutation from lambdas is needed) within theprocessmethod.
264-307: Critical race condition:finallyblock executes before async processing completes.
CompositeFuture.all(...).onComplete(...)is non-blocking. Thefinallyblock (lines 291-307) executes immediately after this code, before any async work completes. This means:
isBatchSuccessfulremainstrue(initial value) when audit log is written- The audit log will report success even for in-progress or failed batches
Move the audit logging inside the
onCompletecallback to ensure it executes after all processing completes.
371-382:moduleIddoesn't reflect failure whenisTransactionSuccessfulis false.When reprocess count is exhausted (line 323),
isTransactionSuccessfulremainsfalse, butmoduleIdis always set toRPR_SENT_TO_REPROCESS_SUCCESS.getCode(). This creates inconsistency with the failure state indescription.- String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode(); + String moduleId = isTransactionSuccessful + ? PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode() + : description.getCode();
249-249: Use parameterizedFuturetype.
List<Future>is a raw type. UseList<Future<Void>>for type safety and to avoid compiler warnings.- List<Future> futures = new ArrayList<>(); + List<Future<Void>> futures = new ArrayList<>();
251-263: Missing exception handling inexecuteBlocking- failures won't setisBatchSuccessfulto false.If
processDTOthrows an exception, the blocking code fails, butisBatchSuccessfulis never set tofalse. This means the finally block will incorrectly report success even when individual items fail.reprocessorDtoList.forEach(dto -> { Promise<Void> promise = Promise.promise(); vertx.executeBlocking(p -> { - processDTO(reprocessRestartTriggerMap, dto); - p.complete(); + try { + processDTO(reprocessRestartTriggerMap, dto); + p.complete(); + } catch (Exception e) { + regProcLogger.error("Error processing DTO: {}", dto.getRegistrationId(), e); + p.fail(e); + } }, false, res -> { if (res.succeeded()) { promise.complete(); } else { + isBatchSuccessful = false; promise.fail(res.cause()); } }); futures.add(promise.future()); });Note: Even with this fix, there's still a race condition with the
finallyblock (see separate comment). Consider moving audit logging into theonCompletecallback.
🧹 Nitpick comments (6)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)
246-270: Parallel processing logic looks good, but ensure proper synchronization.The parallel processing implementation using
vertx.executeBlockingwithordered=falseenables concurrent processing. However, note that:
- The
isBatchSuccessfulfield being modified from multiple threads (if the result handler is called concurrently) requires synchronization or should be anAtomicBoolean.- Consider adding a count of successful vs failed items in the
onCompletecallback for better observability.+ AtomicBoolean batchSuccess = new AtomicBoolean(true); + AtomicInteger failedCount = new AtomicInteger(0); if (!CollectionUtils.isEmpty(reprocessorDtoList)) { - isBatchSuccessful = true; regProcLogger.info("Reprocess count - {}", reprocessorDtoList.size()); - List<Future> futures = new ArrayList<>(); + List<Future<Void>> futures = new ArrayList<>(); reprocessorDtoList.forEach(dto -> { Promise<Void> promise = Promise.promise(); vertx.executeBlocking(p -> { try { processDTO(reprocessRestartTriggerMap, dto); p.complete(); } catch (Exception e) { p.fail(e); } }, false, res -> { if (res.succeeded()) { promise.complete(); } else { + batchSuccess.set(false); + failedCount.incrementAndGet(); promise.fail(res.cause()); } }); futures.add(promise.future()); }); CompositeFuture.all(futures).onComplete(ar -> { if (ar.succeeded()) { regProcLogger.info("Successfully processed count - {}", futures.size()); } else { - regProcLogger.error("Failed to process some DTOs", ar.cause()); + regProcLogger.error("Failed to process {} DTOs", failedCount.get(), ar.cause()); } + // Move audit logging here to execute after all processing completes }); }registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java (3)
40-41: Redundant static import.Line 41 imports
parseUTCToLocalDateTimespecifically, but line 40 already imports all static members fromDateUtilsvia the wildcard import. Remove the redundant line.import static io.mosip.kernel.core.util.DateUtils.*; -import static io.mosip.kernel.core.util.DateUtils.parseUTCToLocalDateTime;
696-701: Simplify HashMap initialization.The
forEachwith lambda for copying entries is unnecessarily verbose. Use the copy constructor instead.// Copy "others" metadata if present if(birs.getOthers() != null) { - HashMap<String, String> others = new HashMap<>(); - birs.getOthers().entrySet().forEach(e -> { - others.put(e.getKey(), e.getValue()); - }); + HashMap<String, String> others = new HashMap<>(birs.getOthers()); biometricRecord.setOthers(others); }
803-815: Consider narrowing exception handling.Catching
Exceptionhides the distinction between expected conditions (e.g., UIN not found) and unexpected errors (e.g., network failures). Consider catching specific exception types or at least preserving the original exception in the thrownBiometricClassificationException.public boolean allBiometricHaveException(String rid, String registrationType, ProviderStageName stageName) throws BiometricClassificationException { try { String uin = getUIn(rid, registrationType, stageName); BiometricRecord bm = getBiometricRecordfromIdrepo(uin, rid); return allBiometricHaveException(bm.getSegments(), rid); - } catch (Exception e) { + } catch (IOException | ApisResourceAccessException | PacketManagerException | JsonProcessingException e) { regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid, "utility::allBiometricHaveException():: Error while classifying biometric exceptions", e); throw new BiometricClassificationException( PlatformErrorMessages.RPR_BDD_UNABLE_TO_FETCH_BIOMETRIC_INFO.getCode(), - PlatformErrorMessages.RPR_BDD_UNABLE_TO_FETCH_BIOMETRIC_INFO.getMessage()); + PlatformErrorMessages.RPR_BDD_UNABLE_TO_FETCH_BIOMETRIC_INFO.getMessage(), e); } }registration-processor/core-processor/registration-processor-uin-generator-stage/src/test/java/io/mosip/registration/processor/stages/uigenerator/UinGeneratorStageTest.java (1)
381-381: PacketCreatedOn tests correctly cover NEW/UPDATE/LOST scenarios; consider tightening expectationsThe new stubbing of
utility.retrieveCreatedDateFromPacket(...)and the added tests aroundupdatePacketCreatedOnInDemographicIdentitynicely exercise:
- null mapped-field key,
- null created-date value,
- and skipping for
LOSTregistrations, matching the intended scope forpacketCreatedOn.If you want the tests to be more precise, you could:
- Stub
retrieveCreatedDateFromPacketwith test-specific values instead of a single global constant, or- Assert that
retrieveCreatedDateFromPacketis invoked with the expectedregistrationIdandregistrationTypefor NEW vs UPDATE.This would better align the “metaInfo present” test names with what’s actually being asserted, but is not strictly required for correctness.
Also applies to: 2673-2674, 2685-2707, 2710-2732, 2735-2754, 2757-2776
registration-processor/registration-processor-info-storage-service/src/test/java/io/mosip/registration/processor/packet/storage/utils/UtilityTest.java (1)
63-69: Instance‑based Utility tests look good; consider safer stubbing for the spySwitching this suite to exercise the concrete
Utilityinstance gives you strong coverage over the new helper methods and their edge cases (date parsing, age limits/buffers, packetCreated derivation, biometric exception classification, etc.), which is valuable.One small robustness nit: several tests stub methods on the
@Spyutilityusing thewhen(utility.someMethod(...)).thenReturn(...)form. With spies this pattern can invoke the real implementation during stubbing, which can be fragile if the method gains more dependencies later.Where you are stubbing
utility’s own methods (e.g.,getUIn,getIdVidMetadata, etc.), you may want to switch to thedoReturn(...).when(utility).someMethod(...)style:- when(utility.getUIn(any(), any(), any())).thenReturn("123456789012"); + doReturn("123456789012").when(utility).getUIn(any(), any(), any());Same idea applies to other internal
utilitymethods you mock out. This is optional but can make the tests more resilient to future refactors.Also applies to: 82-85, 100-105, 188-221, 239-266, 300-315, 318-335, 345-365, 367-396, 398-427, 429-455, 485-579, 595-615, 619-622, 625-636, 650-659, 661-667, 670-695, 701-772
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
registration-processor/core-processor/registration-processor-bio-dedupe-stage/src/main/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessor.java(1 hunks)registration-processor/core-processor/registration-processor-bio-dedupe-stage/src/test/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessorTest.java(5 hunks)registration-processor/core-processor/registration-processor-uin-generator-stage/src/main/java/io/mosip/registration/processor/stages/uingenerator/stage/UinGeneratorStage.java(2 hunks)registration-processor/core-processor/registration-processor-uin-generator-stage/src/test/java/io/mosip/registration/processor/stages/uigenerator/UinGeneratorStageTest.java(6 hunks)registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utilities.java(3 hunks)registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java(4 hunks)registration-processor/registration-processor-info-storage-service/src/test/java/io/mosip/registration/processor/packet/storage/utils/UtilityTest.java(34 hunks)registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java(6 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-11-18T09:19:25.334Z
Learnt from: ashok-ksharma
Repo: mosip/registration PR: 2209
File: registration-processor/core-processor/registration-processor-uin-generator-stage/src/main/java/io/mosip/registration/processor/stages/uingenerator/stage/UinGeneratorStage.java:1168-1177
Timestamp: 2025-11-18T09:19:25.334Z
Learning: In the MOSIP registration processor, the `updatePacketCreatedOnInDemographicIdentity` method in `UinGeneratorStage.java` is intentionally limited to `RegistrationType.NEW` and `RegistrationType.UPDATE` only, because these are the registration types that update biometrics. `RES_UPDATE` and Mapped `UPDATE` do not update biometrics, so they should not trigger the `packetCreatedOn` update logic.
Applied to files:
registration-processor/core-processor/registration-processor-uin-generator-stage/src/test/java/io/mosip/registration/processor/stages/uigenerator/UinGeneratorStageTest.javaregistration-processor/core-processor/registration-processor-bio-dedupe-stage/src/main/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessor.javaregistration-processor/core-processor/registration-processor-bio-dedupe-stage/src/test/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessorTest.javaregistration-processor/registration-processor-info-storage-service/src/test/java/io/mosip/registration/processor/packet/storage/utils/UtilityTest.javaregistration-processor/core-processor/registration-processor-uin-generator-stage/src/main/java/io/mosip/registration/processor/stages/uingenerator/stage/UinGeneratorStage.javaregistration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utilities.javaregistration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java
🧬 Code graph analysis (2)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/constants/ReprocessorConstants.java (1)
ReprocessorConstants(3-37)
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java (3)
registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/util/JsonUtil.java (1)
JsonUtil(34-317)registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/constant/MappingJsonConstants.java (1)
MappingJsonConstants(3-50)registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/constant/JsonConstant.java (1)
JsonConstant(8-270)
🔇 Additional comments (9)
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utilities.java (1)
5-8: LGTM - Formatting changes only.The changes in this file are limited to import reorganization and minor whitespace adjustments in method signatures. No functional changes.
Also applies to: 541-541, 726-726
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java (5)
58-112: Well-documented configuration properties.The new dependencies and configuration properties are properly injected with clear Javadoc explaining their purpose and default values.
259-297: LGTM - Clear implementation with proper null handling.The method properly validates inputs and throws descriptive exceptions when required data is unavailable. The fallback strategy in
resolveLastPacketProcessedDateprovides multiple ways to determine the packet date.
618-648: LGTM - Fallback pattern with graceful degradation.The method is used as part of a multi-strategy date resolution. Returning
nullon parse errors allows the caller to proceed to the next fallback strategy. Exception details are logged for debugging.
411-417: LGTM - Helper methods are well-implemented.These utility methods have proper null checks, clear logging, and follow consistent patterns for date extraction and validation.
Also applies to: 427-455, 465-480, 505-532, 596-609, 829-847
491-497: LGTM.Simple delegation to
IdRepoService. Callers appropriately handle the nullable return value.registration-processor/core-processor/registration-processor-bio-dedupe-stage/src/test/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessorTest.java (1)
741-754: Updated stubbing againstUtilityaligns with new BioDedupeProcessor behaviorThe switch from
Utilitiesto the injectedUtilityin these tests is consistent with the production changes, and the scenarios (MANUAL_VERIFICATION vs REJECTED, all-biometric-exception, infant, andBiometricClassificationException) remain well covered. No issues from a test-behavior standpoint.Also applies to: 782-795, 820-833, 858-867, 940-947
registration-processor/core-processor/registration-processor-bio-dedupe-stage/src/main/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessor.java (1)
120-122: Utility injection/use in post‑ABIS logic looks consistent and safeThe introduction of the autowired
Utilityand its use inpostAbisIdentification(wasInfantWhenLastPacketProcessed/allBiometricHaveException) preserves the existing decision flow for update packets with no ABIS match, while routing the calls through the shared helper. Exceptions from these methods are already handled at theprocess()level, so there’s no new uncaught path.Also applies to: 469-507
registration-processor/core-processor/registration-processor-uin-generator-stage/src/main/java/io/mosip/registration/processor/stages/uingenerator/stage/UinGeneratorStage.java (1)
190-197: Verify logger call signature and confirm placeholder substitution behaviorThe registration-type guard in
updatePacketCreatedOnInDemographicIdentityis correct—it intentionally limits updates toNEWandUPDATEonly, excludingRES_UPDATEand mapped UPDATE types that do not update biometrics.However, the
regProcLogger.info(...)call at line 1174 uses a{}placeholder with five arguments:regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), registrationId, "Skipping update of packetCreatedOn. registrationType: {}", object.getReg_type());MOSIP's kernel logger uses a custom wrapper for structured logging (sessionId, registrationId). Verify whether this logger implementation supports SLF4J-style
{}placeholder substitution with varargs. If not, use string concatenation instead:regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), registrationId, "Skipping update of packetCreatedOn. registrationType: " + object.getReg_type());
|
|
||
| // Check if date is older than 100 years | ||
| LocalDateTime hundredYearsAgo = now.minusYears(200); | ||
| if (before(ldt, hundredYearsAgo)) { | ||
| regProcLogger.error("Date is older than 200 years : {}", ldt); | ||
| return null; | ||
| } |
There was a problem hiding this comment.
Inconsistent comment: says 100 years but code checks 200 years.
The comment states "Check if date is older than 100 years" but the code actually checks for 200 years. Update the comment to match the implementation.
- // Check if date is older than 100 years
+ // Check if date is older than 200 years
LocalDateTime hundredYearsAgo = now.minusYears(200);Also consider renaming hundredYearsAgo to twoHundredYearsAgo for consistency.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Check if date is older than 100 years | |
| LocalDateTime hundredYearsAgo = now.minusYears(200); | |
| if (before(ldt, hundredYearsAgo)) { | |
| regProcLogger.error("Date is older than 200 years : {}", ldt); | |
| return null; | |
| } | |
| // Check if date is older than 200 years | |
| LocalDateTime hundredYearsAgo = now.minusYears(200); | |
| if (before(ldt, hundredYearsAgo)) { | |
| regProcLogger.error("Date is older than 200 years : {}", ldt); | |
| return null; | |
| } |
🤖 Prompt for AI Agents
In
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java
around lines 574 to 580, the inline comment incorrectly says "100 years" while
the code checks for 200 years; change the comment to state "Check if date is
older than 200 years" and rename the local variable hundredYearsAgo to
twoHundredYearsAgo (and update all its references) so the identifier matches the
check and maintain consistency with the existing log message.
| for (BIR bir : birs) { | ||
| BiometricType type = bir.getBdbInfo().getType().get(0); | ||
| boolean isFaceOrExceptionPhoto = type == BiometricType.FACE || type == BiometricType.EXCEPTION_PHOTO; | ||
|
|
||
| regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid, | ||
| "utility::allBiometricHaveException():: Checking biometric type: {} ",type); | ||
|
|
||
| if (hasOthers) { | ||
| if (!isFaceOrExceptionPhoto) { | ||
| String exceptionValue = bir.getOthers().get(EXCEPTION); | ||
| regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid, | ||
| "utility::allBiometricHaveException():: Biometric type: {}, exceptionValue: {}", type, exceptionValue); | ||
|
|
||
| if (exceptionValue == null || !exceptionValue.equalsIgnoreCase(TRUE)) { | ||
| regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid, | ||
| "utility::allBiometricHaveException():: Biometric type: {} does not have exception", type); | ||
| return false; | ||
| } |
There was a problem hiding this comment.
Potential NullPointerException risks in biometric processing.
Two NPE concerns:
-
Line 747:
bir.getBdbInfo().getType().get(0)- No null checks. IfgetBdbInfo()is null,getType()is null, or the list is empty, this will throw. -
Line 755:
bir.getOthers().get(EXCEPTION)-hasOtherschecks if any BIR in the list has others, but this specificbirmight havenullforgetOthers().
for (BIR bir : birs) {
- BiometricType type = bir.getBdbInfo().getType().get(0);
+ if (bir.getBdbInfo() == null || bir.getBdbInfo().getType() == null || bir.getBdbInfo().getType().isEmpty()) {
+ regProcLogger.warn(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid,
+ "utility::allBiometricHaveException():: Skipping BIR with missing BdbInfo or Type");
+ continue;
+ }
+ BiometricType type = bir.getBdbInfo().getType().get(0);
boolean isFaceOrExceptionPhoto = type == BiometricType.FACE || type == BiometricType.EXCEPTION_PHOTO;
// ... later in the code
if (hasOthers) {
if (!isFaceOrExceptionPhoto) {
- String exceptionValue = bir.getOthers().get(EXCEPTION);
+ Map<String, String> birOthers = bir.getOthers();
+ String exceptionValue = (birOthers != null) ? birOthers.get(EXCEPTION) : null;🤖 Prompt for AI Agents
In
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java
around lines 746 to 763, the code can throw NPEs when bir.getBdbInfo(),
bir.getBdbInfo().getType() is null or empty, or when bir.getOthers() is null for
the current bir; add defensive null/empty checks before dereferencing: first
verify bir!=null && bir.getBdbInfo()!=null && bir.getBdbInfo().getType()!=null
&& !bir.getBdbInfo().getType().isEmpty() before accessing get(0) and skip or
treat missing type as not matching FACE/EXCEPTION_PHOTO; second, ensure
bir.getOthers()!=null and bir.getOthers().containsKey(EXCEPTION) before calling
get(EXCEPTION) and treat absent exception entries as not TRUE (log and return
false per current behavior). Update debug/info logs to reflect when values are
missing to aid tracing.
Signed-off-by: Muralitharan K <muralitharan@mosip.io>
Signed-off-by: Muralitharan K <muralitharan@mosip.io>
353c3a7 to
7bfef95
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (3)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (3)
100-101: Instance fieldisBatchSuccessfulis not thread-safe and scoped incorrectly.This issue was raised in a previous review but remains unresolved. The field is shared across all invocations of
process(). If the scheduler triggers concurrent batch executions, different batches will interfere with each other's state. This should be a local variable within theprocessmethod.Additionally, this field is read in the
finallyblock (lines 294-306) which executes immediately after the asyncCompositeFuture.all(...)call, before the async work completes. See the related race condition comment below.
371-382:moduleIdand audit event codes don't reflect failure state.These issues were raised in previous reviews but remain unresolved:
Line 372:
moduleIdis always set toRPR_SENT_TO_REPROCESS_SUCCESS.getCode()even whenisTransactionSuccessfulisfalse(reprocess count exhausted at line 323).Lines 375-377: When
isTransactionSuccessfulisfalse, the audit is logged with success event codes (RPR_402,UPDATE,BUSINESS) instead of failure codes.🔎 Proposed fix
/** Module-Id can be Both Success/Error code */ - String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode(); + String moduleId = isTransactionSuccessful + ? PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode() + : description.getCode(); String moduleName = ModuleName.RE_PROCESSOR.toString(); registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName); - String eventId = EventId.RPR_402.toString(); - String eventName = EventName.UPDATE.toString(); - String eventType = EventType.BUSINESS.toString(); + String eventId = isTransactionSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString(); + String eventName = isTransactionSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString(); + String eventType = isTransactionSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString(); if (!isTransactionSuccessful) { auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, eventType, moduleId, moduleName, registrationId); }
246-270: Critical race condition:finallyblock executes before async processing completes.The
CompositeFuture.all(...).onComplete(...)at line 264 is non-blocking. Thefinallyblock (lines 291-307) executes immediately after, before any async work completes. This means:
isBatchSuccessfulwill betrue(set at line 247) when audit is logged, regardless of actual outcomes- Individual
processDTOfailures won't update batch status since processing happens asynchronouslyAdditionally, if
processDTOthrows an exception at line 253,p.complete()at line 254 is never called, causing the promise to hang indefinitely.🔎 Recommended fix
Move audit logging inside the
onCompletecallback and add exception handling:if (!CollectionUtils.isEmpty(reprocessorDtoList)) { - isBatchSuccessful = true; regProcLogger.info("Reprocess count - {}", reprocessorDtoList.size()); - List<Future> futures = new ArrayList<>(); + List<Future<Void>> futures = new ArrayList<>(); + boolean[] batchSuccess = {true}; reprocessorDtoList.forEach(dto -> { Promise<Void> promise = Promise.promise(); vertx.executeBlocking(p -> { - processDTO(reprocessRestartTriggerMap, dto); - p.complete(); + try { + processDTO(reprocessRestartTriggerMap, dto); + p.complete(); + } catch (Exception e) { + p.fail(e); + } }, false, res -> { if (res.succeeded()) { promise.complete(); } else { + batchSuccess[0] = false; promise.fail(res.cause()); } }); futures.add(promise.future()); }); CompositeFuture.all(futures).onComplete(ar -> { - if (ar.succeeded()) { - regProcLogger.info("Successfully processed count - {}", futures.size()); - } else { - regProcLogger.error("Failed to process some DTOs", ar.cause()); - } + // Move audit logging here + LogDescription desc = new LogDescription(); + if (ar.succeeded() && batchSuccess[0]) { + desc.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getMessage()); + // Log success audit + } else { + // Log failure audit + } }); }
🧹 Nitpick comments (2)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (2)
11-11: Consider explicit imports over wildcard import.Wildcard imports (
io.vertx.core.*;) can reduce readability and may cause naming conflicts. Consider importing only the specific classes used:Future,Promise,CompositeFuture,Vertx,AsyncResult.
249-249: Use parameterized type instead of rawFuture.
List<Future>uses a raw type. UseList<Future<Void>>for type safety and to avoid compiler warnings.- List<Future> futures = new ArrayList<>(); + List<Future<Void>> futures = new ArrayList<>();
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java
🔇 Additional comments (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)
385-407: Typo fix in method name is correct.The rename from
intializeReprocessRestartTriggerMappingtoinitializeReprocessRestartTriggerMappingcorrects the spelling. The initialization logic for the restart trigger mapping appears sound.
Signed-off-by: Muralitharan K <muralitharan@mosip.io>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)
221-221: PreferStringBuilderoverStringBuffer.
ridSbis populated in a sequentialforEachbefore any async work begins and is only read (not written) inside theonCompletelambda. The synchronization overhead ofStringBufferis unnecessary here.♻️ Proposed fix
- StringBuffer ridSb=new StringBuffer(); + StringBuilder ridSb = new StringBuilder();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java` at line 221, The code uses a synchronized StringBuffer for ridSb in ReprocessorVerticle which is unnecessary; replace the StringBuffer instantiation with a non-synchronized StringBuilder (i.e., change the declaration/initialization of ridSb to use StringBuilder) and ensure all subsequent uses (appends and final read in the onComplete lambda) work with StringBuilder to remove unnecessary synchronization overhead.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`:
- Around line 282-286: Remove the stray extra semicolon after getCode() when
assigning moduleId, avoid the unnecessary enum toString() call by using
ModuleName.RE_PROCESSOR.name() (or otherwise assign the enum's String name) for
moduleName, and replace ridSb.toString().length() with ridSb.length() to avoid
creating a temporary String; these changes should be applied around the
moduleId/moduleName assignment and the call to
auditLogRequestBuilder.createAuditRequestBuilder(...) in ReprocessorVerticle.
- Around line 265-288: The batch currently uses CompositeFuture.all(futures)
which fails fast and triggers the audit before all per-DTO executeBlocking tasks
finish; change CompositeFuture.all(futures) to CompositeFuture.join(futures) so
the handler runs only after every future completes (success or failure), keep
the isBatchSuccessful = ar.succeeded() check, and leave the existing
logging/audit call sequence (regProcLogger.error/ regProcLogger.info and
auditLogRequestBuilder.createAuditRequestBuilder) intact so the batch audit is
emitted only after all worker futures have finished.
---
Duplicate comments:
In
`@registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`:
- Around line 249-264: Change the raw List<Future> to a typed List<Future<Void>>
and make the blocking handler explicitly propagate exceptions: inside the
vertx.executeBlocking handler that calls processDTO(dto) wrap the call in
try-catch and call p.fail(e) on exception (and p.complete() on success) so
failures are delivered to the Promise and then to promise.fail(res.cause()) as
already wired; update the list declaration and usages to List<Future<Void>> and
keep Promise<Void> promise = Promise.promise() for each dto.
- Around line 391-401: The code always uses success constants for moduleId and
audit event fields even when isTransactionSuccessful is false; update the logic
in ReprocessorVerticle so moduleId, eventId, eventName and eventType are chosen
based on isTransactionSuccessful (use the appropriate failure constants when
false) before calling
registrationStatusService.updateRegistrationStatusForWorkflowEngine(...) and
before calling auditLogRequestBuilder.createAuditRequestBuilder(...), ensuring
the updateRegistrationStatusForWorkflowEngine and audit call receive the correct
failure codes (refer to variables moduleId, eventId, eventName, eventType,
method registrationStatusService.updateRegistrationStatusForWorkflowEngine and
auditLogRequestBuilder.createAuditRequestBuilder, and the
isTransactionSuccessful flag).
---
Nitpick comments:
In
`@registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`:
- Line 221: The code uses a synchronized StringBuffer for ridSb in
ReprocessorVerticle which is unnecessary; replace the StringBuffer instantiation
with a non-synchronized StringBuilder (i.e., change the
declaration/initialization of ridSb to use StringBuilder) and ensure all
subsequent uses (appends and final read in the onComplete lambda) work with
StringBuilder to remove unnecessary synchronization overhead.
| CompositeFuture.all(futures).onComplete(ar -> { | ||
| boolean isBatchSuccessful = ar.succeeded(); | ||
| try { | ||
| if (!isBatchSuccessful) { | ||
| regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), | ||
| null, PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getMessage() | ||
| + ExceptionUtils.getStackTrace(ar.cause())); | ||
| } | ||
| } finally { | ||
| String message = isBatchSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getMessage() : | ||
| PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getMessage(); | ||
| regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), | ||
| null, message); | ||
| String eventId = isBatchSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString(); | ||
| String eventName = isBatchSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString(); | ||
| String eventType = isBatchSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString(); | ||
| /** Module-Id can be Both Success/Error code */ | ||
| String moduleId = isBatchSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode() : | ||
| PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode();; | ||
| String moduleName = ModuleName.RE_PROCESSOR.toString(); | ||
| auditLogRequestBuilder.createAuditRequestBuilder(message, eventId, eventName, eventType, | ||
| moduleId, moduleName, (ridSb.toString().length()>1?ridSb.substring(0,ridSb.length()-1):"")); | ||
| } | ||
| regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), | ||
| LoggerFileConstant.REGISTRATIONID.toString(), registrationId, description.getMessage()); | ||
|
|
||
| /** Module-Id can be Both Success/Error code */ | ||
| String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode(); | ||
| String moduleName = ModuleName.RE_PROCESSOR.toString(); | ||
| registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName); | ||
| String eventId = EventId.RPR_402.toString(); | ||
| String eventName = EventName.UPDATE.toString(); | ||
| String eventType = EventType.BUSINESS.toString(); | ||
|
|
||
| if (!isTransactionSuccessful) | ||
| auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, | ||
| eventType, moduleId, moduleName, registrationId); | ||
| }); |
There was a problem hiding this comment.
CompositeFuture.all() fails fast — batch audit fires before all DTOs complete on partial failure.
CompositeFuture.all() fails immediately when any future fails, while CompositeFuture.join() continues to run all remaining futures even after one fails. With the current all() semantics, if the first DTO's executeBlocking fails, onComplete fires immediately with isBatchSuccessful = false, logging the batch audit while the remaining N-1 worker threads are still executing processDTO. Those threads will still call registrationStatusService.updateRegistrationStatusForWorkflowEngine() concurrently after the batch audit has already been emitted.
For batch-processing semantics (process all, then audit), Future.join() (or CompositeFuture.join()) is more appropriate:
♻️ Proposed fix
- Future.all(futures).onComplete(ar -> {
+ Future.join(futures).onComplete(ar -> {
boolean isBatchSuccessful = ar.succeeded();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`
around lines 265 - 288, The batch currently uses CompositeFuture.all(futures)
which fails fast and triggers the audit before all per-DTO executeBlocking tasks
finish; change CompositeFuture.all(futures) to CompositeFuture.join(futures) so
the handler runs only after every future completes (success or failure), keep
the isBatchSuccessful = ar.succeeded() check, and leave the existing
logging/audit call sequence (regProcLogger.error/ regProcLogger.info and
auditLogRequestBuilder.createAuditRequestBuilder) intact so the batch audit is
emitted only after all worker futures have finished.
| String moduleId = isBatchSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode() : | ||
| PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode();; | ||
| String moduleName = ModuleName.RE_PROCESSOR.toString(); | ||
| auditLogRequestBuilder.createAuditRequestBuilder(message, eventId, eventName, eventType, | ||
| moduleId, moduleName, (ridSb.toString().length()>1?ridSb.substring(0,ridSb.length()-1):"")); |
There was a problem hiding this comment.
Double semicolon on line 283 and unnecessary toString() call on line 286.
Line 283 has a stray extra semicolon (getCode();;). Line 286 calls ridSb.toString().length(), which allocates a temporary String object just to read the length; ridSb.length() is equivalent and allocation-free.
♻️ Proposed fixes
String moduleId = isBatchSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode() :
- PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode();;
+ PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode();
String moduleName = ModuleName.RE_PROCESSOR.toString();
auditLogRequestBuilder.createAuditRequestBuilder(message, eventId, eventName, eventType,
- moduleId, moduleName, (ridSb.toString().length()>1?ridSb.substring(0,ridSb.length()-1):""));
+ moduleId, moduleName, (ridSb.length() > 1 ? ridSb.substring(0, ridSb.length()-1) : ""));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`
around lines 282 - 286, Remove the stray extra semicolon after getCode() when
assigning moduleId, avoid the unnecessary enum toString() call by using
ModuleName.RE_PROCESSOR.name() (or otherwise assign the enum's String name) for
moduleName, and replace ridSb.toString().length() with ridSb.length() to avoid
creating a temporary String; these changes should be applied around the
moduleId/moduleName assignment and the call to
auditLogRequestBuilder.createAuditRequestBuilder(...) in ReprocessorVerticle.
Summary by CodeRabbit
Bug Fixes
Refactor