Skip to content

Make asset downloading concurrent-safe#1989

Open
shi-eric wants to merge 3 commits intonewton-physics:mainfrom
shi-eric:shi-eric/concurrent-safe-asset-download
Open

Make asset downloading concurrent-safe#1989
shi-eric wants to merge 3 commits intonewton-physics:mainfrom
shi-eric:shi-eric/concurrent-safe-asset-download

Conversation

@shi-eric
Copy link
Member

@shi-eric shi-eric commented Mar 7, 2026

Description

Make download_git_folder() safe under concurrent multi-process/thread access using a temp-directory + safe-rename pattern, inspired by Warp's kernel cache approach.

Each caller now clones into a PID/TID-unique temp directory, then atomically renames it to the final cache location. If another process already placed the cache, the rename is a no-op and the temp directory is discarded. Orphaned temp directories from crashed processes are cleaned up automatically after 1 hour.

This eliminates the need for the manually maintained pre-download list in unittest_parallel.py, which was a fragile workaround for the lack of concurrency safety (forgetting to update it caused flaky test failures).

Closes #1987

Changes

  • newton/_src/utils/download_assets.py:
    • Add _safe_rename() — tolerates races where another process creates the destination first
    • Add _temp_cache_path() — returns a PID/TID-unique temp path
    • Add _cleanup_stale_temp_dirs() — removes orphaned temp dirs older than 1 hour
    • Modify download_git_folder() to clone into temp dir, then safe-rename into final cache location
  • newton/tests/test_download_assets.py — add tests for all new helpers + concurrent download test
  • newton/tests/thirdparty/unittest_parallel.py — remove _parallel_download(), asset lists, menagerie folder lists, and related imports

Checklist

  • New or existing tests cover these changes
  • The documentation is up to date with these changes
  • CHANGELOG.md has been updated (if user-facing change)

Test plan

Unit tests:

uv run --extra dev python -m unittest newton.tests.test_download_assets -v

All 10 tests pass:

  • TestSafeRename — rename success and destination-exists cases
  • TestTempCachePath — PID/TID uniqueness across threads
  • TestCleanupStaleTempDirs — removes old orphans, preserves recent, ignores unrelated
  • TestDownloadAssets.test_download_and_refresh — basic download + stale refresh + force refresh
  • TestDownloadAssets.test_concurrent_download — 4 threads racing to download the same asset

Additionally, a standalone stress test (not committed) was used to validate with real assets and spawned processes. It clears the cache, spawns 8 worker processes that all race to download anybotics_anymal_d, verifies no leftover temp directories, and loads the asset into a Newton model to verify integrity. Passed across multiple rounds:

Stress test script (not committed)
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025 The Newton Developers
# SPDX-License-Identifier: Apache-2.0

"""Stress test for concurrent asset downloading.

Clears the asset cache, spawns many processes that all race to download the
same asset, then verifies the asset is usable by loading a Newton model from
it. This exercises the temp-dir + safe-rename concurrency mechanism.

Usage:
    python newton/tests/stress_test_concurrent_download.py
    python newton/tests/stress_test_concurrent_download.py --workers 16
    python newton/tests/stress_test_concurrent_download.py --asset unitree_h1
"""

from __future__ import annotations

import argparse
import multiprocessing
import os
import sys
import tempfile
import time


def download_worker(args):
    """Worker function that downloads an asset and returns (pid, path, elapsed, error)."""
    asset_name, cache_dir = args
    pid = os.getpid()
    start = time.perf_counter()
    try:
        from newton._src.utils.download_assets import download_asset

        path = download_asset(asset_name, cache_dir=cache_dir)
        elapsed = time.perf_counter() - start
        return (pid, str(path), elapsed, None)
    except Exception as e:
        elapsed = time.perf_counter() - start
        return (pid, None, elapsed, str(e))


