-
Notifications
You must be signed in to change notification settings - Fork 0
Add fallback to previous version feature for s3 #19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
a852e2a
7a4bd97
a73a555
de57970
55bfd44
412070c
0977c2b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,9 +14,11 @@ defmodule RemotePersistentTerm.Fetcher.S3 do | |
| bucket: bucket, | ||
| key: String.t(), | ||
| region: region, | ||
| failover_buckets: [failover_bucket] | nil | ||
| failover_buckets: [failover_bucket] | nil, | ||
| version_id: String.t() | nil, | ||
| versions: [map()] | nil | ||
| } | ||
| defstruct [:bucket, :key, :region, :failover_buckets] | ||
| defstruct [:bucket, :key, :region, :failover_buckets, :version_id, :versions] | ||
|
|
||
| @failover_bucket_schema [ | ||
| bucket: [ | ||
|
|
@@ -91,7 +93,7 @@ defmodule RemotePersistentTerm.Fetcher.S3 do | |
| message: "Found latest version of object" | ||
| ) | ||
|
|
||
| {:ok, etag} | ||
| {:ok, etag, %{state | versions: versions}} | ||
| else | ||
| {:error, {:unexpected_response, %{body: reason}}} -> | ||
| {:error, reason} | ||
|
|
@@ -136,9 +138,9 @@ defmodule RemotePersistentTerm.Fetcher.S3 do | |
| defp list_object_versions(state) do | ||
| res = | ||
| aws_client_request( | ||
| &ExAws.S3.get_bucket_object_versions/2, | ||
| :get_bucket_object_versions, | ||
| state, | ||
| prefix: state.key | ||
| [[prefix: state.key]] | ||
louiscb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ) | ||
|
|
||
| with {:ok, %{body: %{versions: versions}}} <- res do | ||
|
|
@@ -147,7 +149,7 @@ defmodule RemotePersistentTerm.Fetcher.S3 do | |
| end | ||
|
|
||
| defp get_object(state) do | ||
| aws_client_request(&ExAws.S3.get_object/2, state, state.key) | ||
| aws_client_request(:get_object, state, [state.key, [version_id: state.version_id]]) | ||
| end | ||
|
|
||
| defp find_latest([_ | _] = contents) do | ||
|
|
@@ -166,14 +168,53 @@ defmodule RemotePersistentTerm.Fetcher.S3 do | |
|
|
||
| defp find_latest(_), do: {:error, :not_found} | ||
|
|
||
| @impl true | ||
| def previous_version(state) do | ||
| Logger.info( | ||
| bucket: state.bucket, | ||
| key: state.key, | ||
| message: "About to fetch previous version of object", | ||
| version_id: state.version_id | ||
| ) | ||
|
|
||
| versions = if state.versions, do: {:ok, state.versions}, else: list_object_versions(state) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not 100% sure how s3 replication works, are the versions guaranteed to be the same across replicated buckets? If not, then it is possible we have stored the versions of one region, and now are trying a fallback region with the original versions
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm but if the bucket fails and it chooses the failover bucket then the but yea I see what you're saying, in the edge case that the version fails to deserialise and then the bucket fails midway ? Seems quite unlikely but I guess could happen. |
||
|
|
||
| with {:ok, versions} <- versions, | ||
| {:ok, previous_version} <- find_previous_version(versions, state.version_id) do | ||
| {:ok, %{state | version_id: previous_version.version_id, versions: versions}} | ||
| else | ||
| {:error, reason} -> | ||
| Logger.error(%{ | ||
| bucket: state.bucket, | ||
| key: state.key, | ||
| reason: inspect(reason), | ||
| message: "Failed to get previous version of object" | ||
| }) | ||
|
|
||
| {:error, reason} | ||
| end | ||
| end | ||
|
|
||
| defp find_previous_version(versions, current_version_id) do | ||
| versions | ||
| |> Enum.sort_by(& &1.last_modified, :desc) | ||
| |> Enum.find(fn version -> | ||
| version.version_id != current_version_id | ||
| end) | ||
| |> case do | ||
| nil -> {:error, :no_previous_version} | ||
| version -> {:ok, version} | ||
| end | ||
| end | ||
|
|
||
| defp aws_client_request(op, %{failover_buckets: nil} = state, opts) do | ||
| perform_request(op, state.bucket, state.region, opts) | ||
| end | ||
|
|
||
| defp aws_client_request( | ||
| op, | ||
| %{ | ||
| failover_buckets: [_|_] = failover_buckets | ||
| failover_buckets: [_ | _] = failover_buckets | ||
| } = state, | ||
| opts | ||
| ) do | ||
|
|
@@ -222,8 +263,8 @@ defmodule RemotePersistentTerm.Fetcher.S3 do | |
| end | ||
| end | ||
|
|
||
| defp perform_request(op, bucket, region, opts) do | ||
| op.(bucket, opts) | ||
| defp perform_request(func, bucket, region, opts) do | ||
| apply(ExAws.S3, func, [bucket | opts]) | ||
| |> client().request(region: region) | ||
| end | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.