Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 57 additions & 8 deletions lib/remote_persistent_term.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ defmodule RemotePersistentTerm do

Currently only supports gzip (0x1F, 0x8B).
"""
],
version_fallback?: [
type: :boolean,
required: false,
default: false,
doc: """
If true, when deserialization fails, the system will attempt to use previous versions \
of the term until a valid version is found or all versions are exhausted. \
Only currently supported by the S3 fetcher.
"""
]
]

Expand All @@ -77,15 +87,17 @@ defmodule RemotePersistentTerm do
fetcher_state: term(),
refresh_interval: pos_integer(),
current_version: String.t(),
auto_decompress?: boolean()
auto_decompress?: boolean(),
version_fallback?: boolean()
}
defstruct [
:fetcher_mod,
:fetcher_state,
:refresh_interval,
:current_version,
:name,
:auto_decompress?
:auto_decompress?,
:version_fallback?
]

@doc """
Expand Down Expand Up @@ -138,7 +150,8 @@ defmodule RemotePersistentTerm do
fetcher_state: fetcher_state,
refresh_interval: opts[:refresh_interval],
name: name(opts),
auto_decompress?: opts[:auto_decompress?]
auto_decompress?: opts[:auto_decompress?],
version_fallback?: opts[:version_fallback?]
}

if opts[:lazy_init?] do
Expand Down Expand Up @@ -268,9 +281,15 @@ defmodule RemotePersistentTerm do
start_meta,
fn ->
{status, version} =
with {:ok, current_version} <- state.fetcher_mod.current_version(state.fetcher_state),
with {:ok, current_version, updated_fetcher_state} <-
state.fetcher_mod.current_version(state.fetcher_state),
true <- state.current_version != current_version,
:ok <- download_and_store_term(state, deserialize_fun, put_fun) do
:ok <-
download_and_store_term(
%{state | fetcher_state: updated_fetcher_state},
deserialize_fun,
put_fun
) do
{:updated, current_version}
else
false ->
Expand Down Expand Up @@ -307,9 +326,39 @@ defmodule RemotePersistentTerm do

defp download_and_store_term(state, deserialize_fun, put_fun) do
with {:ok, term} <- state.fetcher_mod.download(state.fetcher_state),
{:ok, decompressed} <- maybe_decompress(state, term),
{:ok, deserialized} <- deserialize_fun.(decompressed) do
put_fun.(deserialized)
{:ok, decompressed} <- maybe_decompress(state, term) do
try_deserialize_and_store(state, decompressed, deserialize_fun, put_fun)
end
end

defp try_deserialize_and_store(state, term, deserialize_fun, put_fun) do
case deserialize_fun.(term) do
{:ok, deserialized} ->
put_fun.(deserialized)

{:error, _reason} when state.version_fallback? ->
Logger.error(
"#{state.name} - failed to deserialize remote term, falling back to previous version"
)

try_previous_version(state, deserialize_fun, put_fun)

error ->
error
end
end

defp try_previous_version(state, deserialize_fun, put_fun) do
case state.fetcher_mod.previous_version(state.fetcher_state) do
{:ok, previous_state} ->
download_and_store_term(
%{state | fetcher_state: previous_state},
deserialize_fun,
put_fun
)

{:error, _} = error ->
error
end
end

Expand Down
8 changes: 7 additions & 1 deletion lib/remote_persistent_term/fetcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ defmodule RemotePersistentTerm.Fetcher do
Check the current version of the remote term. Used to avoid downloading the
same term multiple times.
"""
@callback current_version(state()) :: {:ok, version()} | {:error, term()}
@callback current_version(state()) :: {:ok, version(), state()} | {:error, term()}

@doc """
Download the term from the remote source.
"""
@callback download(state()) :: {:ok, term()} | {:error, term()}

@doc """
Get the previous version of the remote term.
Returns a new state that can be used to fetch the previous version.
"""
@callback previous_version(state()) :: {:ok, state()} | {:error, term()}
end
7 changes: 5 additions & 2 deletions lib/remote_persistent_term/fetcher/http.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ defmodule RemotePersistentTerm.Fetcher.Http do
end

@impl true
def current_version(_state) do
{:ok, DateTime.utc_now() |> DateTime.to_string()}
def current_version(state) do
{:ok, DateTime.utc_now() |> DateTime.to_string(), state}
end

@impl true
Expand All @@ -81,6 +81,9 @@ defmodule RemotePersistentTerm.Fetcher.Http do
end
end

@impl true
def previous_version(_state), do: {:error, :not_supported}

defp response_status(url, status) do
if status < 300 do
Logger.info("successfully downloaded remote term from #{url} with status #{status}")
Expand Down
59 changes: 50 additions & 9 deletions lib/remote_persistent_term/fetcher/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
bucket: bucket,
key: String.t(),
region: region,
failover_buckets: [failover_bucket] | nil
failover_buckets: [failover_bucket] | nil,
version_id: String.t() | nil,
versions: [map()] | nil
}
defstruct [:bucket, :key, :region, :failover_buckets]
defstruct [:bucket, :key, :region, :failover_buckets, :version_id, :versions]

@failover_bucket_schema [
bucket: [
Expand Down Expand Up @@ -91,7 +93,7 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
message: "Found latest version of object"
)

{:ok, etag}
{:ok, etag, %{state | versions: versions}}
else
{:error, {:unexpected_response, %{body: reason}}} ->
{:error, reason}
Expand Down Expand Up @@ -136,9 +138,9 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
defp list_object_versions(state) do
res =
aws_client_request(
&ExAws.S3.get_bucket_object_versions/2,
:get_bucket_object_versions,
state,
prefix: state.key
[[prefix: state.key]]
)

with {:ok, %{body: %{versions: versions}}} <- res do
Expand All @@ -147,7 +149,7 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
end

defp get_object(state) do
aws_client_request(&ExAws.S3.get_object/2, state, state.key)
aws_client_request(:get_object, state, [state.key, [version_id: state.version_id]])
end

defp find_latest([_ | _] = contents) do
Expand All @@ -166,14 +168,53 @@ defmodule RemotePersistentTerm.Fetcher.S3 do

defp find_latest(_), do: {:error, :not_found}

@impl true
def previous_version(state) do
Logger.info(
bucket: state.bucket,
key: state.key,
message: "About to fetch previous version of object",
version_id: state.version_id
)

versions = if state.versions, do: {:ok, state.versions}, else: list_object_versions(state)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure how s3 replication works, are the versions guaranteed to be the same across replicated buckets? If not, then it is possible we have stored the versions of one region, and now are trying a fallback region with the original versions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm but if the bucket fails and it chooses the failover bucket then the list_object_versions will be called to the failover bucket, so the versions it gets will be from the new bucket.

but yea I see what you're saying, in the edge case that the version fails to deserialise and then the bucket fails midway ? Seems quite unlikely but I guess could happen.


with {:ok, versions} <- versions,
{:ok, previous_version} <- find_previous_version(versions, state.version_id) do
{:ok, %{state | version_id: previous_version.version_id, versions: versions}}
else
{:error, reason} ->
Logger.error(%{
bucket: state.bucket,
key: state.key,
reason: inspect(reason),
message: "Failed to get previous version of object"
})

{:error, reason}
end
end

defp find_previous_version(versions, current_version_id) do
versions
|> Enum.sort_by(& &1.last_modified, :desc)
|> Enum.find(fn version ->
version.version_id != current_version_id
end)
|> case do
nil -> {:error, :no_previous_version}
version -> {:ok, version}
end
end

defp aws_client_request(op, %{failover_buckets: nil} = state, opts) do
perform_request(op, state.bucket, state.region, opts)
end

defp aws_client_request(
op,
%{
failover_buckets: [_|_] = failover_buckets
failover_buckets: [_ | _] = failover_buckets
} = state,
opts
) do
Expand Down Expand Up @@ -222,8 +263,8 @@ defmodule RemotePersistentTerm.Fetcher.S3 do
end
end

defp perform_request(op, bucket, region, opts) do
op.(bucket, opts)
defp perform_request(func, bucket, region, opts) do
apply(ExAws.S3, func, [bucket | opts])
|> client().request(region: region)
end

Expand Down
9 changes: 6 additions & 3 deletions lib/remote_persistent_term/fetcher/static.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule RemotePersistentTerm.Fetcher.Static do
@moduledoc """
A macro to help define a valid `RemotePersistentTerm.Fetcher` which
A macro to help define a valid `RemotePersistentTerm.Fetcher` which
always returns some hardcoded static data.

Mostly intended for testing purposes.
Expand All @@ -22,10 +22,13 @@ defmodule RemotePersistentTerm.Fetcher.Static do
def init(_), do: {:ok, []}

@impl true
def current_version(_), do: {:ok, unquote(Keyword.get(opts, :version, "1"))}
def current_version(state), do: {:ok, unquote(Keyword.get(opts, :version, "1")), state}

@impl true
def download(_), do: {:ok, unquote(Macro.escape(Keyword.fetch!(opts, :data)))}
def download(state), do: {:ok, unquote(Macro.escape(Keyword.fetch!(opts, :data)))}

@impl true
def previous_version(_), do: {:error, :not_supported}
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule RemotePersistentTerm.MixProject do
use Mix.Project

@name "RemotePersistentTerm"
@version "0.12.0"
@version "0.13.0"
@repo_url "https://github.com/AppMonet/remote_persistent_term"

def project do
Expand Down
56 changes: 55 additions & 1 deletion test/remote_persistent_term/fetcher/s3_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ defmodule RemotePersistentTerm.Fetcher.S3Test do
log =
capture_log(fn ->
result = S3.current_version(state)
assert {:ok, "current-etag"} = result
assert {:ok, "current-etag", _updated_state} = result
end)

assert log =~ "bucket: \"#{@bucket}\""
Expand Down Expand Up @@ -253,4 +253,58 @@ defmodule RemotePersistentTerm.Fetcher.S3Test do
assert log =~ "Downloaded object from S3"
end
end

describe "previous_version/1" do
test "finds the correct previous version when given a current version ID" do
versions = [
%{
version_id: "v3",
last_modified: "2025-05-08T09:58:38.000Z",
is_latest: "true"
},
%{
version_id: "v2",
last_modified: "2025-04-02T10:21:18.000Z",
is_latest: "false"
},
%{
version_id: "v1",
last_modified: "2025-04-02T09:10:37.000Z",
is_latest: "false"
}
]

expect(AwsClientMock, :request, fn operation, opts ->
assert operation.bucket == @bucket
assert operation.resource == "versions"
assert operation.params == [prefix: @key]
assert opts == [region: @region]
{:ok, %{body: %{versions: versions}}}
end)

state = %S3{bucket: @bucket, key: @key, region: @region, version_id: "v3"}
assert {:ok, %{version_id: "v2"}} = S3.previous_version(state)
end

test "returns error when there are no previous versions" do
versions = [
%{
version_id: "v1",
last_modified: "2025-04-02T09:10:37.000Z",
is_latest: "true"
}
]

expect(AwsClientMock, :request, fn operation, opts ->
assert operation.bucket == @bucket
assert operation.resource == "versions"
assert operation.params == [prefix: @key]
assert opts == [region: @region]
{:ok, %{body: %{versions: versions}}}
end)

state = %S3{bucket: @bucket, key: @key, region: @region, version_id: "v1"}
assert {:error, :no_previous_version} = S3.previous_version(state)
end
end
end