diff --git a/src/Core/src/Eventuous.Subscriptions/Filters/ConsumerFilter.cs b/src/Core/src/Eventuous.Subscriptions/Filters/ConsumerFilter.cs index 22ad566c..01dbf483 100644 --- a/src/Core/src/Eventuous.Subscriptions/Filters/ConsumerFilter.cs +++ b/src/Core/src/Eventuous.Subscriptions/Filters/ConsumerFilter.cs @@ -1,15 +1,12 @@ // Copyright (C) Ubiquitous AS. All rights reserved // Licensed under the Apache License, Version 2.0. -using System.Runtime.CompilerServices; - namespace Eventuous.Subscriptions.Filters; using Consumers; using Context; public class ConsumerFilter(IMessageConsumer consumer) : ConsumeFilter { - [MethodImpl(MethodImplOptions.AggressiveInlining)] protected override ValueTask Send(IMessageConsumeContext context, LinkedListNode? next) => consumer.Consume(context); } diff --git a/src/SqlServer/src/Eventuous.SqlServer/Eventuous.SqlServer.csproj b/src/SqlServer/src/Eventuous.SqlServer/Eventuous.SqlServer.csproj index 5c7deb19..a4dd9384 100644 --- a/src/SqlServer/src/Eventuous.SqlServer/Eventuous.SqlServer.csproj +++ b/src/SqlServer/src/Eventuous.SqlServer/Eventuous.SqlServer.csproj @@ -12,7 +12,9 @@ + + @@ -26,4 +28,8 @@ + + + + diff --git a/src/SqlServer/src/Eventuous.SqlServer/Schema.cs b/src/SqlServer/src/Eventuous.SqlServer/Schema.cs index 28984de2..a8c5ee8d 100644 --- a/src/SqlServer/src/Eventuous.SqlServer/Schema.cs +++ b/src/SqlServer/src/Eventuous.SqlServer/Schema.cs @@ -8,14 +8,16 @@ namespace Eventuous.SqlServer; public class Schema(string schema = Schema.DefaultSchema) { public const string DefaultSchema = "eventuous"; - public string AppendEvents => $"{schema}.append_events"; - public string ReadStreamForwards => $"{schema}.read_stream_forwards"; - public string ReadStreamSub => $"{schema}.read_stream_sub"; - public string ReadAllForwards => $"{schema}.read_all_forwards"; - public string CheckStream => $"{schema}.check_stream"; - public string StreamExists => $"SELECT CAST(IIF(EXISTS(SELECT 1 FROM {schema}.Streams WHERE StreamName = (@name)), 1, 0) AS BIT)"; - public string GetCheckpointSql => $"SELECT Position FROM {schema}.Checkpoints where Id=(@checkpointId)"; - public string AddCheckpointSql => $"INSERT INTO {schema}.Checkpoints (Id) VALUES ((@checkpointId))"; + public string AppendEvents => $"{schema}.append_events"; + public string ReadStreamForwards => $"{schema}.read_stream_forwards"; + public string ReadStreamBackwards => $"{schema}.read_stream_backwards"; + public string ReadStreamSub => $"{schema}.read_stream_sub"; + public string ReadAllForwards => $"{schema}.read_all_forwards"; + public string ReadAllBackwards => $"{schema}.read_all_backwards"; + public string CheckStream => $"{schema}.check_stream"; + public string StreamExists => $"SELECT CAST(IIF(EXISTS(SELECT 1 FROM {schema}.Streams WHERE StreamName = (@name)), 1, 0) AS BIT)"; + public string GetCheckpointSql => $"SELECT Position FROM {schema}.Checkpoints where Id=(@checkpointId)"; + public string AddCheckpointSql => $"INSERT INTO {schema}.Checkpoints (Id) VALUES ((@checkpointId))"; public string UpdateCheckpointSql => $"UPDATE {schema}.Checkpoints set Position=(@position) where Id=(@checkpointId)"; diff --git a/src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadAllBackwards.sql b/src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadAllBackwards.sql new file mode 100644 index 00000000..f1c0d657 --- /dev/null +++ b/src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadAllBackwards.sql @@ -0,0 +1,15 @@ +CREATE OR ALTER PROCEDURE __schema__.read_all_backwards + @from_position bigint, + @count int + AS +BEGIN + +SELECT TOP (@count) + MessageId, MessageType, StreamPosition, GlobalPosition, + JsonData, JsonMetadata, Created, StreamName +FROM __schema__.Messages +INNER JOIN __schema__.Streams ON Messages.StreamId = Streams.StreamId +WHERE Messages.GlobalPosition <= @from_position +ORDER BY Messages.GlobalPosition DESC + +END \ No newline at end of file diff --git a/src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadStreamBackwards.sql b/src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadStreamBackwards.sql new file mode 100644 index 00000000..ea9039e7 --- /dev/null +++ b/src/SqlServer/src/Eventuous.SqlServer/Scripts/ReadStreamBackwards.sql @@ -0,0 +1,23 @@ +CREATE OR ALTER PROCEDURE __schema__.read_stream_backwards + @stream_name NVARCHAR(850), + @count INT +AS +BEGIN + +DECLARE @current_version int, @stream_id int + +SELECT @current_version = Version, @stream_id = StreamId +FROM __schema__.Streams +WHERE StreamName = @stream_name + +IF @stream_id IS NULL + THROW 50001, 'StreamNotFound', 1; + +SELECT TOP (@count) + MessageId, MessageType, StreamPosition, GlobalPosition, + JsonData, JsonMetadata, Created +FROM __schema__.Messages +WHERE StreamId = @stream_id AND StreamPosition <= @current_version +ORDER BY Messages.GlobalPosition DESC + +END \ No newline at end of file diff --git a/src/SqlServer/src/Eventuous.SqlServer/SqlServerStore.cs b/src/SqlServer/src/Eventuous.SqlServer/SqlServerStore.cs index 9a1324a3..5582a14e 100644 --- a/src/SqlServer/src/Eventuous.SqlServer/SqlServerStore.cs +++ b/src/SqlServer/src/Eventuous.SqlServer/SqlServerStore.cs @@ -59,8 +59,23 @@ CancellationToken cancellationToken } } - public Task ReadEventsBackwards(StreamName stream, int count, CancellationToken cancellationToken) - => throw new NotImplementedException(); + public async Task ReadEventsBackwards(StreamName stream, int count, CancellationToken cancellationToken) { + await using var connection = await OpenConnection(cancellationToken).NoContext(); + + await using var cmd = connection.GetStoredProcCommand(_schema.ReadStreamBackwards) + .Add("@stream_name", SqlDbType.NVarChar, stream.ToString()) + .Add("@count", SqlDbType.Int, count); + + try { + await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).NoContext(); + + var result = reader.ReadEvents(cancellationToken); + + return await result.Select(x => ToStreamEvent(x)).ToArrayAsync(cancellationToken).NoContext(); + } catch (SqlException e) when (e.Message.StartsWith("StreamNotFound")) { + throw new StreamNotFound(stream); + } + } public async Task AppendEvents( StreamName stream, @@ -130,11 +145,7 @@ CancellationToken cancellationToken ) => throw new NotImplementedException(); - public Task DeleteStream( - StreamName stream, - ExpectedStreamVersion expectedVersion, - CancellationToken cancellationToken - ) + public Task DeleteStream(StreamName stream, ExpectedStreamVersion expectedVersion, CancellationToken cancellationToken) => throw new NotImplementedException(); StreamEvent ToStreamEvent(PersistedEvent evt) { diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs index 304c380e..0760c4e6 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/IntegrationFixture.cs @@ -16,6 +16,7 @@ public sealed class IntegrationFixture : IAsyncLifetime { public IFixture Auto { get; } = new Fixture().Customize(new NodaTimeCustomization()); public GetSqlServerConnection GetConnection { get; private set; } = null!; public Faker Faker { get; } = new(); + public Schema Schema { get; set; } public string SchemaName { get; } @@ -35,15 +36,17 @@ public sealed class IntegrationFixture : IAsyncLifetime { public async Task InitializeAsync() { _sqlServer = new SqlEdgeBuilder() .WithImage("mcr.microsoft.com/azure-sql-edge:latest") + // .WithAutoRemove(false) + // .WithCleanUp(false) .Build(); await _sqlServer.StartAsync(); - var schema = new Schema(SchemaName); + Schema = new Schema(SchemaName); var connString = _sqlServer.GetConnectionString(); GetConnection = () => GetConn(connString); - await schema.CreateSchema(GetConnection); + await Schema.CreateSchema(GetConnection); DefaultEventSerializer.SetDefaultSerializer(Serializer); - EventStore = new SqlServerStore(GetConnection, new SqlServerStoreOptions(SchemaName), Serializer); + EventStore = new SqlServerStore(GetConnection, new SqlServerStoreOptions(SchemaName), Serializer); ActivitySource.AddActivityListener(_listener); return; diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/SubscriptionFixture.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/SubscriptionFixture.cs index de2d2834..d0907ac1 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/SubscriptionFixture.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Fixtures/SubscriptionFixture.cs @@ -20,7 +20,7 @@ public abstract class SubscriptionFixture : IClassFixture protected SubscriptionFixture( IntegrationFixture fixture, - ITestOutputHelper outputHelper, + ITestOutputHelper output, T handler, bool subscribeToAll, bool autoStart = true, @@ -31,7 +31,7 @@ protected SubscriptionFixture( _subscribeToAll = subscribeToAll; Stream = new StreamName(fixture.Auto.Create()); SchemaName = fixture.GetSchemaName(); - _loggerFactory = TestHelpers.Logging.GetLoggerFactory(outputHelper, logLevel); + _loggerFactory = TestHelpers.Logging.GetLoggerFactory(output, logLevel); _listener = new LoggingEventListener(_loggerFactory); SubscriptionId = $"test-{Guid.NewGuid():N}"; Handler = handler; @@ -39,6 +39,7 @@ protected SubscriptionFixture( } protected string SubscriptionId { get; } + protected Schema Schema { get; set; } protected ValueTask Start() => Subscription.SubscribeWithLog(Log); @@ -51,8 +52,8 @@ protected SubscriptionFixture( readonly ILoggerFactory _loggerFactory; public virtual async Task InitializeAsync() { - var schema = new Schema(SchemaName); - await schema.CreateSchema(_fixture.GetConnection); + Schema = new Schema(SchemaName); + await Schema.CreateSchema(_fixture.GetConnection); CheckpointStoreOptions = new SqlServerCheckpointStoreOptions { Schema = SchemaName }; CheckpointStore = new SqlServerCheckpointStore(_fixture.GetConnection, CheckpointStoreOptions); diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/LoadTest.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/LoadTest.cs new file mode 100644 index 00000000..42df2be3 --- /dev/null +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/LoadTest.cs @@ -0,0 +1,72 @@ +using System.Data; +using Eventuous.SqlServer; +using Eventuous.SqlServer.Extensions; +using Eventuous.Sut.App; +using Eventuous.Sut.Domain; +using Eventuous.Sut.Subs; +using Eventuous.Tests.SqlServer.Fixtures; +using Hypothesist; + +namespace Eventuous.Tests.SqlServer.Subscriptions; + +public class LoadTest : SubscriptionFixture { + readonly IntegrationFixture _fixture; + readonly BookingService _service; + + public LoadTest(IntegrationFixture fixture, ITestOutputHelper output) + : base(fixture, output, new TestEventHandler(), true, autoStart: false, logLevel: LogLevel.Debug) { + _fixture = fixture; + var eventStore = new SqlServerStore(fixture.GetConnection, new SqlServerStoreOptions(SchemaName)); + var store = new AggregateStore(eventStore); + _service = new BookingService(store); + } + + [Fact] + public async Task ProduceAndConsumeManyEvents() { + const int count = 55000; + Handler.AssertThat().Any(_ => true); + + var generateTask = Task.Run(() => GenerateAndHandleCommands(count)); + + await Start(); + await Task.Delay(TimeSpan.FromMinutes(7)); + await Stop(); + Handler.Count.Should().Be(count); + + var checkpoint = await CheckpointStore.GetLastCheckpoint(SubscriptionId, default); + checkpoint.Position.Value.Should().Be(count - 1); + + await using var connection = _fixture.GetConnection(); + await connection.OpenAsync(); + + await using var cmd = connection.GetStoredProcCommand(Schema.ReadAllBackwards) + .Add("@from_position", SqlDbType.BigInt, long.MaxValue) + .Add("@count", SqlDbType.Int, 1); + await using var reader = await cmd.ExecuteReaderAsync(CancellationToken.None); + + var result = reader.ReadEvents(CancellationToken.None); + + var lastEvent = await result.LastAsync(); + lastEvent.GlobalPosition.Should().Be(count - 1); + } + + async Task> GenerateAndHandleCommands(int count) { + var commands = Enumerable + .Range(0, count) + .Select(_ => DomainFixture.CreateImportBooking()) + .ToList(); + + foreach (var cmd in commands) { + var result = await _service.Handle(cmd, default); + + if (result is ErrorResult error) { + throw error.Exception ?? new Exception(error.Message); + } + } + + return commands; + } + + static BookingEvents.BookingImported ToEvent(Commands.ImportBooking cmd) + => new(cmd.RoomId, cmd.Price, cmd.CheckIn, cmd.CheckOut); +} diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToAll.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToAll.cs index a90264ab..a24f021d 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToAll.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToAll.cs @@ -30,7 +30,7 @@ public async Task ShouldConsumeProducedEvents() { await Start(); await Handler.Validate(2.Seconds()); - Handler.Count.Should().Be(10); + Handler.Count.Should().Be(testEvents.Count); await Stop(); } diff --git a/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToStream.cs b/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToStream.cs index ea93fe7e..fa7cb18f 100644 --- a/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToStream.cs +++ b/src/SqlServer/test/Eventuous.Tests.SqlServer/Subscriptions/SubscribeToStream.cs @@ -59,12 +59,7 @@ async Task> GenerateAndProduceEvents(int count) { var streamEvents = events.Select(x => new StreamEvent(Guid.NewGuid(), x, new Metadata(), "", 0)); - await _eventStore.AppendEvents( - Stream, - ExpectedStreamVersion.Any, - streamEvents.ToArray(), - default - ); + await _eventStore.AppendEvents(Stream, ExpectedStreamVersion.Any, streamEvents.ToArray(), default); return events; } diff --git a/test/Eventuous.Sut.Domain/BookingEvents.cs b/test/Eventuous.Sut.Domain/BookingEvents.cs index 3645a4b5..9b3f67ce 100644 --- a/test/Eventuous.Sut.Domain/BookingEvents.cs +++ b/test/Eventuous.Sut.Domain/BookingEvents.cs @@ -6,19 +6,13 @@ namespace Eventuous.Sut.Domain; public static class BookingEvents { [EventType("RoomBooked")] - public record RoomBooked( - string RoomId, - LocalDate CheckIn, - LocalDate CheckOut, - float Price, - string? GuestId = null - ); + public record RoomBooked(string RoomId, LocalDate CheckIn, LocalDate CheckOut, float Price, string? GuestId = null); [EventType("PaymentRegistered")] public record BookingPaymentRegistered( - string PaymentId, - float AmountPaid - ); + string PaymentId, + float AmountPaid + ); [EventType("OutstandingAmountChanged")] public record BookingOutstandingAmountChanged(float OutstandingAmount); @@ -33,12 +27,7 @@ public record BookingOverpaid(float OverpaidAmount); public record BookingCancelled; [EventType("V1.BookingImported")] - public record BookingImported( - string RoomId, - float Price, - LocalDate CheckIn, - LocalDate CheckOut - ); + public record BookingImported(string RoomId, float Price, LocalDate CheckIn, LocalDate CheckOut); // These constants are for test purpose, use inline names in real apps public static class TypeNames { diff --git a/test/Eventuous.Sut.Subs/TestEventHandler.cs b/test/Eventuous.Sut.Subs/TestEventHandler.cs index 509fa9a9..63be527c 100644 --- a/test/Eventuous.Sut.Subs/TestEventHandler.cs +++ b/test/Eventuous.Sut.Subs/TestEventHandler.cs @@ -13,6 +13,7 @@ public record TestEvent(string Data, int Number) { public class TestEventHandler(TimeSpan? delay = null, ITestOutputHelper? output = null) : BaseEventHandler { readonly TimeSpan _delay = delay ?? TimeSpan.Zero; + readonly string _id = Guid.NewGuid().ToString("N"); public int Count { get; private set; } @@ -27,9 +28,9 @@ public IHypothesis AssertThat() { public Task Validate(TimeSpan timeout) => EnsureHypothesis.Validate(timeout); public override async ValueTask HandleEvent(IMessageConsumeContext context) { - output?.WriteLine(context.Message!.ToString()); await Task.Delay(_delay); await EnsureHypothesis.Test(context.Message!, context.CancellationToken); + output?.WriteLine($"[{_id}] Handled event {context.GlobalPosition}, count is {Count}"); Count++; return EventHandlingStatus.Success;