Skip to content

Commit 69440fe

Browse files
fix(audience): guard SendBatch against overlapping timer ticks
System.Threading.Timer does not serialise callbacks — with a 5s flush interval and HTTP requests that can run up to the 30s timeout, a slow network stacks six overlapping callbacks, each on its own ThreadPool thread blocked inside SendBatchAsync. Gate SendBatch with Interlocked.CompareExchange so subsequent ticks return in microseconds while a send is in flight. The guard is cleared in finally so a throw inside GetResult or RescheduleSendTimer still releases the next tick, and defensively in Shutdown so a WaitOne that times out on a still-running callback cannot strand the flag at 1 across an Init/Shutdown cycle. Exposes SendBatch via an internal testing hook so the guard can be exercised without a real timer; the new test blocks the HTTP handler, fires an overlapping tick, and asserts only one request reaches the wire. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent f1d6685 commit 69440fe

2 files changed

Lines changed: 93 additions & 12 deletions

File tree

src/Packages/Audience/Runtime/ImmutableAudience.cs

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ public static class ImmutableAudience
2626
private static volatile bool _initialized;
2727
private static readonly object _initLock = new object();
2828

29+
// Guard against overlapping timer ticks. System.Threading.Timer fires
30+
// callbacks on independent ThreadPool threads and does not serialise
31+
// them; without this gate, a slow SendBatchAsync (up to the HTTP
32+
// timeout) would stack on every interval tick, each tick holding its
33+
// own thread blocked on a pending request.
34+
private static int _sendInFlight;
35+
2936
// AudienceUnityHooks sets this at SubsystemRegistration so Unity studios
3037
// can omit PersistentDataPath from AudienceConfig and Init will fill it
3138
// from Application.persistentDataPath. Non-Unity callers must still set
@@ -484,6 +491,12 @@ public static void Shutdown()
484491
_sendTimer = null;
485492
}
486493

494+
// Clear the in-flight guard in case the WaitOne above timed out
495+
// with a SendBatch callback still running: without this, a later
496+
// Init would leave _sendInFlight stranded at 1 and suppress every
497+
// tick of the new timer.
498+
Interlocked.Exchange(ref _sendInFlight, 0);
499+
487500
_queue?.Shutdown();
488501

489502
// Best-effort final send, capped so a slow network can't hang quit.
@@ -553,6 +566,10 @@ internal static void ResetState()
553566

554567
internal static void FlushQueueToDiskForTesting() => _queue?.FlushSync();
555568

569+
// Invokes the timer callback body directly so the overlapping-tick
570+
// guard can be exercised without a real timer.
571+
internal static void SendBatchForTesting() => SendBatch();
572+
556573
// -----------------------------------------------------------------
557574
// Private
558575
// -----------------------------------------------------------------
@@ -578,23 +595,36 @@ private static void Enqueue(Dictionary<string, object>? msg)
578595

579596
private static void SendBatch()
580597
{
581-
var transport = _transport;
582-
if (transport == null) return;
598+
// CAS in the guard before doing any work; a previous tick still
599+
// running means skip entirely, including the reschedule — the
600+
// in-flight tick will reschedule on its own finally path.
601+
if (Interlocked.CompareExchange(ref _sendInFlight, 1, 0) != 0)
602+
return;
583603

584-
if (!transport.IsInBackoffWindow)
604+
try
585605
{
586-
try
587-
{
588-
transport.SendBatchAsync().ConfigureAwait(false).GetAwaiter().GetResult();
589-
}
590-
catch (Exception ex)
606+
var transport = _transport;
607+
if (transport == null) return;
608+
609+
if (!transport.IsInBackoffWindow)
591610
{
592-
// ThreadPool timer thread; no caller above to catch.
593-
Log.Warn($"SendBatch unexpected exception: {ex.GetType().Name}: {ex.Message}");
611+
try
612+
{
613+
transport.SendBatchAsync().ConfigureAwait(false).GetAwaiter().GetResult();
614+
}
615+
catch (Exception ex)
616+
{
617+
// ThreadPool timer thread; no caller above to catch.
618+
Log.Warn($"SendBatch unexpected exception: {ex.GetType().Name}: {ex.Message}");
619+
}
594620
}
595-
}
596621

597-
RescheduleSendTimer(transport);
622+
RescheduleSendTimer(transport);
623+
}
624+
finally
625+
{
626+
Interlocked.Exchange(ref _sendInFlight, 0);
627+
}
598628
}
599629

600630
// Realigns the timer to NextAttemptAt so we don't repoll through a long backoff window.

src/Packages/Audience/Tests/Runtime/ImmutableAudienceTests.cs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,5 +850,56 @@ public void FullToAnonymous_FutureTracksOmitUserId()
850850
Assert.IsFalse(trackFiles[0].ContainsKey("userId"),
851851
"Track under Anonymous consent must not carry userId");
852852
}
853+
854+
// -----------------------------------------------------------------
855+
// SendBatch — overlapping timer tick guard
856+
// -----------------------------------------------------------------
857+
858+
[Test]
859+
public void SendBatch_ConcurrentTicks_OnlyOneReachesTransport()
860+
{
861+
var handler = new BlockingHandler();
862+
var config = MakeConfig();
863+
config.HttpHandler = handler;
864+
865+
ImmutableAudience.Init(config);
866+
ImmutableAudience.Track("event_to_send");
867+
ImmutableAudience.FlushQueueToDiskForTesting();
868+
869+
// Kick off one SendBatch on a worker — it will block inside the
870+
// handler until we signal, holding _sendInFlight = 1.
871+
var blocked = Task.Run(() => ImmutableAudience.SendBatchForTesting());
872+
873+
// Give the worker enough time to enter the handler's SendAsync.
874+
Assert.IsTrue(handler.EnteredSendAsync.Wait(TimeSpan.FromSeconds(2)),
875+
"first SendBatch should have reached the HTTP handler");
876+
877+
// Second tick while the first is still in flight — must return
878+
// immediately without issuing another request.
879+
ImmutableAudience.SendBatchForTesting();
880+
881+
// Release the blocked send and let it finish.
882+
handler.Release.Set();
883+
Assert.IsTrue(blocked.Wait(TimeSpan.FromSeconds(5)),
884+
"blocked SendBatch should finish after release");
885+
886+
Assert.AreEqual(1, handler.RequestCount,
887+
"overlapping tick must not issue a second HTTP request");
888+
}
889+
890+
private class BlockingHandler : HttpMessageHandler
891+
{
892+
public readonly ManualResetEventSlim EnteredSendAsync = new ManualResetEventSlim(false);
893+
public readonly ManualResetEventSlim Release = new ManualResetEventSlim(false);
894+
public int RequestCount;
895+
896+
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken ct)
897+
{
898+
Interlocked.Increment(ref RequestCount);
899+
EnteredSendAsync.Set();
900+
await Task.Run(() => Release.Wait(ct), ct).ConfigureAwait(false);
901+
return new HttpResponseMessage(HttpStatusCode.ServiceUnavailable);
902+
}
903+
}
853904
}
854905
}

0 commit comments

Comments
 (0)