diff --git a/src/Packages/Audience/Runtime/Transport.meta b/src/Packages/Audience/Runtime/Transport.meta new file mode 100644 index 000000000..48c3d6b2e --- /dev/null +++ b/src/Packages/Audience/Runtime/Transport.meta @@ -0,0 +1,8 @@ +fileFormatVersion: 2 +guid: b7e3a1f2d4c548a89e5f6b7c8d9e0f1a +folderAsset: yes +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/src/Packages/Audience/Runtime/Transport/DiskStore.cs b/src/Packages/Audience/Runtime/Transport/DiskStore.cs new file mode 100644 index 000000000..6e691521b --- /dev/null +++ b/src/Packages/Audience/Runtime/Transport/DiskStore.cs @@ -0,0 +1,103 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; + +namespace Immutable.Audience +{ + /// + /// File-per-event persistent store. Each event is written as an atomic + /// {ticks}_{uuid}.json file inside imtbl_audience/queue/. + /// + internal sealed class DiskStore + { + private readonly string _queueDir; + + internal DiskStore(string persistentDataPath) + { + _queueDir = Path.Combine(persistentDataPath, "imtbl_audience", "queue"); + Directory.CreateDirectory(_queueDir); + } + + /// Atomically writes as a new event file. + internal void Write(string json) + { + var fileName = $"{DateTime.UtcNow.Ticks}_{Guid.NewGuid():N}.json"; + var finalPath = Path.Combine(_queueDir, fileName); + var tmpPath = finalPath + ".tmp"; + + File.WriteAllText(tmpPath, json); + + try + { + File.Move(tmpPath, finalPath); + } + catch (IOException) + { + // Destination already exists (unlikely but safe to handle) + File.Delete(finalPath); + File.Move(tmpPath, finalPath); + } + } + + /// + /// Returns up to file paths, oldest first. + /// Files older than days are deleted and excluded. + /// + internal IReadOnlyList ReadBatch(int maxSize) + { + if (maxSize <= 0) + return Array.Empty(); + + maxSize = Math.Min(maxSize, Constants.MaxBatchSize); + + var cutoff = DateTime.UtcNow.AddDays(-Constants.StaleEventDays); + + var result = new List(); + + // Sort by filename (ticks prefix) → oldest first + var files = Directory.GetFiles(_queueDir, "*.json") + .OrderBy(f => Path.GetFileName(f), StringComparer.Ordinal); + + foreach (var path in files) + { + if (result.Count >= maxSize) + break; + + // Stale check: parse ticks from filename prefix + var name = Path.GetFileNameWithoutExtension(path); + var underscoreIdx = name.IndexOf('_'); + if (underscoreIdx > 0 && long.TryParse(name.Substring(0, underscoreIdx), out var ticks)) + { + var fileTime = new DateTime(ticks, DateTimeKind.Utc); + if (fileTime < cutoff) + { + TryDelete(path); + continue; + } + } + + result.Add(path); + } + + return result; + } + + /// Deletes the event files at . + internal void Delete(IEnumerable paths) + { + foreach (var path in paths) + TryDelete(path); + } + + /// Returns the total number of event files currently on disk. + internal int Count() => Directory.GetFiles(_queueDir, "*.json").Length; + + private static void TryDelete(string path) + { + try { File.Delete(path); } + catch (IOException) { } + catch (UnauthorizedAccessException) { } + } + } +} diff --git a/src/Packages/Audience/Runtime/Transport/DiskStore.cs.meta b/src/Packages/Audience/Runtime/Transport/DiskStore.cs.meta new file mode 100644 index 000000000..5dd9a52d3 --- /dev/null +++ b/src/Packages/Audience/Runtime/Transport/DiskStore.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: c8f4b2e1a3d547b09f6e7c8d9a0b1c2d +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/src/Packages/Audience/Runtime/Transport/EventQueue.cs b/src/Packages/Audience/Runtime/Transport/EventQueue.cs new file mode 100644 index 000000000..0b7491a5f --- /dev/null +++ b/src/Packages/Audience/Runtime/Transport/EventQueue.cs @@ -0,0 +1,136 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; + +namespace Immutable.Audience +{ + /// + /// Thread-safe, disk-persistent batch event queue for the Audience SDK. + /// + /// Enqueue is lock-free and safe to call from any thread. A background + /// drain thread moves events from the in-memory + /// to , flushing either on a time interval or when the + /// in-memory batch reaches . + /// + /// Call before process exit to flush remaining events + /// and stop the drain thread cleanly. + /// + internal sealed class EventQueue : IDisposable + { + private readonly DiskStore _store; + private readonly int _flushIntervalMs; + private readonly int _flushSize; + + private readonly ConcurrentQueue _memory = new ConcurrentQueue(); + private readonly CancellationTokenSource _cts = new CancellationTokenSource(); + private readonly Thread _drainThread; + private readonly ManualResetEventSlim _flushGate = new ManualResetEventSlim(false); + + // Volatile so all threads see the shutdown signal immediately. + private volatile bool _disposed; + + /// Pre-created for this queue. + /// How often to drain to disk regardless of batch size. + /// Drain to disk immediately when this many events are queued. + internal EventQueue(DiskStore store, int flushIntervalSeconds, int flushSize) + { + _store = store ?? throw new ArgumentNullException(nameof(store)); + _flushIntervalMs = Math.Max(1, flushIntervalSeconds) * 1000; + _flushSize = Math.Max(1, flushSize); + + _drainThread = new Thread(DrainLoop) + { + IsBackground = true, + Name = "imtbl-audience-drain" + }; + _drainThread.Start(); + } + + /// Enqueues a JSON-serialised event. Lock-free; safe from any thread. + internal void Enqueue(string json) + { + if (_disposed) return; + + _memory.Enqueue(json); + + // Signal the drain thread early if we've hit the flush-size threshold + if (_memory.Count >= _flushSize) + _flushGate.Set(); + } + + /// + /// Drains the in-memory queue and persists all events to disk immediately. + /// Blocks until the drain is complete. + /// + internal void FlushSync() + { + DrainMemoryToDisk(); + } + + /// + /// Flushes all pending events to disk and stops the drain thread. + /// Safe to call multiple times. + /// + internal void Shutdown() + { + if (_disposed) return; + + // Stop accepting new events first — closes the race window where + // events enqueued between Cancel and final drain would be lost. + _disposed = true; + + // Signal the drain thread to exit, then wait for it. + _cts.Cancel(); + _flushGate.Set(); + _drainThread.Join(TimeSpan.FromSeconds(5)); + + // Final drain: anything enqueued before _disposed was set. + DrainMemoryToDisk(); + } + + // ----------------------------------------------------------------- + // Background drain loop + // ----------------------------------------------------------------- + + private void DrainLoop() + { + while (!_cts.IsCancellationRequested) + { + // Wait for flush gate or interval timeout + _flushGate.Wait(_flushIntervalMs); + _flushGate.Reset(); + + if (_cts.IsCancellationRequested) + break; + + DrainMemoryToDisk(); + } + } + + private void DrainMemoryToDisk() + { + while (_memory.TryDequeue(out var json)) + { + try + { + _store.Write(json); + } + catch (Exception) + { + // Best-effort: if we can't write, discard rather than block the drain + } + } + } + + // ----------------------------------------------------------------- + // IDisposable + // ----------------------------------------------------------------- + + public void Dispose() + { + Shutdown(); + _cts.Dispose(); + _flushGate.Dispose(); + } + } +} diff --git a/src/Packages/Audience/Runtime/Transport/EventQueue.cs.meta b/src/Packages/Audience/Runtime/Transport/EventQueue.cs.meta new file mode 100644 index 000000000..9d97874ed --- /dev/null +++ b/src/Packages/Audience/Runtime/Transport/EventQueue.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: d9e5c3f2b4e658c10a7f8d9e0b1c2d3e +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/src/Packages/Audience/Tests/Runtime/DiskStoreTests.cs b/src/Packages/Audience/Tests/Runtime/DiskStoreTests.cs new file mode 100644 index 000000000..d9c37dbc7 --- /dev/null +++ b/src/Packages/Audience/Tests/Runtime/DiskStoreTests.cs @@ -0,0 +1,164 @@ +using System; +using System.IO; +using System.Linq; +using System.Threading; +using NUnit.Framework; + +namespace Immutable.Audience.Tests +{ + [TestFixture] + internal class DiskStoreTests + { + private string _testDir; + private DiskStore _store; + + [SetUp] + public void SetUp() + { + _testDir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()); + Directory.CreateDirectory(_testDir); + _store = new DiskStore(_testDir); + } + + [TearDown] + public void TearDown() + { + if (Directory.Exists(_testDir)) + Directory.Delete(_testDir, recursive: true); + } + + [Test] + public void Write_CreatesJsonFile_InQueueDirectory() + { + _store.Write("{\"event\":\"test\"}"); + + var queueDir = Path.Combine(_testDir, "imtbl_audience", "queue"); + var files = Directory.GetFiles(queueDir, "*.json"); + Assert.AreEqual(1, files.Length, "should have written exactly one event file"); + } + + [Test] + public void Write_FileContents_MatchInputJson() + { + const string json = "{\"event\":\"pageview\",\"userId\":\"u1\"}"; + _store.Write(json); + + var queueDir = Path.Combine(_testDir, "imtbl_audience", "queue"); + var file = Directory.GetFiles(queueDir, "*.json").Single(); + Assert.AreEqual(json, File.ReadAllText(file)); + } + + [Test] + public void ReadBatch_ReturnsOldestFirst() + { + // Write events with a small delay to guarantee different ticks in filenames + _store.Write("{\"seq\":1}"); + Thread.Sleep(10); + _store.Write("{\"seq\":2}"); + Thread.Sleep(10); + _store.Write("{\"seq\":3}"); + + var batch = _store.ReadBatch(10); + + Assert.AreEqual(3, batch.Count); + // Filenames are {ticks}_{uuid}.json — lexicographic sort == oldest first + var names = batch.Select(Path.GetFileName).ToList(); + Assert.That(names, Is.Ordered.Ascending); + } + + [Test] + public void ReadBatch_RespectsMaxSize() + { + for (var i = 0; i < 5; i++) + { + _store.Write($"{{\"i\":{i}}}"); + Thread.Sleep(5); + } + + var batch = _store.ReadBatch(3); + Assert.AreEqual(3, batch.Count); + } + + [Test] + public void ReadBatch_ClampsToMaxBatchSize() + { + for (var i = 0; i < Constants.MaxBatchSize + 10; i++) + _store.Write($"{{\"i\":{i}}}"); + + var batch = _store.ReadBatch(Constants.MaxBatchSize + 10); + Assert.LessOrEqual(batch.Count, Constants.MaxBatchSize); + } + + [Test] + public void ReadBatch_ExcludesAndDeletesStaleFiles() + { + _store.Write("{\"fresh\":true}"); + + // Manually plant a stale file (ticks from 31 days ago) + var staleTime = DateTime.UtcNow.AddDays(-(Constants.StaleEventDays + 1)); + var staleName = $"{staleTime.Ticks}_{Guid.NewGuid():N}.json"; + var queueDir = Path.Combine(_testDir, "imtbl_audience", "queue"); + File.WriteAllText(Path.Combine(queueDir, staleName), "{\"stale\":true}"); + + var batch = _store.ReadBatch(10); + + Assert.AreEqual(1, batch.Count, "stale file should be excluded from batch"); + Assert.IsFalse(File.Exists(Path.Combine(queueDir, staleName)), "stale file should be deleted"); + } + + [Test] + public void Delete_RemovesSpecifiedFiles() + { + _store.Write("{\"a\":1}"); + _store.Write("{\"b\":2}"); + + var batch = _store.ReadBatch(10); + Assert.AreEqual(2, batch.Count); + + _store.Delete(batch); + + Assert.AreEqual(0, _store.Count()); + } + + [Test] + public void Count_ReflectsNumberOfFilesOnDisk() + { + Assert.AreEqual(0, _store.Count()); + + _store.Write("{\"x\":1}"); + _store.Write("{\"x\":2}"); + + Assert.AreEqual(2, _store.Count()); + } + + [Test] + public void ReadBatch_EmptyQueue_ReturnsEmpty() + { + var batch = _store.ReadBatch(10); + Assert.AreEqual(0, batch.Count); + } + + [Test] + public void ReadBatch_ZeroMaxSize_ReturnsEmpty() + { + _store.Write("{\"x\":1}"); + var batch = _store.ReadBatch(0); + Assert.AreEqual(0, batch.Count); + } + + [Test] + public void CrashRecovery_PicksUpFilesFromPreviousRun() + { + // Simulate a previous run by writing a file directly + var queueDir = Path.Combine(_testDir, "imtbl_audience", "queue"); + var survivingName = $"{DateTime.UtcNow.Ticks}_{Guid.NewGuid():N}.json"; + File.WriteAllText(Path.Combine(queueDir, survivingName), "{\"survived\":true}"); + + // Create a new DiskStore instance pointing at the same path (simulates restart) + var store2 = new DiskStore(_testDir); + var batch = store2.ReadBatch(10); + + Assert.AreEqual(1, batch.Count, "crash-surviving file should be picked up on next init"); + } + } +} diff --git a/src/Packages/Audience/Tests/Runtime/DiskStoreTests.cs.meta b/src/Packages/Audience/Tests/Runtime/DiskStoreTests.cs.meta new file mode 100644 index 000000000..d646d0000 --- /dev/null +++ b/src/Packages/Audience/Tests/Runtime/DiskStoreTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: e0f6d4a3b5c769d21b8e9f0a1b2c3d4e +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/src/Packages/Audience/Tests/Runtime/EventQueueTests.cs b/src/Packages/Audience/Tests/Runtime/EventQueueTests.cs new file mode 100644 index 000000000..c9ec7ccd3 --- /dev/null +++ b/src/Packages/Audience/Tests/Runtime/EventQueueTests.cs @@ -0,0 +1,130 @@ +using System; +using System.IO; +using System.Threading; +using NUnit.Framework; + +namespace Immutable.Audience.Tests +{ + [TestFixture] + internal class EventQueueTests + { + private string _testDir; + private DiskStore _store; + + [SetUp] + public void SetUp() + { + _testDir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()); + Directory.CreateDirectory(_testDir); + _store = new DiskStore(_testDir); + } + + [TearDown] + public void TearDown() + { + if (Directory.Exists(_testDir)) + Directory.Delete(_testDir, recursive: true); + } + + [Test] + public void Enqueue_ThenFlushSync_PersistesEventToDisk() + { + using var queue = new EventQueue(_store, flushIntervalSeconds: 60, flushSize: 100); + + queue.Enqueue("{\"event\":\"track\"}"); + queue.FlushSync(); + + Assert.AreEqual(1, _store.Count(), "event should be on disk after FlushSync"); + } + + [Test] + public void Enqueue_MultipleEvents_AllPersistedAfterFlush() + { + using var queue = new EventQueue(_store, flushIntervalSeconds: 60, flushSize: 100); + + for (var i = 0; i < 10; i++) + queue.Enqueue($"{{\"i\":{i}}}"); + + queue.FlushSync(); + + Assert.AreEqual(10, _store.Count()); + } + + [Test] + public void FlushSize_Trigger_DrainsToDiskAutomatically() + { + const int flushSize = 5; + using var queue = new EventQueue(_store, flushIntervalSeconds: 60, flushSize: flushSize); + + for (var i = 0; i < flushSize; i++) + queue.Enqueue($"{{\"i\":{i}}}"); + + // Give the background thread time to drain + var deadline = DateTime.UtcNow.AddSeconds(3); + while (_store.Count() < flushSize && DateTime.UtcNow < deadline) + Thread.Sleep(20); + + Assert.AreEqual(flushSize, _store.Count(), + "reaching FlushSize should trigger automatic drain without explicit FlushSync"); + } + + [Test] + public void Shutdown_FlushesRemainingEvents() + { + var queue = new EventQueue(_store, flushIntervalSeconds: 60, flushSize: 100); + + queue.Enqueue("{\"event\":\"a\"}"); + queue.Enqueue("{\"event\":\"b\"}"); + + queue.Shutdown(); + + Assert.AreEqual(2, _store.Count(), "Shutdown should flush all remaining in-memory events"); + } + + [Test] + public void Shutdown_CalledTwice_DoesNotThrow() + { + var queue = new EventQueue(_store, flushIntervalSeconds: 60, flushSize: 100); + queue.Shutdown(); + Assert.DoesNotThrow(() => queue.Shutdown()); + } + + [Test] + public void Enqueue_AfterShutdown_IsIgnored() + { + var queue = new EventQueue(_store, flushIntervalSeconds: 60, flushSize: 100); + queue.Shutdown(); + + queue.Enqueue("{\"event\":\"ignored\"}"); + + Assert.AreEqual(0, _store.Count(), "events enqueued after Shutdown should be discarded"); + } + + [Test] + public void IntervalFlush_DrainsToDiskWithoutExplicitCall() + { + // Very short interval to make the test fast + using var queue = new EventQueue(_store, flushIntervalSeconds: 1, flushSize: 100); + + queue.Enqueue("{\"event\":\"interval_flush\"}"); + + // Wait slightly longer than the flush interval + var deadline = DateTime.UtcNow.AddSeconds(4); + while (_store.Count() < 1 && DateTime.UtcNow < deadline) + Thread.Sleep(50); + + Assert.AreEqual(1, _store.Count(), "interval flush should have persisted event to disk"); + } + + [Test] + public void Dispose_FlushesAndStopsDrainThread() + { + using (var queue = new EventQueue(_store, flushIntervalSeconds: 60, flushSize: 100)) + { + queue.Enqueue("{\"event\":\"dispose_test\"}"); + } // Dispose called here + + Assert.AreEqual(1, _store.Count(), "Dispose should flush events to disk"); + } + } +} diff --git a/src/Packages/Audience/Tests/Runtime/EventQueueTests.cs.meta b/src/Packages/Audience/Tests/Runtime/EventQueueTests.cs.meta new file mode 100644 index 000000000..63cd7437d --- /dev/null +++ b/src/Packages/Audience/Tests/Runtime/EventQueueTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: f1a7e5b4c6d870e32c9f0a1b2c3d4e5f +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: