diff --git a/.formatter.exs b/.formatter.exs new file mode 100644 index 00000000..d304ff32 --- /dev/null +++ b/.formatter.exs @@ -0,0 +1,3 @@ +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b22e9643..edd0219d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,28 +29,31 @@ jobs: image: hexpm/elixir:${{ matrix.image }} steps: - - name: Checkout - uses: actions/checkout@v5 - - - name: Hex and Rebar setup - run: | - mix local.hex --force - mix local.rebar --force - - - name: Restore deps and _build cache - uses: actions/cache@v4 - with: - path: | - deps - _build - key: deps-${{ runner.os }}-${{ matrix.image }}-${{ hashFiles('**/mix.lock') }} - restore-keys: | - deps-${{ runner.os }}-${{ matrix.image }} - - - name: Install dependencies - run: mix deps.get --only test - - - name: Run tests - run: | - epmd -daemon - mix test + - name: Checkout + uses: actions/checkout@v5 + + - name: Hex and Rebar setup + run: | + mix local.hex --force + mix local.rebar --force + + - name: Restore deps and _build cache + uses: actions/cache@v4 + with: + path: | + deps + _build + key: deps-${{ runner.os }}-${{ matrix.image }}-${{ hashFiles('**/mix.lock') }} + restore-keys: | + deps-${{ runner.os }}-${{ matrix.image }} + + - name: Install dependencies + run: mix deps.get --only test + + - name: Check formatted + run: mix format --check-formatted + + - name: Run tests + run: | + epmd -daemon + mix test diff --git a/lib/phoenix/pubsub/application.ex b/lib/phoenix/pubsub/application.ex index b8eef3ba..f5b95dcd 100644 --- a/lib/phoenix/pubsub/application.ex +++ b/lib/phoenix/pubsub/application.ex @@ -16,4 +16,4 @@ defmodule Phoenix.PubSub.Application do [] end end -end \ No newline at end of file +end diff --git a/lib/phoenix/pubsub/pg2.ex b/lib/phoenix/pubsub/pg2.ex index 823ae5e5..9e5b2dc5 100644 --- a/lib/phoenix/pubsub/pg2.ex +++ b/lib/phoenix/pubsub/pg2.ex @@ -64,16 +64,19 @@ defmodule Phoenix.PubSub.PG2 do broadcast_pool_size = Keyword.get(opts, :broadcast_pool_size, pool_size) if pool_size < broadcast_pool_size do - {:error, "the :pool_size option must be greater than or equal to the :broadcast_pool_size option"} + {:error, + "the :pool_size option must be greater than or equal to the :broadcast_pool_size option"} else adapter_name = Keyword.fetch!(opts, :adapter_name) - Supervisor.start_link(__MODULE__, {name, adapter_name, pool_size, broadcast_pool_size}, name: :"#{adapter_name}_supervisor") + + Supervisor.start_link(__MODULE__, {name, adapter_name, pool_size, broadcast_pool_size}, + name: :"#{adapter_name}_supervisor" + ) end end @impl true def init({name, adapter_name, pool_size, broadcast_pool_size}) do - listener_groups = groups(adapter_name, pool_size) broadcast_groups = groups(adapter_name, broadcast_pool_size) diff --git a/lib/phoenix/tracker/clock.ex b/lib/phoenix/tracker/clock.ex index a0200f23..ee2a6c62 100644 --- a/lib/phoenix/tracker/clock.ex +++ b/lib/phoenix/tracker/clock.ex @@ -2,13 +2,13 @@ defmodule Phoenix.Tracker.Clock do @moduledoc false alias Phoenix.Tracker.State - @type context :: State.context - @type clock :: {State.name, context} + @type context :: State.context() + @type clock :: {State.name(), context} @doc """ Returns a list of replicas from a list of contexts. """ - @spec clockset_replicas([clock]) :: [State.name] + @spec clockset_replicas([clock]) :: [State.name()] def clockset_replicas(clockset) do for {replica, _} <- clockset, do: replica end @@ -18,8 +18,10 @@ defmodule Phoenix.Tracker.Clock do """ @spec append_clock([clock], clock) :: [clock] def append_clock(clockset, {_, clock}) when map_size(clock) == 0, do: clockset + def append_clock(clockset, {node, clock}) do big_clock = combine_clocks(clockset) + cond do dominates?(clock, big_clock) -> [{node, clock}] dominates?(big_clock, clock) -> clockset @@ -32,6 +34,7 @@ defmodule Phoenix.Tracker.Clock do """ @spec dominates?(context, context) :: boolean def dominates?(c1, c2) when map_size(c1) < map_size(c2), do: false + def dominates?(c1, c2) do Enum.reduce_while(c2, true, fn {replica, clock}, true -> if Map.get(c1, replica, 0) >= clock do @@ -47,6 +50,7 @@ defmodule Phoenix.Tracker.Clock do """ def dominates_or_equal?(c1, c2) when c1 == %{} and c2 == %{}, do: true def dominates_or_equal?(c1, _c2) when c1 == %{}, do: false + def dominates_or_equal?(c1, c2) do Enum.reduce_while(c1, true, fn {replica, clock}, true -> if clock >= Map.get(c2, replica, 0) do @@ -87,7 +91,7 @@ defmodule Phoenix.Tracker.Clock do if dominates?(clock, clock2) do {set, true} else - {[{node2, clock2}| set], insert || !dominates?(clock2, clock)} + {[{node2, clock2} | set], insert || !dominates?(clock2, clock)} end end) |> case do diff --git a/lib/phoenix/tracker/delta_generation.ex b/lib/phoenix/tracker/delta_generation.ex index d1161f0e..60e15296 100644 --- a/lib/phoenix/tracker/delta_generation.ex +++ b/lib/phoenix/tracker/delta_generation.ex @@ -8,28 +8,35 @@ defmodule Phoenix.Tracker.DeltaGeneration do Falls back to extracting entire crdt if unable to match delta. """ - @spec extract(State.t, [State.delta], State.name, State.context) :: State.delta | State.t + @spec extract(State.t(), [State.delta()], State.name(), State.context()) :: + State.delta() | State.t() def extract(%State{mode: :normal} = state, generations, remote_ref, remote_context) do case delta_fulfilling_clock(generations, remote_context) do {delta, index} -> - if index, do: Logger.debug "#{inspect state.replica}: sending delta generation #{index + 1}" + if index, + do: Logger.debug("#{inspect(state.replica)}: sending delta generation #{index + 1}") + State.extract(delta, remote_ref, remote_context) + nil -> - Logger.debug "#{inspect state.replica}: falling back to sending entire crdt" + Logger.debug("#{inspect(state.replica)}: falling back to sending entire crdt") State.extract(state, remote_ref, remote_context) end end - @spec push(State.t, [State.delta], State.delta, [pos_integer]) :: [State.delta] + @spec push(State.t(), [State.delta()], State.delta(), [pos_integer]) :: [State.delta()] def push(%State{mode: :normal} = parent, [] = _generations, %State{mode: :delta} = delta, opts) do parent.delta |> List.duplicate(Enum.count(opts)) |> do_push(delta, opts, {delta, []}) end + def push(%State{mode: :normal} = _parent, generations, %State{mode: :delta} = delta, opts) do do_push(generations, delta, opts, {delta, []}) end + defp do_push([], _delta, [], {_prev, acc}), do: Enum.reverse(acc) + defp do_push([gen | generations], delta, [gen_max | opts], {prev, acc}) do case State.merge_deltas(gen, delta) do {:ok, merged} -> @@ -47,7 +54,7 @@ defmodule Phoenix.Tracker.DeltaGeneration do @doc """ Prunes permanently downed replicates from the delta generation list """ - @spec remove_down_replicas([State.delta], Replica.replica_ref) :: [State.delta] + @spec remove_down_replicas([State.delta()], Replica.replica_ref()) :: [State.delta()] def remove_down_replicas(generations, replica_ref) do Enum.map(generations, fn %State{mode: :delta} = gen -> State.remove_down_replicas(gen, replica_ref) diff --git a/lib/phoenix/tracker/shard.ex b/lib/phoenix/tracker/shard.ex index e1d04a0e..5b688f06 100644 --- a/lib/phoenix/tracker/shard.ex +++ b/lib/phoenix/tracker/shard.ex @@ -4,36 +4,37 @@ defmodule Phoenix.Tracker.Shard do alias Phoenix.Tracker.{Clock, State, Replica, DeltaGeneration} require Logger - @type presence :: {key :: String.t, meta :: map()} - @type topic :: String.t + @type presence :: {key :: String.t(), meta :: map()} + @type topic :: String.t() - @callback init(Keyword.t) :: {:ok, pid} | {:error, reason :: term} - @callback handle_diff(%{topic => {joins :: [presence], leaves :: [presence]}}, state :: term) :: {:ok, state :: term} + @callback init(Keyword.t()) :: {:ok, pid} | {:error, reason :: term} + @callback handle_diff(%{topic => {joins :: [presence], leaves :: [presence]}}, state :: term) :: + {:ok, state :: term} @callback handle_info(message :: term, state :: term) :: {:noreply, state :: term} @optional_callbacks handle_info: 2 @type t :: %{ - shard_name: String.t(), - pubsub_server: atom(), - tracker: module, - tracker_state: any, - replica: Replica.t(), - report_events_to: any, - namespaced_topic: String.t(), - log_level: boolean | atom, - replicas: map, - pending_clockset: [], - presences: State.t(), - broadcast_period: integer, - max_silent_periods: integer(), - silent_periods: integer(), - down_period: integer, - permdown_period: integer, - clock_sample_periods: integer, - deltas: [State.delta()], - max_delta_sizes: integer, - current_sample_count: integer - } + shard_name: String.t(), + pubsub_server: atom(), + tracker: module, + tracker_state: any, + replica: Replica.t(), + report_events_to: any, + namespaced_topic: String.t(), + log_level: boolean | atom, + replicas: map, + pending_clockset: [], + presences: State.t(), + broadcast_period: integer, + max_silent_periods: integer(), + silent_periods: integer(), + down_period: integer, + permdown_period: integer, + clock_sample_periods: integer, + deltas: [State.delta()], + max_delta_sizes: integer, + current_sample_count: integer + } ## Used by Phoenix.Tracker for dispatching to appropriate shard @spec name_for_number(atom, non_neg_integer) :: atom @@ -58,12 +59,15 @@ defmodule Phoenix.Tracker.Shard do def untrack(server_pid, pid, topic, key) when is_pid(pid) do GenServer.call(server_pid, {:untrack, pid, topic, key}) end + def untrack(server_pid, pid) when is_pid(pid) do GenServer.call(server_pid, {:untrack, pid}) end - @spec update(pid, pid, topic, term, map() | (map() -> map())) :: {:ok, ref :: binary} | {:error, reason :: term} - def update(server_pid, pid, topic, key, meta) when is_pid(pid) and (is_map(meta) or is_function(meta)) do + @spec update(pid, pid, topic, term, map() | (map() -> map())) :: + {:ok, ref :: binary} | {:error, reason :: term} + def update(server_pid, pid, topic, key, meta) + when is_pid(pid) and (is_map(meta) or is_function(meta)) do GenServer.call(server_pid, {:update, pid, topic, key, meta}) end @@ -98,98 +102,110 @@ defmodule Phoenix.Tracker.Shard do tracker_name = Keyword.fetch!(pool_opts, :name) name = name_for_number(tracker_name, number) shard_opts = Keyword.put(pool_opts, :name, name) - GenServer.start_link(__MODULE__, + + GenServer.start_link( + __MODULE__, [tracker, tracker_opts, shard_opts], name: name) end def init([tracker, tracker_opts, shard_opts]) do Process.flag(:trap_exit, true) - shard_name = Keyword.fetch!(shard_opts, :name) - pubsub_server = Keyword.fetch!(shard_opts, :pubsub_server) - broadcast_period = shard_opts[:broadcast_period] || 1500 - max_silent_periods = shard_opts[:max_silent_periods] || 10 - down_period = shard_opts[:down_period] - || (broadcast_period * max_silent_periods * 2) - permdown_period = shard_opts[:permdown_period] || 1_200_000 + shard_name = Keyword.fetch!(shard_opts, :name) + pubsub_server = Keyword.fetch!(shard_opts, :pubsub_server) + broadcast_period = shard_opts[:broadcast_period] || 1500 + max_silent_periods = shard_opts[:max_silent_periods] || 10 + + down_period = + shard_opts[:down_period] || + broadcast_period * max_silent_periods * 2 + + permdown_period = shard_opts[:permdown_period] || 1_200_000 clock_sample_periods = shard_opts[:clock_sample_periods] || 2 - log_level = Keyword.get(shard_opts, :log_level, false) - max_delta_sizes = shard_opts[:max_delta_sizes] || [100, 1000, 10_000] + log_level = Keyword.get(shard_opts, :log_level, false) + max_delta_sizes = shard_opts[:max_delta_sizes] || [100, 1000, 10_000] with :ok <- validate_down_period(down_period, broadcast_period), :ok <- validate_permdown_period(permdown_period, down_period), {:ok, tracker_state} <- tracker.init(tracker_opts) do - - node_name = Phoenix.PubSub.node_name(pubsub_server) + node_name = Phoenix.PubSub.node_name(pubsub_server) namespaced_topic = namespaced_topic(shard_name) - replica = Replica.new(node_name) + replica = Replica.new(node_name) subscribe(pubsub_server, namespaced_topic) send_stuttered_heartbeat(self(), broadcast_period) - {:ok, %{shard_name: shard_name, - pubsub_server: pubsub_server, - tracker: tracker, - tracker_state: tracker_state, - replica: replica, - report_events_to: shard_opts[:report_events_to], - namespaced_topic: namespaced_topic, - log_level: log_level, - replicas: %{}, - pending_clockset: [], - presences: State.new(Replica.ref(replica), shard_name), - broadcast_period: broadcast_period, - max_silent_periods: max_silent_periods, - silent_periods: max_silent_periods, - down_period: down_period, - permdown_period: permdown_period, - clock_sample_periods: clock_sample_periods, - deltas: [], - max_delta_sizes: max_delta_sizes, - current_sample_count: clock_sample_periods}} + {:ok, + %{ + shard_name: shard_name, + pubsub_server: pubsub_server, + tracker: tracker, + tracker_state: tracker_state, + replica: replica, + report_events_to: shard_opts[:report_events_to], + namespaced_topic: namespaced_topic, + log_level: log_level, + replicas: %{}, + pending_clockset: [], + presences: State.new(Replica.ref(replica), shard_name), + broadcast_period: broadcast_period, + max_silent_periods: max_silent_periods, + silent_periods: max_silent_periods, + down_period: down_period, + permdown_period: permdown_period, + clock_sample_periods: clock_sample_periods, + deltas: [], + max_delta_sizes: max_delta_sizes, + current_sample_count: clock_sample_periods + }} end end - def validate_down_period(d_period, b_period) when d_period < (2 * b_period) do + def validate_down_period(d_period, b_period) when d_period < 2 * b_period do {:error, "down_period must be at least twice as large as the broadcast_period"} end + def validate_down_period(_d_period, _b_period), do: :ok def validate_permdown_period(p_period, d_period) when p_period <= d_period do {:error, "permdown_period must be at least larger than the down_period"} end - def validate_permdown_period(_p_period, _d_period), do: :ok + def validate_permdown_period(_p_period, _d_period), do: :ok defp send_stuttered_heartbeat(pid, interval) do Process.send_after(pid, :heartbeat, Enum.random(0..trunc(interval * 0.25))) end def handle_info(:heartbeat, state) do - {:noreply, state - |> broadcast_delta_heartbeat() - |> request_transfer_from_replicas_needing_synced() - |> detect_downs() - |> schedule_next_heartbeat()} + {:noreply, + state + |> broadcast_delta_heartbeat() + |> request_transfer_from_replicas_needing_synced() + |> detect_downs() + |> schedule_next_heartbeat()} end def handle_info({:pub, :heartbeat, {name, vsn}, :empty, clocks}, state) do - {:noreply, state - |> put_pending_clock(clocks) - |> handle_heartbeat({name, vsn})} + {:noreply, + state + |> put_pending_clock(clocks) + |> handle_heartbeat({name, vsn})} end + def handle_info({:pub, :heartbeat, {name, vsn}, delta, clocks}, state) do state = handle_heartbeat(state, {name, vsn}) {presences, joined, left} = State.merge(state.presences, delta) - {:noreply, state - |> report_diff(joined, left) - |> put_presences(presences) - |> put_pending_clock(clocks) - |> push_delta_generation(delta)} + {:noreply, + state + |> report_diff(joined, left) + |> put_presences(presences) + |> put_pending_clock(clocks) + |> push_delta_generation(delta)} end def handle_info({:pub, :transfer_req, ref, {name, _vsn}, {_, clocks}}, state) do - log state, fn -> "#{state.replica.name}: transfer_req from #{inspect name}" end + log(state, fn -> "#{state.replica.name}: transfer_req from #{inspect(name)}" end) delta = DeltaGeneration.extract(state.presences, state.deltas, name, clocks) msg = {:pub, :transfer_ack, ref, Replica.ref(state.replica), delta} direct_broadcast(state, name, msg) @@ -198,19 +214,20 @@ defmodule Phoenix.Tracker.Shard do end def handle_info({:pub, :transfer_ack, _ref, {name, _vsn}, remote_presences}, state) do - log(state, fn -> "#{state.replica.name}: transfer_ack from #{inspect name}" end) + log(state, fn -> "#{state.replica.name}: transfer_ack from #{inspect(name)}" end) {presences, joined, left} = State.merge(state.presences, remote_presences) - {:noreply, state - |> report_diff(joined, left) - |> push_delta_generation(remote_presences) - |> put_presences(presences)} + {:noreply, + state + |> report_diff(joined, left) + |> push_delta_generation(remote_presences) + |> put_presences(presences)} end def handle_info({:pub, :graceful_permdown, {_name, _vsn} = ref}, state) do case Replica.fetch_by_ref(state.replicas, ref) do {:ok, replica} -> {:noreply, state |> down(replica) |> permdown(replica)} - :error -> {:noreply, state} + :error -> {:noreply, state} end end @@ -228,7 +245,7 @@ defmodule Phoenix.Tracker.Shard do raise ArgumentError, """ expected #{state.tracker}.handle_info/2 to return {:noreply, state}, but got: - #{inspect other} + #{inspect(other)} """ end else @@ -245,6 +262,7 @@ defmodule Phoenix.Tracker.Shard do nil -> {state, ref} = put_presence(state, pid, topic, key, meta) {:reply, {:ok, ref}, state} + _ -> {:reply, {:error, {:already_tracked, pid, topic, key}}, state} end @@ -252,9 +270,11 @@ defmodule Phoenix.Tracker.Shard do def handle_call({:untrack, pid, topic, key}, _from, state) do new_state = drop_presence(state, pid, topic, key) + if State.get_by_pid(new_state.presences, pid) == [] do Process.unlink(pid) end + {:reply, :ok, new_state} end @@ -263,7 +283,8 @@ defmodule Phoenix.Tracker.Shard do {:reply, :ok, drop_presence(state, pid)} end - def handle_call({:update, pid, topic, key, meta_updater}, _from, state) when is_function(meta_updater) do + def handle_call({:update, pid, topic, key, meta_updater}, _from, state) + when is_function(meta_updater) do handle_update({pid, topic, key, meta_updater}, state) end @@ -318,6 +339,7 @@ defmodule Phoenix.Tracker.Shard do Process.link(pid) ref = random_ref() meta = Map.put(meta, :phx_ref, ref) + new_state = state |> report_diff_join(topic, key, meta, prev_meta) @@ -337,6 +359,7 @@ defmodule Phoenix.Tracker.Shard do state end end + defp drop_presence(state, pid) do leaves = State.get_by_pid(state.presences, pid) @@ -356,10 +379,14 @@ defmodule Phoenix.Tracker.Shard do {replicas, %Replica{vsn: ^vsn, status: :down}, %Replica{vsn: ^vsn, status: :up} = upped} -> up(%{state | replicas: replicas}, upped) - {replicas, %Replica{vsn: old, status: :up} = downed, %Replica{vsn: ^vsn, status: :up} = upped} when old != vsn -> + {replicas, %Replica{vsn: old, status: :up} = downed, + %Replica{vsn: ^vsn, status: :up} = upped} + when old != vsn -> %{state | replicas: replicas} |> down(downed) |> permdown(downed) |> up(upped) - {replicas, %Replica{vsn: old, status: :down} = downed, %Replica{vsn: ^vsn, status: :up} = upped} when old != vsn -> + {replicas, %Replica{vsn: old, status: :down} = downed, + %Replica{vsn: ^vsn, status: :up} = upped} + when old != vsn -> %{state | replicas: replicas} |> permdown(downed) |> up(upped) end end @@ -370,12 +397,13 @@ defmodule Phoenix.Tracker.Shard do %{state | pending_clockset: [], current_sample_count: state.clock_sample_periods} end + defp request_transfer_from_replicas_needing_synced(state) do %{state | current_sample_count: state.current_sample_count - 1} end defp request_transfer(state, {name, _vsn}) do - log state, fn -> "#{state.replica.name}: transfer_req from #{name}" end + log(state, fn -> "#{state.replica.name}: transfer_req from #{name}" end) ref = make_ref() msg = {:pub, :transfer_req, ref, Replica.ref(state.replica), clock(state)} direct_broadcast(state, name, msg) @@ -406,7 +434,7 @@ defmodule Phoenix.Tracker.Shard do defp clock(state), do: State.clocks(state.presences) - @spec clockset_to_sync(t) :: [State.name] + @spec clockset_to_sync(t) :: [State.name()] defp clockset_to_sync(state) do my_ref = Replica.ref(state.replica) @@ -422,7 +450,7 @@ defmodule Phoenix.Tracker.Shard do defp up(state, remote_replica) do report_event(state, {:replica_up, remote_replica.name}) - log state, fn -> "#{state.replica.name}: replica up from #{inspect remote_replica.name}" end + log(state, fn -> "#{state.replica.name}: replica up from #{inspect(remote_replica.name)}" end) {presences, joined, []} = State.replica_up(state.presences, Replica.ref(remote_replica)) state @@ -432,7 +460,11 @@ defmodule Phoenix.Tracker.Shard do defp down(state, remote_replica) do report_event(state, {:replica_down, remote_replica.name}) - log state, fn -> "#{state.replica.name}: replica down from #{inspect remote_replica.name}" end + + log(state, fn -> + "#{state.replica.name}: replica down from #{inspect(remote_replica.name)}" + end) + {presences, [], left} = State.replica_down(state.presences, Replica.ref(remote_replica)) state @@ -442,7 +474,7 @@ defmodule Phoenix.Tracker.Shard do defp permdown(state, %Replica{name: name} = remote_replica) do report_event(state, {:replica_permdown, name}) - log state, fn -> "#{state.replica.name}: permanent replica down detected #{name}" end + log(state, fn -> "#{state.replica.name}: permanent replica down detected #{name}" end) replica_ref = Replica.ref(remote_replica) presences = State.remove_down_replicas(state.presences, replica_ref) deltas = DeltaGeneration.remove_down_replicas(state.deltas, replica_ref) @@ -451,12 +483,14 @@ defmodule Phoenix.Tracker.Shard do {:ok, _replica} -> replicas = Map.delete(state.replicas, name) %{state | presences: presences, replicas: replicas, deltas: deltas} + _ -> %{state | presences: presences, deltas: deltas} end end defp report_event(%{report_events_to: nil}, _event), do: :ok + defp report_event(%{report_events_to: pid} = state, event) do send(pid, {event, state.replica.name}) end @@ -470,7 +504,12 @@ defmodule Phoenix.Tracker.Shard do end defp direct_broadcast(state, target_node, msg) do - Phoenix.PubSub.direct_broadcast!(target_node, state.pubsub_server, state.namespaced_topic, msg) + Phoenix.PubSub.direct_broadcast!( + target_node, + state.pubsub_server, + state.namespaced_topic, + msg + ) end defp broadcast_delta_heartbeat(%{presences: presences} = state) do @@ -479,30 +518,45 @@ defmodule Phoenix.Tracker.Shard do delta = presences.delta new_presences = presences |> State.reset_delta() |> State.compact() - broadcast_from(state, self(), {:pub, :heartbeat, Replica.ref(state.replica), delta, clock(state)}) + broadcast_from( + state, + self(), + {:pub, :heartbeat, Replica.ref(state.replica), delta, clock(state)} + ) + %{state | presences: new_presences, silent_periods: 0} |> push_delta_generation(delta) state.silent_periods >= state.max_silent_periods -> - broadcast_from(state, self(), {:pub, :heartbeat, Replica.ref(state.replica), :empty, clock(state)}) + broadcast_from( + state, + self(), + {:pub, :heartbeat, Replica.ref(state.replica), :empty, clock(state)} + ) + %{state | silent_periods: 0} - true -> update_in(state.silent_periods, &(&1 + 1)) + true -> + update_in(state.silent_periods, &(&1 + 1)) end end defp report_diff(state, [], []), do: state + defp report_diff(state, joined, left) do - join_diff = Enum.reduce(joined, %{}, fn {{topic, _pid, key}, meta, _}, acc -> - Map.update(acc, topic, {[{key, meta}], []}, fn {joins, leaves} -> - {[{key, meta} | joins], leaves} + join_diff = + Enum.reduce(joined, %{}, fn {{topic, _pid, key}, meta, _}, acc -> + Map.update(acc, topic, {[{key, meta}], []}, fn {joins, leaves} -> + {[{key, meta} | joins], leaves} + end) end) - end) - full_diff = Enum.reduce(left, join_diff, fn {{topic, _pid, key}, meta, _}, acc -> - Map.update(acc, topic, {[], [{key, meta}]}, fn {joins, leaves} -> - {joins, [{key, meta} | leaves]} + + full_diff = + Enum.reduce(left, join_diff, fn {{topic, _pid, key}, meta, _}, acc -> + Map.update(acc, topic, {[], [{key, meta}]}, fn {joins, leaves} -> + {joins, [{key, meta} | leaves]} + end) end) - end) full_diff |> state.tracker.handle_diff(state.tracker_state) @@ -514,6 +568,7 @@ defmodule Phoenix.Tracker.Shard do |> state.tracker.handle_diff(state.tracker_state) |> handle_tracker_result(state) end + defp report_diff_join(state, topic, key, meta, prev_meta) do %{topic => {[{key, meta}], [{key, prev_meta}]}} |> state.tracker.handle_diff(state.tracker_state) @@ -523,11 +578,12 @@ defmodule Phoenix.Tracker.Shard do defp handle_tracker_result({:ok, tracker_state}, state) do %{state | tracker_state: tracker_state} end + defp handle_tracker_result(other, state) do raise ArgumentError, """ expected #{state.tracker}.handle_diff/2 to return {:ok, state}, but got: - #{inspect other} + #{inspect(other)} """ end @@ -535,6 +591,7 @@ defmodule Phoenix.Tracker.Shard do case State.get_by_pid(state.presences, pid, topic, key) do nil -> {:reply, {:error, :nopresence}, state} + {{_topic, _pid, ^key}, prev_meta, {_replica, _}} -> {state, ref} = put_update(state, pid, topic, key, meta_updater.(prev_meta), prev_meta) {:reply, {:ok, ref}, state} @@ -544,6 +601,7 @@ defmodule Phoenix.Tracker.Shard do defp push_delta_generation(state, {%State{mode: :normal}, _}) do %{state | deltas: []} end + defp push_delta_generation(%{deltas: deltas} = state, %State{mode: :delta} = delta) do new_deltas = DeltaGeneration.push(state.presences, deltas, delta, state.max_delta_sizes) %{state | deltas: new_deltas} diff --git a/test/phoenix/distributed_pubsub_test.exs b/test/phoenix/distributed_pubsub_test.exs index 6fdbe19a..13ce1c16 100644 --- a/test/phoenix/distributed_pubsub_test.exs +++ b/test/phoenix/distributed_pubsub_test.exs @@ -57,7 +57,8 @@ defmodule Phoenix.PubSub.DistributedTest do assert_receive {@node3, :ping} end - test "broadcast is received by other node that was broadcast from node that has broadcast_pool_size < pool_size", config do + test "broadcast is received by other node that was broadcast from node that has broadcast_pool_size < pool_size", + config do # node4 has pool_size = 1, message is sent from node3 that has pool_size = 4, broadcast_pool_size = 1 spy_on_pubsub(@node1, config.pubsub, self(), config.topic) spy_on_pubsub(@node2, config.pubsub, self(), config.topic) diff --git a/test/phoenix/pubsub_test.exs b/test/phoenix/pubsub_test.exs index ff2e1323..a5baa172 100644 --- a/test/phoenix/pubsub_test.exs +++ b/test/phoenix/pubsub_test.exs @@ -15,7 +15,8 @@ defmodule Phoenix.PubSub.UnitTest do {:error, {{:shutdown, {:failed_to_start_child, Phoenix.PubSub.PG2, message}}, _}} = start_supervised({Phoenix.PubSub, opts}) - assert ^message = "the :pool_size option must be greater than or equal to the :broadcast_pool_size option" + assert ^message = + "the :pool_size option must be greater than or equal to the :broadcast_pool_size option" end defp name do diff --git a/test/phoenix/tracker/clock_test.exs b/test/phoenix/tracker/clock_test.exs index 2125352f..17c82619 100644 --- a/test/phoenix/tracker/clock_test.exs +++ b/test/phoenix/tracker/clock_test.exs @@ -16,20 +16,22 @@ defmodule Phoenix.TrackerClockTest do clock1 = {:a, %{a: 1, b: 2, c: 3}} clock2 = {:b, %{b: 2, c: 3, d: 1}} clock3 = {:c, %{a: 1, b: 2}} - assert [clock2, clock3] == Clock.append_clock([clock2, clock3], clock1) |> Enum.sort - assert [clock1, clock2] == Clock.append_clock([clock1, clock2], clock3) |> Enum.sort - assert [clock1, clock2] == Clock.append_clock([clock1, clock2], clock1) |> Enum.sort - assert [clock1, clock2] == Clock.append_clock([clock1, clock2], clock2) |> Enum.sort - assert [clock1, clock2, clock3] == Clock.append_clock([clock1, clock3], clock2) |> Enum.sort - assert [:b, :c] == [clock2, clock3] - |> Clock.append_clock(clock1) - |> Clock.clockset_replicas() - |> Enum.sort() + assert [clock2, clock3] == Clock.append_clock([clock2, clock3], clock1) |> Enum.sort() + assert [clock1, clock2] == Clock.append_clock([clock1, clock2], clock3) |> Enum.sort() + assert [clock1, clock2] == Clock.append_clock([clock1, clock2], clock1) |> Enum.sort() + assert [clock1, clock2] == Clock.append_clock([clock1, clock2], clock2) |> Enum.sort() + assert [clock1, clock2, clock3] == Clock.append_clock([clock1, clock3], clock2) |> Enum.sort() + + assert [:b, :c] == + [clock2, clock3] + |> Clock.append_clock(clock1) + |> Clock.clockset_replicas() + |> Enum.sort() end test "upperbound" do assert Clock.upperbound(%{a: 1, b: 2, c: 2}, %{a: 3, b: 1, d: 2}) == - %{a: 3, b: 2, c: 2, d: 2} + %{a: 3, b: 2, c: 2, d: 2} assert Clock.upperbound(%{}, %{a: 3, b: 1, d: 2}) == %{a: 3, b: 1, d: 2} assert Clock.upperbound(%{a: 3, b: 1, d: 2}, %{}) == %{a: 3, b: 1, d: 2} @@ -37,7 +39,7 @@ defmodule Phoenix.TrackerClockTest do test "lowerbound" do assert Clock.lowerbound(%{a: 1, b: 2, c: 2}, %{a: 3, b: 1, d: 2}) == - %{a: 1, b: 1, c: 2, d: 2} + %{a: 1, b: 1, c: 2, d: 2} assert Clock.lowerbound(%{}, %{a: 3, b: 1, d: 2}) == %{a: 3, b: 1, d: 2} assert Clock.lowerbound(%{a: 3, b: 1, d: 2}, %{}) == %{a: 3, b: 1, d: 2} diff --git a/test/phoenix/tracker/delta_generation_test.exs b/test/phoenix/tracker/delta_generation_test.exs index 0d11e943..6ae8bc18 100644 --- a/test/phoenix/tracker/delta_generation_test.exs +++ b/test/phoenix/tracker/delta_generation_test.exs @@ -105,7 +105,10 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do s3 = new(:s3, config) s2 = State.join(s2, new_pid(), "lobby", "user2", %{}) assert [gen1, gen1, gen1] = gens = push(s1, [], s2.delta, [5, 10, 15]) - assert [pruned_gen1, pruned_gen1, pruned_gen1] = DeltaGeneration.remove_down_replicas(gens, :s2) + + assert [pruned_gen1, pruned_gen1, pruned_gen1] = + DeltaGeneration.remove_down_replicas(gens, :s2) + assert {s3, [], []} = State.merge(s3, pruned_gen1) assert State.get_by_topic(s3, "lobby") == [] end @@ -119,7 +122,7 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do expected_values = %{{:r1, 1} => {pid, "lobby", "user1", %{}}} assert DeltaGeneration.extract(s2, [], :r2, %{r1: 1}) == - {expected_state, expected_values} + {expected_state, expected_values} end test "delta is extracted for the first delta with dominating clocks", config do @@ -141,15 +144,21 @@ defmodule Phoenix.Tracker.DeltaGenerationTest do d2 = State.reset_delta(s2).delta {d1_left, d1_right} = d1.range - d3 = %{d1 | - range: {Map.put(d1_left, :r2, 0), Map.put(d1_right, :r2, 1)}, - values: Map.put(d1.values, {:r2, 1}, {pid, "lobby", "user2", %{}})} + + d3 = %{ + d1 + | range: {Map.put(d1_left, :r2, 0), Map.put(d1_right, :r2, 1)}, + values: Map.put(d1.values, {:r2, 1}, {pid, "lobby", "user2", %{}}) + } + s3 = %{s2 | delta: d3} - expected_delta = %{d3 | - clouds: %{}, - range: {%{r2: 0}, %{r2: 1}}, - values: %{{:r2, 1} => {pid, "lobby", "user2", %{}}}} + expected_delta = %{ + d3 + | clouds: %{}, + range: {%{r2: 0}, %{r2: 1}}, + values: %{{:r2, 1} => {pid, "lobby", "user2", %{}}} + } assert DeltaGeneration.extract(s3, [d2, d3], :r3, %{r2: 0}) == expected_delta end diff --git a/test/phoenix/tracker/replica_test.exs b/test/phoenix/tracker/replica_test.exs index e6f5e199..2e2bdf9e 100644 --- a/test/phoenix/tracker/replica_test.exs +++ b/test/phoenix/tracker/replica_test.exs @@ -29,8 +29,9 @@ defmodule Phoenix.Tracker.ReplicaTest do test "put_heartbeat/2 with previously tracked name" do for status <- [:up, :down] do existing_node = Replica.new("existing") |> Map.put(:status, status) + assert {replicas, ^existing_node, updated_node} = - Replica.put_heartbeat(%{ "existing" => existing_node}, Replica.ref(existing_node)) + Replica.put_heartbeat(%{"existing" => existing_node}, Replica.ref(existing_node)) assert replicas["existing"] == updated_node assert updated_node.name == existing_node.name @@ -42,9 +43,17 @@ defmodule Phoenix.Tracker.ReplicaTest do end test "detect_down/4 with temporarily downed node" do - {replicas, nil, tempdown_node} = Replica.put_heartbeat(%{}, Replica.ref(Replica.new("tempdown"))) + {replicas, nil, tempdown_node} = + Replica.put_heartbeat(%{}, Replica.ref(Replica.new("tempdown"))) + assert {replicas, ^tempdown_node, updated_tempdown} = - Replica.detect_down(replicas, tempdown_node, 5, 10, tempdown_node.last_heartbeat_at + 6) + Replica.detect_down( + replicas, + tempdown_node, + 5, + 10, + tempdown_node.last_heartbeat_at + 6 + ) assert Map.fetch(replicas, "tempdown") == {:ok, updated_tempdown} assert updated_tempdown.name == "tempdown" @@ -53,9 +62,17 @@ defmodule Phoenix.Tracker.ReplicaTest do end test "detect_down/4 with permanently downed node removes from replicas map" do - {replicas, nil, tempdown_node} = Replica.put_heartbeat(%{}, Replica.ref(Replica.new("tempdown"))) + {replicas, nil, tempdown_node} = + Replica.put_heartbeat(%{}, Replica.ref(Replica.new("tempdown"))) + assert {replicas, ^tempdown_node, updated_tempdown} = - Replica.detect_down(replicas, tempdown_node, 5, 10, tempdown_node.last_heartbeat_at + 11) + Replica.detect_down( + replicas, + tempdown_node, + 5, + 10, + tempdown_node.last_heartbeat_at + 11 + ) assert Map.fetch(replicas, "tempdown") == :error assert updated_tempdown.name == "tempdown" @@ -65,8 +82,9 @@ defmodule Phoenix.Tracker.ReplicaTest do test "detect_down/4 with up node" do {replicas, nil, up_node} = Replica.put_heartbeat(%{}, Replica.ref(Replica.new("up"))) + assert {replicas, ^up_node, ^up_node} = - Replica.detect_down(replicas, up_node, 5, 10, up_node.last_heartbeat_at) + Replica.detect_down(replicas, up_node, 5, 10, up_node.last_heartbeat_at) assert Map.fetch(replicas, "up") == {:ok, up_node} assert up_node.status == :up diff --git a/test/phoenix/tracker/shard_replication_test.exs b/test/phoenix/tracker/shard_replication_test.exs index c0ec0780..4f0c817d 100644 --- a/test/phoenix/tracker/shard_replication_test.exs +++ b/test/phoenix/tracker/shard_replication_test.exs @@ -23,16 +23,15 @@ defmodule Phoenix.Tracker.ShardReplicationTest do test "heartbeats", %{shard: shard} do subscribe_to_server(shard) - assert_heartbeat from: @primary + assert_heartbeat(from: @primary) flush() - assert_heartbeat from: @primary + assert_heartbeat(from: @primary) flush() - assert_heartbeat from: @primary + assert_heartbeat(from: @primary) end test "gossip from unseen node triggers nodeup and transfer request", - %{shard: shard, topic: topic, tracker: tracker} do - + %{shard: shard, topic: topic, tracker: tracker} do assert list(shard, topic) == [] subscribe_to_server(shard) drop_gossips(shard) @@ -40,28 +39,29 @@ defmodule Phoenix.Tracker.ShardReplicationTest do start_shard(@node1, name: tracker) track_presence(@node1, shard, spawn_pid(), topic, "node1", %{}) flush() - assert_heartbeat from: @node1 + assert_heartbeat(from: @node1) resume_gossips(shard) # primary sends transfer_req to node1 after seeing behind - ref = assert_transfer_req to: @node1, from: @primary + ref = assert_transfer_req(to: @node1, from: @primary) # node1 fulfills transfer request and sends transfer_ack to primary - assert_transfer_ack ref, from: @node1 - assert_heartbeat to: @node1, from: @primary + assert_transfer_ack(ref, from: @node1) + assert_heartbeat(to: @node1, from: @primary) assert [{"node1", _}] = list(shard, topic) end test "requests for transfer collapses clocks", - %{shard: shard, topic: topic, tracker: tracker} do - + %{shard: shard, topic: topic, tracker: tracker} do subscribe_to_server(shard) subscribe(topic) + for node <- [@node1, @node2] do spy_on_server(node, self(), shard) start_shard(node, name: tracker) assert_receive {{:replica_up, ^node}, @primary}, @timeout assert_receive {{:replica_up, @primary}, ^node}, @timeout end + assert_receive {{:replica_up, @node2}, @node1}, @timeout assert_receive {{:replica_up, @node1}, @node2}, @timeout @@ -72,10 +72,12 @@ defmodule Phoenix.Tracker.ShardReplicationTest do track_presence(@node2, shard, spawn_pid(), topic, "node2", %{}) # node1 sends delta broadcast to node2 - assert_receive {@node1, {:pub, :heartbeat, {@node2, _vsn}, %State{mode: :delta}, _clocks}}, @timeout + assert_receive {@node1, {:pub, :heartbeat, {@node2, _vsn}, %State{mode: :delta}, _clocks}}, + @timeout # node2 sends delta broadcast to node1 - assert_receive {@node2, {:pub, :heartbeat, {@node1, _vsn}, %State{mode: :delta}, _clocks}}, @timeout + assert_receive {@node2, {:pub, :heartbeat, {@node1, _vsn}, %State{mode: :delta}, _clocks}}, + @timeout flush() resume_gossips(shard) @@ -87,23 +89,23 @@ defmodule Phoenix.Tracker.ShardReplicationTest do assert_receive {:pub, :transfer_ack, ^ref, {^node, _vsn}, _state}, @timeout # wait for local sync - assert_join ^topic, "node1", %{} - assert_join ^topic, "node1.2", %{} - assert_join ^topic, "node2", %{} - assert_heartbeat from: @node1 - assert_heartbeat from: @node2 + assert_join(^topic, "node1", %{}) + assert_join(^topic, "node1.2", %{}) + assert_join(^topic, "node2", %{}) + assert_heartbeat(from: @node1) + assert_heartbeat(from: @node2) assert [{"node1", _}, {"node1.2", _}, {"node2", _}] = list(shard, topic) end test "old pids from a node are permdowned when the node comes back up", - %{shard: shard, shard_pid: shard_pid, topic: topic, tracker: tracker} do + %{shard: shard, shard_pid: shard_pid, topic: topic, tracker: tracker} do track_presence(@primary, shard, self(), topic, @primary, %{}) {node1_node, {:ok, node1_shard}} = start_shard(@node1, name: tracker) track_presence(@node1, shard, node1_node, topic, @node1, %{}) spy_on_server(@node1, self(), shard) - assert_heartbeat to: @node1, from: @primary + assert_heartbeat(to: @node1, from: @primary) {node2_node, {:ok, node2_shard}} = start_shard(@node2, name: tracker) track_presence(@node2, shard, node2_node, topic, @node2, %{}) @@ -119,8 +121,8 @@ defmodule Phoenix.Tracker.ShardReplicationTest do flush() spy_on_server(@node1, self(), shard) - assert_heartbeat to: @node2, from: @primary - assert_heartbeat to: @node1, from: @node2 + assert_heartbeat(to: @node2, from: @primary) + assert_heartbeat(to: @node1, from: @node2) refute {@node1, node1_node} in get_values(@primary, shard_pid) refute {@node1, node1_node} in get_values(@node1, node1_shard) @@ -129,65 +131,90 @@ defmodule Phoenix.Tracker.ShardReplicationTest do # TODO split into multiple test cases test "tempdowns with nodeups of new vsn, and permdowns", - %{shard: shard, topic: topic, tracker: tracker} do - + %{shard: shard, topic: topic, tracker: tracker} do subscribe_to_server(shard) subscribe(topic) {node1_node, {:ok, node1_server}} = start_shard(@node1, name: tracker) {_node2_node, {:ok, _node2_server}} = start_shard(@node2, name: tracker) + for node <- [@node1, @node2] do track_presence(node, shard, spawn_pid(), topic, node, %{}) - assert_join ^topic, ^node, %{} + assert_join(^topic, ^node, %{}) end - assert_map %{@node1 => %Replica{status: :up, vsn: vsn_before}, - @node2 => %Replica{status: :up}}, replicas(shard), 2 + + assert_map( + %{@node1 => %Replica{status: :up, vsn: vsn_before}, @node2 => %Replica{status: :up}}, + replicas(shard), + 2 + ) # tempdown netsplit flush() :ok = :sys.suspend(node1_server) - assert_leave ^topic, @node1, %{} - assert_map %{@node1 => %Replica{status: :down, vsn: ^vsn_before}, - @node2 => %Replica{status: :up}}, replicas(shard), 2 + assert_leave(^topic, @node1, %{}) + + assert_map( + %{@node1 => %Replica{status: :down, vsn: ^vsn_before}, @node2 => %Replica{status: :up}}, + replicas(shard), + 2 + ) + flush() :ok = :sys.resume(node1_server) - assert_join ^topic, @node1, %{} - assert_heartbeat from: @node1 - assert_map %{@node1 => %Replica{status: :up, vsn: ^vsn_before}, - @node2 => %Replica{status: :up}}, replicas(shard), 2 + assert_join(^topic, @node1, %{}) + assert_heartbeat(from: @node1) + + assert_map( + %{@node1 => %Replica{status: :up, vsn: ^vsn_before}, @node2 => %Replica{status: :up}}, + replicas(shard), + 2 + ) # tempdown crash Process.unlink(node1_node) Process.exit(node1_server, :kill) - assert_leave ^topic, @node1, %{} - assert_map %{@node1 => %Replica{status: :down}, - @node2 => %Replica{status: :up}}, replicas(shard), 2 + assert_leave(^topic, @node1, %{}) + + assert_map( + %{@node1 => %Replica{status: :down}, @node2 => %Replica{status: :up}}, + replicas(shard), + 2 + ) # tempdown => nodeup with new vsn {node1_node, {:ok, node1_server}} = start_shard(@node1, name: tracker) track_presence(@node1, shard, spawn_pid(), topic, "node1-back", %{}) - assert_join ^topic, "node1-back", %{} + assert_join(^topic, "node1-back", %{}) assert [{@node2, _}, {"node1-back", _}] = list(shard, topic) - assert_map %{@node1 => %Replica{status: :up, vsn: new_vsn}, - @node2 => %Replica{status: :up}}, replicas(shard), 2 + + assert_map( + %{@node1 => %Replica{status: :up, vsn: new_vsn}, @node2 => %Replica{status: :up}}, + replicas(shard), + 2 + ) + assert vsn_before != new_vsn # tempdown again Process.unlink(node1_node) Process.exit(node1_server, :kill) - assert_leave ^topic, "node1-back", %{} - assert_map %{@node1 => %Replica{status: :down}, - @node2 => %Replica{status: :up}}, replicas(shard), 2 + assert_leave(^topic, "node1-back", %{}) + + assert_map( + %{@node1 => %Replica{status: :down}, @node2 => %Replica{status: :up}}, + replicas(shard), + 2 + ) # tempdown => permdown flush() for _ <- 0..trunc(@permdown / @heartbeat), do: assert_heartbeat(from: @primary) - assert_map %{@node2 => %Replica{status: :up}}, replicas(shard), 1 + assert_map(%{@node2 => %Replica{status: :up}}, replicas(shard), 1) end test "node detects and locally broadcasts presence_join/leave", - %{shard: shard, topic: topic, tracker: tracker} do - + %{shard: shard, topic: topic, tracker: tracker} do local_presence = spawn_pid() remote_pres = spawn_pid() @@ -195,52 +222,54 @@ defmodule Phoenix.Tracker.ShardReplicationTest do subscribe(topic) assert list(shard, topic) == [] {:ok, _ref} = Shard.track(shard, self(), topic, "me", %{name: "me"}) - assert_join ^topic, "me", %{name: "me"} + assert_join(^topic, "me", %{name: "me"}) assert [{"me", %{name: "me", phx_ref: _}}] = list(shard, topic) - {:ok, _ref} = Shard.track(shard, local_presence , topic, "me2", %{name: "me2"}) - assert_join ^topic, "me2", %{name: "me2"} - assert [{"me", %{name: "me", phx_ref: _}}, - {"me2",%{name: "me2", phx_ref: _}}] = - list(shard, topic) + {:ok, _ref} = Shard.track(shard, local_presence, topic, "me2", %{name: "me2"}) + assert_join(^topic, "me2", %{name: "me2"}) + + assert [{"me", %{name: "me", phx_ref: _}}, {"me2", %{name: "me2", phx_ref: _}}] = + list(shard, topic) # remote joins assert replicas(shard) == %{} start_shard(@node1, name: tracker) track_presence(@node1, shard, remote_pres, topic, "node1", %{name: "s1"}) - assert_join ^topic, "node1", %{name: "s1"} - assert_map %{@node1 => %Replica{status: :up}}, replicas(shard), 1 - assert [{"me", %{name: "me", phx_ref: _}}, - {"me2",%{name: "me2", phx_ref: _}}, - {"node1", %{name: "s1", phx_ref: _}}] = - list(shard, topic) + assert_join(^topic, "node1", %{name: "s1"}) + assert_map(%{@node1 => %Replica{status: :up}}, replicas(shard), 1) + + assert [ + {"me", %{name: "me", phx_ref: _}}, + {"me2", %{name: "me2", phx_ref: _}}, + {"node1", %{name: "s1", phx_ref: _}} + ] = + list(shard, topic) # local leaves Process.exit(local_presence, :kill) - assert_leave ^topic, "me2", %{name: "me2"} - assert [{"me", %{name: "me", phx_ref: _}}, - {"node1", %{name: "s1", phx_ref: _}}] = - list(shard, topic) + assert_leave(^topic, "me2", %{name: "me2"}) + + assert [{"me", %{name: "me", phx_ref: _}}, {"node1", %{name: "s1", phx_ref: _}}] = + list(shard, topic) # remote leaves Process.exit(remote_pres, :kill) - assert_leave ^topic, "node1", %{name: "s1"} + assert_leave(^topic, "node1", %{name: "s1"}) assert [{"me", %{name: "me", phx_ref: _}}] = list(shard, topic) end test "detects nodedown and locally broadcasts leaves", - %{shard: shard, topic: topic, tracker: tracker} do - + %{shard: shard, topic: topic, tracker: tracker} do local_presence = spawn_pid() subscribe(topic) {node_pid, {:ok, node1_server}} = start_shard(@node1, name: tracker) assert list(shard, topic) == [] - {:ok, _ref} = Shard.track(shard, local_presence , topic, "local1", %{name: "l1"}) - assert_join ^topic, "local1", %{} + {:ok, _ref} = Shard.track(shard, local_presence, topic, "local1", %{name: "l1"}) + assert_join(^topic, "local1", %{}) track_presence(@node1, shard, spawn_pid(), topic, "node1", %{name: "s1"}) - assert_join ^topic, "node1", %{name: "s1"} + assert_join(^topic, "node1", %{name: "s1"}) assert %{@node1 => %Replica{status: :up}} = replicas(shard) assert [{"local1", _}, {"node1", _}] = list(shard, topic) assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic) @@ -248,24 +277,22 @@ defmodule Phoenix.Tracker.ShardReplicationTest do # nodedown Process.unlink(node_pid) Process.exit(node1_server, :kill) - assert_leave ^topic, "node1", %{name: "s1"} + assert_leave(^topic, "node1", %{name: "s1"}) assert %{@node1 => %Replica{status: :down}} = replicas(shard) assert [{"local1", _}] = list(shard, topic) assert [{"local1", _}, {"node1", _}] = dirty_list(shard, topic) - :timer.sleep(@permdown + 2*@heartbeat) + :timer.sleep(@permdown + 2 * @heartbeat) assert [{"local1", _}] = dirty_list(shard, topic) end - test "untrack with no tracked topic is a noop", - %{shard: shard, topic: topic} do + %{shard: shard, topic: topic} do assert Shard.untrack(shard, self(), topic, "foo") == :ok end test "untrack with topic", - %{shard: shard, topic: topic} do - + %{shard: shard, topic: topic} do Shard.track(shard, self(), topic, "user1", %{name: "user1"}) Shard.track(shard, self(), "another:topic", "user2", %{name: "user2"}) assert [{"user1", %{name: "user1"}}] = list(shard, topic) @@ -280,8 +307,7 @@ defmodule Phoenix.Tracker.ShardReplicationTest do end test "untrack from all topics", - %{shard: shard, topic: topic} do - + %{shard: shard, topic: topic} do Shard.track(shard, self(), topic, "user1", %{name: "user1"}) Shard.track(shard, self(), "another:topic", "user2", %{name: "user2"}) assert [{"user1", %{name: "user1"}}] = list(shard, topic) @@ -294,25 +320,26 @@ defmodule Phoenix.Tracker.ShardReplicationTest do end test "updating presence sends join/leave and phx_ref_prev", - %{shard: shard, topic: topic} do - + %{shard: shard, topic: topic} do subscribe(topic) {:ok, _ref} = Shard.track(shard, self(), topic, "u1", %{name: "u1"}) assert [{"u1", %{name: "u1", phx_ref: ref}}] = list(shard, topic) {:ok, _ref} = Shard.update(shard, self(), topic, "u1", %{name: "u1-updated"}) - assert_leave ^topic, "u1", %{name: "u1", phx_ref: ^ref} - assert_join ^topic, "u1", %{name: "u1-updated", phx_ref_prev: ^ref} + assert_leave(^topic, "u1", %{name: "u1", phx_ref: ^ref}) + assert_join(^topic, "u1", %{name: "u1-updated", phx_ref_prev: ^ref}) end test "updating presence sends join/leave and phx_ref_prev with profer diffs if function for update used", - %{shard: shard, topic: topic} do - + %{shard: shard, topic: topic} do subscribe(topic) {:ok, _ref} = Shard.track(shard, self(), topic, "u1", %{browser: "Chrome", status: "online"}) assert [{"u1", %{browser: "Chrome", status: "online", phx_ref: ref}}] = list(shard, topic) - {:ok, _ref} = Shard.update(shard, self(), topic, "u1", fn meta -> Map.put(meta, :status, "away") end) - assert_leave ^topic, "u1", %{browser: "Chrome", status: "online", phx_ref: ^ref} - assert_join ^topic, "u1", %{browser: "Chrome", status: "away", phx_ref_prev: ^ref} + + {:ok, _ref} = + Shard.update(shard, self(), topic, "u1", fn meta -> Map.put(meta, :status, "away") end) + + assert_leave(^topic, "u1", %{browser: "Chrome", status: "online", phx_ref: ^ref}) + assert_join(^topic, "u1", %{browser: "Chrome", status: "away", phx_ref_prev: ^ref}) end test "updating with no prior presence", %{shard: shard, topic: topic} do @@ -322,8 +349,10 @@ defmodule Phoenix.Tracker.ShardReplicationTest do test "duplicate tracking", %{shard: shard, topic: topic} do pid = self() assert {:ok, _ref} = Shard.track(shard, pid, topic, "u1", %{}) + assert {:error, {:already_tracked, ^pid, ^topic, "u1"}} = - Shard.track(shard, pid, topic, "u1", %{}) + Shard.track(shard, pid, topic, "u1", %{}) + assert {:ok, _ref} = Shard.track(shard, pid, "another:topic", "u1", %{}) assert {:ok, _ref} = Shard.track(shard, pid, topic, "anotherkey", %{}) @@ -333,17 +362,17 @@ defmodule Phoenix.Tracker.ShardReplicationTest do end test "graceful exits with permdown", - %{shard: shard, topic: topic, tracker: tracker} do + %{shard: shard, topic: topic, tracker: tracker} do subscribe(topic) {_node_pid, {:ok, _node1_server}} = start_shard(@node1, name: tracker) track_presence(@node1, shard, spawn_pid(), topic, "node1", %{name: "s1"}) - assert_join ^topic, "node1", %{name: "s1"} + assert_join(^topic, "node1", %{name: "s1"}) assert %{@node1 => %Replica{status: :up}} = replicas(shard) assert [{"node1", _}] = list(shard, topic) # graceful permdown {_, :ok} = graceful_permdown(@node1, shard) - assert_leave ^topic, "node1", %{name: "s1"} + assert_leave(^topic, "node1", %{name: "s1"}) assert [] = list(shard, topic) assert replicas(shard) == %{} end @@ -425,6 +454,7 @@ defmodule Phoenix.Tracker.ShardReplicationTest do def assert_transfer_ack(ref, opts) do from = Keyword.fetch!(opts, :from) + if to = opts[:to] do assert_receive {^to, {:pub, :transfer_ack, ^ref, {^from, _vsn}, _state}}, @timeout else @@ -434,6 +464,7 @@ defmodule Phoenix.Tracker.ShardReplicationTest do def assert_heartbeat(opts) do from = Keyword.fetch!(opts, :from) + if to = opts[:to] do assert_receive {^to, {:pub, :heartbeat, {^from, _vsn}, _delta, _clocks}}, @timeout else diff --git a/test/phoenix/tracker/shard_test.exs b/test/phoenix/tracker/shard_test.exs index f6338af2..227b3256 100644 --- a/test/phoenix/tracker/shard_test.exs +++ b/test/phoenix/tracker/shard_test.exs @@ -3,14 +3,16 @@ defmodule Phoenix.Tracker.ShardTest do @opts [pubsub_server: nil, name: nil] test "validates down_period" do - opts = Keyword.merge(@opts, [down_period: 1]) + opts = Keyword.merge(@opts, down_period: 1) + assert Phoenix.Tracker.Shard.init([nil, nil, opts]) == - {:error, "down_period must be at least twice as large as the broadcast_period"} + {:error, "down_period must be at least twice as large as the broadcast_period"} end test "validates permdown_period" do - opts = Keyword.merge(@opts, [permdown_period: 1_200_00, down_period: 1_200_000]) + opts = Keyword.merge(@opts, permdown_period: 1_200_00, down_period: 1_200_000) + assert Phoenix.Tracker.Shard.init([nil, nil, opts]) == - {:error, "permdown_period must be at least larger than the down_period"} + {:error, "permdown_period must be at least larger than the down_period"} end end diff --git a/test/shared/pubsub_test.exs b/test/shared/pubsub_test.exs index 82bae280..f5cf1dec 100644 --- a/test/shared/pubsub_test.exs +++ b/test/shared/pubsub_test.exs @@ -41,9 +41,16 @@ defmodule Phoenix.PubSubTest do setup config do size = config[:pool_size] || 1 - registry_size = config[:registry_size] || config[:registry_pool_size] || config[:pool_size] || 1 + + registry_size = + config[:registry_size] || config[:registry_pool_size] || config[:pool_size] || 1 + {adapter, adapter_opts} = Application.get_env(:phoenix_pubsub, :test_adapter) - adapter_opts = [adapter: adapter, name: config.test, pool_size: size, registry_size: registry_size] ++ adapter_opts + + adapter_opts = + [adapter: adapter, name: config.test, pool_size: size, registry_size: registry_size] ++ + adapter_opts + start_supervised!({Phoenix.PubSub, adapter_opts}) opts = %{ @@ -184,7 +191,8 @@ defmodule Phoenix.PubSubTest do assert_ets_duplicate_count(config.pubsub, 2) assert :persistent_term.get(config.adapter_name) == - {config.adapter_name, :"#{config.adapter_name}_2", :"#{config.adapter_name}_3", :"#{config.adapter_name}_4"} + {config.adapter_name, :"#{config.adapter_name}_2", :"#{config.adapter_name}_3", + :"#{config.adapter_name}_4"} end @tag pool_size: 3 @@ -193,7 +201,7 @@ defmodule Phoenix.PubSubTest do assert_ets_duplicate_count(config.pubsub, 3) assert :persistent_term.get(config.adapter_name) == - {config.adapter_name, :"#{config.adapter_name}_2", :"#{config.adapter_name}_3"} + {config.adapter_name, :"#{config.adapter_name}_2", :"#{config.adapter_name}_3"} end defp assert_ets_duplicate_count(pubsub, count) do diff --git a/test/support/cluster.ex b/test/support/cluster.ex index 85892854..f18d0306 100644 --- a/test/support/cluster.ex +++ b/test/support/cluster.ex @@ -11,7 +11,7 @@ defmodule Phoenix.PubSub.Cluster do # Allow spawned nodes to fetch all code from this node :erl_boot_server.start([]) - allow_boot to_charlist("127.0.0.1") + allow_boot(to_charlist("127.0.0.1")) nodes |> Enum.map(&Task.async(fn -> spawn_node(&1) end)) @@ -26,6 +26,7 @@ defmodule Phoenix.PubSub.Cluster do start_pubsub(node, opts) {:ok, node} end + defp spawn_node(node_host) do spawn_node({node_host, []}) end @@ -58,6 +59,7 @@ defmodule Phoenix.PubSub.Cluster do defp ensure_applications_started(node) do rpc(node, Application, :ensure_all_started, [:mix]) rpc(node, Mix, :env, [Mix.env()]) + for {app_name, _, _} <- Application.loaded_applications() do rpc(node, Application, :ensure_all_started, [app_name]) end @@ -65,6 +67,7 @@ defmodule Phoenix.PubSub.Cluster do defp start_pubsub(node, opts) do opts = [name: Phoenix.PubSubTest, pool_size: 4] |> Keyword.merge(opts) + args = [ [{Phoenix.PubSub, opts}], [strategy: :one_for_one] @@ -78,6 +81,6 @@ defmodule Phoenix.PubSub.Cluster do |> to_string |> String.split("@") |> Enum.at(0) - |> String.to_atom + |> String.to_atom() end end diff --git a/test/support/node_case.ex b/test/support/node_case.ex index b8d96cbd..9b12963d 100644 --- a/test/support/node_case.ex +++ b/test/support/node_case.ex @@ -28,16 +28,18 @@ defmodule Phoenix.PubSub.NodeCase do end def handle_diff(diff, state) do - for {topic, {joins, leaves}} <- diff do + for {topic, {joins, leaves}} <- diff do for {key, meta} <- joins do msg = %{topic: topic, event: "presence_join", payload: {key, meta}} Phoenix.PubSub.direct_broadcast!(state.node_name, state.pubsub_server, topic, msg) end + for {key, meta} <- leaves do msg = %{topic: topic, event: "presence_leave", payload: {key, meta}} Phoenix.PubSub.direct_broadcast!(state.node_name, state.pubsub_server, topic, msg) end end + {:ok, state} end @@ -95,7 +97,7 @@ defmodule Phoenix.PubSub.NodeCase do broadcast_period: @heartbeat, max_silent_periods: 2, permdown_period: @permdown, - shard_number: 0, + shard_number: 0 ] end @@ -105,21 +107,22 @@ defmodule Phoenix.PubSub.NodeCase do end) end - def spy_on_server(node_name, pubsub_server \\ @pubsub, - target_pid, tracker_server) do - spy_on_pubsub(node_name, pubsub_server, target_pid, - "phx_presence:#{tracker_server}") + def spy_on_server(node_name, pubsub_server \\ @pubsub, target_pid, tracker_server) do + spy_on_pubsub(node_name, pubsub_server, target_pid, "phx_presence:#{tracker_server}") end def spy_on_pubsub(node_name, server \\ @pubsub, target_pid, topic) do call_node(node_name, fn -> Phoenix.PubSub.subscribe(server, topic) + loop = fn next -> receive do - msg -> send target_pid, {node_name, msg} + msg -> send(target_pid, {node_name, msg}) end + next.(next) end + loop.(loop) end) end @@ -127,20 +130,22 @@ defmodule Phoenix.PubSub.NodeCase do defmacro assert_join(topic, key, meta, timeout \\ @timeout) do quote do assert_receive %{ - event: "presence_join", - topic: unquote(topic), - payload: {unquote(key), unquote(meta)} - }, unquote(timeout) + event: "presence_join", + topic: unquote(topic), + payload: {unquote(key), unquote(meta)} + }, + unquote(timeout) end end defmacro assert_leave(topic, key, meta, timeout \\ @timeout) do quote do assert_receive %{ - event: "presence_leave", - topic: unquote(topic), - payload: {unquote(key), unquote(meta)} - }, unquote(timeout) + event: "presence_leave", + topic: unquote(topic), + payload: {unquote(key), unquote(meta)} + }, + unquote(timeout) end end @@ -172,14 +177,16 @@ defmodule Phoenix.PubSub.NodeCase do parent = self() ref = make_ref() - pid = Node.spawn_link(node, fn -> - result = func.() - send parent, {ref, result} - ref = Process.monitor(parent) - receive do - {:DOWN, ^ref, :process, _, _} -> :ok - end - end) + pid = + Node.spawn_link(node, fn -> + result = func.() + send(parent, {ref, result}) + ref = Process.monitor(parent) + + receive do + {:DOWN, ^ref, :process, _, _} -> :ok + end + end) receive do {^ref, result} -> {pid, result}