-
Notifications
You must be signed in to change notification settings - Fork 45
"Running Membrane in Elixir application" guide #1070
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
cf7e242
WIP guide
varsill d0aed08
Format the markadown code. Move section about batch processing on top…
varsill 457986d
Move guide to useful_concepts/
varsill 1ebed1b
Tag snippets as Elixir code
varsill eafa94b
Move guide to useful_concepts/
varsill 1b62adc
WIP
varsill 2435f0b
Improve grammar
varsill f6af6c9
Improve formatting
varsill c3aee79
Fix snippets for Oban. Move batch processing section down. Add descri…
varsill c4c126b
Add MyProject.Pipeline
varsill 86f69fe
Add link to Oban's installation guide
varsill 607bafb
Improve formatting
varsill 5de3969
Merge branch 'master' into varsill/application_guide
varsill d02817a
Change YOLO into RF-DETR
varsill 4eeaa4d
Fix the oban example
varsill ec1e10d
add missing muxer
varsill bfe2347
Revoke rtsp example
varsill 09b1120
Remove @segment_duration
varsill 73a6863
Fix the rtsp pipeline
varsill 44237de
Fix the rtsp pipeline
varsill 1cfb2c2
Merge branch 'master' into varsill/application_guide
varsill File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
274 changes: 274 additions & 0 deletions
274
guides/useful_concepts/running_membrane_in_elixir_application.md
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,274 @@ | ||
| # Using Membrane in Elixir Application | ||
|
|
||
| This guide outlines best practices for integrating Membrane Pipelines into your Elixir application, specifically focusing on how to attach pipelines to your application's supervision tree. | ||
|
|
||
| In most cases, pipelines are used in one of the following scenarios: | ||
|
|
||
| - Static Orchestration: Maintaining a single, permanent pipeline that always runs with your application (e.g. mixing output from multiple IP cameras into a single HLS stream). | ||
|
|
||
| - Dynamic Orchestration: Spawning pipelines on-demand based on user actions (e.g. starting a unique pipeline for every user joining a conference room). | ||
|
|
||
| - Batch Processing: Handling "offline" tasks with a clear start and end (e.g. transcoding a stream inside an MP4 file). | ||
|
|
||
| ## Static Orchestration: Supervisor on Startup | ||
|
|
||
| This approach is ideal when your pipeline architecture is fixed and needs to run continuously from the moment your application starts. | ||
| Imagine an application that monitors a security camera feed to detect people or vehicles in real-time. Such an application could consist of two components: | ||
|
|
||
| - The Membrane Pipeline which connects to an RTSP stream, decodes the video, and extracts raw frames. It sends these frames to the external process (via a Unix socket or standard input), receives the transformed video back, re-encodes it, and broadcasts the final stream using HLS. | ||
|
|
||
| - The OS Process being a Python script running a machine learning model like [RF-DETR](https://github.com/roboflow/rf-detr). It reads raw video frames and performs object segmentation, coloring the pixels that correspond to detected objects. | ||
|
|
||
| The pipeline could look similar to this one: | ||
|
|
||
| ```elixir | ||
| defmodule MyProject.Pipeline do | ||
| use Membrane.Pipeline | ||
|
|
||
| @impl true | ||
| def handle_init(_ctx, rtsp_url) do | ||
| spec = [ | ||
| child(:source, %Membrane.RTSP.Source{ | ||
| transport: :tcp, | ||
| allowed_media_types: [:video], | ||
| stream_uri: rtsp_url, | ||
| on_connection_closed: :send_eos | ||
| }) | ||
| ] | ||
|
|
||
| {[spec: spec], %{}} | ||
| end | ||
|
|
||
| @impl true | ||
| def handle_child_notification({:set_up_tracks, tracks}, _child, _ctx, state) do | ||
| hls_config = %Membrane.HTTPAdaptiveStream.SinkBin{ | ||
| manifest_module: Membrane.HTTPAdaptiveStream.HLS, | ||
| target_window_duration: Membrane.Time.seconds(120), | ||
| storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{ | ||
| directory: "output_hls" | ||
| } | ||
| } | ||
|
|
||
| video_track = Enum.find(tracks, &(&1.type == :video)) | ||
|
|
||
| spec = [ | ||
| get_child(:source) | ||
| |> via_out(Pad.ref(:output, video_track.control_path)) | ||
| |> child(:depayloader, Membrane.H264.RTP.Depayloader) | ||
| |> child(:parser, Membrane.H264.Parser) | ||
| |> child(:decoder, Membrane.H264.FFmpeg.Decoder) | ||
| # filter talking with the side-car Python OS process running RF-DETR model | ||
| |> child(:segmentation_filter, MyProject.ObjectSegmentationFilter) | ||
| |> child(:encoder, %Membrane.H264.FFmpeg.Encoder{preset: :fast}) | ||
| |> via_in(:input, options: [encoding: :H264, segment_duration: Membrane.Time.seconds(10)]) | ||
| |> child(:hls, hls_config) | ||
| ] | ||
|
|
||
| {[spec: spec], state} | ||
| end | ||
|
|
||
| @impl true | ||
| def handle_child_notification(_notificaiton, _child, _ctx, state), do: {[], state} | ||
| end | ||
| ``` | ||
|
|
||
| Then you could create a dedicated supervisor to manage the pipeline alongside any necessary "side-car" processes such as a [`MuonTrap.Daemon`](https://hexdocs.pm/muontrap/readme.html) wrapping an external OS command. | ||
| This allows you to treat the pipeline and its dependencies as a single unit. | ||
|
|
||
| ```elixir | ||
| defmodule MyProject.InfrastructureSupervisor do | ||
| use Supervisor | ||
|
|
||
| def start_link(args) do | ||
| Supervisor.start_link(__MODULE__, args, name: __MODULE__) | ||
| end | ||
|
|
||
| @impl true | ||
| def init(args) do | ||
| children = [ | ||
| %{ | ||
| id: Pipeline, | ||
| start: {Membrane.Pipeline, :start_link, [MyProject.Pipeline, args[:rtsp_url]]}, | ||
| restart: :transient | ||
| }, | ||
| %{ | ||
| id: SideCar, | ||
| start: | ||
| {MuonTrap.Daemon, :start_link, | ||
| ["python", ["run_model.py", "rf-detr-large-2026.pth"], [log_output: :debug]]}, | ||
| restart: :transient | ||
| } | ||
| ] | ||
|
|
||
| Supervisor.init(children, strategy: :one_for_all) | ||
| end | ||
| end | ||
| ``` | ||
|
|
||
| Usage of `:one_for_all` strategy ensures that the `MuonTrap.Deamon` get restarted each time the pipeline restarts and another way around. | ||
| In your specific scenario, you may need to adjust the [`:restart`](https://hexdocs.pm/elixir/1.12/Supervisor.html#module-restart-values-restart) and [`:strategy`](https://hexdocs.pm/elixir/1.12/Supervisor.html#module-start_link-2-init-2-and-strategies) | ||
| options to reflect the dependencies between supervised processes. | ||
|
|
||
| To launch this when your app boots, add the `InfrastructureSupervisor` to the children list in your `application.ex`: | ||
|
|
||
| ```elixir | ||
| defmodule MyProject.Application do | ||
| use Application | ||
|
|
||
| @impl true | ||
| def start(_type, _args) do | ||
| children = [ | ||
| {MyProject.InfrastructureSupervisor, [rtsp_url: "rtsp://user:pass@127.0.0.1:554"]} | ||
| # ... other children required by your application | ||
| ] | ||
|
|
||
| opts = [strategy: :one_for_one, name: MyProject.Supervisor] | ||
| Supervisor.start_link(children, opts) | ||
| end | ||
| end | ||
| ``` | ||
|
|
||
| ## Dynamically spawning the pipelines under the DynamicSupervisor | ||
|
|
||
| While the static approach works perfectly for fixed infrastructure, many applications require more flexibility. | ||
| Consider a scenario where you need to spawn a new pipeline on demand to ingest an RTSP stream from a camera provided by a user. | ||
|
|
||
| Since we have already defined the `InfrastructureSupervisor`, scaling to multiple dynamic pipelines is straightforward. | ||
|
|
||
| Instead of starting the `InfrastructureSupervisor` directly in your application tree, we add a [`DynamicSupervisor`](https://hexdocs.pm/elixir/DynamicSupervisor.html). | ||
| This allows us to spawn new infrastructure "units" (the pipeline + sidecars) on demand. | ||
|
|
||
| ```elixir | ||
| defmodule MyProject.Application do | ||
| use Application | ||
|
|
||
| @impl true | ||
| def start(_type, _args) do | ||
| children = [ | ||
| {DynamicSupervisor, strategy: :one_for_one, name: MyProject.PipelineDynamicSupervisor} | ||
| # ... other children required by your application | ||
| ] | ||
|
|
||
| opts = [strategy: :one_for_one, name: MyProject.Supervisor] | ||
| Supervisor.start_link(children, opts) | ||
| end | ||
| end | ||
| ``` | ||
|
|
||
| Now, whenever a new pipeline is needed you can spawn a new instance of your infrastructure using [`DynamicSupervisor.start_child/2`](https://hexdocs.pm/elixir/1.12/DynamicSupervisor.html#start_child/2). | ||
| It could happen e.g. inside `mount/3` callback of your LiveView: | ||
|
|
||
| ```elixir | ||
| def mount(params, _session, socket) do | ||
| if connected?(socket) do | ||
| {:ok, infrastructure_supervisor_pid} = DynamicSupervisor.start_child( | ||
| MyProject.PipelineDynamicSupervisor, | ||
| {MyProject.InfrastructureSupervisor, params["rtsp_url"]} | ||
| ) | ||
| {:ok, assign(socket, :infrastructure_supervisor_pid, infrastructure_supervisor_pid)} | ||
| else | ||
| {:ok, socket} | ||
| end | ||
| end | ||
| ``` | ||
|
|
||
| When you need to stop the pipeline, you can use [`DynamicSupervisor.terminate_child/2`](https://hexdocs.pm/elixir/1.12/DynamicSupervisor.html#terminate_child/2). | ||
|
|
||
| In case of your LiveView commponent it could happen in its `terminate/2` callback: | ||
|
|
||
| ```elixir | ||
| def terminate(_reason, socket) do | ||
| if pid = socket.assigns[:infrastructure_supervisor_pid] do | ||
| DynamicSupervisor.terminate_child(MyProject.PipelineDynamicSupervisor, pid) | ||
| end | ||
| :ok | ||
| end | ||
| ``` | ||
|
|
||
| ## Batch Processing | ||
|
|
||
| Batch processing is ideal for "offline tasks" where you need to ensure the job completes successfully. | ||
|
|
||
| For example, let's assume we want to rescale H.264 video in MP4 container. To achieve this, we build the pipeline as follows: | ||
|
|
||
| ```elixir | ||
| defmodule TranscodePipeline do | ||
| use Membrane.Pipeline | ||
|
|
||
| alias Membrane.File.{Source, Sink} | ||
| alias Membrane.H264.FFmpeg.{Decoder, Encoder} | ||
| alias Membrane.H264.Parser | ||
| alias Membrane.FFmpeg.SWScale.Converter | ||
|
|
||
| @impl true | ||
| def handle_init(_ctx, opts) do | ||
| spec = [ | ||
| child(:source, %Source{location: opts[:input_path]}) | ||
| |> child(:demuxer, Membrane.MP4.Demuxer.ISOM) | ||
| |> via_out(:output, options: [kind: :video]) | ||
| |> child(:in_parser, %Parser{output_stream_structure: :annexb}) | ||
| |> child(:decoder, Decoder) | ||
| |> child(:converter, %Converter{output_width: 1080, output_height: 720}) | ||
| |> child(:encoder, Encoder) | ||
| |> child(:out_parser, %Parser{output_stream_structure: :avc1}) | ||
| |> child(:muxer, Membrane.MP4.Muxer.ISOM) | ||
| |> child(:sink, %Sink{location: opts[:output_path]}) | ||
| ] | ||
|
|
||
| {[spec: spec], %{}} | ||
| end | ||
|
|
||
| @impl true | ||
| def handle_element_end_of_stream(:sink, :input, _ctx, state), do: {[terminate: :normal], state} | ||
|
|
||
| @impl true | ||
| def handle_element_end_of_stream(_child, _pad, _ctx, state), do: {[], state} | ||
| end | ||
| ``` | ||
|
|
||
| To ensure reliability, we might wrap execution of the pipeline in an [Oban](https://hexdocs.pm/oban/Oban.html) worker. | ||
|
|
||
| Oban is the standard library for background job processing in Elixir. It persists jobs to your database, ensuring that your long-running transcoding tasks are fault-tolerant. | ||
| If the pipeline crashes or the server restarts, Oban will automatically retry the job until it succeeds. | ||
| To install Oban in your project, you can follow this [installation guide](https://hexdocs.pm/oban/installation.html). | ||
|
|
||
| Assuming that Oban is installed in your project and the `:default` queue is configured we can define the Oban worker: | ||
|
|
||
| ```elixir | ||
| defmodule VideoTranscoderWorker do | ||
| use Oban.Worker, queue: :default, unique: [period: 60] | ||
|
|
||
| @impl true | ||
| def perform(%Oban.Job{args: %{"input_path" => input, "output_path" => output}}) do | ||
| {:ok, _supervisor_pid, pipeline_pid} = | ||
| Membrane.Pipeline.start_link( | ||
| TranscodePipeline, | ||
| input_path: input, | ||
| output_path: output | ||
| ) | ||
|
|
||
| ref = Process.monitor(pipeline_pid) | ||
|
|
||
| receive do | ||
| {:DOWN, ^ref, :process, ^pipeline_pid, :normal} -> | ||
| :ok | ||
| end | ||
| end | ||
|
|
||
| @impl true | ||
| def timeout(_job), do: :timer.minutes(5) | ||
| end | ||
| ``` | ||
|
|
||
| Now we can run the Oban worker: | ||
|
|
||
| ```elixir | ||
| %{ | ||
| "input_path" => "input.mp4", | ||
| "output_path" => "output.mp4" | ||
| } | ||
| |> VideoTranscoderWorker.new() | ||
| |> Oban.insert() | ||
| ``` | ||
|
|
||
| When the job terminates successfully, you will see the output `output.mp4` file with transcoded video. | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's link to the latest docs (like https://hexdocs.pm/elixir/Supervisor.html#module-child-specification)