From db22d2fb37d907fe235610b3ac4626c06117adc3 Mon Sep 17 00:00:00 2001 From: ty Date: Mon, 16 Dec 2024 13:43:24 -0500 Subject: [PATCH 1/2] Start RTMP forwarding when RTMP source arrives This fixes a ~1 second delay in audio when multistreaming starve rtmp connection if already playing allow passing stream key as app cleanup unused handle_init initialize rtmp connection before aborting --- lib/algora/pipeline.ex | 134 +++++++++++++++++++-------------- lib/algora/pipeline/manager.ex | 56 +++++++++++--- 2 files changed, 122 insertions(+), 68 deletions(-) diff --git a/lib/algora/pipeline.ex b/lib/algora/pipeline.ex index 40aa129..15af887 100644 --- a/lib/algora/pipeline.ex +++ b/lib/algora/pipeline.ex @@ -39,7 +39,11 @@ defmodule Algora.Pipeline do end @impl true - def handle_init(_context, %{app: @app, stream_key: stream_key, client_ref: client_ref}) do + def handle_init(context, %{ + app: @app, + stream_key: stream_key, + client_ref: client_ref + } = params) do Membrane.Logger.info("Starting pipeline #{@app}") if user = Algora.Accounts.get_user_by(stream_key: stream_key) do @@ -65,11 +69,10 @@ defmodule Algora.Pipeline do reconnect: reconnect } - setup_forwarding!(state) setup_extras!(state) Algora.Library.toggle_stream_status(state.video, :live) - structure = [ + actions = [ # child({:src, reconnect}, %Algora.Pipeline.SourceBin{ client_ref: client_ref @@ -103,17 +106,28 @@ defmodule Algora.Pipeline do }) ] - spec = {structure, stream_sync: :sinks, clock_provider: {:src, reconnect}} + {forwarding_refs, forwarding_actions} = forward_rtmp(user, state) + + spec = {actions ++ forwarding_actions, [ + clock_provider: {:src, reconnect}, + stream_sync: :sinks, + group: :rtmp_input, + crash_group_mode: :transient + ]} + + state = %{state | + forwarding: forwarding_refs, + playing: true + } {[spec: spec], state} else - Membrane.Logger.debug("Invalid stream_key") - {[reply: {:error, "invalid stream key"}, terminate: :normal], %{stream_key: stream_key}} + Algora.Pipeline.Manager.AbortPipeline.handle_init(context, params) end end - def handle_init(_context, %{stream_key: stream_key}) do - {[reply: {:error, "invalid app"}, terminate: :normal], %{stream_key: stream_key}} + def handle_init(context, %{client_ref: client_ref} = params) do + Algora.Pipeline.Manager.AbortPipeline.handle_init(context, params) end @impl true @@ -121,8 +135,8 @@ defmodule Algora.Pipeline do {[], state} end - def handle_child_notification(:end_of_stream, :funnel_video, _ctx, %{finalized: true} = state) do - {[], state} + def handle_child_notification(:stream_deleted, {:src, _reconnect}, _ctx, %{finalized: true} = state) do + {[], %{state | playing: false}} end def handle_child_notification(:end_of_stream, :funnel_video, _ctx, state) do @@ -131,7 +145,7 @@ defmodule Algora.Pipeline do state = terminate_later(state) # unlink next tick send(self(), :unlink_all) - {[], state} + {[], %{state | playing: false}} end def handle_child_notification(:finalized, _element, _ctx, %{finalized: true} = state) do @@ -152,9 +166,16 @@ defmodule Algora.Pipeline do {[{:reply, state.video.id}], state} end + def handle_call({:resume_rtmp, _params}, _ctx, %{playing: true} = state) do + Membrane.Logger.error("Tried to resume pipeline while playing #{inspect(state)}") + {[reply: {:error, :playing}], state} + end + def handle_call({:resume_rtmp, %{client_ref: client_ref}}, _ctx, state) do - state = cancel_terminate(state) reconnect = state.reconnect + 1 + state = + cancel_terminate(state) + |> Map.put(:reconnect, reconnect) Membrane.Logger.info("Attempting reconnection for video #{state.video.uuid}") @@ -180,12 +201,23 @@ defmodule Algora.Pipeline do ] send(self(), :link_tracks) - setup_forwarding!(state) Algora.Library.toggle_stream_status(state.video, :resumed) - spec = {actions, clock_provider: {:src, reconnect}, group: :rtmp_input, crash_group_mode: :temporary} + {forwarding_refs, forwarding_actions} = forward_rtmp(state.user, state) + + spec = {actions ++ forwarding_actions, [ + clock_provider: {:src, reconnect}, + group: :rtmp_input, + crash_group_mode: :transient + ]} - {[spec: spec, reply: :ok], %{state | reconnect: reconnect}} + state = %{state | + reconnect: reconnect, + forwarding: forwarding_refs, + playing: true + } + + {[spec: spec, reply: :ok], state} end def handle_info(:link_tracks, _ctx, %{reconnect: reconnect} = state) do @@ -243,42 +275,9 @@ defmodule Algora.Pipeline do {Enum.filter(actions, & &1), %{state | forwarding: []}} end - def handle_info({:forward_rtmp, url, ref}, _ctx, state) do - ref = {ref, state.reconnect} - if Enum.member?(state.forwarding, ref) do - {[], state} - else - spec = [ - # - child(ref, %Membrane.RTMP.Sink{rtmp_url: url}), - - # - get_child(:tee_audio) - |> via_in(Pad.ref(:audio, 0), toilet_capacity: 10_000) - |> get_child(ref), - - # - get_child(:tee_video) - |> via_in(Pad.ref(:video, 0), toilet_capacity: 10_000) - |> get_child(ref) - ] - - {[spec: spec], %{state | forwarding: [ref | state.forwarding]}} - end - end - def handle_info(:multicast_algora, _ctx, state) do user = Algora.Accounts.get_user_by!(handle: "algora") - destinations = Algora.Accounts.list_active_destinations(user.id) - - for destination <- destinations do - url = - URI.new!(destination.rtmp_url) - |> URI.append_path("/" <> destination.stream_key) - |> URI.to_string() - - send(self(), {:forward_rtmp, url, String.to_atom("rtmp_sink_#{destination.id}")}) - end + {refs, actions} = forward_rtmp(user, state) if url = Algora.Accounts.get_restream_ws_url(user) do Task.Supervisor.start_child( @@ -294,9 +293,10 @@ defmodule Algora.Pipeline do ) end - {[], state} + {[spec: actions], %{state | forwarding: state.forwarding ++ refs}} end + def handle_info( {:metadata_message, %Messages.SetDataFrame{} = message}, _ctx, @@ -454,17 +454,39 @@ defmodule Algora.Pipeline do ) end - defp setup_forwarding!(%{video: video}) do - destinations = Algora.Accounts.list_active_destinations(video.user_id) - - for destination <- destinations do + defp forward_rtmp(user, %{reconnect: reconnect}) do + Algora.Accounts.list_active_destinations(user.id) + |> Enum.reduce({[], []}, fn(destination, {refs, actions}) -> + forward_ref = String.to_atom("rtmp_sink_#{destination.id}") + ref = {forward_ref, reconnect} url = URI.new!(destination.rtmp_url) |> URI.append_path("/" <> destination.stream_key) |> URI.to_string() - send(self(), {:forward_rtmp, url, String.to_atom("rtmp_sink_#{destination.id}")}) - end + { + [ref|refs], + actions ++ [{ + [ + # + child(ref, %Membrane.RTMP.Sink{rtmp_url: url}), + + # + get_child({:video_reconnect, reconnect}) + |> via_in(Pad.ref(:video, 0), toilet_capacity: 10_000) + |> get_child(ref), + + # + get_child({:audio_reconnect, reconnect}) + |> via_in(Pad.ref(:audio, 0), toilet_capacity: 10_000) + |> get_child(ref) + ], + [ + group: forward_ref, crash_group_mode: :transient + ] + }] + } + end) end defp setup_extras!(%{video: video, user: user}) do diff --git a/lib/algora/pipeline/manager.ex b/lib/algora/pipeline/manager.ex index 1960e42..d5271b2 100644 --- a/lib/algora/pipeline/manager.ex +++ b/lib/algora/pipeline/manager.ex @@ -1,10 +1,37 @@ defmodule Algora.Pipeline.Manager do + + defmodule AbortPipeline do + use Membrane.Pipeline + require Membrane.Logger + + def handle_init(_ctx, %{stream_key: stream_key, client_ref: client_ref} = params) do + Membrane.Logger.error("Aborted stream key: #{stream_key}") + spec = [ + # + child(:abort, %Algora.Pipeline.SourceBin{ + client_ref: client_ref + }), + + # + get_child(:abort) + |> via_out(:audio) + |> child(:video, Membrane.Testing.Sink), + + # + get_child(:abort) + |> via_out(:video) + |> child(:audio, Membrane.Testing.Sink), + ] + {[spec: spec, terminate: :normal], params} + end + end + use GenServer @app "live" - def handle_new_client(_client_ref, "", ""), do: - {__MODULE__.Abort, "Invalid stream key and app"} + def handle_new_client(client_ref, "", ""), do: + handle_new_client(client_ref, @app, "") def handle_new_client(client_ref, stream_key, ""), do: handle_new_client(client_ref, @app, stream_key) @@ -20,16 +47,21 @@ defmodule Algora.Pipeline.Manager do {:ok, pid} = with true <- Algora.config([:resume_rtmp]), {pid, metadata} when is_pid(pid) <- :syn.lookup(:pipelines, stream_key) do - :ok = __MODULE__.resume_rtmp(pid, %{ params | video_uuid: metadata[:video_uuid] }) - {:ok, pid} - else - _ -> - if Algora.config([:flame, :backend]) == FLAME.LocalBackend do - Algora.Pipeline.Supervisor.start_child([self(), params]) - else - FLAME.place_child(Algora.Pipeline.Pool, {__MODULE__, [self(), params]}) - end - end + case resume_rtmp(pid, %{params | video_uuid: metadata[:video_uuid]}) do + :ok -> + {:ok, pid} + {:error, reason} -> + {:ok, _sub, pid} = Membrane.Pipeline.start_link(AbortPipeline, params) + {:ok, pid} + end + else + _ -> + if Algora.config([:flame, :backend]) == FLAME.LocalBackend do + Algora.Pipeline.Supervisor.start_child([self(), params]) + else + FLAME.place_child(Algora.Pipeline.Pool, {__MODULE__, [self(), params]}) + end + end {Algora.Pipeline.ClientHandler, %{pipeline: pid}} end From f6860691cd539483590930014ce9e64a3e2b24e6 Mon Sep 17 00:00:00 2001 From: ty Date: Mon, 16 Dec 2024 22:00:37 -0500 Subject: [PATCH 2/2] move abort pipeline to own file --- lib/algora/pipeline.ex | 5 +++-- lib/algora/pipeline/abort_pipeline.ex | 26 ++++++++++++++++++++++++++ lib/algora/pipeline/manager.ex | 27 +-------------------------- 3 files changed, 30 insertions(+), 28 deletions(-) create mode 100644 lib/algora/pipeline/abort_pipeline.ex diff --git a/lib/algora/pipeline.ex b/lib/algora/pipeline.ex index 15af887..933ae6a 100644 --- a/lib/algora/pipeline.ex +++ b/lib/algora/pipeline.ex @@ -6,6 +6,7 @@ defmodule Algora.Pipeline do alias Membrane.RTMP.Messages alias Algora.{Admin, Library} + alias Algora.Pipeline.AbortPipeline alias Algora.Pipeline.HLS.LLController @segment_duration_seconds 6 @@ -122,12 +123,12 @@ defmodule Algora.Pipeline do {[spec: spec], state} else - Algora.Pipeline.Manager.AbortPipeline.handle_init(context, params) + Algora.Pipeline.AbortPipeline.handle_init(context, params) end end def handle_init(context, %{client_ref: client_ref} = params) do - Algora.Pipeline.Manager.AbortPipeline.handle_init(context, params) + Algora.Pipeline.AbortPipeline.handle_init(context, params) end @impl true diff --git a/lib/algora/pipeline/abort_pipeline.ex b/lib/algora/pipeline/abort_pipeline.ex new file mode 100644 index 0000000..bab8212 --- /dev/null +++ b/lib/algora/pipeline/abort_pipeline.ex @@ -0,0 +1,26 @@ +defmodule Algora.Pipeline.AbortPipeline do + use Membrane.Pipeline + require Membrane.Logger + + def handle_init(_ctx, %{stream_key: stream_key, client_ref: client_ref} = params) do + Membrane.Logger.error("Aborted stream key: #{stream_key}") + spec = [ + # + child(:abort, %Algora.Pipeline.SourceBin{ + client_ref: client_ref + }), + + # + get_child(:abort) + |> via_out(:audio) + |> child(:video, Membrane.Testing.Sink), + + # + get_child(:abort) + |> via_out(:video) + |> child(:audio, Membrane.Testing.Sink), + ] + {[spec: spec, terminate: :normal], params} + end +end + diff --git a/lib/algora/pipeline/manager.ex b/lib/algora/pipeline/manager.ex index d5271b2..8d4aa35 100644 --- a/lib/algora/pipeline/manager.ex +++ b/lib/algora/pipeline/manager.ex @@ -1,30 +1,5 @@ defmodule Algora.Pipeline.Manager do - - defmodule AbortPipeline do - use Membrane.Pipeline - require Membrane.Logger - - def handle_init(_ctx, %{stream_key: stream_key, client_ref: client_ref} = params) do - Membrane.Logger.error("Aborted stream key: #{stream_key}") - spec = [ - # - child(:abort, %Algora.Pipeline.SourceBin{ - client_ref: client_ref - }), - - # - get_child(:abort) - |> via_out(:audio) - |> child(:video, Membrane.Testing.Sink), - - # - get_child(:abort) - |> via_out(:video) - |> child(:audio, Membrane.Testing.Sink), - ] - {[spec: spec, terminate: :normal], params} - end - end + alias Algora.Pipeline.AbortPipeline use GenServer