diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bb0a190..5c16d05 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,8 +12,13 @@ jobs: strategy: matrix: - elixir: ["1.19"] - otp: ["28"] + include: + - elixir: "1.15" + otp: "26" + - elixir: "1.17" + otp: "27" + - elixir: "1.19" + otp: "28" steps: - uses: actions/checkout@v4 @@ -31,6 +36,7 @@ jobs: key: ${{ runner.os }}-mix-${{ matrix.elixir }}-${{ matrix.otp }}-${{ hashFiles('mix.lock') }} restore-keys: ${{ runner.os }}-mix-${{ matrix.elixir }}-${{ matrix.otp }}- + - run: epmd -daemon - run: mix deps.get - run: mix compile --warnings-as-errors - run: mix format --check-formatted diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..1dce6e4 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,18 @@ +# Changelog + +## 0.1.0 + +Initial release. + +- GenStage-based producer/consumer pipeline with per-queue concurrency + limits and priority dispatch +- Cluster-wide producer discovery via `PgRegistry` +- Job routing: random for normal jobs, rendezvous hashing for unique jobs +- Unique job deduplication with local ETS tables and cross-node entry + transfer on topology changes +- Graceful shutdown with job redistribution to surviving producers +- Cron scheduling via `Highlander` for cluster-wide singleton execution +- Telemetry events for job lifecycle and queue shutdown +- Testing helpers with `:manual` and `:inline` modes +- Exponential backoff with jitter for retries +- Queue pause/resume across the cluster diff --git a/README.md b/README.md index 21e67f5..a6cfde5 100644 --- a/README.md +++ b/README.md @@ -19,13 +19,14 @@ discovery and `Highlander` for singleton scheduling. - **At-most-once delivery** — each job runs at most once. Jobs are in-memory with no persistence; a VM crash loses queued, scheduled, and executing jobs. -- **Unique jobs (best effort)** — deduplication is backed by a local ETS +- **Unique jobs (best effort)** — deduplication is backed by an ETS table per node. Rendezvous hashing routes the same job fingerprint to - the same producer, keeping dedup checks local. On graceful shutdown, - dedup entries are transferred to their new owners. On node join, - unique checks are blocked until all existing producers have synced - their entries. On crashes, entries on the lost node are gone and - duplicates are possible until the uniqueness period expires. + the same node, and the dedup check runs on that node's Unique server. + On graceful shutdown, dedup entries are transferred to surviving + nodes. On node join, unique checks are blocked until all existing + producers have synced their entries. On crashes, entries on the lost + node are gone and duplicates are possible until the uniqueness period + expires. ## Installation diff --git a/lib/stagehand/queue/producer.ex b/lib/stagehand/queue/producer.ex index 081e8b6..a501730 100644 --- a/lib/stagehand/queue/producer.ex +++ b/lib/stagehand/queue/producer.ex @@ -318,19 +318,18 @@ defmodule Stagehand.Queue.Producer do defp sync_unique_entries(state, all_producers) do unique_name = Module.concat(state.conf.name, Stagehand.Unique) entries = Stagehand.Unique.export(unique_name) + survivors = all_producers -- [self()] - do_sync_unique(unique_name, entries, all_producers) + do_sync_unique(unique_name, entries, survivors) end defp do_sync_unique(_unique_name, [], _producers), do: :ok - defp do_sync_unique(_unique_name, _entries, producers) when length(producers) < 2, do: :ok + defp do_sync_unique(_unique_name, _entries, []), do: :ok - defp do_sync_unique(unique_name, entries, producers) do + defp do_sync_unique(unique_name, entries, survivors) do remote = for {fp, _job, _ts} = entry <- entries, - owner = Stagehand.Router.rendezvous(producers, fp), - owner != self(), - node(owner) != node(), + owner = Stagehand.Router.rendezvous(survivors, fp), reduce: %{} do acc -> Map.update(acc, node(owner), [entry], &[entry | &1]) end diff --git a/lib/stagehand/router.ex b/lib/stagehand/router.ex index 4ea101f..9cc39cb 100644 --- a/lib/stagehand/router.ex +++ b/lib/stagehand/router.ex @@ -45,8 +45,9 @@ defmodule Stagehand.Router do fingerprint = Unique.fingerprint(job) producer_pid = rendezvous(producers, fingerprint) + target_unique = {unique_server, node(producer_pid)} - case Unique.check_and_insert(unique_server, fingerprint, job) do + case Unique.check_and_insert(target_unique, fingerprint, job) do {:ok, job} -> route_normal(job, producer_pid) diff --git a/mix.exs b/mix.exs index 2cb6b3d..58a98e3 100644 --- a/mix.exs +++ b/mix.exs @@ -1,16 +1,23 @@ defmodule Stagehand.MixProject do use Mix.Project + @version "0.1.0" + @source_url "https://github.com/twinn/stagehand" + def project do [ app: :stagehand, - version: "0.1.0", - elixir: "~> 1.19", + version: @version, + elixir: "~> 1.15", start_permanent: Mix.env() == :prod, deps: deps(), + description: description(), + package: package(), + docs: docs(), elixirc_paths: elixirc_paths(Mix.env()), aliases: aliases(), - dialyzer: [plt_add_apps: [:ex_unit]] + dialyzer: [plt_add_apps: [:ex_unit]], + source_url: @source_url ] end @@ -36,6 +43,25 @@ defmodule Stagehand.MixProject do defp elixirc_paths(:test), do: ["lib", "test/support"] defp elixirc_paths(_), do: ["lib"] + defp description do + "An in-memory, GenStage-based background job processing library for Elixir." + end + + defp package do + [ + licenses: ["MIT"], + links: %{"GitHub" => @source_url}, + files: ~w(lib mix.exs README.md LICENSE CHANGELOG.md) + ] + end + + defp docs do + [ + main: "Stagehand", + extras: ["README.md", "CHANGELOG.md", "LICENSE"] + ] + end + defp deps do [ {:crontab, "~> 1.1"}, @@ -43,6 +69,7 @@ defmodule Stagehand.MixProject do {:highlander, "~> 0.2"}, {:pg_registry, "~> 0.4"}, {:telemetry, "~> 1.0"}, + {:ex_doc, "~> 0.34", only: :dev, runtime: false}, {:credo, "~> 1.7", only: [:dev, :test], runtime: false}, {:styler, "~> 1.11", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 1.4", only: [:dev, :test], runtime: false} diff --git a/mix.lock b/mix.lock index ca2ea86..6b89c74 100644 --- a/mix.lock +++ b/mix.lock @@ -3,12 +3,18 @@ "credo": {:hex, :credo, "1.7.17", "f92b6aa5b26301eaa5a35e4d48ebf5aa1e7094ac00ae38f87086c562caf8a22f", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1eb5645c835f0b6c9b5410f94b5a185057bcf6d62a9c2b476da971cde8749645"}, "crontab": {:hex, :crontab, "1.2.0", "503611820257939d5d0fd272eb2b454f48a470435a809479ddc2c40bb515495c", [:mix], [{:ecto, "~> 1.0 or ~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm", "ebd7ef4d831e1b20fa4700f0de0284a04cac4347e813337978e25b4cc5cc2207"}, "dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"}, "erlex": {:hex, :erlex, "0.2.8", "cd8116f20f3c0afe376d1e8d1f0ae2452337729f68be016ea544a72f767d9c12", [:mix], [], "hexpm", "9d66ff9fedf69e49dc3fd12831e12a8a37b76f8651dd21cd45fcf5561a8a7590"}, + "ex_doc": {:hex, :ex_doc, "0.40.1", "67542e4b6dde74811cfd580e2c0149b78010fd13001fda7cfeb2b2c2ffb1344d", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "bcef0e2d360d93ac19f01a85d58f91752d930c0a30e2681145feea6bd3516e00"}, "file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"}, "gen_stage": {:hex, :gen_stage, "1.3.2", "7c77e5d1e97de2c6c2f78f306f463bca64bf2f4c3cdd606affc0100b89743b7b", [:mix], [], "hexpm", "0ffae547fa777b3ed889a6b9e1e64566217413d018cabd825f786e843ffe63e7"}, "highlander": {:hex, :highlander, "0.2.1", "e59b459f857e89daf73f2598bf2b2c0479a435481e6101ea389fd3625919b052", [:mix], [], "hexpm", "5ba19a18358803d82a923511acec8ee85fac30731c5ca056f2f934bc3d3afd9a"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, "libring": {:hex, :libring, "1.7.0", "4f245d2f1476cd7ed8f03740f6431acba815401e40299208c7f5c640e1883bda", [:mix], [], "hexpm", "070e3593cb572e04f2c8470dd0c119bc1817a7a0a7f88229f43cf0345268ec42"}, + "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, + "makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"}, + "makeup_erlang": {:hex, :makeup_erlang, "1.0.3", "4252d5d4098da7415c390e847c814bad3764c94a814a0b4245176215615e1035", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "953297c02582a33411ac6208f2c6e55f0e870df7f80da724ed613f10e6706afd"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, "pg_registry": {:hex, :pg_registry, "0.4.0", "edb4de796e2a0224e69d29edb3883c7a2be90a026261fcda2f1d0c1e19189df3", [:mix], [], "hexpm", "2a34ebdce4ce9ea43995aeda7981b0b954165d444f6aa5ce1784e7c5b1708274"}, "styler": {:hex, :styler, "1.11.0", "35010d970689a23c2bcc8e97bd8bf7d20e3561d60c49be84654df5c37d051a9c", [:mix], [], "hexpm", "70f36165d0cf238a32b7a456fdef6a9c72e77e657d7ac4a0ace33aeba3f2b8c0"}, "telemetry": {:hex, :telemetry, "1.4.1", "ab6de178e2b29b58e8256b92b382ea3f590a47152ca3651ea857a6cae05ac423", [:rebar3], [], "hexpm", "2172e05a27531d3d31dd9782841065c50dd5c3c7699d95266b2edd54c2dafa1c"}, diff --git a/test/stagehand/cluster_test.exs b/test/stagehand/cluster_test.exs new file mode 100644 index 0000000..c4b2c59 --- /dev/null +++ b/test/stagehand/cluster_test.exs @@ -0,0 +1,118 @@ +defmodule Stagehand.ClusterTest do + use ExUnit.Case, async: true + + alias Stagehand.Queue.Pipeline + alias Stagehand.Test.Cluster + + setup_all do + {peer, peer_node} = Cluster.spawn_peer() + {:ok, peer: peer, peer_node: peer_node} + end + + setup %{peer: peer, peer_node: peer_node} do + # Restart peer if a previous test stopped it + {peer, peer_node} = + if Node.ping(peer_node) == :pong do + {peer, peer_node} + else + Cluster.spawn_peer() + end + + name = :"stagehand_cluster_#{:erlang.unique_integer([:positive])}" + + start_supervised!( + {Stagehand, name: name, queues: [default: 3], shutdown_grace_period: 5_000}, + id: name + ) + + pg_key = {:stagehand, name, :producers, "default"} + {ref, _} = PgRegistry.monitor(Stagehand.ProducerRegistry, pg_key) + + {:ok, _} = + Cluster.start_stagehand(peer_node, name, + queues: [default: 3], + shutdown_grace_period: 5_000 + ) + + assert_receive {^ref, :join, ^pg_key, [{pid, _}]} when node(pid) == peer_node, 5_000 + PgRegistry.demonitor(Stagehand.ProducerRegistry, ref) + + {:ok, name: name, peer: peer, peer_node: peer_node} + end + + describe "producer discovery" do + test "both nodes see each other's producers", %{name: name, peer_node: peer_node} do + local_producers = Pipeline.producers_for_queue(name, "default") + remote_producers = :erpc.call(peer_node, Pipeline, :producers_for_queue, [name, "default"]) + + assert length(local_producers) == 2 + assert length(remote_producers) == 2 + + local_nodes = local_producers |> Enum.map(&node/1) |> Enum.sort() + assert node() in local_nodes + assert peer_node in local_nodes + end + + test "producers_for_queue returns only local after peer stops", %{name: name, peer: peer, peer_node: peer_node} do + assert length(Pipeline.producers_for_queue(name, "default")) == 2 + + pg_key = {:stagehand, name, :producers, "default"} + {ref, _} = PgRegistry.monitor(Stagehand.ProducerRegistry, pg_key) + + :peer.stop(peer) + + assert_receive {^ref, :leave, ^pg_key, [{pid, _}]} when node(pid) == peer_node, 5_000 + PgRegistry.demonitor(Stagehand.ProducerRegistry, ref) + + producers = Pipeline.producers_for_queue(name, "default") + assert length(producers) == 1 + assert node(hd(producers)) == node() + end + end + + describe "unique job dedup across nodes" do + test "same unique fingerprint is deduplicated", %{name: name, peer_node: peer_node} do + job = %Stagehand.Job{ + worker: SomeWorker, + queue: "default", + args: %{"key" => "cluster_unique"}, + unique: [period: 300, fields: [:worker, :queue, :args]], + state: :available + } + + {:ok, first} = Stagehand.insert(name, job) + refute first.conflict? + + {:ok, second} = :erpc.call(peer_node, Stagehand, :insert, [name, job]) + assert second.conflict? + end + end + + describe "graceful shutdown" do + test "peer shutdown transfers unique entries to survivor", %{name: name, peer_node: peer_node} do + job = %Stagehand.Job{ + worker: SomeWorker, + queue: "default", + args: %{"key" => "transfer_me"}, + unique: [period: 300, fields: [:worker, :queue, :args]], + state: :available + } + + {:ok, _} = Stagehand.insert(name, job) + + pg_key = {:stagehand, name, :producers, "default"} + {ref, _} = PgRegistry.monitor(Stagehand.ProducerRegistry, pg_key) + + :erpc.call(peer_node, Supervisor, :stop, [name]) + + assert_receive {^ref, :leave, ^pg_key, [{pid, _}]} when node(pid) == peer_node, 5_000 + PgRegistry.demonitor(Stagehand.ProducerRegistry, ref) + + unique_name = Module.concat(name, Stagehand.Unique) + _ = :sys.get_state(unique_name) + + {:ok, duplicate} = Stagehand.insert(name, job) + assert duplicate.conflict? + end + end +end diff --git a/test/support/cluster.ex b/test/support/cluster.ex new file mode 100644 index 0000000..a987534 --- /dev/null +++ b/test/support/cluster.ex @@ -0,0 +1,51 @@ +defmodule Stagehand.Test.Cluster do + @moduledoc false + + def spawn_peer do + cookie = Atom.to_charlist(Node.get_cookie()) + name = :"stagehand_peer_#{:erlang.unique_integer([:positive])}" + + {:ok, peer, node} = + :peer.start(%{ + name: name, + args: [~c"-setcookie", cookie] + }) + + :ok = :erpc.call(node, :code, :add_paths, [:code.get_path()]) + {:ok, _} = :erpc.call(node, Application, :ensure_all_started, [:stagehand]) + + {peer, node} + end + + def start_stagehand(node, name, opts \\ []) do + queues = Keyword.get(opts, :queues, default: 5) + grace = Keyword.get(opts, :shutdown_grace_period, 5_000) + caller = self() + + # Spawn a long-lived process on the remote node to own the + # Stagehand supervision tree. erpc callers are short-lived and + # their exit kills linked children. + Node.spawn(node, fn -> + {:ok, _} = + Stagehand.start_link( + name: name, + queues: queues, + shutdown_grace_period: grace + ) + + send(caller, {:stagehand_started, name}) + Process.sleep(:infinity) + end) + + receive do + {:stagehand_started, ^name} -> {:ok, name} + after + 5_000 -> {:error, :timeout} + end + end + + def sync(node) do + :erpc.call(node, :sys, :get_state, [Stagehand.ProducerRegistry]) + :sys.get_state(Stagehand.ProducerRegistry) + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 194f282..61663f1 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,2 +1,3 @@ Logger.configure(level: :warning) +{:ok, _} = :net_kernel.start([:"stagehand_test@127.0.0.1"]) ExUnit.start()