def verify_asset_usable(asset_name, cache_dir):
    """Load the downloaded asset into a Newton model to verify it's not corrupted."""
    from newton._src.utils.download_assets import download_asset

    asset_path = download_asset(asset_name, cache_dir=cache_dir)

    # Determine what file to load based on what's available in the asset
    usda_candidates = [
        "usd_structured",  # structured USD dir (e.g. unitree_h1, unitree_g1)
        "usd",  # flat USD dir (e.g. anymal_d)
    ]
    urdf_candidates = [
        "urdf",  # URDF dir (e.g. franka_emika_panda)
    ]

    import newton

    builder = newton.ModelBuilder()

    # Try USD first
    for subdir in usda_candidates:
        candidate = asset_path / subdir
        if candidate.is_dir():
            usda_files = list(candidate.glob("*.usda")) + list(candidate.glob("*.usd"))
            if usda_files:
                asset_file = str(usda_files[0])
                print(f"  Loading USD: {asset_file}")
                builder.add_usd(asset_file)
                model = builder.finalize()
                print(f"  Model built: {model.body_count} bodies, {model.joint_count} joints")
                return True

    # Try URDF
    for subdir in urdf_candidates:
        candidate = asset_path / subdir
        if candidate.is_dir():
            urdf_files = list(candidate.glob("*.urdf"))
            if urdf_files:
                asset_file = str(urdf_files[0])
                print(f"  Loading URDF: {asset_file}")
                builder.add_urdf(asset_file)
                model = builder.finalize()
                print(f"  Model built: {model.body_count} bodies, {model.joint_count} joints")
                return True

    # For assets like manipulation_objects that just have model.usda
    model_file = asset_path / "model.usda"
    if model_file.exists():
        print(f"  Loading USD: {model_file}")
        from pxr import Usd

        stage = Usd.Stage.Open(str(model_file))
        prim_count = len(list(stage.TraverseAll()))
        print(f"  USD stage opened: {prim_count} prims")
        return True

    print(f"  Warning: No loadable model found in {asset_path}, skipping model verification")
    print(f"  Asset directory contents: {list(asset_path.iterdir())}")
    return True  # Asset was downloaded, just no model format we know how to load


def main():
    parser = argparse.ArgumentParser(description="Stress test concurrent asset downloading")
    parser.add_argument(
        "--workers",
        type=int,
        default=8,
        help="Number of concurrent worker processes (default: 8)",
    )
    parser.add_argument(
        "--asset",
        default="anybotics_anymal_d",
        help="Asset name to download (default: anybotics_anymal_d)",
    )
    parser.add_argument(
        "--rounds",
        type=int,
        default=3,
        help="Number of rounds to repeat the stress test (default: 3)",
    )
    args = parser.parse_args()

    # Use a fresh temp directory as cache to ensure clean state
    cache_dir = tempfile.mkdtemp(prefix="nwtn_stress_")
    print(f"Cache directory: {cache_dir}")
    print(f"Asset: {args.asset}")
    print(f"Workers: {args.workers}")
    print(f"Rounds: {args.rounds}")
    print()

    total_failures = 0

    for round_num in range(1, args.rounds + 1):
        print(f"{'='*60}")
        print(f"Round {round_num}/{args.rounds}")
        print(f"{'='*60}")

        # Clear cache before each round
        from newton._src.utils.download_assets import _safe_rmtree

        _safe_rmtree(cache_dir)
        os.makedirs(cache_dir, exist_ok=True)
        print(f"Cleared cache: {cache_dir}")

        # Spawn workers
        worker_args = [(args.asset, cache_dir)] * args.workers
        print(f"Spawning {args.workers} workers...")
        round_start = time.perf_counter()

        ctx = multiprocessing.get_context("spawn")
        with ctx.Pool(args.workers) as pool:
            results = pool.map(download_worker, worker_args)

        round_elapsed = time.perf_counter() - round_start

        # Report results
        successes = 0
        failures = 0
        for pid, path, elapsed, error in results:
            if error:
                print(f"  FAIL  pid={pid}  elapsed={elapsed:.2f}s  error={error}")
                failures += 1
            else:
                print(f"  OK    pid={pid}  elapsed={elapsed:.2f}s  path={path}")
                successes += 1

        print(f"\nRound {round_num}: {successes}/{args.workers} succeeded in {round_elapsed:.2f}s")

        if failures:
            print(f"  {failures} FAILURES")
            total_failures += failures
            continue

        # Check for leftover temp directories (should all be cleaned up)
        from pathlib import Path

        leftovers = [
            e.name for e in Path(cache_dir).iterdir() if "_p" in e.name and ("_t" in e.name or "_stale_p" in e.name)
        ]
        if leftovers:
            print(f"\n  FAIL: {len(leftovers)} leftover temp/stale directories found:")
            for name in leftovers:
                print(f"    {name}")
            total_failures += 1
        else:
            print("\n  No leftover temp/stale directories (cleanup verified)")

        # Verify the asset is usable
        print("\nVerifying asset integrity...")
        try:
            verify_asset_usable(args.asset, cache_dir)
            print("  Asset verification PASSED")
        except Exception as e:
            print(f"  Asset verification FAILED: {e}")
            total_failures += 1

        print()

    # Final cleanup
    from newton._src.utils.download_assets import _safe_rmtree

    _safe_rmtree(cache_dir)

    print(f"{'='*60}")
    if total_failures:
        print(f"FAILED: {total_failures} total failures across {args.rounds} rounds")
        sys.exit(1)
    else:
        print(f"PASSED: All {args.rounds} rounds completed successfully")
        sys.exit(0)


if __name__ == "__main__":
    main()
Stress test output (8 workers, 2 rounds)
Round 1: 8/8 succeeded in 7.83s
  No leftover temp/stale directories (cleanup verified)
  Loading USD: .../anybotics_anymal_d/usd/anymal_d.usda
  Model built: 17 bodies, 17 joints
  Asset verification PASSED

