Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
# Stagehand

GenStage-based background job processing for Elixir. In-memory, no database required.
An in-memory, GenStage-based background job processing library for Elixir.

## Why

Most job processing libraries require a database. Stagehand doesn't. It's built on GenStage and runs entirely in-memory, making it a good fit for applications that need background job processing without the overhead of external dependencies.
Stagehand runs entirely in-memory with no database dependency. It is
built on GenStage and uses `PgRegistry` for cluster-wide producer
discovery and `Highlander` for singleton scheduling.

## Guarantees

- **Graceful shutdown** — executing jobs complete before the node stops. The producer drains in-flight work within a configurable grace period.
- **No new work during shutdown** — the producer snapshots the current cluster membership, then leaves the pg group so no new jobs are routed to it. Any messages already in the mailbox are drained before redistribution.
- **Job redistribution** — on shutdown, scheduled, queued, and in-flight jobs are redistributed to surviving producers. If there are no surviving producers (e.g. the last node shuts down without a replacement joining first), these jobs are lost.
- **At-most-once delivery** — each job runs at most once. Jobs are in-memory with no persistence, so a VM crash loses queued, scheduled, and executing jobs.
- **Unique jobs (best effort)** — deduplication is backed by a local ETS table. A consistent hash ring routes the same job fingerprint to the same producer. When a node joins or leaves, the ring only remaps keys that belong to the changed node — all other fingerprints stay on their current owner, keeping their dedup state intact. On graceful shutdown, the producer snapshots cluster membership, leaves the group, then transfers dedup entries to their new owners using the snapshot. When a new node joins, unique checks are blocked until all existing producers have synced their entries, preventing duplicates during the transition. On crashes, entries on the lost node are gone and duplicates are possible until the uniqueness period expires.
- **Graceful shutdown** — executing jobs complete before the node stops.
The producer drains in-flight work within a configurable grace period.
- **No new work during shutdown** — the producer leaves the cluster
registry so no new jobs are routed to it. Any messages already in the
mailbox are drained before redistribution.
- **Job redistribution** — on shutdown, scheduled, queued, and in-flight
jobs are redistributed to surviving producers. If no surviving
producers exist, these jobs are lost.
- **At-most-once delivery** — each job runs at most once. Jobs are
in-memory with no persistence; a VM crash loses queued, scheduled,
and executing jobs.
- **Unique jobs (best effort)** — deduplication is backed by a local ETS
table per node. Rendezvous hashing routes the same job fingerprint to
the same producer, keeping dedup checks local. On graceful shutdown,
dedup entries are transferred to their new owners. On node join,
unique checks are blocked until all existing producers have synced
their entries. On crashes, entries on the lost node are gone and
duplicates are possible until the uniqueness period expires.

## Installation

