Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@
- `cargo xtask test-slice durability-release` now includes the exact
`wal_process_crashpoints` witness, promoting the process-kill WAL crashpoint
runner from future descriptor to release-gate evidence.
- `cargo xtask dind` now defaults to run mode and includes the exact
`dind_durability_convergence_gate` witness, proving live WAL execution,
read-only WAL recovery, WSC import, and retained-material reveal agree on the
same app-facing receipt and bounded reading while missing or corrupt support
material returns typed obstruction.
- `cargo xtask test-slice durability-release` now includes the exact
`dind_durability_convergence_gate` witness so the release slice also carries
the DIND durability convergence proof.
- `warp-core` trusted runtime hosts now configure runtime WAL through
`TrustedRuntimeWalConfig`, including in-memory and filesystem-backed
adapters. `TrustedRuntimeWalStoreKind` exposes the configured adapter kind as
Expand Down
367 changes: 367 additions & 0 deletions crates/warp-core/tests/causal_wal_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1923,6 +1923,373 @@ fn wsc_retained_evidence_export_modes() {
));
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct DindDurabilityOutcome {
submission_id: Hash,
canonical_envelope_digest: Hash,
ticket_digest: Hash,
receipt_digest: Hash,
decision: WalTickDecision,
reading_id: Hash,
reading_coordinate_digest: Hash,
reading_payload_digest: Hash,
reading_envelope_digest: Hash,
retained_material_digest: Hash,
retained_payload_len: u64,
}

fn dind_durability_outcome(
acceptance: SubmissionAcceptanceRecord,
receipt: TickReceiptRecord,
reading: ReadingRefRecord,
retained_material: RetainedMaterialRecord,
retained_payload_len: usize,
) -> DindDurabilityOutcome {
DindDurabilityOutcome {
submission_id: acceptance.submission_id,
canonical_envelope_digest: acceptance.canonical_envelope_digest,
ticket_digest: receipt.ticket_digest,
receipt_digest: receipt.receipt_digest,
decision: receipt.decision,
reading_id: reading.reading_id,
reading_coordinate_digest: reading.semantic_coordinate_digest,
reading_payload_digest: reading.payload_digest,
reading_envelope_digest: reading.envelope_digest,
retained_material_digest: retained_material.material_digest,
retained_payload_len: u64::try_from(retained_payload_len).unwrap_or(u64::MAX),
}
}