Round 2: 8/8 succeeded in 8.77s
  No leftover temp/stale directories (cleanup verified)
  Loading USD: .../anybotics_anymal_d/usd/anymal_d.usda
  Model built: 17 bodies, 17 joints
  Asset verification PASSED

PASSED: All 2 rounds completed successfully

Summary by CodeRabbit

  • New Features

    • Per-run temp staging for downloads with automatic cleanup of stale cache artifacts
  • Bug Fixes

    • Safer download flow with atomic handoff, robust rename handling, and guaranteed temp-artifact cleanup to prevent cache corruption during failures or concurrent runs
  • Tests

    • Added concurrency and cleanup tests validating download safety, rename behavior, and temp-dir handling
  • Chores

    • Removed parallel pre-download orchestration used in tests

@shi-eric shi-eric self-assigned this Mar 7, 2026
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 7, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Clones assets into per-process/thread temporary directories, atomically renames them into the shared cache, cleans up stale temp artifacts, adds concurrency-safe helpers, expands unit tests for rename/temp/cleanup/concurrency, and removes the test-suite parallel pre-download orchestration.

Changes

Cohort / File(s) Summary
Atomic Download Infrastructure
newton/_src/utils/download_assets.py
Adds _safe_rename, _temp_cache_path, _cleanup_stale_temp_dirs, _TEMP_DIR_RE, related imports, and changes download flow to stage downloads into per-process/thread temp dirs, perform atomic rename with stale-cache handoff, and ensure finally-block cleanup. Public APIs' signatures unchanged.
Download Safety & Concurrency Tests
newton/tests/test_download_assets.py
Adds tests exposing and validating _safe_rename, _safe_rmtree, _temp_cache_path, _cleanup_stale_temp_dirs, and download_git_folder; includes tests for rename behavior, temp-path uniqueness, stale-dir cleanup, and a concurrent download test.
Test Harness Simplification
newton/tests/thirdparty/unittest_parallel.py
Removes the _parallel_download helper and all pre-download orchestration calls from the test harness, eliminating prior parallel asset pre-download steps.

Sequence Diagram

sequenceDiagram
    participant Client as Client
    participant Download as download_git_folder()
    participant TempFS as TempDir
    participant Git as Git
    participant Cache as CacheFS
    participant Cleanup as Cleanup

    Client->>Download: request download_git_folder(url, cache_folder, force_refresh)
    Download->>TempFS: _temp_cache_path() -> create per-process/thread temp_dir
    TempFS-->>Download: temp_dir created
    Download->>Git: clone repository into temp_dir
    Git-->>Download: clone complete
    Download->>TempFS: write temp stamp (.newton_last_check) in temp_dir
    alt existing cache present & new version detected
        Download->>Cache: rename existing cache -> stale path (atomic)
        Cache-->>Download: stale cache staged
    end
    Download->>Cache: _safe_rename(temp_dir -> cache_folder) (atomic)
    Cache-->>Download: new cache in place
    Download->>Cleanup: _cleanup_stale_temp_dirs(cache_folder)
    Cleanup-->>Download: stale/temp artifacts removed
    Download-->>Client: return final cached path
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

automation

Suggested reviewers

  • eric-heiden
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 70.37% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Make asset downloading concurrent-safe' directly and concisely describes the main objective of the PR: implementing concurrent-safe asset downloading.
Linked Issues check ✅ Passed The PR fully implements all coding objectives from #1987: concurrent-safe asset downloads via temp dirs with safe-rename, cleanup of stale temp dirs, and removal of manual preload lists.
Out of Scope Changes check ✅ Passed All changes align with PR objectives: download concurrency safety implementation, related test coverage, and removal of manual preload workarounds; no out-of-scope changes detected.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@shi-eric shi-eric marked this pull request as draft March 7, 2026 20:35
@codecov
Copy link

codecov bot commented Mar 7, 2026

Codecov Report

❌ Patch coverage is 82.35294% with 12 lines in your changes missing coverage. Please review.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
newton/_src/utils/download_assets.py 82.35% 12 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
newton/tests/test_download_assets.py (1)

121-132: The committed regression still misses the multiprocessing case.

