Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 79 additions & 56 deletions lib/algora/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Algora.Pipeline do
alias Membrane.RTMP.Messages

alias Algora.{Admin, Library}
alias Algora.Pipeline.AbortPipeline
alias Algora.Pipeline.HLS.LLController

@segment_duration_seconds 6
Expand Down Expand Up @@ -39,7 +40,11 @@ defmodule Algora.Pipeline do
end

@impl true
def handle_init(_context, %{app: @app, stream_key: stream_key, client_ref: client_ref}) do
def handle_init(context, %{
app: @app,
stream_key: stream_key,
client_ref: client_ref
} = params) do
Membrane.Logger.info("Starting pipeline #{@app}")

if user = Algora.Accounts.get_user_by(stream_key: stream_key) do
Expand All @@ -65,11 +70,10 @@ defmodule Algora.Pipeline do
reconnect: reconnect
}

setup_forwarding!(state)
setup_extras!(state)
Algora.Library.toggle_stream_status(state.video, :live)

structure = [
actions = [
#
child({:src, reconnect}, %Algora.Pipeline.SourceBin{
client_ref: client_ref
Expand Down Expand Up @@ -103,26 +107,37 @@ defmodule Algora.Pipeline do
})
]

spec = {structure, stream_sync: :sinks, clock_provider: {:src, reconnect}}
{forwarding_refs, forwarding_actions} = forward_rtmp(user, state)

spec = {actions ++ forwarding_actions, [
clock_provider: {:src, reconnect},
stream_sync: :sinks,
group: :rtmp_input,
crash_group_mode: :transient
]}

state = %{state |
forwarding: forwarding_refs,
playing: true
}

{[spec: spec], state}
else
Membrane.Logger.debug("Invalid stream_key")
{[reply: {:error, "invalid stream key"}, terminate: :normal], %{stream_key: stream_key}}
Algora.Pipeline.AbortPipeline.handle_init(context, params)
end
end

def handle_init(_context, %{stream_key: stream_key}) do
{[reply: {:error, "invalid app"}, terminate: :normal], %{stream_key: stream_key}}
def handle_init(context, %{client_ref: client_ref} = params) do
Algora.Pipeline.AbortPipeline.handle_init(context, params)
end

@impl true
def handle_child_notification({:track_playable, :video}, _element, _ctx, state) do
{[], state}
end

def handle_child_notification(:end_of_stream, :funnel_video, _ctx, %{finalized: true} = state) do
{[], state}
def handle_child_notification(:stream_deleted, {:src, _reconnect}, _ctx, %{finalized: true} = state) do
{[], %{state | playing: false}}
end

def handle_child_notification(:end_of_stream, :funnel_video, _ctx, state) do
Expand All @@ -131,7 +146,7 @@ defmodule Algora.Pipeline do
state = terminate_later(state)
# unlink next tick
send(self(), :unlink_all)
{[], state}
{[], %{state | playing: false}}
end

def handle_child_notification(:finalized, _element, _ctx, %{finalized: true} = state) do
Expand All @@ -152,9 +167,16 @@ defmodule Algora.Pipeline do
{[{:reply, state.video.id}], state}
end

def handle_call({:resume_rtmp, _params}, _ctx, %{playing: true} = state) do
Membrane.Logger.error("Tried to resume pipeline while playing #{inspect(state)}")
{[reply: {:error, :playing}], state}
end

def handle_call({:resume_rtmp, %{client_ref: client_ref}}, _ctx, state) do
state = cancel_terminate(state)
reconnect = state.reconnect + 1
state =
cancel_terminate(state)
|> Map.put(:reconnect, reconnect)

Membrane.Logger.info("Attempting reconnection for video #{state.video.uuid}")

Expand All @@ -180,12 +202,23 @@ defmodule Algora.Pipeline do
]

send(self(), :link_tracks)
setup_forwarding!(state)
Algora.Library.toggle_stream_status(state.video, :resumed)

spec = {actions, clock_provider: {:src, reconnect}, group: :rtmp_input, crash_group_mode: :temporary}
{forwarding_refs, forwarding_actions} = forward_rtmp(state.user, state)

spec = {actions ++ forwarding_actions, [
clock_provider: {:src, reconnect},
group: :rtmp_input,
crash_group_mode: :transient
]}

{[spec: spec, reply: :ok], %{state | reconnect: reconnect}}
state = %{state |
reconnect: reconnect,
forwarding: forwarding_refs,
playing: true
}

{[spec: spec, reply: :ok], state}
end

def handle_info(:link_tracks, _ctx, %{reconnect: reconnect} = state) do
Expand Down Expand Up @@ -243,42 +276,9 @@ defmodule Algora.Pipeline do
{Enum.filter(actions, & &1), %{state | forwarding: []}}
end

def handle_info({:forward_rtmp, url, ref}, _ctx, state) do
ref = {ref, state.reconnect}
if Enum.member?(state.forwarding, ref) do
{[], state}
else
spec = [
#
child(ref, %Membrane.RTMP.Sink{rtmp_url: url}),

#
get_child(:tee_audio)
|> via_in(Pad.ref(:audio, 0), toilet_capacity: 10_000)
|> get_child(ref),

#
get_child(:tee_video)
|> via_in(Pad.ref(:video, 0), toilet_capacity: 10_000)
|> get_child(ref)
]

{[spec: spec], %{state | forwarding: [ref | state.forwarding]}}
end
end

def handle_info(:multicast_algora, _ctx, state) do
user = Algora.Accounts.get_user_by!(handle: "algora")
destinations = Algora.Accounts.list_active_destinations(user.id)

for destination <- destinations do
url =
URI.new!(destination.rtmp_url)
|> URI.append_path("/" <> destination.stream_key)
|> URI.to_string()

send(self(), {:forward_rtmp, url, String.to_atom("rtmp_sink_#{destination.id}")})
end
{refs, actions} = forward_rtmp(user, state)

if url = Algora.Accounts.get_restream_ws_url(user) do
Task.Supervisor.start_child(
Expand All @@ -294,9 +294,10 @@ defmodule Algora.Pipeline do
)
end

{[], state}
{[spec: actions], %{state | forwarding: state.forwarding ++ refs}}
end


def handle_info(
{:metadata_message, %Messages.SetDataFrame{} = message},
_ctx,
Expand Down Expand Up @@ -454,17 +455,39 @@ defmodule Algora.Pipeline do
)
end

defp setup_forwarding!(%{video: video}) do
destinations = Algora.Accounts.list_active_destinations(video.user_id)

for destination <- destinations do
defp forward_rtmp(user, %{reconnect: reconnect}) do
Algora.Accounts.list_active_destinations(user.id)
|> Enum.reduce({[], []}, fn(destination, {refs, actions}) ->
forward_ref = String.to_atom("rtmp_sink_#{destination.id}")
ref = {forward_ref, reconnect}
url =
URI.new!(destination.rtmp_url)
|> URI.append_path("/" <> destination.stream_key)
|> URI.to_string()

send(self(), {:forward_rtmp, url, String.to_atom("rtmp_sink_#{destination.id}")})
end
{
[ref|refs],
actions ++ [{
[
#
child(ref, %Membrane.RTMP.Sink{rtmp_url: url}),

#
get_child({:video_reconnect, reconnect})
|> via_in(Pad.ref(:video, 0), toilet_capacity: 10_000)
|> get_child(ref),

#
get_child({:audio_reconnect, reconnect})
|> via_in(Pad.ref(:audio, 0), toilet_capacity: 10_000)
|> get_child(ref)
],
[
group: forward_ref, crash_group_mode: :transient
]
}]
}
end)
end

defp setup_extras!(%{video: video, user: user}) do
Expand Down
26 changes: 26 additions & 0 deletions lib/algora/pipeline/abort_pipeline.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Algora.Pipeline.AbortPipeline do
use Membrane.Pipeline
require Membrane.Logger

def handle_init(_ctx, %{stream_key: stream_key, client_ref: client_ref} = params) do
Membrane.Logger.error("Aborted stream key: #{stream_key}")
spec = [
#
child(:abort, %Algora.Pipeline.SourceBin{
client_ref: client_ref
}),

#
get_child(:abort)
|> via_out(:audio)
|> child(:video, Membrane.Testing.Sink),

#
get_child(:abort)
|> via_out(:video)
|> child(:audio, Membrane.Testing.Sink),
]
{[spec: spec, terminate: :normal], params}
end
end

31 changes: 19 additions & 12 deletions lib/algora/pipeline/manager.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
defmodule Algora.Pipeline.Manager do
alias Algora.Pipeline.AbortPipeline

use GenServer

@app "live"

def handle_new_client(_client_ref, "", ""), do:
{__MODULE__.Abort, "Invalid stream key and app"}
def handle_new_client(client_ref, "", ""), do:
handle_new_client(client_ref, @app, "")

def handle_new_client(client_ref, stream_key, ""), do:
handle_new_client(client_ref, @app, stream_key)
Expand All @@ -20,16 +22,21 @@ defmodule Algora.Pipeline.Manager do
{:ok, pid} =
with true <- Algora.config([:resume_rtmp]),
{pid, metadata} when is_pid(pid) <- :syn.lookup(:pipelines, stream_key) do
:ok = __MODULE__.resume_rtmp(pid, %{ params | video_uuid: metadata[:video_uuid] })
{:ok, pid}
else
_ ->
if Algora.config([:flame, :backend]) == FLAME.LocalBackend do
Algora.Pipeline.Supervisor.start_child([self(), params])
else
FLAME.place_child(Algora.Pipeline.Pool, {__MODULE__, [self(), params]})
end
end
case resume_rtmp(pid, %{params | video_uuid: metadata[:video_uuid]}) do
:ok ->
{:ok, pid}
{:error, reason} ->
{:ok, _sub, pid} = Membrane.Pipeline.start_link(AbortPipeline, params)
{:ok, pid}
end
else
_ ->
if Algora.config([:flame, :backend]) == FLAME.LocalBackend do
Algora.Pipeline.Supervisor.start_child([self(), params])
else
FLAME.place_child(Algora.Pipeline.Pool, {__MODULE__, [self(), params]})
end
end

{Algora.Pipeline.ClientHandler, %{pipeline: pid}}
end
Expand Down