#[test]
fn dind_durability_convergence_gate() {
let label = "dind-durability-convergence";
let dir = temp_wal_dir(label);
let mut store = must_ok(FilesystemWalStore::open(&dir, WalSegmentId::from_raw(1)));
let writer_epoch = must_ok(store.acquire_writer_epoch(writer_epoch_request()));
let acceptance = submission_acceptance(label);
let receipt = receipt_record(label, WalTickDecision::Applied);
let correlation = correlation_record(label);
let retained_payload = b"dind convergence retained reading bytes";
let retained_digest = blake3::hash(retained_payload).into();
let retained_coordinate = digest("coordinate:dind-durability-convergence");
let retained_material = RetainedMaterialRecord {
material_digest: retained_digest,
semantic_coordinate_digest: retained_coordinate,
kind: RetainedMaterialKind::ReadingPayload,
posture: EvidenceMaterialPosture::Present,
};
let reading = ReadingRefRecord {
reading_id: digest("reading:dind-durability-convergence"),
semantic_coordinate_digest: retained_coordinate,
payload_digest: retained_digest,
envelope_digest: digest("envelope:dind-durability-convergence"),
posture: EvidenceMaterialPosture::Present,
};
let expected_outcome = dind_durability_outcome(
acceptance,
receipt,
reading,
retained_material,
retained_payload.len(),
);

let submission_tx = durable_submission_transaction(label, Lsn::from_raw(0));
let tick_tx = durable_tick_transaction(label, Lsn::from_raw(2), WalTickDecision::Applied);
let reading_tx = must_ok(build_retained_reading_transaction(
builder(
transaction_id("tx:dind-durability-convergence:reading"),
Lsn::from_raw(5),
WalAppendAuthority::TrustedScheduler,
WalTransactionKind::SchedulerTick,
),
std::slice::from_ref(&retained_material),
reading,
vec![frontier(
AffectedFrontierKind::ReadingIndex,
"dind-convergence:reading:before",
"dind-convergence:reading:after",
)],
));
let mut live_state = RecoveredState::default();
for transaction in [&submission_tx, &tick_tx, &reading_tx] {
live_state = must_ok(apply_committed_transaction(live_state, transaction));
}

must_ok(store.append_transaction(submission_tx));
must_ok(store.append_transaction(tick_tx));
must_ok(store.append_transaction(reading_tx));
must_ok(store.seal_segment(epoch_id(), WalSegmentId::from_raw(1)));
let segment_path = store.segment_path();
let segment_bytes = must_ok(fs::read(&segment_path));
let last_commit = must_some(store.read_commits().last().cloned());
let manifest = WalManifest {
manifest_digest: digest("dind-durability-convergence:manifest"),
last_committed_lsn: Some(last_commit.last_lsn),
last_commit_digest: Some(last_commit.commit_digest),
sealed_segment_count: 1,
};
must_ok(store.publish_manifest(epoch_id(), manifest));

assert_eq!(live_state.applied_transactions.len(), 3);
assert_eq!(
live_state
.frontiers
.get(&AffectedFrontierKind::RuntimeState),
Some(&digest("state:dind-durability-convergence:after"))
);
assert_eq!(
live_state
.frontiers
.get(&AffectedFrontierKind::ReadingIndex),
Some(&digest("dind-convergence:reading:after"))
);

let report = must_ok(recover_filesystem_store(&dir, RecoveryAccessMode::ReadOnly));
let recovered_submissions = must_ok(recover_submission_index(&report));
let recovered_receipts = must_ok(recover_receipt_index(&report));
let recovered_retention = must_ok(recover_retention_index(&report));
let recovered_outcome = dind_durability_outcome(
must_some(
recovered_submissions
.get(&acceptance.submission_id)
.map(|entry| entry.acceptance),
),
TickReceiptRecord {
submission_id: acceptance.submission_id,
ticket_digest: must_some(
recovered_receipts
.ticket_by_submission
.get(&acceptance.submission_id)
.copied(),
),
receipt_digest: must_some(
recovered_receipts
.receipt_by_submission
.get(&acceptance.submission_id)
.copied(),
),
decision: must_some(
recovered_receipts
.decisions_by_receipt
.get(&receipt.receipt_digest)
.copied(),
),
},
must_some(
recovered_retention
.reading_by_id
.get(&reading.reading_id)
.copied(),
),
must_some(
recovered_retention
.material_by_digest
.get(&retained_material.material_digest)
.copied(),
),
retained_payload.len(),
);
assert_eq!(recovered_outcome, expected_outcome);
assert!(retained_material_obstructions(
&recovered_retention,
&BTreeSet::from([retained_material.material_digest]),
)
.is_empty());

let certificate = build_recovery_certificate(
&report,
None,
0,
digest("dind-durability-convergence:frontier"),
digest("dind-durability-convergence:indexes"),
);
let writer_epoch = WalWriterEpoch::from_writer_epoch(&writer_epoch);
let projection = project_filesystem_wal_recovery(
&dir,
&report,
std::slice::from_ref(&writer_epoch),
Some(&certificate),
);
assert_eq!(projection.posture, WalRecoveryProjectionPosture::Present);
let root = must_some(projection.root);
let segment = root.segments[0].clone();
let root_identity_digest = root.identity_digest();

let self_contained = must_ok(wsc_self_contained_wal_export(
&root,
&[WscSelfContainedWalSegmentMaterial {
segment_id: segment.segment_id,
segment_bytes: segment_bytes.clone(),
}],
&[WscSelfContainedRetainedMaterial {
material: retained_material,
material_bytes: retained_payload.to_vec(),
}],
wsc_records(
&[retained_material],
&[reading],
&[acceptance],
&[receipt],
&[correlation],
),
));
let imported_self_contained = must_ok(validate_wsc_self_contained_wal_export(
&self_contained,
&root,
));
let self_contained_outcome = dind_durability_outcome(
imported_self_contained.accepted_submissions[0],
imported_self_contained.receipts[0],
imported_self_contained.retention.readings[0],
imported_self_contained.retention.materials[0],
imported_self_contained.retained_payloads[0]
.material_bytes
.len(),
);
assert_eq!(self_contained_outcome, expected_outcome);
assert_eq!(
imported_self_contained.root_identity_digest,
root_identity_digest
);
assert_eq!(
imported_self_contained.retained_payloads[0].material_bytes,
retained_payload
);

let mut cas_store = MemoryTier::new();
let segment_content_hash = *cas_store.put(&segment_bytes).as_bytes();
let retained_content_hash = *cas_store.put(retained_payload).as_bytes();
let cas_addressed = must_ok(wsc_cas_addressed_wal_export(
&root,
&[WscCasAddressedWalSegmentMaterial {
segment_id: segment.segment_id,
content_hash: segment_content_hash,
semantic_coordinate_digest: digest("dind-durability-convergence:segment"),
byte_len: u64::try_from(segment_bytes.len()).unwrap_or(u64::MAX),
}],
&[WscCasAddressedRetainedMaterialReference {
material_kind: RetainedMaterialKind::ReadingPayload,
content_hash: retained_content_hash,
semantic_coordinate_digest: retained_coordinate,
byte_len: u64::try_from(retained_payload.len()).unwrap_or(u64::MAX),
}],
wsc_records(
&[retained_material],
&[reading],
&[acceptance],
&[receipt],
&[correlation],
),
));
let imported_cas = must_ok(validate_wsc_cas_addressed_wal_export(
&cas_addressed,
&root,
&EchoCasAvailability(&cas_store),
));
let cas_outcome = dind_durability_outcome(
imported_cas.accepted_submissions[0],
imported_cas.receipts[0],
imported_cas.retention.readings[0],
imported_cas.retention.materials[0],
usize::try_from(imported_cas.cas_references.retained_materials[0].byte_len)
.unwrap_or(usize::MAX),
);
assert_eq!(cas_outcome, expected_outcome);
assert_eq!(imported_cas.root_identity_digest, root_identity_digest);
assert_eq!(
imported_cas.cas_references.retained_materials[0].content_hash,
retained_content_hash
);

let mut segment_only_cas = MemoryTier::new();
assert_eq!(
*segment_only_cas.put(&segment_bytes).as_bytes(),
segment_content_hash
);
let missing_retained = must_err(
validate_wsc_cas_addressed_wal_export(
&cas_addressed,
&root,
&EchoCasAvailability(&segment_only_cas),
),
"missing retained material must obstruct rather than diverge",
);
assert!(matches!(
missing_retained,
WscCasAddressedWalImportError::MissingCasBlob {
content_hash,
semantic_coordinate_digest,
} if content_hash == retained_content_hash
&& semantic_coordinate_digest == retained_coordinate
));

let mut corrupted_retained_bytes = retained_payload.to_vec();
corrupted_retained_bytes[0] ^= 0x7f;
let corrupt_retained = must_err(
wsc_self_contained_wal_export(
&root,
&[WscSelfContainedWalSegmentMaterial {
segment_id: segment.segment_id,
segment_bytes: segment_bytes.clone(),
}],
&[WscSelfContainedRetainedMaterial {
material: retained_material,
material_bytes: corrupted_retained_bytes,
}],
wsc_records(
&[retained_material],
&[reading],
&[acceptance],
&[receipt],
&[correlation],
),
),
"corrupt embedded retained material must obstruct rather than diverge",
);
assert!(matches!(
corrupt_retained,
WscSelfContainedWalExportError::RetainedMaterialDigestMismatch {
expected,
actual
} if expected == retained_digest && actual != expected
));

let mut corrupted_segment_bytes = segment_bytes;
let Some(last_byte) = corrupted_segment_bytes.last_mut() else {
panic!("DIND convergence WAL fixture unexpectedly produced empty segment bytes");
};
*last_byte ^= 0x7f;
let corrupt_self_contained = must_ok(wsc_self_contained_wal_export(
&root,
&[WscSelfContainedWalSegmentMaterial {
segment_id: segment.segment_id,
segment_bytes: corrupted_segment_bytes,
}],
&[WscSelfContainedRetainedMaterial {
material: retained_material,
material_bytes: retained_payload.to_vec(),
}],
wsc_records(
&[retained_material],
&[reading],
&[acceptance],
&[receipt],
&[correlation],
),
));
let corrupt_segment = must_err(
validate_wsc_self_contained_wal_export(&corrupt_self_contained, &root),
"corrupt embedded WAL material must obstruct rather than diverge",
);
assert!(matches!(
corrupt_segment,
WscSelfContainedWalImportError::SegmentRecovery {
segment_id,
error: WalRecoveryError::Store(WalStoreError::SegmentRecordDigestMismatch),
} if segment_id == segment.segment_id
));
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

fn root_with_absolute_locator(root: &WalRoot, path: &str) -> WalRoot {
let mut root = root.clone();
root.segments[0].storage_locator =
Expand Down
Loading
Loading