This only exercises thread contention. The behavior being fixed here depends on PID-scoped temp directories and cross-process rename handoff, so the original #1987 multiprocessing failure mode is still unguarded in CI. Please add a small multi-process smoke test as well.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@newton/tests/test_download_assets.py` around lines 121 - 132, The test only
covers threads; add a multiprocessing smoke test to exercise cross-process temp
dir and rename handoff: create a new test (e.g.,
test_concurrent_download_multiprocess) that spawns multiple processes which each
call the same helper that invokes download_git_folder(self.remote_dir,
self.asset_rel, cache_dir=self.cache_dir, branch="main"), then in each process
assert the returned path exists and the file content equals "v1\n"; use
multiprocessing.Process or concurrent.futures.ProcessPoolExecutor to run ~2–4
worker processes and join/collect results so CI exercises PID-scoped temp dirs
and cross-process rename behavior for download_git_folder.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@newton/_src/utils/download_assets.py`:
- Around line 82-111: The cleanup currently deletes any matching temp dir older
than max_age in _cleanup_stale_temp_dirs, which can remove a still-active
download; instead parse the owner PID from the temp-dir suffix (the existing
_TEMP_DIR_RE contains the p{pid} token) and check that process liveness before
deleting: extract pid from the dir name, then test whether that pid is alive
(e.g. try os.kill(pid, 0) on POSIX, or use psutil.pid_exists(pid) if available /
fall back appropriately) and only call _safe_rmtree(entry) if the owner process
is not alive; keep the existing age check as an additional guard.

---

Nitpick comments:
In `@newton/tests/test_download_assets.py`:
- Around line 121-132: The test only covers threads; add a multiprocessing smoke
test to exercise cross-process temp dir and rename handoff: create a new test
(e.g., test_concurrent_download_multiprocess) that spawns multiple processes
which each call the same helper that invokes
download_git_folder(self.remote_dir, self.asset_rel, cache_dir=self.cache_dir,
branch="main"), then in each process assert the returned path exists and the
file content equals "v1\n"; use multiprocessing.Process or
concurrent.futures.ProcessPoolExecutor to run ~2–4 worker processes and
join/collect results so CI exercises PID-scoped temp dirs and cross-process
rename behavior for download_git_folder.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yml

Review profile: CHILL

Plan: Pro

Run ID: 3b097046-05e8-42a5-b8e9-48dd416f5bb8

📥 Commits

Reviewing files that changed from the base of the PR and between ad6d706 and 50f1f82.

📒 Files selected for processing (3)
  • newton/_src/utils/download_assets.py
  • newton/tests/test_download_assets.py
  • newton/tests/thirdparty/unittest_parallel.py
💤 Files with no reviewable changes (1)
  • newton/tests/thirdparty/unittest_parallel.py

@shi-eric shi-eric force-pushed the shi-eric/concurrent-safe-asset-download branch from 50f1f82 to f0e2bd9 Compare March 7, 2026 21:25
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
newton/tests/test_download_assets.py (1)

121-132: Thread-level concurrency test is good; consider documenting process-level coverage.

This test validates thread-safety using ThreadPoolExecutor, which is valuable. However, the PR's primary goal is multiprocess-safe downloads (as used by the parallel test runner). The temp path includes both PID and thread ID, so thread-level testing alone doesn't fully exercise the PID-based uniqueness.

The PR description mentions a standalone stress test with 8 worker processes was run successfully. Consider either:

  1. Adding a brief docstring note that process-level concurrency was validated externally, or
  2. Adding a ProcessPoolExecutor variant (though this may be overkill for unit tests)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@newton/tests/test_download_assets.py` around lines 121 - 132, The
thread-level test test_concurrent_download verifies thread-safety but doesn't
exercise PID-based uniqueness for process-level concurrency; update the test by
either (A) adding a one-line docstring/note to test_concurrent_download stating
that process-level concurrency was validated externally (and reference
download_git_folder and cache_dir/temp path PID+TID behavior), or (B) add an
additional test function that uses concurrent.futures.ProcessPoolExecutor to
spawn multiple processes calling download_git_folder(self.remote_dir,
self.asset_rel, cache_dir=self.cache_dir, branch="main") and assert the
downloaded file exists and contains "v1\n" to validate multiprocess safety;
include the new test name (e.g., test_concurrent_download_processes) so
reviewers can find it easily.
newton/_src/utils/download_assets.py (1)

333-344: Consider exception safety for stale_dir rename.

The stale-cache rename at line 337 catches FileNotFoundError, but if os.rename raises a different OSError (e.g., permission error), it will propagate up and skip the subsequent _safe_rename(temp_dir, cache_folder) call. The temp_dir cleanup in finally will handle the temp directory, but the stale_dir may be left in an inconsistent state.

Consider also catching and logging (or ignoring) other OSError variants to ensure the workflow proceeds:

🛡️ Proposed robustness improvement
         if cache_folder.exists():
             try:
                 os.rename(cache_folder, stale_dir)
-            except FileNotFoundError:
+            except OSError:
                 # Another thread already moved/removed it
                 pass
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@newton/_src/utils/download_assets.py` around lines 333 - 344, The stale-cache
rename block should catch broader OSError variants so other OS errors (e.g.,
PermissionError) don't abort the sequence; update the try/except around
os.rename(cache_folder, stale_dir) in this section to catch OSError (in addition
to FileNotFoundError), log or warn the exception with its details, then allow
execution to continue to _safe_rename(temp_dir, cache_folder) and the subsequent
stale_dir cleanup via _safe_rmtree; keep the original FileNotFoundError handling
semantics (i.e., treat as already moved) but ensure any caught OSError is
surfaced in the logs for debugging while not preventing cleanup.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@newton/_src/utils/download_assets.py`:
- Around line 333-344: The stale-cache rename block should catch broader OSError
variants so other OS errors (e.g., PermissionError) don't abort the sequence;
update the try/except around os.rename(cache_folder, stale_dir) in this section
to catch OSError (in addition to FileNotFoundError), log or warn the exception
with its details, then allow execution to continue to _safe_rename(temp_dir,
cache_folder) and the subsequent stale_dir cleanup via _safe_rmtree; keep the
original FileNotFoundError handling semantics (i.e., treat as already moved) but
ensure any caught OSError is surfaced in the logs for debugging while not
preventing cleanup.

In `@newton/tests/test_download_assets.py`:
- Around line 121-132: The thread-level test test_concurrent_download verifies
thread-safety but doesn't exercise PID-based uniqueness for process-level
concurrency; update the test by either (A) adding a one-line docstring/note to
test_concurrent_download stating that process-level concurrency was validated
externally (and reference download_git_folder and cache_dir/temp path PID+TID
behavior), or (B) add an additional test function that uses
concurrent.futures.ProcessPoolExecutor to spawn multiple processes calling
download_git_folder(self.remote_dir, self.asset_rel, cache_dir=self.cache_dir,
branch="main") and assert the downloaded file exists and contains "v1\n" to
validate multiprocess safety; include the new test name (e.g.,
test_concurrent_download_processes) so reviewers can find it easily.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yml

Review profile: CHILL

Plan: Pro

Run ID: f0bd22c6-76e1-430b-b01e-fb4ca96412fa

📥 Commits

Reviewing files that changed from the base of the PR and between 50f1f82 and f0e2bd9.

📒 Files selected for processing (3)
  • newton/_src/utils/download_assets.py
  • newton/tests/test_download_assets.py
  • newton/tests/thirdparty/unittest_parallel.py
💤 Files with no reviewable changes (1)
  • newton/tests/thirdparty/unittest_parallel.py

@shi-eric shi-eric force-pushed the shi-eric/concurrent-safe-asset-download branch from f0e2bd9 to 68231ac Compare March 7, 2026 22:45
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@newton/_src/utils/download_assets.py`:
- Around line 329-337: The current logic renames an existing cache_folder to
stale_dir before calling _safe_rename(temp_dir, cache_folder), which can move a
winner's freshly published cache; fix this by recording the identity/state of
cache_folder (e.g., os.stat() metadata such as st_ino/st_mtime or a hash of its
contents) into a variable before cloning, and before creating stale_dir only
proceed with os.rename(cache_folder, stale_dir) if the current cache_folder
still matches that recorded identity; if it does not match (meaning another
caller published a new cache), skip the stale swap and simply discard the
temp_dir (or let _safe_rename handle no-op), and add a deterministic regression
test that simulates the "winner already published" race to assert that the
published cache is not moved/removed (reference cache_folder, temp_dir,
stale_dir, and _safe_rename in your changes).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yml

Review profile: CHILL

Plan: Pro

Run ID: 9d4bdb29-2c6d-495a-beef-44bea4b83469

📥 Commits

Reviewing files that changed from the base of the PR and between f0e2bd9 and 68231ac.

📒 Files selected for processing (3)
  • newton/_src/utils/download_assets.py
  • newton/tests/test_download_assets.py
  • newton/tests/thirdparty/unittest_parallel.py
💤 Files with no reviewable changes (1)
  • newton/tests/thirdparty/unittest_parallel.py

@shi-eric shi-eric marked this pull request as ready for review March 7, 2026 23:26
@shi-eric shi-eric added the testing Issues that are related to unit and other types of testing label Mar 8, 2026
Copy link
Member

@adenzler-nvidia adenzler-nvidia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Summary

The core temp-dir + safe-rename pattern is well-designed and the removal of the fragile pre-download list is a clear improvement. A few concurrency issues worth addressing before merge.

Critical (2): TOCTOU race on the post-rename existence check; finally block can mask the original exception.

Important (3): Bare os.rename for stale-dir move misses race errors; orphaned stale_dir not cleaned on failure; missing test coverage for _safe_rename retry/exhaustion paths.

Nits (2): Lone # 3. numbering; force_refresh semantics subtly changed under concurrency.

return cache_folder / folder_path

# Should not happen, but handle gracefully
raise RuntimeError(f"Failed to place cache folder at {cache_folder}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical — TOCTOU race makes this path reachable under concurrency

This is not actually unreachable. Consider:

  1. Thread A completes _safe_rename(temp_dir, cache_folder) — succeeds.
  2. Thread B calls os.rename(cache_folder, stale_dir_B) — moves A's result away.
  3. Thread A checks if cache_folder.exists(): — returns False.
  4. Thread A raises this RuntimeError even though the download succeeded.

This is a TOCTOU race in the exact concurrent scenario this PR targets.

Suggestion: return cache_folder / folder_path unconditionally after _safe_rename (the path will be valid by the time the caller actually uses it, since whichever thread won the race placed valid content there). Or re-check with a brief retry.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — removed the post-rename existence check entirely. After _safe_rename either succeeds or returns silently (another thread won), cache_folder / folder_path is valid regardless. The check + RuntimeError was dead weight that introduced a needless race.

finally:
# Always clean up temp dir
if temp_dir.exists():
_safe_rmtree(temp_dir)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critical — finally cleanup can mask the original exception

If the try block raises (e.g. GitCommandError) and then _safe_rmtree(temp_dir) also raises (e.g. PermissionError on Windows where git processes hold locks), Python replaces the original exception with the cleanup exception. The root cause is completely lost.

finally:
    try:
        if temp_dir.exists():
            _safe_rmtree(temp_dir)
    except OSError as cleanup_err:
        print(f"Warning: failed to clean up temp dir {temp_dir}: {cleanup_err}")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — the finally block now wraps each cleanup call in try/except OSError: pass so a cleanup failure can't mask the original exception.

stale_dir = Path(f"{cache_folder}_stale_p{os.getpid()}_t{threading.get_ident()}")
if cache_folder.exists():
try:
os.rename(cache_folder, stale_dir)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important — bare os.rename misses race conditions that _safe_rename already handles

Only FileNotFoundError is caught here. If stale_dir already exists (e.g. same thread runs this path twice without cleanup completing), os.rename raises FileExistsError (Windows) or OSError(ENOTEMPTY) (Linux). This propagates as a confusing "Failed to download git folder: [Errno 39] Directory not empty" even though the download itself succeeded.

Consider using _safe_rename(cache_folder, stale_dir) here — it already handles these exact races.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a _safe_rmtree(stale_dir) pre-cleanup before the os.rename call to handle leftover stale dirs from a previous same-thread invocation.

Note: using _safe_rename here (as suggested) would be wrong — if stale_dir already exists, _safe_rename returns silently without moving cache_folder, which would skip the stale-dir eviction entirely. The bare os.rename is intentional since we want the move to actually happen; the pre-cleanup addresses the collision case.

_safe_rename(temp_dir, cache_folder)
# Clean up stale dir
if stale_dir.exists():
_safe_rmtree(stale_dir)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important — orphaned stale_dir if _safe_rename raises on the next line's predecessor

If _safe_rename(temp_dir, cache_folder) on line 337 raises after exhausting retries, the exception propagates to the except handlers which don't reference stale_dir. The finally block only cleans temp_dir. The stale dir lingers until _cleanup_stale_temp_dirs runs (up to 1 hour).

Consider adding stale_dir cleanup to the finally block (define stale_dir before the try, initialize to None, and clean up if it exists).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — stale_dir is now initialized to None before the try block and cleaned up in the finally block alongside temp_dir.

_safe_rmtree(cache_folder)

# 3. Download if not cached (or if cache was just cleared)
# 3. Download into a process/thread-unique temp directory, then rename
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the old code had # 1., # 2., # 3. comments providing a visual outline. The PR removes 1 and 2 but keeps # 3. here. A lone # 3. with no predecessors is confusing — drop the number.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — dropped the number.


def test_rename_destination_exists(self):
"""Rename is a no-op when destination already exists."""
src = os.path.join(self.base, "src_dir")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important — missing test coverage for _safe_rename retry and exhaustion paths

The retry loop (transient OSError succeeds on retry) and exhaustion path (raises after N attempts) are the mechanical heart of the concurrency safety but have no tests. Also missing: ENOTEMPTY path. These are cheap to add via unittest.mock.patch("os.rename", ...):

  1. Mock os.rename to raise a generic OSError on first call, succeed on second → verify rename succeeds.
  2. Mock os.rename to raise OSError on all attempts → verify it raises after exhausting retries.
  3. Mock os.rename to raise OSError(errno.ENOTEMPTY, ...) → verify it returns silently (same as FileExistsError).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added three tests:

  • test_rename_retries_on_transient_error — mock os.rename to raise on first call, succeed on second
  • test_rename_raises_after_exhausting_retries — mock to always raise, verify exception after all attempts
  • test_rename_enotempty_returns_silently — mock ENOTEMPTY, verify silent return

@shi-eric shi-eric force-pushed the shi-eric/concurrent-safe-asset-download branch from 68231ac to ac3bf35 Compare March 9, 2026 17:19
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
newton/tests/test_download_assets.py (1)

123-134: Consider asserting no orphaned temp directories remain.

The test validates cache integrity (file content is correct) but doesn't verify that the concurrency mechanism properly cleans up all temporary artifacts. Leftover _p*_t* directories would indicate a cleanup bug.

♻️ Suggested enhancement
         with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
             futures = [executor.submit(download) for _ in range(4)]
             for f in concurrent.futures.as_completed(futures):
                 f.result()
+
+        # Verify no orphaned temp/stale directories remain
+        import re
+        temp_pattern = re.compile(r"_(?:stale_)?p\d+_t\d+$")
+        for entry in Path(self.cache_dir).iterdir():
+            self.assertIsNone(
+                temp_pattern.search(entry.name),
+                f"Orphaned temp/stale directory found: {entry.name}",
+            )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@newton/tests/test_download_assets.py` around lines 123 - 134, The
test_concurrent_download currently checks cache integrity but not cleanup; after
all futures complete, add an assertion that no orphaned temporary directories
remain in the cache directory by scanning cache_dir (the same cache_dir passed
to download_git_folder) for any entries matching the temp pattern (e.g., names
like "_p*_t*" or whatever temp-prefix your implementation uses) and assert the
glob/list is empty; implement this by using
pathlib.Path(self.cache_dir).glob(...) or similar and failing the test if any
matches are found to ensure download_git_folder cleans up temp artifacts under
concurrent runs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@newton/tests/test_download_assets.py`:
- Around line 123-134: The test_concurrent_download currently checks cache
integrity but not cleanup; after all futures complete, add an assertion that no
orphaned temporary directories remain in the cache directory by scanning
cache_dir (the same cache_dir passed to download_git_folder) for any entries
matching the temp pattern (e.g., names like "_p*_t*" or whatever temp-prefix
your implementation uses) and assert the glob/list is empty; implement this by
using pathlib.Path(self.cache_dir).glob(...) or similar and failing the test if
any matches are found to ensure download_git_folder cleans up temp artifacts
under concurrent runs.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yml

Review profile: CHILL

Plan: Pro

Run ID: 33e35848-d611-4620-bc48-c488741d7bb1

📥 Commits

Reviewing files that changed from the base of the PR and between 68231ac and ac3bf35.

📒 Files selected for processing (3)
  • newton/_src/utils/download_assets.py
  • newton/tests/test_download_assets.py
  • newton/tests/thirdparty/unittest_parallel.py
💤 Files with no reviewable changes (1)
  • newton/tests/thirdparty/unittest_parallel.py

Copy link
Member

@adenzler-nvidia adenzler-nvidia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work — the temp-dir + safe-rename pattern is sound and the test coverage on the helpers is thorough. Two issues in the error handling that I think should be addressed before merge:

  1. Successful download reported as failure — After _safe_rename succeeds, if _safe_rmtree(stale_dir) raises, the exception falls through to except Exception and wraps it in RuntimeError("Failed to download git folder: ..."). The cache is valid at that point, but the caller sees a failure.

  2. Silent cleanup failures can cause unbounded disk growth — The finally block uses bare except OSError: pass, and _cleanup_stale_temp_dirs has the same pattern on its outer parent.iterdir() call. If both cleanup paths fail (e.g., permissions misconfiguration on CI), orphaned temp/stale directories accumulate with zero diagnostic output. Replacing pass with warnings.warn(...) preserves the "don't mask the original exception" intent while leaving a debugging trail.

_safe_rename(temp_dir, cache_folder)
# Clean up stale dir
if stale_dir.exists():
_safe_rmtree(stale_dir)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If _safe_rmtree(stale_dir) raises here (permission error, NFS I/O error, etc.), the exception propagates to the except Exception handler below, which wraps it as RuntimeError("Failed to download git folder: ..."). But _safe_rename already succeeded — the cache is valid and the download worked.

Suggestion: wrap this in its own try/except so a cleanup failure can't cause a successful download to be reported as failed:

try:
    if stale_dir.exists():
        _safe_rmtree(stale_dir)
except OSError as e:
    import warnings
    warnings.warn(f"Failed to remove stale directory {stale_dir}: {e}", stacklevel=2)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — wrapped the stale-dir cleanup in try/except so a cleanup failure after a successful rename doesn't propagate to the except handler and get misreported as a download failure. Orphaned stale dirs get cleaned up by _cleanup_stale_temp_dirs on the next call.

Also switched the stale-dir move itself to use _safe_rename(cache_folder, stale_dir) which retries on transient PermissionError (Windows file locks) instead of failing immediately.

try:
if d is not None and d.exists():
_safe_rmtree(d)
except OSError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bare except OSError: pass here means that if cleanup fails (e.g., wrong permissions on the cache dir), orphaned temp/stale directories accumulate silently. Combined with the same pattern in _cleanup_stale_temp_dirs (the outer except OSError: pass on parent.iterdir()), both cleanup mechanisms can fail simultaneously with zero diagnostics.

Consider replacing pass with warnings.warn(...) — this still avoids masking the original exception but leaves a trail:

except OSError as cleanup_err:
    import warnings
    warnings.warn(
        f"Failed to clean up temporary directory {d}: {cleanup_err}",
        stacklevel=2,
    )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping this one. The silent catch here is intentional — this is the finally block whose sole purpose is to avoid masking the original exception (or losing a successful return). Orphaned dirs are already handled by _cleanup_stale_temp_dirs on the next call, so there's no diagnostic gap. Adding warnings would create noise for a transient condition that's already covered.

@shi-eric shi-eric marked this pull request as draft March 11, 2026 06:43
Each caller now clones into a PID/TID-unique temp directory, then
atomically renames it to the final cache location. If another process
already placed the cache, the rename is a no-op and the temp directory
is discarded. Orphaned temp directories from crashed processes are
cleaned up automatically after 1 hour.

This eliminates the need for the manually maintained pre-download list
in unittest_parallel.py, which was a fragile workaround for the lack
of concurrency safety.

Closes newton-physics#1987
- Remove TOCTOU race: return unconditionally after _safe_rename instead
  of re-checking cache_folder.exists()
- Wrap finally cleanup in try/except to avoid masking the original
  exception when _safe_rmtree fails
- Clean up stale_dir in finally block to prevent orphaned directories
  when _safe_rename exhausts retries
- Pre-clean stale_dir before os.rename to handle leftover from previous
  same-thread calls
- Drop lone "# 3." comment numbering
- Add tests for _safe_rename retry, exhaustion, and ENOTEMPTY paths
- Use _safe_rename for stale-dir move so transient PermissionError
  (Windows file locks) is retried instead of propagating as a download
  failure
- Wrap stale-dir cleanup in try/except so a cleanup failure after a
  successful rename doesn't get misreported as a download failure
@shi-eric shi-eric force-pushed the shi-eric/concurrent-safe-asset-download branch from ac3bf35 to c23c51f Compare March 13, 2026 05:34
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@newton/_src/utils/download_assets.py`:
- Around line 334-336: The current stale-swap has a TOCTOU race: between
checking cache_folder.exists() and calling _safe_rename(cache_folder, stale_dir)
another process may have moved/created the folder; wrap the stale rename and the
final temp->cache swap in try/except to handle missing/exists races atomically:
when calling _safe_rename(cache_folder, stale_dir) catch and ignore
FileNotFoundError (or OSError indicating the source is gone) because a
concurrent publisher likely succeeded, and when calling _safe_rename(temp_dir,
cache_folder) treat EEXIST/AlreadyExists as success (or re-check cache_folder
afterwards and skip replace if it now exists), keeping the use of cache_folder,
stale_dir, temp_dir and _safe_rename as the referenced symbols.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yml

