diff --git a/src/Packages/Audience/Runtime/Transport/HttpTransport.cs b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs new file mode 100644 index 000000000..093d15c65 --- /dev/null +++ b/src/Packages/Audience/Runtime/Transport/HttpTransport.cs @@ -0,0 +1,230 @@ +#nullable enable + +using System; +using System.Collections.Generic; +using System.IO; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Immutable.Audience +{ + /// + /// Sends queued events from to the Audience backend. + /// + internal sealed class HttpTransport : IDisposable + { + private readonly DiskStore _store; + private readonly string _url; + private readonly string _publishableKey; + private readonly HttpClient _client; + private readonly Action? _onError; + private readonly Func _getUtcNow; + + private int _consecutiveFailures; + private DateTime? _nextAttemptAt; + + /// Source of event batches. + /// Studio API key. Sent as x-immutable-publishable-key on every request. + /// Optional failure callback. Exceptions thrown inside it are caught and ignored. + /// Optional . Callers can supply a custom pipeline (e.g. specific for test purposes). Defaults to the standard handler when null. + /// Optional UTC clock source used for backoff timing (e.g. swappable for deterministic time). Defaults to DateTime.UtcNow when null. + internal HttpTransport( + DiskStore store, + string publishableKey, + Action? onError = null, + HttpMessageHandler? handler = null, + Func? getUtcNow = null) + { + _store = store ?? throw new ArgumentNullException(nameof(store)); + _publishableKey = publishableKey ?? throw new ArgumentNullException(nameof(publishableKey)); + _url = Constants.BaseUrl(publishableKey) + Constants.MessagesPath; + _onError = onError; + _client = handler != null ? new HttpClient(handler) : new HttpClient(); + _client.Timeout = TimeSpan.FromSeconds(30); + _getUtcNow = getUtcNow ?? (() => DateTime.UtcNow); + } + + /// + /// Attempts to process one batch: reads it from disk, gzips it, and POSTs it. + /// Returns true if a batch was consumed (outcome irrelevant), false if the queue was empty. + /// + internal async Task SendBatchAsync(CancellationToken ct = default) + { + var batch = _store.ReadBatch(Constants.DefaultFlushSize); + if (batch.Count == 0) + return false; + + string? payload; + try + { + payload = BuildPayload(batch); + } + catch (Exception ex) + { + // Non-IOException = unrecoverable storage failure (e.g. permissions); + // retry won't help. Drop the batch, report via onError. + _store.Delete(batch); + NotifyError(AudienceErrorCode.FlushFailed, $"Local storage read failed: {ex.Message}"); + return true; + } + + if (payload == null) + { + // Every file was unreadable (deleted or locked between ReadBatch and now). + // Drop the refs, return. + _store.Delete(batch); + return true; + } + + var compressed = Gzip.Compress(payload); + + try + { + using var request = new HttpRequestMessage(HttpMethod.Post, _url); + request.Headers.Add("x-immutable-publishable-key", _publishableKey); + request.Content = new ByteArrayContent(compressed); + request.Content.Headers.ContentType = new MediaTypeHeaderValue("application/json"); + request.Content.Headers.Add("Content-Encoding", "gzip"); + + using var response = await _client.SendAsync(request, ct).ConfigureAwait(false); + + var statusCode = (int)response.StatusCode; + + if (statusCode >= 200 && statusCode < 300) + { + // 2xx: server acked, drop the batch, healthy state. + _store.Delete(batch); + ResetBackoff(); + } + else if (statusCode >= 400 && statusCode < 500) + { + // 4xx: server rejected the payload. Drop it (retry won't help) and + // reset backoff — server is healthy, our data was the problem. + _store.Delete(batch); + ResetBackoff(); + NotifyError(AudienceErrorCode.ValidationRejected, + $"Batch rejected with {statusCode}"); + } + else + { + // 5xx (or other non-2xx/4xx): server is unhealthy or the response + // is anomalous. Keep batch on disk, back off, retry later. + RecordFailure(); + NotifyError(AudienceErrorCode.FlushFailed, $"Server error {statusCode}, will retry"); + } + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + // Caller cancelled the token (e.g. on shutdown). Events stay on + // disk, no failure recorded. HttpClient timeouts throw the same + // exception but without ct.IsCancellationRequested set, so they + // fall through to the Exception branch below and trigger backoff. + } + catch (Exception ex) + { + RecordFailure(); + NotifyError(AudienceErrorCode.NetworkError, ex.Message); + } + + return true; + } + + internal int BackoffMs => _consecutiveFailures switch + { + <= 0 => 0, + 1 => 5_000, + 2 => 10_000, + 3 => 20_000, + 4 => 40_000, + _ => 60_000, + }; + + /// + /// Earliest UTC time at which the next attempt may run. + /// Null when no backoff is active (never failed, or last attempt succeeded). + /// + internal DateTime? NextAttemptAt => _nextAttemptAt; + + /// + /// True while UtcNow < NextAttemptAt. Flips false as the clock + /// advances; no reset required. + /// + internal bool IsInBackoffWindow => _getUtcNow() < _nextAttemptAt; + + public void Dispose() + { + _client.Dispose(); + } + + private void RecordFailure() + { + var now = _getUtcNow(); + if (now < _nextAttemptAt) return; // inside prior window — don't compound backoff + + _consecutiveFailures++; + _nextAttemptAt = now.AddMilliseconds(BackoffMs); + } + + private void ResetBackoff() + { + _consecutiveFailures = 0; + _nextAttemptAt = null; + } + + /// + /// Reads each path and wraps the concatenated JSON bodies in + /// {"batch":[msg1,msg2,...]}. + /// + /// + /// The batched JSON, or null if every path was unreadable. Caller + /// treats null as "nothing to send" and deletes the path list. + /// + private static string? BuildPayload(IReadOnlyList paths) + { + var sb = new StringBuilder("{\"batch\":["); + var count = 0; + + for (var i = 0; i < paths.Count; i++) + { + try + { + var json = File.ReadAllText(paths[i]); + if (count > 0) sb.Append(','); + sb.Append(json); + count++; + } + catch (IOException) + { + // Transient disk race: the file was deleted or locked between + // ReadBatch and now. Safe to skip — the remaining paths in the + // batch may still read fine. Non-IOException failures escape + // and are handled by the caller (SendBatchAsync) as a batch- + // wide storage error. + } + } + + if (count == 0) return null; + + sb.Append("]}"); + return sb.ToString(); + } + + private void NotifyError(AudienceErrorCode code, string message) + { + if (_onError == null) return; + try + { + _onError(new AudienceError(code, message)); + } + catch + { + // Consumer callback threw. Swallow: the SDK must not surface + // exceptions through the error-reporting path itself. + } + } + } +} \ No newline at end of file diff --git a/src/Packages/Audience/Runtime/Utility/Gzip.cs b/src/Packages/Audience/Runtime/Utility/Gzip.cs new file mode 100644 index 000000000..c54d4cd33 --- /dev/null +++ b/src/Packages/Audience/Runtime/Utility/Gzip.cs @@ -0,0 +1,26 @@ +using System.IO; +using System.IO.Compression; +using System.Text; + +namespace Immutable.Audience +{ + /// + /// Gzip compression using from System.IO.Compression. + /// Available in Unity 2021+ (.NET Standard 2.1). Pure C#, works on all desktop platforms. + /// + internal static class Gzip + { + internal static byte[] Compress(string text) + { + var raw = Encoding.UTF8.GetBytes(text); + + using var output = new MemoryStream(); + using (var gzip = new GZipStream(output, CompressionLevel.Fastest)) + { + gzip.Write(raw, 0, raw.Length); + } + + return output.ToArray(); + } + } +} \ No newline at end of file diff --git a/src/Packages/Audience/Tests/Runtime/GzipTests.cs b/src/Packages/Audience/Tests/Runtime/GzipTests.cs new file mode 100644 index 000000000..8d509be44 --- /dev/null +++ b/src/Packages/Audience/Tests/Runtime/GzipTests.cs @@ -0,0 +1,59 @@ +using System.IO; +using System.IO.Compression; +using System.Text; +using NUnit.Framework; + +namespace Immutable.Audience.Tests +{ + [TestFixture] + internal class GzipTests + { + [Test] + public void Compress_ProducesValidGzip_ThatDecompressesToOriginal() + { + const string original = "{\"type\":\"track\",\"eventName\":\"test\"}"; + + var compressed = Gzip.Compress(original); + + // Decompress and verify round-trip + using var input = new MemoryStream(compressed); + using var gzip = new GZipStream(input, CompressionMode.Decompress); + using var reader = new StreamReader(gzip, Encoding.UTF8); + var decompressed = reader.ReadToEnd(); + + Assert.AreEqual(original, decompressed); + } + + [Test] + public void Compress_OutputIsSmallerThanInput_ForRealisticPayload() + { + // Repeated field names compress well in JSON batches. + var sb = new StringBuilder("{\"batch\":["); + for (var i = 0; i < 20; i++) + { + if (i > 0) sb.Append(','); + sb.Append($"{{\"type\":\"track\",\"eventName\":\"level_complete\",\"anonymousId\":\"anon-{i}\"}}"); + } + + sb.Append("]}"); + var payload = sb.ToString(); + + var compressed = Gzip.Compress(payload); + + Assert.Less(compressed.Length, Encoding.UTF8.GetByteCount(payload), "gzip should compress a batch of similar JSON events"); + } + + [Test] + public void Compress_EmptyString_ProducesValidGzip() + { + var compressed = Gzip.Compress(""); + + using var input = new MemoryStream(compressed); + using var gzip = new GZipStream(input, CompressionMode.Decompress); + using var reader = new StreamReader(gzip, Encoding.UTF8); + var decompressed = reader.ReadToEnd(); + + Assert.AreEqual("", decompressed); + } + } +} \ No newline at end of file diff --git a/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs b/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs new file mode 100644 index 000000000..8e5f18620 --- /dev/null +++ b/src/Packages/Audience/Tests/Runtime/HttpTransportTests.cs @@ -0,0 +1,387 @@ +using System; +using System.IO; +using System.IO.Compression; +using System.Net; +using System.Net.Http; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace Immutable.Audience.Tests +{ + [TestFixture] + internal class HttpTransportTests + { + private string _testDir; + private DiskStore _store; + + // Controllable clock for timing-sensitive tests. Tests that care about + // backoff windows or NextAttemptAt pass `getUtcNow: _getUtcNow` to the transport + // and use Advance(ms) to move time forward deterministically. + private DateTime _utcNow; + private Func _getUtcNow; + + [SetUp] + public void SetUp() + { + _testDir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()); + Directory.CreateDirectory(_testDir); + _store = new DiskStore(_testDir); + _utcNow = new DateTime(2026, 4, 18, 12, 0, 0, DateTimeKind.Utc); + _getUtcNow = () => _utcNow; + } + + private void Advance(int milliseconds) => + _utcNow = _utcNow.AddMilliseconds(milliseconds); + + [TearDown] + public void TearDown() + { + if (Directory.Exists(_testDir)) + Directory.Delete(_testDir, recursive: true); + } + + [Test] + public async Task SendBatchAsync_200_DeletesFilesFromDisk() + { + _store.Write("{\"type\":\"track\",\"eventName\":\"a\"}"); + _store.Write("{\"type\":\"track\",\"eventName\":\"b\"}"); + + var handler = new MockHandler(HttpStatusCode.OK, "{\"accepted\":2,\"rejected\":0}"); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", handler: handler); + + var sent = await transport.SendBatchAsync(); + + Assert.IsTrue(sent); + Assert.AreEqual(0, _store.Count(), "files should be deleted after 200"); + } + + [Test] + public async Task SendBatchAsync_200_SendsGzippedPayloadWithCorrectHeaders() + { + _store.Write("{\"type\":\"track\",\"eventName\":\"test\"}"); + + byte[] capturedBody = null; + string capturedKey = null; + string capturedContentType = null; + // Read body inside the callback — the request content is disposed after SendAsync returns. + var handler = new MockHandler(HttpStatusCode.OK, "{\"accepted\":1,\"rejected\":0}", + onRequest: req => + { + capturedKey = string.Join("", req.Headers.GetValues("x-immutable-publishable-key")); + capturedContentType = req.Content.Headers.ContentType.MediaType; + capturedBody = req.Content.ReadAsByteArrayAsync().Result; + }); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", handler: handler); + + await transport.SendBatchAsync(); + + Assert.AreEqual("pk_imapik-test-key1", capturedKey); + Assert.AreEqual("application/json", capturedContentType); + + var decompressed = DecompressGzip(capturedBody); + StringAssert.StartsWith("{\"batch\":[", decompressed); + StringAssert.EndsWith("]}", decompressed); + StringAssert.Contains("\"eventName\":\"test\"", decompressed); + } + + [Test] + public async Task SendBatchAsync_200_UsesCorrectUrlForTestKey() + { + _store.Write("{\"type\":\"track\"}"); + + HttpRequestMessage captured = null; + var handler = new MockHandler(HttpStatusCode.OK, "{\"accepted\":1,\"rejected\":0}", + onRequest: req => captured = req); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", handler: handler); + + await transport.SendBatchAsync(); + + StringAssert.StartsWith(Constants.SandboxBaseUrl, captured.RequestUri.ToString()); + } + + [Test] + public async Task SendBatchAsync_200_UsesCorrectUrlForProdKey() + { + _store.Write("{\"type\":\"track\"}"); + + HttpRequestMessage captured = null; + var handler = new MockHandler(HttpStatusCode.OK, "{\"accepted\":1,\"rejected\":0}", + onRequest: req => captured = req); + using var transport = new HttpTransport(_store, "pk_imapik-prodkey", handler: handler); + + await transport.SendBatchAsync(); + + StringAssert.StartsWith(Constants.ProductionBaseUrl, captured.RequestUri.ToString()); + } + + [Test] + public async Task SendBatchAsync_EmptyQueue_ReturnsFalse() + { + var handler = new MockHandler(HttpStatusCode.OK, "{}"); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", handler: handler); + + var sent = await transport.SendBatchAsync(); + + Assert.IsFalse(sent); + Assert.AreEqual(0, handler.CallCount, "should not make HTTP call when queue is empty"); + } + + [Test] + public async Task SendBatchAsync_4xx_DeletesFilesAndResetsBackoff() + { + _store.Write("{\"type\":\"track\"}"); + + var handler = new MockHandler(HttpStatusCode.BadRequest, ""); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + await transport.SendBatchAsync(); + + Assert.AreEqual(0, _store.Count(), "4xx should delete files — won't succeed on retry"); + Assert.IsFalse(transport.IsInBackoffWindow); + Assert.IsNotNull(reportedError); + Assert.AreEqual(AudienceErrorCode.ValidationRejected, reportedError.Code); + } + + [Test] + public async Task SendBatchAsync_5xx_KeepsFilesAndIncreasesBackoff() + { + _store.Write("{\"type\":\"track\"}"); + + var handler = new MockHandler(HttpStatusCode.InternalServerError, ""); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + await transport.SendBatchAsync(); + + Assert.AreEqual(1, _store.Count(), "5xx should keep files for retry"); + Assert.IsTrue(transport.IsInBackoffWindow); + Assert.AreEqual(5000, transport.BackoffMs, "first failure = 5s backoff"); + Assert.IsNotNull(reportedError); + Assert.AreEqual(AudienceErrorCode.FlushFailed, reportedError.Code); + } + + [Test] + public async Task BackoffMs_EscalatesOnlyAfterWindowElapsed() + { + _store.Write("{\"type\":\"track\"}"); + var handler = new MockHandler(HttpStatusCode.InternalServerError, ""); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + handler: handler, getUtcNow: _getUtcNow); + + // Schedule: 5s → 10s → 20s → 40s → 60s cap. + // Each escalation requires the previous window to have elapsed. + await transport.SendBatchAsync(); + Assert.AreEqual(5_000, transport.BackoffMs); + + Advance(5_001); + await transport.SendBatchAsync(); + Assert.AreEqual(10_000, transport.BackoffMs); + + Advance(10_001); + await transport.SendBatchAsync(); + Assert.AreEqual(20_000, transport.BackoffMs); + + Advance(20_001); + await transport.SendBatchAsync(); + Assert.AreEqual(40_000, transport.BackoffMs); + + Advance(40_001); + await transport.SendBatchAsync(); + Assert.AreEqual(60_000, transport.BackoffMs, "reaches 60s cap after 40s step"); + + Advance(60_001); + await transport.SendBatchAsync(); + Assert.AreEqual(60_000, transport.BackoffMs, "stays at cap"); + } + + [Test] + public async Task BackoffMs_DoesNotEscalateWhileInsidePreviousWindow() + { + _store.Write("{\"type\":\"track\"}"); + var handler = new MockHandler(HttpStatusCode.InternalServerError, ""); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + handler: handler, getUtcNow: _getUtcNow); + + await transport.SendBatchAsync(); + Assert.AreEqual(5_000, transport.BackoffMs); + var firstDeadline = transport.NextAttemptAt; + Assert.IsNotNull(firstDeadline); + + // Caller ignores the window and retries immediately. Must not escalate. + Advance(100); + await transport.SendBatchAsync(); + Assert.AreEqual(5_000, transport.BackoffMs, + "failures inside the previous window must not escalate backoff"); + Assert.AreEqual(firstDeadline, transport.NextAttemptAt, + "NextAttemptAt should not move when the window hasn't elapsed"); + + // Another premature retry — still no escalation. + Advance(3_000); + await transport.SendBatchAsync(); + Assert.AreEqual(5_000, transport.BackoffMs); + + // Wait out the window, fail again → now we escalate. + _utcNow = firstDeadline.Value.AddMilliseconds(1); + await transport.SendBatchAsync(); + Assert.AreEqual(10_000, transport.BackoffMs); + } + + [Test] + public async Task BackoffMs_ResetsAfterSuccess() + { + _store.Write("{\"type\":\"track\"}"); + + var callCount = 0; + var handler = new MockHandler(() => + { + callCount++; + // Fail twice, then succeed. + return callCount <= 2 + ? new HttpResponseMessage(HttpStatusCode.InternalServerError) + : new HttpResponseMessage(HttpStatusCode.OK) + { Content = new StringContent("{\"accepted\":1,\"rejected\":0}") }; + }); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + handler: handler, getUtcNow: _getUtcNow); + + await transport.SendBatchAsync(); + Assert.AreEqual(5_000, transport.BackoffMs); + + Advance(5_001); + await transport.SendBatchAsync(); + Assert.AreEqual(10_000, transport.BackoffMs); + + Advance(10_001); + await transport.SendBatchAsync(); + Assert.AreEqual(0, transport.BackoffMs, "backoff resets after success"); + Assert.IsFalse(transport.IsInBackoffWindow); + } + + [Test] + public async Task SendBatchAsync_NetworkError_KeepsFilesAndBacksOff() + { + _store.Write("{\"type\":\"track\"}"); + + var handler = new MockHandler(() => throw new HttpRequestException("connection refused")); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + await transport.SendBatchAsync(); + + Assert.AreEqual(1, _store.Count(), "network error should keep files for retry"); + Assert.IsTrue(transport.IsInBackoffWindow); + Assert.IsNotNull(reportedError); + Assert.AreEqual(AudienceErrorCode.NetworkError, reportedError.Code); + } + + [Test] + public async Task SendBatchAsync_HttpClientTimeout_TreatedAsNetworkError() + { + // Regression guard: HttpClient.Timeout throws TaskCanceledException, which + // derives from OperationCanceledException. Without a `when (ct.IsCancellationRequested)` + // guard, timeouts would be silently swallowed as "shutdown" — no backoff, no error + // callback, next cycle hot-loops. This test ensures timeouts flow through the + // NetworkError path. + _store.Write("{\"type\":\"track\"}"); + + var handler = new MockHandler(() => throw new TaskCanceledException("Request timed out")); + AudienceError reportedError = null; + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: e => reportedError = e, handler: handler); + + // Pass default CancellationToken so ct.IsCancellationRequested is false — this + // simulates a real HttpClient timeout (not a caller-initiated cancellation). + await transport.SendBatchAsync(); + + Assert.AreEqual(1, _store.Count(), "timeout should keep files for retry"); + Assert.IsTrue(transport.IsInBackoffWindow, "timeout must increment failures and engage backoff"); + Assert.IsNotNull(reportedError, "NetworkError callback must fire on timeout"); + Assert.AreEqual(AudienceErrorCode.NetworkError, reportedError.Code); + } + + [Test] + public async Task IsInBackoffWindow_ClearsAfterNextAttemptAtElapses() + { + _store.Write("{\"type\":\"track\"}"); + + var now = new DateTime(2026, 4, 17, 12, 0, 0, DateTimeKind.Utc); + var handler = new MockHandler(HttpStatusCode.InternalServerError, ""); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + handler: handler, getUtcNow: () => now); + + await transport.SendBatchAsync(); + + Assert.IsTrue(transport.IsInBackoffWindow, "within window immediately after failure"); + Assert.AreEqual(now.AddMilliseconds(5_000), transport.NextAttemptAt); + + // Advance the clock just before NextAttemptAt — still backing off. + now = now.AddMilliseconds(4_999); + Assert.IsTrue(transport.IsInBackoffWindow); + + // Advance past NextAttemptAt — window closed, next send may proceed. + now = now.AddMilliseconds(2); + Assert.IsFalse(transport.IsInBackoffWindow, "window closes at NextAttemptAt"); + } + + [Test] + public async Task SendBatchAsync_ErrorCallbackThrows_DoesNotCrash() + { + _store.Write("{\"type\":\"track\"}"); + + var handler = new MockHandler(HttpStatusCode.BadRequest, ""); + using var transport = new HttpTransport(_store, "pk_imapik-test-key1", + onError: _ => throw new InvalidOperationException("callback bug"), + handler: handler); + + Assert.DoesNotThrowAsync(() => transport.SendBatchAsync()); + } + + private static string DecompressGzip(byte[] data) + { + using var input = new MemoryStream(data); + using var gzip = new GZipStream(input, CompressionMode.Decompress); + using var reader = new StreamReader(gzip, Encoding.UTF8); + return reader.ReadToEnd(); + } + + /// + /// Minimal HttpMessageHandler that returns a canned response. + /// Optionally captures the request for inspection. + /// + private class MockHandler : HttpMessageHandler + { + private readonly Func _factory; + private readonly Action _onRequest; + + internal int CallCount { get; private set; } + + internal MockHandler(HttpStatusCode status, string body, Action onRequest = null) + { + _factory = () => new HttpResponseMessage(status) + { + Content = new StringContent(body) + }; + _onRequest = onRequest; + } + + internal MockHandler(Func factory, Action onRequest = null) + { + _factory = factory; + _onRequest = onRequest; + } + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken ct) + { + CallCount++; + _onRequest?.Invoke(request); + return Task.FromResult(_factory()); + } + } + } +} \ No newline at end of file