Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Possible sections are:

### Fixed:
- avoid Z.AI OCR rate-limit failures in large batch runs by introducing OCR-specific concurrency control (`--ocr-workers`) and clearer HTTP 429 guidance ([#7](https://github.com/atsyplenkov/paperdown/issues/7))
- align skip and output-reuse behavior with marker-based semantics: skip only when `<output>/<pdf_stem>/log.jsonl` exists; otherwise refresh managed artifacts and continue processing ([#11](https://github.com/atsyplenkov/paperdown/issues/11))

## [0.2.0] - 2026-03-18

Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ paperdown --input pdf/ --output md/ --workers 32 --ocr-workers 2 --overwrite

`--workers` controls how many PDFs are processed concurrently in batch mode. `--ocr-workers` controls concurrent OCR API calls. Effective OCR concurrency is `min(--workers, --ocr-workers)`.

Without `--overwrite`, an existing `<output>/<pdf_stem>/log.jsonl` marker skips the PDF. If the log marker is missing, `paperdown` treats the PDF as unprocessed and refreshes managed artifacts (`index.md`, `figures/`, and `tables/` when `--normalize-tables` is enabled). With `--overwrite`, `paperdown` replaces the whole `<output>/<pdf_stem>/` folder before processing.

## Installation

Install from crates.io:
Expand Down Expand Up @@ -91,7 +93,7 @@ Options:
--workers <WORKERS> Maximum number of PDFs processed concurrently in batch mode. [default: 32]
--ocr-workers <OCR_WORKERS> Maximum number of concurrent OCR API calls in batch mode; effective OCR concurrency is min(--workers, --ocr-workers). [default: 2]
-v, --verbose Enable verbose progress messages on stderr.
--overwrite Replace existing managed output artifacts (index.md, figures/, and tables/ when enabled).
--overwrite Replace the whole <output>/<pdf_stem>/ folder before processing.
--normalize-tables Normalize OCR HTML tables into Markdown and store raw HTML under tables/.
-h, --help Print help (see a summary with '-h')
-V, --version Print version
Expand Down
16 changes: 13 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ paperdown --input pdf/ --output md/ --workers 4\n \
paperdown --input pdf/ --output md/ --overwrite\n \
paperdown --input pdf/ --output md/ --normalize-tables\n\n\
Notes:\n \
Without --overwrite, existing index.md or figures/ causes a failure.\n \
When --normalize-tables is enabled, existing tables/ also causes a failure.\n \
Without --overwrite, an existing <output>/<pdf_stem>/log.jsonl marker skips the PDF.\n \
If the log marker is missing, paperdown treats the PDF as unprocessed and refreshes managed artifacts (index.md, figures/, and tables/ when enabled).\n \
With --overwrite, the whole <output>/<pdf_stem>/ folder is replaced.\n \
Progress bars are shown on stderr only when running in a TTY."
)]
pub struct Cli {
Expand Down Expand Up @@ -91,7 +92,7 @@ pub struct Cli {
#[arg(
long,
action = ArgAction::SetTrue,
help = "Replace existing managed output artifacts (index.md and figures/)."
help = "Replace the whole <output>/<pdf_stem>/ folder before processing."
)]
pub overwrite: bool,

Expand Down Expand Up @@ -168,6 +169,15 @@ mod tests {
assert!(help.contains("Examples:"));
assert!(help.contains("--overwrite"));
assert!(help.contains("--normalize-tables"));
assert!(help.contains(
"Without --overwrite, an existing <output>/<pdf_stem>/log.jsonl marker skips the PDF."
));
assert!(help.contains(
"If the log marker is missing, paperdown treats the PDF as unprocessed and refreshes managed artifacts (index.md, figures/, and tables/ when enabled)."
));
assert!(
help.contains("With --overwrite, the whole <output>/<pdf_stem>/ folder is replaced.")
);
let file_first = help.find("1) ZAI_API_KEY from --env-file");
let env_second = help.find("2) ZAI_API_KEY from environment");
assert!(file_first.is_some());
Expand Down
85 changes: 54 additions & 31 deletions src/core/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,28 @@ pub(crate) struct PreparedOutput {
pub(crate) log_path: PathBuf,
}

fn remove_path_if_exists(path: &Path) -> Result<()> {
match std::fs::symlink_metadata(path) {
Ok(metadata) => {
if metadata.is_dir() {
std::fs::remove_dir_all(path)?;
} else {
std::fs::remove_file(path)?;
}
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
Err(err) => return Err(err.into()),
}
Ok(())
}

fn validate_output_stem(stem: &str) -> Result<()> {
if stem.is_empty() || stem == "." || stem == ".." || stem.contains('/') || stem.contains('\\') {
return Err(anyhow::anyhow!("Invalid output stem: {stem}"));
}
Ok(())
}

pub(crate) fn prepare_output_paths(
output_root: &Path,
pdf_path: &Path,
Expand All @@ -24,8 +46,12 @@ pub(crate) fn prepare_output_paths(
.file_stem()
.and_then(|s| s.to_str())
.ok_or_else(|| anyhow::anyhow!("Invalid PDF filename: {}", pdf_path.display()))?;
validate_output_stem(stem)?;

let output_dir = output_root.join(stem);
if overwrite {
remove_path_if_exists(&output_dir)?;
}
std::fs::create_dir_all(&output_dir)?;

let markdown_path = output_dir.join("index.md");
Expand All @@ -34,41 +60,16 @@ pub(crate) fn prepare_output_paths(
let log_path = output_dir.join("log.jsonl");

if !overwrite {
if markdown_path.exists() {
if log_path.is_file() {
return Err(anyhow::anyhow!(
"Output already exists: {}. Re-run with --overwrite",
markdown_path.display()
log_path.display()
));
}
if figures_dir.exists() {
return Err(anyhow::anyhow!(
"Output already exists: {}. Re-run with --overwrite",
figures_dir.display()
));
}
if normalize_tables && tables_dir.exists() {
return Err(anyhow::anyhow!(
"Output already exists: {}. Re-run with --overwrite",
tables_dir.display()
));
}
} else {
if markdown_path.exists() {
std::fs::remove_file(&markdown_path)?;
}
if figures_dir.exists() {
if figures_dir.is_dir() {
std::fs::remove_dir_all(&figures_dir)?;
} else {
std::fs::remove_file(&figures_dir)?;
}
}
if normalize_tables && tables_dir.exists() {
if tables_dir.is_dir() {
std::fs::remove_dir_all(&tables_dir)?;
} else {
std::fs::remove_file(&tables_dir)?;
}
remove_path_if_exists(&markdown_path)?;
remove_path_if_exists(&figures_dir)?;
if normalize_tables {
remove_path_if_exists(&tables_dir)?;
}
}

Expand Down Expand Up @@ -125,3 +126,25 @@ pub(crate) async fn atomic_write_bytes(path: &Path, content: &[u8]) -> Result<()
fs::rename(&temp_path, path).await?;
Ok(())
}

#[cfg(test)]
mod tests {
use super::validate_output_stem;

#[test]
fn validate_output_stem_rejects_backslash() {
let err = validate_output_stem("a\\b").unwrap_err().to_string();
assert!(err.contains("Invalid output stem"));
}

#[test]
fn validate_output_stem_rejects_forward_slash() {
let err = validate_output_stem("a/b").unwrap_err().to_string();
assert!(err.contains("Invalid output stem"));
}

#[test]
fn validate_output_stem_accepts_normal_stem() {
assert!(validate_output_stem("paper").is_ok());
}
}
147 changes: 138 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ async fn run() -> Result<i32> {
};

if pdfs.len() == 1 {
if !args.overwrite && has_existing_log_marker(&args.output, &pdfs[0]) {
print_single_skip_summary_stdout(&pdfs[0]);
return Ok(0);
}
if args.verbose {
eprintln!("Processing 1 PDF: {}", pdfs[0].display());
}
Expand All @@ -57,18 +61,40 @@ async fn run() -> Result<i32> {
return Ok(0);
}

let workers = args.workers.min(pdfs.len()).max(1);
let total_inputs = pdfs.len();
let mut skipped_count = 0usize;
let mut process_pdfs = Vec::new();
for pdf in pdfs {
if !args.overwrite && has_existing_log_marker(&args.output, &pdf) {
skipped_count += 1;
} else {
process_pdfs.push(pdf);
}
}

if process_pdfs.is_empty() {
let counts = batch_accounting(total_inputs, 0, skipped_count, 0, 0);
print_batch_summary_stdout(
counts.processed,
counts.skipped,
counts.failed,
counts.figures,
);
return Ok(0);
}

let workers = args.workers.min(process_pdfs.len()).max(1);
let ocr_workers = effective_ocr_workers(workers, args.ocr_workers);
eprintln!(
"Processing {} PDFs with {} workers (OCR concurrency: {})...",
pdfs.len(),
process_pdfs.len(),
workers,
ocr_workers
);

let semaphore = Arc::new(Semaphore::new(workers));
let ocr_semaphore = Arc::new(Semaphore::new(ocr_workers));
let results = stream::iter(pdfs.into_iter().map(|pdf| {
let results = stream::iter(process_pdfs.into_iter().map(|pdf| {
let permit_pool = semaphore.clone();
let ocr_limiter = ocr_semaphore.clone();
let output = args.output.clone();
Expand Down Expand Up @@ -118,8 +144,20 @@ async fn run() -> Result<i32> {
}
}

print_batch_summary_stdout(success_count, failed_count, downloaded_figures);
Ok(if failed_count > 0 { 1 } else { 0 })
let counts = batch_accounting(
total_inputs,
success_count,
skipped_count,
failed_count,
downloaded_figures,
);
print_batch_summary_stdout(
counts.processed,
counts.skipped,
counts.failed,
counts.figures,
);
Ok(if counts.failed > 0 { 1 } else { 0 })
}

fn stderr_is_tty() -> bool {
Expand All @@ -141,6 +179,21 @@ fn stdout_is_tty() -> bool {
std::io::stdout().is_terminal()
}

fn has_existing_log_marker(output_root: &Path, pdf: &Path) -> bool {
let Some(stem) = pdf.file_stem() else {
return false;
};
output_root.join(stem).join("log.jsonl").is_file()
}

fn print_single_skip_summary_stdout(pdf: &Path) {
if stdout_is_tty() {
println!("\x1b[1;33mSkipped\x1b[0m {}", display_path(pdf));
} else {
println!("Skipped {}", display_path(pdf));
}
}

fn print_single_summary_stdout(summary: &PdfSummary) {
if stdout_is_tty() {
println!(
Expand All @@ -165,18 +218,48 @@ fn print_single_summary_stdout(summary: &PdfSummary) {
}
}

fn print_batch_summary_stdout(processed: usize, failed: usize, figures: usize) {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct BatchAccounting {
processed: usize,
skipped: usize,
failed: usize,
figures: usize,
}

fn batch_accounting(
total_inputs: usize,
processed: usize,
skipped: usize,
failed: usize,
figures: usize,
) -> BatchAccounting {
assert_eq!(
processed + skipped + failed,
total_inputs,
"batch accounting invariant violated"
);
BatchAccounting {
processed,
skipped,
failed,
figures,
}
}

fn print_batch_summary_stdout(processed: usize, skipped: usize, failed: usize, figures: usize) {
if stdout_is_tty() {
let color = if failed == 0 {
"\x1b[1;32m"
} else {
"\x1b[1;33m"
};
println!(
"{color}Batch Complete\x1b[0m processed: \x1b[1m{processed}\x1b[0m failed: \x1b[1m{failed}\x1b[0m figures: \x1b[1m{figures}\x1b[0m"
"{color}Batch Complete\x1b[0m processed: \x1b[1m{processed}\x1b[0m skipped: \x1b[1m{skipped}\x1b[0m failed: \x1b[1m{failed}\x1b[0m figures: \x1b[1m{figures}\x1b[0m"
);
} else {
println!("Batch Complete processed: {processed} failed: {failed} figures: {figures}");
println!(
"Batch Complete processed: {processed} skipped: {skipped} failed: {failed} figures: {figures}"
);
}
}

Expand Down Expand Up @@ -298,7 +381,8 @@ mod tests {
log_path: "/tmp/out/paper/log.jsonl".to_string(),
};
print_single_summary_stdout(&summary);
print_batch_summary_stdout(2, 1, 4);
print_single_skip_summary_stdout(Path::new(&summary.pdf));
print_batch_summary_stdout(2, 1, 1, 4);
}

#[test]
Expand All @@ -325,4 +409,49 @@ mod tests {
assert_eq!(effective_ocr_workers(8, 32), 8);
assert_eq!(effective_ocr_workers(1, 2), 1);
}

#[test]
fn has_existing_log_marker_returns_true_when_log_file_exists() {
let temp = tempfile::tempdir().expect("tempdir");
let output_root = temp.path();
let pdf = temp.path().join("paper.pdf");
std::fs::write(&pdf, b"%PDF-1.4").expect("create pdf");
let log_path = output_root.join("paper").join("log.jsonl");
std::fs::create_dir_all(log_path.parent().expect("log parent")).expect("create log dir");
std::fs::write(&log_path, b"{}\n").expect("write log marker");

assert!(has_existing_log_marker(output_root, &pdf));
}

#[test]
fn has_existing_log_marker_returns_false_when_log_file_missing() {
let temp = tempfile::tempdir().expect("tempdir");
let output_root = temp.path();
let pdf = temp.path().join("paper.pdf");
std::fs::write(&pdf, b"%PDF-1.4").expect("create pdf");

assert!(!has_existing_log_marker(output_root, &pdf));
}

mod main {
use super::*;

mod tests {
use super::*;

#[test]
fn batch_accounting_mixed_outcomes_is_consistent() {
let counts = batch_accounting(5, 2, 1, 2, 7);
assert_eq!(
counts,
BatchAccounting {
processed: 2,
skipped: 1,
failed: 2,
figures: 7
}
);
}
}
}
}
Loading
Loading