-
Notifications
You must be signed in to change notification settings - Fork 275
Open
Description
ffmpeg:
let mut packet = ffmpeg::Packet::empty(); // Initialized OUTSIDE the loop
loop {
// ffmpeg would just write on top of the packet, not checking if it already has data
match packet.read(&mut ictx) { // π leak here
Ok(_) => { /* ... */ }
}
// ...
packet_tx.blocking_send(packet.clone()); // RAII won't help u here
}we should prob make packet.read unsafe
also adding an helper in Input.read_packet would be a better and safer api which can look like this
impl Input{
fn read_next_packet(ictx: &mut ffmpeg::format::context::Input) -> Result<ffmpeg::Packet> {
let mut packet = ffmpeg::Packet::empty();
packet.read(ictx)?;
Ok(packet)
}
}Full MRE
ffmpeg version n8.0.1 Copyright (c) 2000-2025 the FFmpeg developers
built with gcc 15.2.1 (GCC) 20251112
configuration: --prefix=/usr --disable-debug --disable-static --disable-stripping --enable-amf --enable-avisynth --enable-cuda-llvm --enable-lto --enable-fontconfig --enable-frei0r --enable-gmp --enable-gnutls --enable-gpl --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libdav1d --enable-libdrm --enable-libdvdnav --enable-libdvdread --enable-libfreetype --enable-libfribidi --enable-libglslang --enable-libgsm --enable-libharfbuzz --enable-libiec61883 --enable-libjack --enable-libjxl --enable-libmodplug --enable-libmp3lame --enable-libopencore_amrnb --enable-libopencore_amrwb --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libplacebo --enable-libpulse --enable-librav1e --enable-librsvg --enable-librubberband --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libsrt --enable-libssh --enable-libsvtav1 --enable-libtheora --enable-libv4l2 --enable-libvidstab --enable-libvmaf --enable-libvorbis --enable-libvpl --enable-libvpx --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxcb --enable-libxml2 --enable-libxvid --enable-libzimg --enable-libzmq --enable-nvdec --enable-nvenc --enable-opencl --enable-opengl --enable-shared --enable-vapoursynth --enable-version3 --enable-vulkan
libavutil 60. 8.100 / 60. 8.100
libavcodec 62. 11.100 / 62. 11.100
libavformat 62. 3.100 / 62. 3.100
libavdevice 62. 1.100 / 62. 1.100
libavfilter 11. 4.100 / 11. 4.100
libswscale 9. 1.100 / 9. 1.100
libswresample 6. 1.100 / 6. 1.100
use anyhow::{bail, Result};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::time::Duration;
use tokio::sync::mpsc;
fn main() -> Result<()> {
ffmpeg::init()?;
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = shutdown.clone();
ctrlc::set_handler(move || {
println!("Ctrl+C received");
shutdown_clone.store(true, Ordering::SeqCst);
})
.expect("Error setting Ctrl-C handler");
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
rt.block_on(async {
let (packet_tx, mut packet_rx) = mpsc::channel::<ffmpeg::Packet>(1);
// Dummy receiver - consumes packets to keep the channel moving
let rx_handle = tokio::spawn(async move {
let mut count = 0u64;
let mut last_log = std::time::Instant::now();
while let Some(_) = packet_rx.recv().await {
count += 1;
if last_log.elapsed() > Duration::from_secs(5) {
println!("Received {} packets so far...", count);
last_log = std::time::Instant::now();
}
}
println!("Receiver finished. Total packets: {}", count);
});
// Producer - Running in blocking thread as per original architecture
let shutdown_loop = shutdown.clone();
let producer_handle = tokio::task::spawn_blocking(move || {
while !shutdown_loop.load(Ordering::SeqCst) {
// Using "testsrc" to generate dummy video frames locally
if let Err(e) = run_dummy_leak_loop("lavfi", "testsrc=duration=100:size=1280x720:rate=30", &shutdown_loop, packet_tx.clone()) {
eprintln!("Stream error: {}", e);
if !shutdown_loop.load(Ordering::SeqCst) {
std::thread::sleep(Duration::from_secs(1));
}
}
}
});
let _ = producer_handle.await;
let _ = rx_handle.await;
Ok(())
})
}
fn run_dummy_leak_loop(
format: &str,
url: &str,
shutdown_sig: &Arc<AtomicBool>,
packet_tx: mpsc::Sender<ffmpeg::Packet>,
) -> Result<()> {
let mut opts = ffmpeg::Dictionary::new();
println!("Opening dummy source: {}", url);
let mut ictx = ffmpeg::format::input_with_dictionary(&url, opts)?;
let video_stream_index = ictx
.streams()
.best(ffmpeg::media::Type::Video)
.ok_or_else(|| anyhow::anyhow!("No video stream found"))?
.index();
println!("Source opened. Starting leak loop...");
let mut packet_count = 0u64;
// --- THE LEAK SOURCE ---
// Creating the packet outside the loop. When packet.read() is called inside
// the loop, it may not correctly free the previous buffer allocated by the
// C API, causing memory to climb.
let mut packet = ffmpeg::Packet::empty();
loop {
if shutdown_sig.load(Ordering::SeqCst) {
break;
}
match packet.read(&mut ictx) {
Ok(_) => { /* Successfully read into the existing packet buffer */ }
Err(ffmpeg::Error::Eof) => {
println!("Source finished (EOF)");
break;
}
Err(e) => {
eprintln!("Failed to read packet: {}", e);
bail!("Read error");
}
}
if packet.stream() != video_stream_index {
continue;
}
packet_count += 1;
// Cloning the packet sends a reference-counted handle or a copy
// to the receiver, but the 'mut packet' above persists.
if let Err(_) = packet_tx.blocking_send(packet.clone()) {
break;
}
}
println!("Loop exited. Packets processed: {}", packet_count);
Ok(())
}Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels