Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ jobs:

strategy:
matrix:
elixir: ["1.19"]
otp: ["28"]
include:
- elixir: "1.15"
otp: "26"
- elixir: "1.17"
otp: "27"
- elixir: "1.19"
otp: "28"

steps:
- uses: actions/checkout@v4
Expand All @@ -31,6 +36,7 @@ jobs:
key: ${{ runner.os }}-mix-${{ matrix.elixir }}-${{ matrix.otp }}-${{ hashFiles('mix.lock') }}
restore-keys: ${{ runner.os }}-mix-${{ matrix.elixir }}-${{ matrix.otp }}-

- run: epmd -daemon
- run: mix deps.get
- run: mix compile --warnings-as-errors
- run: mix format --check-formatted
Expand Down
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Changelog

## 0.1.0

Initial release.

- GenStage-based producer/consumer pipeline with per-queue concurrency
limits and priority dispatch
- Cluster-wide producer discovery via `PgRegistry`
- Job routing: random for normal jobs, rendezvous hashing for unique jobs
- Unique job deduplication with local ETS tables and cross-node entry
transfer on topology changes
- Graceful shutdown with job redistribution to surviving producers
- Cron scheduling via `Highlander` for cluster-wide singleton execution
- Telemetry events for job lifecycle and queue shutdown
- Testing helpers with `:manual` and `:inline` modes
- Exponential backoff with jitter for retries
- Queue pause/resume across the cluster
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ discovery and `Highlander` for singleton scheduling.
- **At-most-once delivery** — each job runs at most once. Jobs are
in-memory with no persistence; a VM crash loses queued, scheduled,
and executing jobs.
- **Unique jobs (best effort)** — deduplication is backed by a local ETS
- **Unique jobs (best effort)** — deduplication is backed by an ETS
table per node. Rendezvous hashing routes the same job fingerprint to
the same producer, keeping dedup checks local. On graceful shutdown,
dedup entries are transferred to their new owners. On node join,
unique checks are blocked until all existing producers have synced
their entries. On crashes, entries on the lost node are gone and
duplicates are possible until the uniqueness period expires.
the same node, and the dedup check runs on that node's Unique server.
On graceful shutdown, dedup entries are transferred to surviving
nodes. On node join, unique checks are blocked until all existing
producers have synced their entries. On crashes, entries on the lost
node are gone and duplicates are possible until the uniqueness period
expires.

## Installation

Expand Down
11 changes: 5 additions & 6 deletions lib/stagehand/queue/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -318,19 +318,18 @@ defmodule Stagehand.Queue.Producer do
defp sync_unique_entries(state, all_producers) do
unique_name = Module.concat(state.conf.name, Stagehand.Unique)
entries = Stagehand.Unique.export(unique_name)
survivors = all_producers -- [self()]

do_sync_unique(unique_name, entries, all_producers)
do_sync_unique(unique_name, entries, survivors)
end

defp do_sync_unique(_unique_name, [], _producers), do: :ok
defp do_sync_unique(_unique_name, _entries, producers) when length(producers) < 2, do: :ok
defp do_sync_unique(_unique_name, _entries, []), do: :ok

defp do_sync_unique(unique_name, entries, producers) do
defp do_sync_unique(unique_name, entries, survivors) do
remote =
for {fp, _job, _ts} = entry <- entries,
owner = Stagehand.Router.rendezvous(producers, fp),
owner != self(),
node(owner) != node(),
owner = Stagehand.Router.rendezvous(survivors, fp),
reduce: %{} do
acc -> Map.update(acc, node(owner), [entry], &[entry | &1])
end
Expand Down
3 changes: 2 additions & 1 deletion lib/stagehand/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ defmodule Stagehand.Router do
fingerprint = Unique.fingerprint(job)

producer_pid = rendezvous(producers, fingerprint)
target_unique = {unique_server, node(producer_pid)}

case Unique.check_and_insert(unique_server, fingerprint, job) do
case Unique.check_and_insert(target_unique, fingerprint, job) do
{:ok, job} ->
route_normal(job, producer_pid)

Expand Down
33 changes: 30 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
defmodule Stagehand.MixProject do
use Mix.Project

@version "0.1.0"
@source_url "https://github.com/twinn/stagehand"

def project do
[
app: :stagehand,
version: "0.1.0",
elixir: "~> 1.19",
version: @version,
elixir: "~> 1.15",
start_permanent: Mix.env() == :prod,
deps: deps(),
description: description(),
package: package(),
docs: docs(),
elixirc_paths: elixirc_paths(Mix.env()),
aliases: aliases(),
dialyzer: [plt_add_apps: [:ex_unit]]
dialyzer: [plt_add_apps: [:ex_unit]],
source_url: @source_url
]
end

Expand All @@ -36,13 +43,33 @@ defmodule Stagehand.MixProject do
defp elixirc_paths(:test), do: ["lib", "test/support"]
defp elixirc_paths(_), do: ["lib"]

defp description do
"An in-memory, GenStage-based background job processing library for Elixir."
end

defp package do
[
licenses: ["MIT"],
links: %{"GitHub" => @source_url},
files: ~w(lib mix.exs README.md LICENSE CHANGELOG.md)
]
end

defp docs do
[
main: "Stagehand",
extras: ["README.md", "CHANGELOG.md", "LICENSE"]
]
end

defp deps do
[
{:crontab, "~> 1.1"},
{:gen_stage, "~> 1.2"},
{:highlander, "~> 0.2"},
{:pg_registry, "~> 0.4"},
{:telemetry, "~> 1.0"},
{:ex_doc, "~> 0.34", only: :dev, runtime: false},
{:credo, "~> 1.7", only: [:dev, :test], runtime: false},
{:styler, "~> 1.11", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.4", only: [:dev, :test], runtime: false}
Expand Down
6 changes: 6 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@
"credo": {:hex, :credo, "1.7.17", "f92b6aa5b26301eaa5a35e4d48ebf5aa1e7094ac00ae38f87086c562caf8a22f", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1eb5645c835f0b6c9b5410f94b5a185057bcf6d62a9c2b476da971cde8749645"},
"crontab": {:hex, :crontab, "1.2.0", "503611820257939d5d0fd272eb2b454f48a470435a809479ddc2c40bb515495c", [:mix], [{:ecto, "~> 1.0 or ~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm", "ebd7ef4d831e1b20fa4700f0de0284a04cac4347e813337978e25b4cc5cc2207"},
"dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"},
"earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"},
"erlex": {:hex, :erlex, "0.2.8", "cd8116f20f3c0afe376d1e8d1f0ae2452337729f68be016ea544a72f767d9c12", [:mix], [], "hexpm", "9d66ff9fedf69e49dc3fd12831e12a8a37b76f8651dd21cd45fcf5561a8a7590"},
"ex_doc": {:hex, :ex_doc, "0.40.1", "67542e4b6dde74811cfd580e2c0149b78010fd13001fda7cfeb2b2c2ffb1344d", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "bcef0e2d360d93ac19f01a85d58f91752d930c0a30e2681145feea6bd3516e00"},
"file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"},
"gen_stage": {:hex, :gen_stage, "1.3.2", "7c77e5d1e97de2c6c2f78f306f463bca64bf2f4c3cdd606affc0100b89743b7b", [:mix], [], "hexpm", "0ffae547fa777b3ed889a6b9e1e64566217413d018cabd825f786e843ffe63e7"},
"highlander": {:hex, :highlander, "0.2.1", "e59b459f857e89daf73f2598bf2b2c0479a435481e6101ea389fd3625919b052", [:mix], [], "hexpm", "5ba19a18358803d82a923511acec8ee85fac30731c5ca056f2f934bc3d3afd9a"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"libring": {:hex, :libring, "1.7.0", "4f245d2f1476cd7ed8f03740f6431acba815401e40299208c7f5c640e1883bda", [:mix], [], "hexpm", "070e3593cb572e04f2c8470dd0c119bc1817a7a0a7f88229f43cf0345268ec42"},
"makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"},
"makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.3", "4252d5d4098da7415c390e847c814bad3764c94a814a0b4245176215615e1035", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "953297c02582a33411ac6208f2c6e55f0e870df7f80da724ed613f10e6706afd"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
"pg_registry": {:hex, :pg_registry, "0.4.0", "edb4de796e2a0224e69d29edb3883c7a2be90a026261fcda2f1d0c1e19189df3", [:mix], [], "hexpm", "2a34ebdce4ce9ea43995aeda7981b0b954165d444f6aa5ce1784e7c5b1708274"},
"styler": {:hex, :styler, "1.11.0", "35010d970689a23c2bcc8e97bd8bf7d20e3561d60c49be84654df5c37d051a9c", [:mix], [], "hexpm", "70f36165d0cf238a32b7a456fdef6a9c72e77e657d7ac4a0ace33aeba3f2b8c0"},
"telemetry": {:hex, :telemetry, "1.4.1", "ab6de178e2b29b58e8256b92b382ea3f590a47152ca3651ea857a6cae05ac423", [:rebar3], [], "hexpm", "2172e05a27531d3d31dd9782841065c50dd5c3c7699d95266b2edd54c2dafa1c"},
Expand Down
118 changes: 118 additions & 0 deletions test/stagehand/cluster_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
defmodule Stagehand.ClusterTest do
use ExUnit.Case, async: true

alias Stagehand.Queue.Pipeline
alias Stagehand.Test.Cluster

setup_all do
{peer, peer_node} = Cluster.spawn_peer()
{:ok, peer: peer, peer_node: peer_node}
end

setup %{peer: peer, peer_node: peer_node} do
# Restart peer if a previous test stopped it
{peer, peer_node} =
if Node.ping(peer_node) == :pong do
{peer, peer_node}
else
Cluster.spawn_peer()
end

name = :"stagehand_cluster_#{:erlang.unique_integer([:positive])}"

start_supervised!(
{Stagehand, name: name, queues: [default: 3], shutdown_grace_period: 5_000},
id: name
)

pg_key = {:stagehand, name, :producers, "default"}
{ref, _} = PgRegistry.monitor(Stagehand.ProducerRegistry, pg_key)

{:ok, _} =
Cluster.start_stagehand(peer_node, name,
queues: [default: 3],
shutdown_grace_period: 5_000
)

assert_receive {^ref, :join, ^pg_key, [{pid, _}]} when node(pid) == peer_node, 5_000
PgRegistry.demonitor(Stagehand.ProducerRegistry, ref)

{:ok, name: name, peer: peer, peer_node: peer_node}
end

describe "producer discovery" do
test "both nodes see each other's producers", %{name: name, peer_node: peer_node} do
local_producers = Pipeline.producers_for_queue(name, "default")
remote_producers = :erpc.call(peer_node, Pipeline, :producers_for_queue, [name, "default"])

assert length(local_producers) == 2
assert length(remote_producers) == 2

local_nodes = local_producers |> Enum.map(&node/1) |> Enum.sort()
assert node() in local_nodes
assert peer_node in local_nodes
end

test "producers_for_queue returns only local after peer stops", %{name: name, peer: peer, peer_node: peer_node} do
assert length(Pipeline.producers_for_queue(name, "default")) == 2

pg_key = {:stagehand, name, :producers, "default"}
{ref, _} = PgRegistry.monitor(Stagehand.ProducerRegistry, pg_key)

:peer.stop(peer)

assert_receive {^ref, :leave, ^pg_key, [{pid, _}]} when node(pid) == peer_node, 5_000
PgRegistry.demonitor(Stagehand.ProducerRegistry, ref)

producers = Pipeline.producers_for_queue(name, "default")
assert length(producers) == 1
assert node(hd(producers)) == node()
end
end

describe "unique job dedup across nodes" do
test "same unique fingerprint is deduplicated", %{name: name, peer_node: peer_node} do
job = %Stagehand.Job{
worker: SomeWorker,
queue: "default",
args: %{"key" => "cluster_unique"},
unique: [period: 300, fields: [:worker, :queue, :args]],
state: :available
}

{:ok, first} = Stagehand.insert(name, job)
refute first.conflict?

{:ok, second} = :erpc.call(peer_node, Stagehand, :insert, [name, job])
assert second.conflict?
end
end

describe "graceful shutdown" do
test "peer shutdown transfers unique entries to survivor", %{name: name, peer_node: peer_node} do
job = %Stagehand.Job{
worker: SomeWorker,
queue: "default",
args: %{"key" => "transfer_me"},
unique: [period: 300, fields: [:worker, :queue, :args]],
state: :available
}

{:ok, _} = Stagehand.insert(name, job)

pg_key = {:stagehand, name, :producers, "default"}
{ref, _} = PgRegistry.monitor(Stagehand.ProducerRegistry, pg_key)

:erpc.call(peer_node, Supervisor, :stop, [name])

assert_receive {^ref, :leave, ^pg_key, [{pid, _}]} when node(pid) == peer_node, 5_000
PgRegistry.demonitor(Stagehand.ProducerRegistry, ref)

unique_name = Module.concat(name, Stagehand.Unique)
_ = :sys.get_state(unique_name)

{:ok, duplicate} = Stagehand.insert(name, job)
assert duplicate.conflict?
end
end
end
51 changes: 51 additions & 0 deletions test/support/cluster.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
defmodule Stagehand.Test.Cluster do
@moduledoc false

def spawn_peer do
cookie = Atom.to_charlist(Node.get_cookie())
name = :"stagehand_peer_#{:erlang.unique_integer([:positive])}"

{:ok, peer, node} =
:peer.start(%{
name: name,
args: [~c"-setcookie", cookie]
})

:ok = :erpc.call(node, :code, :add_paths, [:code.get_path()])
{:ok, _} = :erpc.call(node, Application, :ensure_all_started, [:stagehand])

{peer, node}
end

def start_stagehand(node, name, opts \\ []) do
queues = Keyword.get(opts, :queues, default: 5)
grace = Keyword.get(opts, :shutdown_grace_period, 5_000)
caller = self()

# Spawn a long-lived process on the remote node to own the
# Stagehand supervision tree. erpc callers are short-lived and
# their exit kills linked children.
Node.spawn(node, fn ->
{:ok, _} =
Stagehand.start_link(
name: name,
queues: queues,
shutdown_grace_period: grace
)

send(caller, {:stagehand_started, name})
Process.sleep(:infinity)
end)

receive do
{:stagehand_started, ^name} -> {:ok, name}
after
5_000 -> {:error, :timeout}
end
end

def sync(node) do
:erpc.call(node, :sys, :get_state, [Stagehand.ProducerRegistry])
:sys.get_state(Stagehand.ProducerRegistry)
end
end
1 change: 1 addition & 0 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
Logger.configure(level: :warning)
{:ok, _} = :net_kernel.start([:"stagehand_test@127.0.0.1"])
ExUnit.start()
Loading