Skip to content

Packet leak when reading to a "not empty" packet.Β #260

@nrbnlulu

Description

@nrbnlulu

ffmpeg:

let mut packet = ffmpeg::Packet::empty(); // Initialized OUTSIDE the loop

loop {
    // ffmpeg would just write on top of the packet, not checking if it already has data
    match packet.read(&mut ictx) {  // πŸ‘ˆ leak here
        Ok(_) => { /* ... */ }
    }
    // ...
    packet_tx.blocking_send(packet.clone()); // RAII won't help u here
}

we should prob make packet.read unsafe
also adding an helper in Input.read_packet would be a better and safer api which can look like this

impl Input{
    fn read_next_packet(ictx: &mut ffmpeg::format::context::Input) -> Result<ffmpeg::Packet> {
        let mut packet = ffmpeg::Packet::empty();
        packet.read(ictx)?;
        Ok(packet) 
    }
}
Full MRE
ffmpeg version n8.0.1 Copyright (c) 2000-2025 the FFmpeg developers
built with gcc 15.2.1 (GCC) 20251112
configuration: --prefix=/usr --disable-debug --disable-static --disable-stripping --enable-amf --enable-avisynth --enable-cuda-llvm --enable-lto --enable-fontconfig --enable-frei0r --enable-gmp --enable-gnutls --enable-gpl --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libdav1d --enable-libdrm --enable-libdvdnav --enable-libdvdread --enable-libfreetype --enable-libfribidi --enable-libglslang --enable-libgsm --enable-libharfbuzz --enable-libiec61883 --enable-libjack --enable-libjxl --enable-libmodplug --enable-libmp3lame --enable-libopencore_amrnb --enable-libopencore_amrwb --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libplacebo --enable-libpulse --enable-librav1e --enable-librsvg --enable-librubberband --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libsrt --enable-libssh --enable-libsvtav1 --enable-libtheora --enable-libv4l2 --enable-libvidstab --enable-libvmaf --enable-libvorbis --enable-libvpl --enable-libvpx --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxcb --enable-libxml2 --enable-libxvid --enable-libzimg --enable-libzmq --enable-nvdec --enable-nvenc --enable-opencl --enable-opengl --enable-shared --enable-vapoursynth --enable-version3 --enable-vulkan
libavutil      60.  8.100 / 60.  8.100
libavcodec     62. 11.100 / 62. 11.100
libavformat    62.  3.100 / 62.  3.100
libavdevice    62.  1.100 / 62.  1.100
libavfilter    11.  4.100 / 11.  4.100
libswscale      9.  1.100 /  9.  1.100
libswresample   6.  1.100 /  6.  1.100

use anyhow::{bail, Result};
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};
use std::time::Duration;
use tokio::sync::mpsc;

fn main() -> Result<()> {
    ffmpeg::init()?;

    let shutdown = Arc::new(AtomicBool::new(false));
    let shutdown_clone = shutdown.clone();

    ctrlc::set_handler(move || {
        println!("Ctrl+C received");
        shutdown_clone.store(true, Ordering::SeqCst);
    })
    .expect("Error setting Ctrl-C handler");

    let rt = tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()?;

    rt.block_on(async {
        let (packet_tx, mut packet_rx) = mpsc::channel::<ffmpeg::Packet>(1);

        // Dummy receiver - consumes packets to keep the channel moving
        let rx_handle = tokio::spawn(async move {
            let mut count = 0u64;
            let mut last_log = std::time::Instant::now();
            while let Some(_) = packet_rx.recv().await {
                count += 1;
                if last_log.elapsed() > Duration::from_secs(5) {
                    println!("Received {} packets so far...", count);
                    last_log = std::time::Instant::now();
                }
            }
            println!("Receiver finished. Total packets: {}", count);
        });

        // Producer - Running in blocking thread as per original architecture
        let shutdown_loop = shutdown.clone();
        let producer_handle = tokio::task::spawn_blocking(move || {
            while !shutdown_loop.load(Ordering::SeqCst) {
                // Using "testsrc" to generate dummy video frames locally
                if let Err(e) = run_dummy_leak_loop("lavfi", "testsrc=duration=100:size=1280x720:rate=30", &shutdown_loop, packet_tx.clone()) {
                    eprintln!("Stream error: {}", e);
                    if !shutdown_loop.load(Ordering::SeqCst) {
                        std::thread::sleep(Duration::from_secs(1));
                    }
                }
            }
        });

        let _ = producer_handle.await;
        let _ = rx_handle.await;
        Ok(())
    })
}

fn run_dummy_leak_loop(
    format: &str,
    url: &str,
    shutdown_sig: &Arc<AtomicBool>,
    packet_tx: mpsc::Sender<ffmpeg::Packet>,
) -> Result<()> {
    let mut opts = ffmpeg::Dictionary::new();
    
    println!("Opening dummy source: {}", url);
    let mut ictx = ffmpeg::format::input_with_dictionary(&url, opts)?;

    let video_stream_index = ictx
        .streams()
        .best(ffmpeg::media::Type::Video)
        .ok_or_else(|| anyhow::anyhow!("No video stream found"))?
        .index();

    println!("Source opened. Starting leak loop...");

    let mut packet_count = 0u64;
    
    // --- THE LEAK SOURCE ---
    // Creating the packet outside the loop. When packet.read() is called inside 
    // the loop, it may not correctly free the previous buffer allocated by the 
    // C API, causing memory to climb.
    let mut packet = ffmpeg::Packet::empty(); 

    loop {
        if shutdown_sig.load(Ordering::SeqCst) {
            break;
        }

        match packet.read(&mut ictx) {
            Ok(_) => { /* Successfully read into the existing packet buffer */ }
            Err(ffmpeg::Error::Eof) => {
                println!("Source finished (EOF)");
                break;
            }
            Err(e) => {
                eprintln!("Failed to read packet: {}", e);
                bail!("Read error");
            }
        }

        if packet.stream() != video_stream_index {
            continue;
        }

        packet_count += 1;
        
        // Cloning the packet sends a reference-counted handle or a copy 
        // to the receiver, but the 'mut packet' above persists.
        if let Err(_) = packet_tx.blocking_send(packet.clone()) {
            break;
        }
    }

    println!("Loop exited. Packets processed: {}", packet_count);
    Ok(())
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions