Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Therefore, this project was created because, while [`docling`](https://github.co

- Async OCR requests and batch PDF processing using the Z.AI API.
- Concurrent figure downloads for each PDF.
- Fast processing: approximately 25 seconds per batch of 32 PDFs. Speed depends on the z.ai API availability. See the cost section for more details on spending.
- Fast processing with separate controls for total pipeline concurrency and OCR API concurrency.

> [!note]
> This tool was designed to be used with academic papers written in English. Parsing other PDFs, heavy in tables or figures, or in other languages rather than English has not been tested.
Expand All @@ -45,9 +45,11 @@ paperdown --input path/to/paper.pdf
My preferred method is batch directory processing:

```bash
paperdown --input pdf/ --output md/ --workers 4 --overwrite
paperdown --input pdf/ --output md/ --workers 32 --ocr-workers 2 --overwrite
```

`--workers` controls how many PDFs are processed concurrently in batch mode. `--ocr-workers` controls concurrent OCR API calls. Effective OCR concurrency is `min(--workers, --ocr-workers)`.

## Installation

Install from crates.io:
Expand Down Expand Up @@ -87,6 +89,7 @@ Options:
--timeout <TIMEOUT> HTTP timeout in seconds for OCR requests and figure downloads. [default: 180]
--max-download-bytes <MAX_DOWNLOAD_BYTES> Maximum allowed size (bytes) for each downloaded figure file. [default: 20971520]
--workers <WORKERS> Maximum number of PDFs processed concurrently in batch mode. [default: 32]
--ocr-workers <OCR_WORKERS> Maximum number of concurrent OCR API calls in batch mode; effective OCR concurrency is min(--workers, --ocr-workers). [default: 2]
-v, --verbose Enable verbose progress messages on stderr.
--overwrite Replace existing managed output artifacts (index.md, figures/, and tables/ when enabled).
--normalize-tables Normalize OCR HTML tables into Markdown and store raw HTML under tables/.
Expand Down
23 changes: 16 additions & 7 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ pub struct Cli {
)]
pub workers: usize,

#[arg(
long = "ocr-workers",
default_value_t = 2usize,
value_parser = parse_positive_usize,
help = "Maximum number of concurrent OCR API calls in batch mode; effective OCR concurrency is min(--workers, --ocr-workers)."
)]
pub ocr_workers: usize,

#[arg(
short = 'v',
long,
Expand All @@ -96,10 +104,7 @@ pub struct Cli {
}

pub fn default_workers() -> usize {
let cpu = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
(cpu * 4).clamp(4, 32)
32
}

fn parse_positive_usize(value: &str) -> Result<usize, String> {
Expand All @@ -118,9 +123,8 @@ mod tests {
use clap::{CommandFactory, Parser};

#[test]
fn default_workers_formula_bounds() {
let workers = default_workers();
assert!((4..=32).contains(&workers));
fn default_workers_is_32() {
assert_eq!(default_workers(), 32);
}

#[test]
Expand All @@ -132,6 +136,7 @@ mod tests {
assert_eq!(cli.timeout, 180);
assert_eq!(cli.max_download_bytes, 20_971_520);
assert_eq!(cli.workers, default_workers());
assert_eq!(cli.ocr_workers, 2);
assert!(!cli.verbose);
assert!(!cli.overwrite);
assert!(!cli.normalize_tables);
Expand All @@ -151,6 +156,9 @@ mod tests {
.is_err()
);
assert!(Cli::try_parse_from(["paperdown", "--input", "in.pdf", "--workers", "0"]).is_err());
assert!(
Cli::try_parse_from(["paperdown", "--input", "in.pdf", "--ocr-workers", "0"]).is_err()
);
}

#[test]
Expand All @@ -166,5 +174,6 @@ mod tests {
assert!(env_second.is_some());
assert!(file_first.unwrap() < env_second.unwrap());
assert!(help.contains("single .pdf file or a directory"));
assert!(help.contains("min(--workers, --ocr-workers)"));
}
}
83 changes: 82 additions & 1 deletion src/core.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use anyhow::{Context, Result, anyhow};
use serde::Serialize;
use serde_json::{Value, json};
use std::future::Future;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use tokio::sync::Semaphore;

mod assets;
mod input;
Expand Down Expand Up @@ -56,6 +58,17 @@ pub async fn process_pdf(
output_root: &Path,
env_file: &Path,
options: ProcessPdfOptions,
) -> Result<PdfSummary> {
process_pdf_with_ocr_limiter(pdf_path, output_root, env_file, options, None).await
}

#[doc(hidden)]
pub async fn process_pdf_with_ocr_limiter(
pdf_path: &Path,
output_root: &Path,
env_file: &Path,
options: ProcessPdfOptions,
ocr_limiter: Option<Arc<Semaphore>>,
) -> Result<PdfSummary> {
let run_started = Instant::now();
let pdf_path = pdf_path
Expand All @@ -78,7 +91,10 @@ pub async fn process_pdf(
let payload = ocr::build_payload(&pdf_path).await?;
fire(&options.progress, ProgressEvent::OcrStarted);
let ocr_started = Instant::now();
let response = ocr::call_layout_parsing(&client, &api_key, payload).await?;
let response = run_with_ocr_limiter(ocr_limiter, async {
ocr::call_layout_parsing(&client, &api_key, payload).await
})
.await?;
let ocr_seconds = ocr_started.elapsed();
fire(&options.progress, ProgressEvent::OcrFinished);

Expand Down Expand Up @@ -169,6 +185,71 @@ fn round3(duration: Duration) -> f64 {
((duration.as_secs_f64() * 1000.0).round()) / 1000.0
}

async fn run_with_ocr_limiter<T, F>(limiter: Option<Arc<Semaphore>>, future: F) -> Result<T>
where
F: Future<Output = Result<T>>,
{
if let Some(limiter) = limiter {
let _permit = limiter
.acquire_owned()
.await
.context("OCR limiter closed unexpectedly")?;
future.await
} else {
future.await
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time::{Duration, sleep};

#[test]
fn run_with_ocr_limiter_caps_parallelism() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let limiter = Arc::new(Semaphore::new(2));
let active = Arc::new(AtomicUsize::new(0));
let peak = Arc::new(AtomicUsize::new(0));

let mut tasks = Vec::new();
for _ in 0..8 {
let limiter = Some(limiter.clone());
let active = active.clone();
let peak = peak.clone();
tasks.push(tokio::spawn(async move {
run_with_ocr_limiter(limiter, async {
let current = active.fetch_add(1, Ordering::SeqCst) + 1;
loop {
let seen = peak.load(Ordering::SeqCst);
if current <= seen {
break;
}
if peak
.compare_exchange(seen, current, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
break;
}
}
sleep(Duration::from_millis(30)).await;
active.fetch_sub(1, Ordering::SeqCst);
Ok::<(), anyhow::Error>(())
})
.await
}));
}

for task in tasks {
task.await.expect("join").expect("task result");
}
assert!(peak.load(Ordering::SeqCst) <= 2);
});
}
}

#[cfg(feature = "internal-testing")]
#[doc(hidden)]
pub mod testing {
Expand Down
64 changes: 63 additions & 1 deletion src/core/ocr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,18 @@ pub(crate) async fn call_layout_parsing(
client: &reqwest::Client,
api_key: &str,
payload: Value,
) -> Result<Value> {
call_layout_parsing_at_url(client, api_key, payload, API_URL).await
}

pub(crate) async fn call_layout_parsing_at_url(
client: &reqwest::Client,
api_key: &str,
payload: Value,
api_url: &str,
) -> Result<Value> {
let response = client
.post(API_URL)
.post(api_url)
.header("Authorization", format!("Bearer {api_key}"))
.json(&payload)
.send()
Expand All @@ -33,6 +42,11 @@ pub(crate) async fn call_layout_parsing(
let status = response.status();
let text = response.text().await?;
if !status.is_success() {
if status.as_u16() == 429 {
return Err(anyhow!(
"Z.AI OCR rate limit (HTTP 429). Lower --ocr-workers (e.g. 1) or reduce concurrent jobs sharing this API key."
));
}
return Err(anyhow!(
"Z.AI OCR request failed with HTTP {}: {}",
status.as_u16(),
Expand Down Expand Up @@ -64,3 +78,51 @@ pub(crate) fn validate_layout_response(data: Value) -> Result<(String, Vec<Value
let usage = data.get("usage").filter(|v| v.is_object()).cloned();
Ok((markdown, layout_details, usage))
}

#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

#[test]
fn call_layout_parsing_429_returns_actionable_error() {
let runtime = tokio::runtime::Runtime::new().expect("runtime");
runtime.block_on(async {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
let addr = listener.local_addr().expect("local addr");

let server = tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.expect("accept");
let mut read_buf = [0u8; 4096];
let _ = stream.read(&mut read_buf).await.expect("read request");
let body = r#"{"error":{"code":"1302","message":"Rate limit reached for requests"}}"#;
let response = format!(
"HTTP/1.1 429 Too Many Requests\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
body.len(),
body
);
stream
.write_all(response.as_bytes())
.await
.expect("write response");
});

let client = reqwest::Client::new();
let err = call_layout_parsing_at_url(
&client,
"test-key",
json!({"model": "glm-ocr", "file": "data:application/pdf;base64,AA=="}),
&format!("http://{addr}"),
)
.await
.expect_err("expected 429 error")
.to_string();

server.await.expect("server done");
assert!(err.contains("Z.AI OCR rate limit (HTTP 429)"));
assert!(err.contains("--ocr-workers"));
});
}
}
30 changes: 28 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,19 @@ async fn run() -> Result<i32> {
}

let workers = args.workers.min(pdfs.len()).max(1);
eprintln!("Processing {} PDFs with {} workers...", pdfs.len(), workers);
let ocr_workers = effective_ocr_workers(workers, args.ocr_workers);
eprintln!(
"Processing {} PDFs with {} workers (OCR concurrency: {})...",
pdfs.len(),
workers,
ocr_workers
);

let semaphore = Arc::new(Semaphore::new(workers));
let ocr_semaphore = Arc::new(Semaphore::new(ocr_workers));
let results = stream::iter(pdfs.into_iter().map(|pdf| {
let permit_pool = semaphore.clone();
let ocr_limiter = ocr_semaphore.clone();
let output = args.output.clone();
let env_file = args.env_file.clone();
let progress = progress.clone();
Expand All @@ -75,7 +83,14 @@ async fn run() -> Result<i32> {
};
async move {
let _permit = permit_pool.acquire_owned().await.expect("semaphore");
let res = core::process_pdf(&pdf, &output, &env_file, options).await;
let res = core::process_pdf_with_ocr_limiter(
&pdf,
&output,
&env_file,
options,
Some(ocr_limiter),
)
.await;
(pdf, res)
}
}))
Expand Down Expand Up @@ -111,6 +126,10 @@ fn stderr_is_tty() -> bool {
std::io::stderr().is_terminal()
}

fn effective_ocr_workers(workers: usize, ocr_workers: usize) -> usize {
workers.min(ocr_workers).max(1)
}

fn format_error_for_stderr(message: &str) -> String {
if stderr_is_tty() {
return message.replace("--overwrite", "\x1b[1;33m--overwrite\x1b[0m");
Expand Down Expand Up @@ -299,4 +318,11 @@ mod tests {
callback(ProgressEvent::FigureDownloadFinished);
callback(ProgressEvent::FigureDownloadFinished);
}

#[test]
fn effective_ocr_workers_caps_to_total_workers() {
assert_eq!(effective_ocr_workers(32, 2), 2);
assert_eq!(effective_ocr_workers(8, 32), 8);
assert_eq!(effective_ocr_workers(1, 2), 1);
}
}
5 changes: 4 additions & 1 deletion tests/cli_coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ fn cli_batch_reports_failed_count() {
"--env-file",
env_file.to_str().unwrap(),
"--workers",
"2",
"1",
"--ocr-workers",
"5",
])
.output()
.unwrap();
Expand All @@ -66,4 +68,5 @@ fn cli_batch_reports_failed_count() {
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(stdout.contains("Batch Complete processed: 0 failed: 2 figures: 0"));
assert!(stderr.contains("failed:"));
assert!(stderr.contains("OCR concurrency: 1"));
}
1 change: 1 addition & 0 deletions tests/cli_existing_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ fn batch_existing_outputs_fail_before_env_or_ocr() {
assert!(stderr.contains("a.pdf"));
assert!(stderr.contains("b.pdf"));
assert!(stderr.contains("Re-run with --overwrite"));
assert!(stderr.contains("OCR concurrency:"));

assert!(!stderr.contains("ZAI_API_KEY"));
assert!(!stdout.contains("\u{1b}["));
Expand Down
Loading