Skip to content
14 changes: 14 additions & 0 deletions lib/stagehand/application.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule Stagehand.Application do
@moduledoc false

use Application

@impl true
def start(_type, _args) do
children = [
{PgRegistry, Stagehand.ProducerRegistry}
]

Supervisor.start_link(children, strategy: :one_for_one, name: Stagehand.ApplicationSupervisor)
end
end
27 changes: 14 additions & 13 deletions lib/stagehand/plugins/cron.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ defmodule Stagehand.Plugins.Cron do
@moduledoc """
Plugin for scheduling periodic/cron jobs.

Uses highlander pattern — only the leader node (first in sorted :pg members)
fires cron jobs, preventing duplicate scheduling across the cluster.
Uses `Highlander` to ensure a single cron process runs across the
cluster, preventing duplicate scheduling.

## Usage

Expand Down Expand Up @@ -31,10 +31,19 @@ defmodule Stagehand.Plugins.Cron do

defstruct [:conf, :crontab, :timer_ref]

def start_link(opts) do
def child_spec(opts) do
conf = opts[:conf]
name = {:via, PgRegistry, {:pg, {:stagehand_cron, conf.name}}}
GenServer.start_link(__MODULE__, opts, name: name)

inner = %{
id: {__MODULE__, conf.name},
start: {__MODULE__, :start_link, [opts]}
}

Highlander.child_spec(inner)
end

def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end

@impl true
Expand All @@ -60,7 +69,6 @@ defmodule Stagehand.Plugins.Cron do
now = NaiveDateTime.utc_now()

for {cron, worker, opts} <- state.crontab,
leader?(state),
Crontab.DateChecker.matches_date?(cron, now) do
job = worker.new(%{}, Keyword.put(opts, :meta, %{"cron" => true}))
Stagehand.insert(state.conf.name, job)
Expand All @@ -73,13 +81,6 @@ defmodule Stagehand.Plugins.Cron do

# -- Private --

defp leader?(%{conf: conf}) do
case PgRegistry.get_members(:pg, {:stagehand_cron, conf.name}) do
[] -> true
members -> self() == Enum.min(members)
end
end

defp parse_entry({expression, worker}), do: parse_entry({expression, worker, []})

defp parse_entry({expression, worker, opts}) do
Expand Down
22 changes: 7 additions & 15 deletions lib/stagehand/queue/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ defmodule Stagehand.Queue.Pipeline do
conf = opts[:conf]
limit = opts[:limit] || 10

producer_name = producer_name(conf.name, queue)
consumer_name = consumer_name(conf.name, queue)

shutdown_grace = conf.shutdown_grace_period + 1_000
Expand All @@ -43,16 +42,15 @@ defmodule Stagehand.Queue.Pipeline do
type: :supervisor,
shutdown: shutdown_grace
},
# Producer starts second — {:via, PgRegistry, ...} joins the pg
# group automatically. On shutdown (reverse order) it stops first,
# leaves the group in terminate, then drains executing jobs.
# Producer starts second and registers with PgRegistry in init/1.
# On shutdown (reverse order) it stops first, leaves the group in
# terminate, then drains executing jobs.
%{
id: Producer,
start:
{Producer, :start_link,
[
[
name: producer_name,
queue: queue,
conf: conf,
consumer: consumer_name,
Expand All @@ -67,19 +65,13 @@ defmodule Stagehand.Queue.Pipeline do
end

@doc """
Local producer name (for consumer subscription on the same node).
"""
@spec producer_name(atom(), atom() | binary()) :: {:via, module(), term()}
def producer_name(stagehand_name, queue) do
{:via, PgRegistry, {:pg, {:stagehand, stagehand_name, :producers, to_string(queue)}}}
end

@doc """
Get all producer pids for a queue across the cluster.
Returns all producer pids for a queue across the cluster.
"""
@spec producers_for_queue(atom(), atom() | binary()) :: [pid()]
def producers_for_queue(stagehand_name, queue) do
PgRegistry.get_members(:pg, {:stagehand, stagehand_name, :producers, to_string(queue)})
for {pid, _} <-
PgRegistry.lookup(Stagehand.ProducerRegistry, {:stagehand, stagehand_name, :producers, to_string(queue)}),
do: pid
end

@doc """
Expand Down
26 changes: 13 additions & 13 deletions lib/stagehand/queue/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ defmodule Stagehand.Queue.Producer do
]

def start_link(opts) do
name = opts[:name]
GenStage.start_link(__MODULE__, opts, name: name)
server_opts = if opts[:name], do: [name: opts[:name]], else: []
GenStage.start_link(__MODULE__, opts, server_opts)
end

