-
Notifications
You must be signed in to change notification settings - Fork 16
feat(audience): HttpTransport — background POST with gzip + retry (SDK-141) #691
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
eefa672
feat(audience): add Gzip utility and HttpTransport with retry
ImmutableJeffrey 0e6c121
fix(audience): align HttpTransport backoff to plan schedule
ImmutableJeffrey 52381a4
style(audience): use digit separators in backoff literals
ImmutableJeffrey c9d301c
refactor(audience): time-aware backoff in HttpTransport
ImmutableJeffrey 9985194
fix(audience): narrow BuildPayload catch to IOException
ImmutableJeffrey 995886d
docs(audience): document BuildPayload null return contract
ImmutableJeffrey d68e4e7
refactor(audience): address HttpTransport review feedback (SDK-141)
ImmutableJeffrey File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
230 changes: 230 additions & 0 deletions
230
src/Packages/Audience/Runtime/Transport/HttpTransport.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,230 @@ | ||
| #nullable enable | ||
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.IO; | ||
| using System.Net; | ||
| using System.Net.Http; | ||
| using System.Net.Http.Headers; | ||
| using System.Text; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
|
|
||
| namespace Immutable.Audience | ||
| { | ||
| /// <summary> | ||
| /// Sends queued events from <see cref="DiskStore"/> to the Audience backend. | ||
| /// </summary> | ||
| internal sealed class HttpTransport : IDisposable | ||
| { | ||
| private readonly DiskStore _store; | ||
| private readonly string _url; | ||
| private readonly string _publishableKey; | ||
| private readonly HttpClient _client; | ||
| private readonly Action<AudienceError>? _onError; | ||
| private readonly Func<DateTime> _getUtcNow; | ||
|
|
||
| private int _consecutiveFailures; | ||
| private DateTime? _nextAttemptAt; | ||
|
|
||
| /// <param name="store">Source of event batches.</param> | ||
| /// <param name="publishableKey">Studio API key. Sent as <c>x-immutable-publishable-key</c> on every request.</param> | ||
| /// <param name="onError">Optional failure callback. Exceptions thrown inside it are caught and ignored.</param> | ||
| /// <param name="handler">Optional <see cref="HttpMessageHandler"/>. Callers can supply a custom pipeline (e.g. specific for test purposes). Defaults to the standard handler when null.</param> | ||
| /// <param name="getUtcNow">Optional UTC clock source used for backoff timing (e.g. swappable for deterministic time). Defaults to <c>DateTime.UtcNow</c> when null.</param> | ||
| internal HttpTransport( | ||
| DiskStore store, | ||
| string publishableKey, | ||
| Action<AudienceError>? onError = null, | ||
| HttpMessageHandler? handler = null, | ||
| Func<DateTime>? getUtcNow = null) | ||
| { | ||
| _store = store ?? throw new ArgumentNullException(nameof(store)); | ||
| _publishableKey = publishableKey ?? throw new ArgumentNullException(nameof(publishableKey)); | ||
| _url = Constants.BaseUrl(publishableKey) + Constants.MessagesPath; | ||
| _onError = onError; | ||
| _client = handler != null ? new HttpClient(handler) : new HttpClient(); | ||
| _client.Timeout = TimeSpan.FromSeconds(30); | ||
| _getUtcNow = getUtcNow ?? (() => DateTime.UtcNow); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Attempts to process one batch: reads it from disk, gzips it, and POSTs it. | ||
| /// Returns true if a batch was consumed (outcome irrelevant), false if the queue was empty. | ||
| /// </summary> | ||
| internal async Task<bool> SendBatchAsync(CancellationToken ct = default) | ||
| { | ||
| var batch = _store.ReadBatch(Constants.DefaultFlushSize); | ||
| if (batch.Count == 0) | ||
| return false; | ||
|
|
||
| string? payload; | ||
| try | ||
| { | ||
| payload = BuildPayload(batch); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| // Non-IOException = unrecoverable storage failure (e.g. permissions); | ||
| // retry won't help. Drop the batch, report via onError. | ||
| _store.Delete(batch); | ||
| NotifyError(AudienceErrorCode.FlushFailed, $"Local storage read failed: {ex.Message}"); | ||
| return true; | ||
| } | ||
|
|
||
| if (payload == null) | ||
| { | ||
| // Every file was unreadable (deleted or locked between ReadBatch and now). | ||
| // Drop the refs, return. | ||
| _store.Delete(batch); | ||
| return true; | ||
| } | ||
|
|
||
| var compressed = Gzip.Compress(payload); | ||
|
|
||
| try | ||
| { | ||
| using var request = new HttpRequestMessage(HttpMethod.Post, _url); | ||
| request.Headers.Add("x-immutable-publishable-key", _publishableKey); | ||
| request.Content = new ByteArrayContent(compressed); | ||
| request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json"); | ||
| request.Content.Headers.Add("Content-Encoding", "gzip"); | ||
|
|
||
| using var response = await _client.SendAsync(request, ct).ConfigureAwait(false); | ||
|
|
||
| var statusCode = (int)response.StatusCode; | ||
|
|
||
| if (statusCode >= 200 && statusCode < 300) | ||
| { | ||
| // 2xx: server acked, drop the batch, healthy state. | ||
| _store.Delete(batch); | ||
| ResetBackoff(); | ||
| } | ||
| else if (statusCode >= 400 && statusCode < 500) | ||
| { | ||
| // 4xx: server rejected the payload. Drop it (retry won't help) and | ||
| // reset backoff — server is healthy, our data was the problem. | ||
| _store.Delete(batch); | ||
| ResetBackoff(); | ||
| NotifyError(AudienceErrorCode.ValidationRejected, | ||
| $"Batch rejected with {statusCode}"); | ||
| } | ||
| else | ||
| { | ||
| // 5xx (or other non-2xx/4xx): server is unhealthy or the response | ||
| // is anomalous. Keep batch on disk, back off, retry later. | ||
| RecordFailure(); | ||
| NotifyError(AudienceErrorCode.FlushFailed, $"Server error {statusCode}, will retry"); | ||
| } | ||
| } | ||
| catch (OperationCanceledException) when (ct.IsCancellationRequested) | ||
| { | ||
| // Caller cancelled the token (e.g. on shutdown). Events stay on | ||
| // disk, no failure recorded. HttpClient timeouts throw the same | ||
| // exception but without ct.IsCancellationRequested set, so they | ||
| // fall through to the Exception branch below and trigger backoff. | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| RecordFailure(); | ||
| NotifyError(AudienceErrorCode.NetworkError, ex.Message); | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| internal int BackoffMs => _consecutiveFailures switch | ||
| { | ||
| <= 0 => 0, | ||
| 1 => 5_000, | ||
| 2 => 10_000, | ||
| 3 => 20_000, | ||
| 4 => 40_000, | ||
| _ => 60_000, | ||
| }; | ||
|
|
||
| /// <summary> | ||
| /// Earliest UTC time at which the next attempt may run. | ||
| /// Null when no backoff is active (never failed, or last attempt succeeded). | ||
| /// </summary> | ||
| internal DateTime? NextAttemptAt => _nextAttemptAt; | ||
|
|
||
| /// <summary> | ||
| /// True while <c>UtcNow < NextAttemptAt</c>. Flips false as the clock | ||
| /// advances; no reset required. | ||
| /// </summary> | ||
| internal bool IsInBackoffWindow => _getUtcNow() < _nextAttemptAt; | ||
|
|
||
| public void Dispose() | ||
| { | ||
| _client.Dispose(); | ||
| } | ||
|
|
||
| private void RecordFailure() | ||
| { | ||
| var now = _getUtcNow(); | ||
| if (now < _nextAttemptAt) return; // inside prior window — don't compound backoff | ||
|
|
||
| _consecutiveFailures++; | ||
| _nextAttemptAt = now.AddMilliseconds(BackoffMs); | ||
| } | ||
|
|
||
| private void ResetBackoff() | ||
| { | ||
| _consecutiveFailures = 0; | ||
| _nextAttemptAt = null; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Reads each path and wraps the concatenated JSON bodies in | ||
| /// <c>{"batch":[msg1,msg2,...]}</c>. | ||
| /// </summary> | ||
| /// <returns> | ||
| /// The batched JSON, or <c>null</c> if every path was unreadable. Caller | ||
| /// treats <c>null</c> as "nothing to send" and deletes the path list. | ||
| /// </returns> | ||
| private static string? BuildPayload(IReadOnlyList<string> paths) | ||
| { | ||
| var sb = new StringBuilder("{\"batch\":["); | ||
| var count = 0; | ||
|
|
||
| for (var i = 0; i < paths.Count; i++) | ||
| { | ||
| try | ||
| { | ||
| var json = File.ReadAllText(paths[i]); | ||
| if (count > 0) sb.Append(','); | ||
| sb.Append(json); | ||
| count++; | ||
| } | ||
| catch (IOException) | ||
| { | ||
| // Transient disk race: the file was deleted or locked between | ||
| // ReadBatch and now. Safe to skip — the remaining paths in the | ||
| // batch may still read fine. Non-IOException failures escape | ||
| // and are handled by the caller (SendBatchAsync) as a batch- | ||
| // wide storage error. | ||
| } | ||
| } | ||
|
|
||
| if (count == 0) return null; | ||
|
ImmutableJeffrey marked this conversation as resolved.
|
||
|
|
||
| sb.Append("]}"); | ||
| return sb.ToString(); | ||
| } | ||
|
|
||
| private void NotifyError(AudienceErrorCode code, string message) | ||
| { | ||
| if (_onError == null) return; | ||
| try | ||
| { | ||
| _onError(new AudienceError(code, message)); | ||
| } | ||
| catch | ||
| { | ||
| // Consumer callback threw. Swallow: the SDK must not surface | ||
| // exceptions through the error-reporting path itself. | ||
| } | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| using System.IO; | ||
| using System.IO.Compression; | ||
| using System.Text; | ||
|
|
||
| namespace Immutable.Audience | ||
| { | ||
| /// <summary> | ||
| /// Gzip compression using <see cref="GZipStream"/> from System.IO.Compression. | ||
| /// Available in Unity 2021+ (.NET Standard 2.1). Pure C#, works on all desktop platforms. | ||
| /// </summary> | ||
| internal static class Gzip | ||
| { | ||
| internal static byte[] Compress(string text) | ||
| { | ||
| var raw = Encoding.UTF8.GetBytes(text); | ||
|
|
||
| using var output = new MemoryStream(); | ||
| using (var gzip = new GZipStream(output, CompressionLevel.Fastest)) | ||
| { | ||
| gzip.Write(raw, 0, raw.Length); | ||
| } | ||
|
|
||
| return output.ToArray(); | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| using System.IO; | ||
| using System.IO.Compression; | ||
| using System.Text; | ||
| using NUnit.Framework; | ||
|
|
||
| namespace Immutable.Audience.Tests | ||
| { | ||
| [TestFixture] | ||
| internal class GzipTests | ||
| { | ||
| [Test] | ||
| public void Compress_ProducesValidGzip_ThatDecompressesToOriginal() | ||
| { | ||
| const string original = "{\"type\":\"track\",\"eventName\":\"test\"}"; | ||
|
|
||
| var compressed = Gzip.Compress(original); | ||
|
|
||
| // Decompress and verify round-trip | ||
| using var input = new MemoryStream(compressed); | ||
| using var gzip = new GZipStream(input, CompressionMode.Decompress); | ||
| using var reader = new StreamReader(gzip, Encoding.UTF8); | ||
| var decompressed = reader.ReadToEnd(); | ||
|
|
||
| Assert.AreEqual(original, decompressed); | ||
| } | ||
|
|
||
| [Test] | ||
| public void Compress_OutputIsSmallerThanInput_ForRealisticPayload() | ||
| { | ||
| // Repeated field names compress well in JSON batches. | ||
| var sb = new StringBuilder("{\"batch\":["); | ||
| for (var i = 0; i < 20; i++) | ||
| { | ||
| if (i > 0) sb.Append(','); | ||
| sb.Append($"{{\"type\":\"track\",\"eventName\":\"level_complete\",\"anonymousId\":\"anon-{i}\"}}"); | ||
| } | ||
|
|
||
| sb.Append("]}"); | ||
| var payload = sb.ToString(); | ||
|
|
||
| var compressed = Gzip.Compress(payload); | ||
|
|
||
| Assert.Less(compressed.Length, Encoding.UTF8.GetByteCount(payload), "gzip should compress a batch of similar JSON events"); | ||
| } | ||
|
|
||
| [Test] | ||
| public void Compress_EmptyString_ProducesValidGzip() | ||
| { | ||
| var compressed = Gzip.Compress(""); | ||
|
|
||
| using var input = new MemoryStream(compressed); | ||
| using var gzip = new GZipStream(input, CompressionMode.Decompress); | ||
| using var reader = new StreamReader(gzip, Encoding.UTF8); | ||
| var decompressed = reader.ReadToEnd(); | ||
|
|
||
| Assert.AreEqual("", decompressed); | ||
| } | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.