Review profile: CHILL

Plan: Pro

Run ID: df97548d-cc7a-444e-b37e-0e23625adeff

📥 Commits

Reviewing files that changed from the base of the PR and between ac3bf35 and c23c51f.

📒 Files selected for processing (1)
  • newton/_src/utils/download_assets.py

Comment on lines +334 to +336
if cache_folder.exists():
_safe_rename(cache_folder, stale_dir)
_safe_rename(temp_dir, cache_folder)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle TOCTOU on stale-swap to avoid false failures under concurrent refresh.

Between Line 334 (cache_folder.exists()) and Line 335 (_safe_rename(cache_folder, stale_dir)), another caller can move/remove cache_folder. In that case, this caller can raise at Line 352-353 even though a concurrent caller successfully published a valid cache.

🔧 Proposed fix
         if cache_folder.exists():
-            _safe_rename(cache_folder, stale_dir)
+            try:
+                _safe_rename(cache_folder, stale_dir)
+            except FileNotFoundError:
+                # Lost race: another concurrent caller already moved/replaced it.
+                pass
         _safe_rename(temp_dir, cache_folder)

Also applies to: 352-353

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@newton/_src/utils/download_assets.py` around lines 334 - 336, The current
stale-swap has a TOCTOU race: between checking cache_folder.exists() and calling
_safe_rename(cache_folder, stale_dir) another process may have moved/created the
folder; wrap the stale rename and the final temp->cache swap in try/except to
handle missing/exists races atomically: when calling _safe_rename(cache_folder,
stale_dir) catch and ignore FileNotFoundError (or OSError indicating the source
is gone) because a concurrent publisher likely succeeded, and when calling
_safe_rename(temp_dir, cache_folder) treat EEXIST/AlreadyExists as success (or
re-check cache_folder afterwards and skip replace if it now exists), keeping the
use of cache_folder, stale_dir, temp_dir and _safe_rename as the referenced
symbols.

@shi-eric shi-eric marked this pull request as ready for review March 13, 2026 06:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

testing Issues that are related to unit and other types of testing

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Make asset downloading safe under multiprocessing

3 participants