@doc """
Expand Down Expand Up @@ -100,8 +100,9 @@ defmodule Stagehand.Queue.Producer do
conf = opts[:conf]

pg_group = {:stagehand, conf.name, :producers, queue}
existing = :pg.get_members(:pg, pg_group)
:pg.monitor(:pg, pg_group)
{:ok, _} = PgRegistry.register(Stagehand.ProducerRegistry, pg_group, nil)
{_ref, existing} = PgRegistry.monitor(Stagehand.ProducerRegistry, pg_group)
existing = for {pid, _} <- existing, do: pid

# If there are existing producers on other nodes, tell Unique to block
# check_and_insert until all of them have synced their entries to us.
Expand Down Expand Up @@ -239,14 +240,15 @@ defmodule Stagehand.Queue.Producer do
{:stop, :normal, state}
end

def handle_info({_ref, :join, _group, pids}, state) do
def handle_info({_ref, :join, _group, entries}, state) do
pg_key = {:stagehand, state.conf.name, :producers, state.queue}
sync_unique_entries(state, PgRegistry.get_members(:pg, pg_key))
members = for {pid, _} <- PgRegistry.lookup(Stagehand.ProducerRegistry, pg_key), do: pid
sync_unique_entries(state, members)

# Signal sync complete to new producers' Unique servers
unique_name = Module.concat(state.conf.name, Stagehand.Unique)

for pid <- pids, node(pid) != node() do
for {pid, _} <- entries, node(pid) != node() do
try do
Stagehand.Unique.sync_complete({unique_name, node(pid)})
catch
Expand All @@ -257,7 +259,7 @@ defmodule Stagehand.Queue.Producer do
{:noreply, [], state}
end

def handle_info({_ref, :leave, _group, _pids}, state) do
def handle_info({_ref, :leave, _group, _entries}, state) do
{:noreply, [], state}
end

Expand All @@ -277,10 +279,10 @@ defmodule Stagehand.Queue.Producer do
pg_key = {:stagehand, state.conf.name, :producers, state.queue}

# Snapshot membership while we're still in the group (needed for hash ring)
all_producers = PgRegistry.get_members(:pg, pg_key)
all_producers = for {pid, _} <- PgRegistry.lookup(Stagehand.ProducerRegistry, pg_key), do: pid

# Leave so no new jobs are routed to us
PgRegistry.unregister_name({:pg, pg_key})
PgRegistry.unregister(Stagehand.ProducerRegistry, pg_key)

# Drain any in-flight enqueue messages that arrived before we left
state = drain_mailbox(state)
Expand Down Expand Up @@ -321,11 +323,9 @@ defmodule Stagehand.Queue.Producer do
defp do_sync_unique(_unique_name, _entries, producers) when length(producers) < 2, do: :ok

defp do_sync_unique(unique_name, entries, producers) do
ring = Enum.reduce(producers, HashRing.new(), &HashRing.add_node(&2, &1))

remote =
for {fp, _job, _ts} = entry <- entries,
owner = HashRing.key_to_node(ring, fp),
owner = Stagehand.Router.rendezvous(producers, fp),
owner != self(),
node(owner) != node(),
reduce: %{} do
Expand Down
8 changes: 6 additions & 2 deletions lib/stagehand/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ defmodule Stagehand.Router do
unique_server = Module.concat(conf.name, Unique)
fingerprint = Unique.fingerprint(job)

ring = Enum.reduce(producers, HashRing.new(), &HashRing.add_node(&2, &1))
producer_pid = HashRing.key_to_node(ring, fingerprint)
producer_pid = rendezvous(producers, fingerprint)

case Unique.check_and_insert(unique_server, fingerprint, job) do
{:ok, job} ->
Expand All @@ -55,6 +54,11 @@ defmodule Stagehand.Router do
end
end

@doc false
def rendezvous(pids, key) do
Enum.max_by(pids, fn pid -> :erlang.phash2({node(pid), key}) end)
end

defp schedule_delay(%Job{scheduled_at: nil}), do: 0

defp schedule_delay(%Job{scheduled_at: scheduled_at}) do
Expand Down
7 changes: 4 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ defmodule Stagehand.MixProject do

def application do
[
extra_applications: [:logger]
extra_applications: [:logger],
mod: {Stagehand.Application, []}
]
end

Expand All @@ -40,8 +41,8 @@ defmodule Stagehand.MixProject do
[
{:crontab, "~> 1.1"},
{:gen_stage, "~> 1.2"},
{:libring, "~> 1.7"},
{:pg_registry, "~> 0.2.2"},
{:highlander, "~> 0.2"},
{:pg_registry, "~> 0.4"},
{:telemetry, "~> 1.0"},
{:credo, "~> 1.7", only: [:dev, :test], runtime: false},
{:styler, "~> 1.11", only: [:dev, :test], runtime: false},
Expand Down
3 changes: 2 additions & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
"erlex": {:hex, :erlex, "0.2.8", "cd8116f20f3c0afe376d1e8d1f0ae2452337729f68be016ea544a72f767d9c12", [:mix], [], "hexpm", "9d66ff9fedf69e49dc3fd12831e12a8a37b76f8651dd21cd45fcf5561a8a7590"},
"file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"},
"gen_stage": {:hex, :gen_stage, "1.3.2", "7c77e5d1e97de2c6c2f78f306f463bca64bf2f4c3cdd606affc0100b89743b7b", [:mix], [], "hexpm", "0ffae547fa777b3ed889a6b9e1e64566217413d018cabd825f786e843ffe63e7"},
"highlander": {:hex, :highlander, "0.2.1", "e59b459f857e89daf73f2598bf2b2c0479a435481e6101ea389fd3625919b052", [:mix], [], "hexpm", "5ba19a18358803d82a923511acec8ee85fac30731c5ca056f2f934bc3d3afd9a"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"libring": {:hex, :libring, "1.7.0", "4f245d2f1476cd7ed8f03740f6431acba815401e40299208c7f5c640e1883bda", [:mix], [], "hexpm", "070e3593cb572e04f2c8470dd0c119bc1817a7a0a7f88229f43cf0345268ec42"},
"pg_registry": {:hex, :pg_registry, "0.2.2", "1dd46a90dd53d9af47ace311c767b2d2ebe621b9f77a7eea072283d8ca713a91", [:mix], [], "hexpm", "22b83dd2e9ffe970d3fbb380ab56cb98cf88a486798a9ecbe6cc6392fa1fc5cd"},
"pg_registry": {:hex, :pg_registry, "0.4.0", "edb4de796e2a0224e69d29edb3883c7a2be90a026261fcda2f1d0c1e19189df3", [:mix], [], "hexpm", "2a34ebdce4ce9ea43995aeda7981b0b954165d444f6aa5ce1784e7c5b1708274"},
"styler": {:hex, :styler, "1.11.0", "35010d970689a23c2bcc8e97bd8bf7d20e3561d60c49be84654df5c37d051a9c", [:mix], [], "hexpm", "70f36165d0cf238a32b7a456fdef6a9c72e77e657d7ac4a0ace33aeba3f2b8c0"},
"telemetry": {:hex, :telemetry, "1.4.1", "ab6de178e2b29b58e8256b92b382ea3f590a47152ca3651ea857a6cae05ac423", [:rebar3], [], "hexpm", "2172e05a27531d3d31dd9782841065c50dd5c3c7699d95266b2edd54c2dafa1c"},
}
50 changes: 13 additions & 37 deletions test/stagehand/plugins/cron_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Stagehand.Plugins.CronTest do
use ExUnit.Case, async: true
use ExUnit.Case, async: false

alias Stagehand.Plugins.Cron

Expand Down Expand Up @@ -48,7 +48,10 @@ defmodule Stagehand.Plugins.CronTest do
end

defp cron_pid(name) do
PgRegistry.whereis_name({:pg, {:stagehand_cron, name}})
highlander_pid = :global.whereis_name({Highlander, {Cron, name}})
%{pid: sup_pid} = :sys.get_state(highlander_pid)
[{_, pid, _, _}] = Supervisor.which_children(sup_pid)
pid
end

defp tick(pid) do
Expand All @@ -61,7 +64,6 @@ defmodule Stagehand.Plugins.CronTest do
test "inserts jobs matching the current minute" do
name = start_stagehand(crontab: [{"* * * * *", EveryMinuteWorker}])

# Trigger a tick manually instead of waiting for the real timer
tick(cron_pid(name))

jobs = Stagehand.Testing.all_enqueued(name, worker: EveryMinuteWorker)
Expand Down Expand Up @@ -102,8 +104,6 @@ defmodule Stagehand.Plugins.CronTest do
describe "aliases" do
test "@daily alias is parsed" do
name = start_stagehand(crontab: [{"@daily", HourlyWorker}])

# If parsing failed, start_supervised would have crashed
assert is_pid(cron_pid(name))
end

Expand All @@ -113,40 +113,16 @@ defmodule Stagehand.Plugins.CronTest do
end
end

describe "leader election" do
test "only the leader fires jobs" do
describe "singleton" do
test "cron is registered globally via Highlander" do
name = start_stagehand(crontab: [{"* * * * *", EveryMinuteWorker}])
pid = cron_pid(name)

# Start a second cron process and manually join the same pg group.
# Can't use {:via, PgRegistry, ...} since the name is already taken.
conf = %Stagehand.Config{
name: name,
queues: [default: 5],
shutdown_grace_period: @timeout,
testing: :manual
}

{:ok, cron2} =
GenServer.start_link(Cron,
conf: conf,
crontab: [{"* * * * *", EveryMinuteWorker}]
)

:pg.join(:pg, {:stagehand_cron, name}, cron2)

members = PgRegistry.get_members(:pg, {:stagehand_cron, name})
assert length(members) == 2

leader = Enum.min(members)
non_leader = Enum.max(members)

# Only the leader should insert
tick(leader)
assert length(Stagehand.Testing.all_enqueued(name, worker: EveryMinuteWorker)) == 1

# The non-leader should not insert
tick(non_leader)
assert length(Stagehand.Testing.all_enqueued(name, worker: EveryMinuteWorker)) == 1
# Highlander registers under {Highlander, child_spec.id}
highlander_pid = :global.whereis_name({Highlander, {Cron, name}})
assert is_pid(highlander_pid)
assert is_pid(pid)
assert pid != highlander_pid
end
end

Expand Down
1 change: 0 additions & 1 deletion test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
:pg.start(:pg)
ExUnit.start()
Loading