diff --git a/README.md b/README.md index 784ee98..7201307 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Therefore, this project was created because, while [`docling`](https://github.co - Async OCR requests and batch PDF processing using the Z.AI API. - Concurrent figure downloads for each PDF. -- Fast processing: approximately 25 seconds per batch of 32 PDFs. Speed depends on the z.ai API availability. See the cost section for more details on spending. +- Fast processing with separate controls for total pipeline concurrency and OCR API concurrency. > [!note] > This tool was designed to be used with academic papers written in English. Parsing other PDFs, heavy in tables or figures, or in other languages rather than English has not been tested. @@ -45,9 +45,11 @@ paperdown --input path/to/paper.pdf My preferred method is batch directory processing: ```bash -paperdown --input pdf/ --output md/ --workers 4 --overwrite +paperdown --input pdf/ --output md/ --workers 32 --ocr-workers 2 --overwrite ``` +`--workers` controls how many PDFs are processed concurrently in batch mode. `--ocr-workers` controls concurrent OCR API calls. Effective OCR concurrency is `min(--workers, --ocr-workers)`. + ## Installation Install from crates.io: @@ -87,6 +89,7 @@ Options: --timeout HTTP timeout in seconds for OCR requests and figure downloads. [default: 180] --max-download-bytes Maximum allowed size (bytes) for each downloaded figure file. [default: 20971520] --workers Maximum number of PDFs processed concurrently in batch mode. [default: 32] + --ocr-workers Maximum number of concurrent OCR API calls in batch mode; effective OCR concurrency is min(--workers, --ocr-workers). [default: 2] -v, --verbose Enable verbose progress messages on stderr. --overwrite Replace existing managed output artifacts (index.md, figures/, and tables/ when enabled). --normalize-tables Normalize OCR HTML tables into Markdown and store raw HTML under tables/. diff --git a/src/cli.rs b/src/cli.rs index 9872e53..0340e00 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -72,6 +72,14 @@ pub struct Cli { )] pub workers: usize, + #[arg( + long = "ocr-workers", + default_value_t = 2usize, + value_parser = parse_positive_usize, + help = "Maximum number of concurrent OCR API calls in batch mode; effective OCR concurrency is min(--workers, --ocr-workers)." + )] + pub ocr_workers: usize, + #[arg( short = 'v', long, @@ -96,10 +104,7 @@ pub struct Cli { } pub fn default_workers() -> usize { - let cpu = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(4); - (cpu * 4).clamp(4, 32) + 32 } fn parse_positive_usize(value: &str) -> Result { @@ -118,9 +123,8 @@ mod tests { use clap::{CommandFactory, Parser}; #[test] - fn default_workers_formula_bounds() { - let workers = default_workers(); - assert!((4..=32).contains(&workers)); + fn default_workers_is_32() { + assert_eq!(default_workers(), 32); } #[test] @@ -132,6 +136,7 @@ mod tests { assert_eq!(cli.timeout, 180); assert_eq!(cli.max_download_bytes, 20_971_520); assert_eq!(cli.workers, default_workers()); + assert_eq!(cli.ocr_workers, 2); assert!(!cli.verbose); assert!(!cli.overwrite); assert!(!cli.normalize_tables); @@ -151,6 +156,9 @@ mod tests { .is_err() ); assert!(Cli::try_parse_from(["paperdown", "--input", "in.pdf", "--workers", "0"]).is_err()); + assert!( + Cli::try_parse_from(["paperdown", "--input", "in.pdf", "--ocr-workers", "0"]).is_err() + ); } #[test] @@ -166,5 +174,6 @@ mod tests { assert!(env_second.is_some()); assert!(file_first.unwrap() < env_second.unwrap()); assert!(help.contains("single .pdf file or a directory")); + assert!(help.contains("min(--workers, --ocr-workers)")); } } diff --git a/src/core.rs b/src/core.rs index 4ca44b9..b4bd9f5 100644 --- a/src/core.rs +++ b/src/core.rs @@ -1,11 +1,13 @@ use anyhow::{Context, Result, anyhow}; use serde::Serialize; use serde_json::{Value, json}; +use std::future::Future; use std::path::Path; use std::sync::Arc; use std::time::{Duration, Instant}; use time::OffsetDateTime; use time::format_description::well_known::Rfc3339; +use tokio::sync::Semaphore; mod assets; mod input; @@ -56,6 +58,17 @@ pub async fn process_pdf( output_root: &Path, env_file: &Path, options: ProcessPdfOptions, +) -> Result { + process_pdf_with_ocr_limiter(pdf_path, output_root, env_file, options, None).await +} + +#[doc(hidden)] +pub async fn process_pdf_with_ocr_limiter( + pdf_path: &Path, + output_root: &Path, + env_file: &Path, + options: ProcessPdfOptions, + ocr_limiter: Option>, ) -> Result { let run_started = Instant::now(); let pdf_path = pdf_path @@ -78,7 +91,10 @@ pub async fn process_pdf( let payload = ocr::build_payload(&pdf_path).await?; fire(&options.progress, ProgressEvent::OcrStarted); let ocr_started = Instant::now(); - let response = ocr::call_layout_parsing(&client, &api_key, payload).await?; + let response = run_with_ocr_limiter(ocr_limiter, async { + ocr::call_layout_parsing(&client, &api_key, payload).await + }) + .await?; let ocr_seconds = ocr_started.elapsed(); fire(&options.progress, ProgressEvent::OcrFinished); @@ -169,6 +185,71 @@ fn round3(duration: Duration) -> f64 { ((duration.as_secs_f64() * 1000.0).round()) / 1000.0 } +async fn run_with_ocr_limiter(limiter: Option>, future: F) -> Result +where + F: Future>, +{ + if let Some(limiter) = limiter { + let _permit = limiter + .acquire_owned() + .await + .context("OCR limiter closed unexpectedly")?; + future.await + } else { + future.await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + use tokio::time::{Duration, sleep}; + + #[test] + fn run_with_ocr_limiter_caps_parallelism() { + let runtime = tokio::runtime::Runtime::new().expect("runtime"); + runtime.block_on(async { + let limiter = Arc::new(Semaphore::new(2)); + let active = Arc::new(AtomicUsize::new(0)); + let peak = Arc::new(AtomicUsize::new(0)); + + let mut tasks = Vec::new(); + for _ in 0..8 { + let limiter = Some(limiter.clone()); + let active = active.clone(); + let peak = peak.clone(); + tasks.push(tokio::spawn(async move { + run_with_ocr_limiter(limiter, async { + let current = active.fetch_add(1, Ordering::SeqCst) + 1; + loop { + let seen = peak.load(Ordering::SeqCst); + if current <= seen { + break; + } + if peak + .compare_exchange(seen, current, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + break; + } + } + sleep(Duration::from_millis(30)).await; + active.fetch_sub(1, Ordering::SeqCst); + Ok::<(), anyhow::Error>(()) + }) + .await + })); + } + + for task in tasks { + task.await.expect("join").expect("task result"); + } + assert!(peak.load(Ordering::SeqCst) <= 2); + }); + } +} + #[cfg(feature = "internal-testing")] #[doc(hidden)] pub mod testing { diff --git a/src/core/ocr.rs b/src/core/ocr.rs index d183246..c2ec04a 100644 --- a/src/core/ocr.rs +++ b/src/core/ocr.rs @@ -21,9 +21,18 @@ pub(crate) async fn call_layout_parsing( client: &reqwest::Client, api_key: &str, payload: Value, +) -> Result { + call_layout_parsing_at_url(client, api_key, payload, API_URL).await +} + +pub(crate) async fn call_layout_parsing_at_url( + client: &reqwest::Client, + api_key: &str, + payload: Value, + api_url: &str, ) -> Result { let response = client - .post(API_URL) + .post(api_url) .header("Authorization", format!("Bearer {api_key}")) .json(&payload) .send() @@ -33,6 +42,11 @@ pub(crate) async fn call_layout_parsing( let status = response.status(); let text = response.text().await?; if !status.is_success() { + if status.as_u16() == 429 { + return Err(anyhow!( + "Z.AI OCR rate limit (HTTP 429). Lower --ocr-workers (e.g. 1) or reduce concurrent jobs sharing this API key." + )); + } return Err(anyhow!( "Z.AI OCR request failed with HTTP {}: {}", status.as_u16(), @@ -64,3 +78,51 @@ pub(crate) fn validate_layout_response(data: Value) -> Result<(String, Vec Result { } let workers = args.workers.min(pdfs.len()).max(1); - eprintln!("Processing {} PDFs with {} workers...", pdfs.len(), workers); + let ocr_workers = effective_ocr_workers(workers, args.ocr_workers); + eprintln!( + "Processing {} PDFs with {} workers (OCR concurrency: {})...", + pdfs.len(), + workers, + ocr_workers + ); let semaphore = Arc::new(Semaphore::new(workers)); + let ocr_semaphore = Arc::new(Semaphore::new(ocr_workers)); let results = stream::iter(pdfs.into_iter().map(|pdf| { let permit_pool = semaphore.clone(); + let ocr_limiter = ocr_semaphore.clone(); let output = args.output.clone(); let env_file = args.env_file.clone(); let progress = progress.clone(); @@ -75,7 +83,14 @@ async fn run() -> Result { }; async move { let _permit = permit_pool.acquire_owned().await.expect("semaphore"); - let res = core::process_pdf(&pdf, &output, &env_file, options).await; + let res = core::process_pdf_with_ocr_limiter( + &pdf, + &output, + &env_file, + options, + Some(ocr_limiter), + ) + .await; (pdf, res) } })) @@ -111,6 +126,10 @@ fn stderr_is_tty() -> bool { std::io::stderr().is_terminal() } +fn effective_ocr_workers(workers: usize, ocr_workers: usize) -> usize { + workers.min(ocr_workers).max(1) +} + fn format_error_for_stderr(message: &str) -> String { if stderr_is_tty() { return message.replace("--overwrite", "\x1b[1;33m--overwrite\x1b[0m"); @@ -299,4 +318,11 @@ mod tests { callback(ProgressEvent::FigureDownloadFinished); callback(ProgressEvent::FigureDownloadFinished); } + + #[test] + fn effective_ocr_workers_caps_to_total_workers() { + assert_eq!(effective_ocr_workers(32, 2), 2); + assert_eq!(effective_ocr_workers(8, 32), 8); + assert_eq!(effective_ocr_workers(1, 2), 1); + } } diff --git a/tests/cli_coverage.rs b/tests/cli_coverage.rs index 36e4e12..16ec953 100644 --- a/tests/cli_coverage.rs +++ b/tests/cli_coverage.rs @@ -56,7 +56,9 @@ fn cli_batch_reports_failed_count() { "--env-file", env_file.to_str().unwrap(), "--workers", - "2", + "1", + "--ocr-workers", + "5", ]) .output() .unwrap(); @@ -66,4 +68,5 @@ fn cli_batch_reports_failed_count() { let stderr = String::from_utf8_lossy(&output.stderr); assert!(stdout.contains("Batch Complete processed: 0 failed: 2 figures: 0")); assert!(stderr.contains("failed:")); + assert!(stderr.contains("OCR concurrency: 1")); } diff --git a/tests/cli_existing_output.rs b/tests/cli_existing_output.rs index d51cb67..e6e11f0 100644 --- a/tests/cli_existing_output.rs +++ b/tests/cli_existing_output.rs @@ -47,6 +47,7 @@ fn batch_existing_outputs_fail_before_env_or_ocr() { assert!(stderr.contains("a.pdf")); assert!(stderr.contains("b.pdf")); assert!(stderr.contains("Re-run with --overwrite")); + assert!(stderr.contains("OCR concurrency:")); assert!(!stderr.contains("ZAI_API_KEY")); assert!(!stdout.contains("\u{1b}["));