Expand Down Expand Up @@ -41,7 +54,7 @@ config :my_app, Stagehand,
config :my_app, Stagehand, testing: :manual
```

Add Stagehand to your supervision tree:
Add Stagehand to the application supervision tree:

```elixir
children = [
Expand All @@ -63,7 +76,7 @@ defmodule MyApp.EmailWorker do
end
```

Insert jobs:
Inserting jobs:

```elixir
%{"to" => "user@example.com", "body" => "hello"}
Expand All @@ -74,16 +87,16 @@ Insert jobs:
### Return values

- `:ok` or `{:ok, value}` — job succeeded
- `{:error, reason}` — job failed, will retry if attempts remain
- `{:error, reason}` — job failed, retries if attempts remain
- `{:snooze, seconds}` — re-enqueue after delay
- `{:cancel, reason}` — stop, no more retries

### Options

- `:queue` — queue name (default `:default`)
- `:max_attempts` — retry limit (default `20`)
- `:priority` — 0-9, lower is higher (default `0`)
- `:unique` — uniqueness config or `false`
- `:priority` — 0-9, lower is higher priority (default `0`)
- `:unique` — uniqueness configuration or `false`
- `:schedule_in` — delay in seconds or `{amount, :seconds | :minutes | :hours | :days}`
- `:scheduled_at` — specific `DateTime`

Expand Down
23 changes: 11 additions & 12 deletions lib/stagehand.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
defmodule Stagehand do
@moduledoc """
Stagehand is a GenStage-based background job processing library,
In-memory, no database required.
An in-memory, GenStage-based background job processing library.

## Configuration

Expand Down Expand Up @@ -37,7 +36,7 @@ defmodule Stagehand do
alias Stagehand.Router

@doc """
Start a Stagehand supervision tree.
Starts a Stagehand supervision tree.
"""
@spec start_link(keyword()) :: Supervisor.on_start()
def start_link(opts) when is_list(opts) do
Expand All @@ -56,7 +55,7 @@ defmodule Stagehand do
end

@doc """
Insert a job for execution.
Inserts a job for execution.
"""
@spec insert(atom(), Stagehand.Job.t()) :: {:ok, Stagehand.Job.t()} | {:error, term()}
def insert(name \\ __MODULE__, %Stagehand.Job{} = job) do
Expand All @@ -76,7 +75,7 @@ defmodule Stagehand do
end

@doc """
Insert multiple jobs at once.
Inserts multiple jobs at once.
"""
@spec insert_all(atom(), [Stagehand.Job.t()]) :: {:ok, [Stagehand.Job.t()]} | {:error, [term()]}
def insert_all(name \\ __MODULE__, jobs) when is_list(jobs) do
Expand All @@ -93,7 +92,7 @@ defmodule Stagehand do
end

@doc """
Cancel a job by its ref. For available jobs, removes from the producer.
Cancels a job by its ref. For available jobs, removes from the producer.
Returns `:ok` on success or `{:error, :not_found}`.
"""
@spec cancel_job(atom(), Stagehand.Job.t()) :: :ok | {:error, :not_found} | :not_found
Expand All @@ -114,15 +113,15 @@ defmodule Stagehand do
end

@doc """
Retry a job by re-inserting it.
Retries a job by re-inserting it.
"""
@spec retry_job(atom(), Stagehand.Job.t()) :: {:ok, Stagehand.Job.t()} | {:error, term()}
def retry_job(name \\ __MODULE__, %Stagehand.Job{} = job) do
insert(name, %{job | attempt: 0, errors: [], state: :available})
end

@doc """
Pause a queue. Broadcasts to all producers for this queue.
Pauses a queue across all producers in the cluster.
"""
@spec pause_queue(atom(), keyword()) :: :ok
def pause_queue(name \\ __MODULE__, opts) do
Expand All @@ -136,7 +135,7 @@ defmodule Stagehand do
end

@doc """
Resume a paused queue. Broadcasts to all producers for this queue.
Resumes a paused queue across all producers in the cluster.
"""
@spec resume_queue(atom(), keyword()) :: :ok
def resume_queue(name \\ __MODULE__, opts) do
Expand All @@ -150,7 +149,7 @@ defmodule Stagehand do
end

@doc """
Drain a queue, returning all pending jobs from all producers.
Drains a queue, returning all pending jobs from all producers.
"""
@spec drain_queue(atom(), keyword()) :: [Stagehand.Job.t()]
def drain_queue(name \\ __MODULE__, opts) do
Expand All @@ -162,7 +161,7 @@ defmodule Stagehand do
end

@doc """
Check the status of a queue across all producers.
Returns the aggregated status of a queue across all producers.
"""
@spec check_queue(atom(), keyword()) :: map()
def check_queue(name \\ __MODULE__, opts) do
Expand All @@ -182,7 +181,7 @@ defmodule Stagehand do
end

@doc """
Get the configuration for a Stagehand instance.
Returns the configuration for a Stagehand instance.
"""
@spec config(atom()) :: Config.t() | {:error, {:not_running, atom()}}
def config(name \\ __MODULE__) do
Expand Down
4 changes: 2 additions & 2 deletions lib/stagehand/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ defmodule Stagehand.Config do
]

@doc """
Build a config struct from the given options.
Builds a config struct from the given options.

Options can be passed directly or read from Application config:
Options can be passed directly or read from application config:

Config.new(otp_app: :my_app, name: Stagehand)
"""
Expand Down
4 changes: 2 additions & 2 deletions lib/stagehand/queue/consumer_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
defmodule Stagehand.Queue.ConsumerSupervisor do
@moduledoc """
ConsumerSupervisor that spawns an Executor task for each job received
from the Producer. Starts without a subscription the Producer calls
`GenStage.async_subscribe/2` after joining the pg group.
from the Producer. Starts without a subscription; the Producer calls
`GenStage.async_subscribe/2` after registering with the cluster.
"""

use ConsumerSupervisor
Expand Down
2 changes: 1 addition & 1 deletion lib/stagehand/queue/executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Stagehand.Queue.Executor do
alias Stagehand.Telemetry

@doc """
Execute a job. This is the entry point called by the ConsumerSupervisor.
Executes a job. Called by the `ConsumerSupervisor` for each dispatched job.
"""
@spec run(Job.t()) :: term()
def run(%Job{worker: worker} = job) do
Expand Down
11 changes: 6 additions & 5 deletions lib/stagehand/queue/pipeline.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
defmodule Stagehand.Queue.Pipeline do
@moduledoc """
Supervisor for a single queue's GenStage pipeline: ConsumerSupervisor + Producer.
Supervisor for a single queue's GenStage pipeline.

Children are ordered so that on shutdown (reverse order) the Producer
stops first — leaving the pg group and draining executing jobs — before
the ConsumerSupervisor is stopped.
Manages a `ConsumerSupervisor` and a `Producer`. On shutdown, the
producer stops first (reverse child order), leaves the cluster
registry, and drains executing jobs before the consumer supervisor
is stopped.
"""

use Supervisor
Expand Down Expand Up @@ -75,7 +76,7 @@ defmodule Stagehand.Queue.Pipeline do
end

@doc """
Get the consumer supervisor process name.
Returns the consumer supervisor process name.
"""
@spec consumer_name(atom(), atom() | binary()) :: {:via, module(), term()}
def consumer_name(stagehand_name, queue) do
Expand Down
25 changes: 14 additions & 11 deletions lib/stagehand/queue/producer.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
defmodule Stagehand.Queue.Producer do
@moduledoc """
GenStage producer for a single queue. Thin demand buffer — receives jobs,
queues them, dispatches when consumers have demand.
GenStage producer for a single queue.

Receives jobs, buffers them in a priority queue, and dispatches to
consumers as demand arrives. Registers with `PgRegistry` for
cluster-wide discovery.
"""

use GenStage
Expand All @@ -24,15 +27,15 @@ defmodule Stagehand.Queue.Producer do
end

@doc """
Enqueue a job into this producer.
Enqueues a job into this producer.
"""
@spec enqueue(GenServer.server(), Stagehand.Job.t()) :: {:ok, Stagehand.Job.t()}
def enqueue(producer, %Stagehand.Job{} = job) do
GenStage.call(producer, {:enqueue, job})
end

@doc """
Schedule a job to be enqueued after `delay_ms` milliseconds.
Schedules a job to be enqueued after `delay_ms` milliseconds.
The timer is tracked so the job can be redistributed on shutdown.
"""
@spec schedule(GenServer.server(), Stagehand.Job.t(), non_neg_integer()) :: {:ok, Stagehand.Job.t()}
Expand All @@ -41,48 +44,48 @@ defmodule Stagehand.Queue.Producer do
end

@doc """
Cancel a job by ref. Removes from available queue if present.
Cancels a job by ref. Removes from the available queue if present.
"""
@spec cancel(GenServer.server(), reference()) :: :ok | :not_found
def cancel(producer, ref) when is_reference(ref) do
GenStage.call(producer, {:cancel, ref})
end

@doc """
Pause this queue — stops dispatching jobs.
Pauses this queue. Jobs continue to be accepted but are not dispatched.
"""
@spec pause(GenServer.server()) :: :ok
def pause(producer) do
GenStage.call(producer, :pause)
end

@doc """
Resume a paused queue.
Resumes a paused queue and dispatches any buffered jobs.
"""
@spec resume(GenServer.server()) :: :ok
def resume(producer) do
GenStage.call(producer, :resume)
end

@doc """
Update the concurrency limit (handled by the consumer supervisor, not here).
Returns current queue info.
Returns current queue info including pause state, available count,
and executing count.
"""
@spec check(GenServer.server()) :: map()
def check(producer) do
GenStage.call(producer, :check)
end

@doc """
Drain all available jobs synchronously. Returns the list of jobs.
Drains all available jobs synchronously. Returns the list of jobs.
"""
@spec drain(GenServer.server()) :: [Stagehand.Job.t()]
def drain(producer) do
GenStage.call(producer, :drain)
end

@doc """
Initiate graceful shutdown. Stops accepting new jobs and waits for
Initiates graceful shutdown. Stops accepting new jobs and waits for
executing jobs to finish before terminating.
"""
@spec shutdown(GenServer.server()) :: :ok
Expand Down
9 changes: 5 additions & 4 deletions lib/stagehand/router.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
defmodule Stagehand.Router do
@moduledoc """
Routes jobs to the correct producer.
Routes jobs to producers.

- Normal jobs: random producer from the :pg group
- Unique jobs: consistent hash → deterministic producer
Normal jobs are sent to a random producer for the queue. Unique jobs
are routed deterministically via rendezvous hashing so that the same
fingerprint always reaches the same producer.
"""

alias Stagehand.Job
Expand All @@ -12,7 +13,7 @@ defmodule Stagehand.Router do
alias Stagehand.Unique

@doc """
Route and insert a job.
Routes a job to a producer and inserts it.
"""
@spec route(Job.t(), Stagehand.Config.t()) :: {:ok, Job.t()} | {:error, :no_producers}
def route(%Job{} = job, conf) do
Expand Down
6 changes: 4 additions & 2 deletions lib/stagehand/supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
defmodule Stagehand.Supervisor do
@moduledoc """
Top-level supervisor for a Stagehand instance. Manages the registry,
hash ring, unique server, queue manager, and plugins.
Top-level supervisor for a Stagehand instance.

Manages the local process registry, unique job deduplication server,
queue pipelines, and plugins.
"""

use Supervisor
Expand Down
Loading
Loading