diff --git a/CHANGELOG.md b/CHANGELOG.md index edc893cb9..93582449e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Added - `eqxShipping`: Unit and integration tests [#70](https://github.com/jet/dotnet-templates/pull/70) +- `eqxFc`: Fulfilment-Center inspired example utilizing Process Manager patterns with basic `Equinox.MemoryStore` and `Equinox.EventStore` tests [#40](https://github.com/jet/dotnet-templates/pulls/40) ### Changed ### Removed diff --git a/README.md b/README.md index 6996b39b4..ec4653cd4 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,8 @@ These templates focus solely on Consistent Processing using Equinox Stores: - [`eqxweb`](equinox-web/README.md) - Boilerplate for an ASP .NET Core 2 Web App, with an associated storage-independent Domain project using [Equinox](https://github.com/jet/equinox). - [`eqxwebcs`](equinox-web-csharp/README.md) - Boilerplate for an ASP .NET Core 2 Web App, with an associated storage-independent Domain project using [Equinox](https://github.com/jet/equinox), _ported to C#_. - [`eqxtestbed`](equinox-testbed/README.md) - Host that allows running back-to-back benchmarks when prototyping models using [Equinox](https://github.com/jet/equinox), using different stores and/or store configuration parameters. + +- [`eqxFc`](equinox-fc/README.md) - Samples showcasing various modeling and testing techniques such as (FsCheck-based) unit tests and use of `MemoryStore` for integration tests. ## [Propulsion](https://github.com/jet/propulsion) related diff --git a/dotnet-templates.sln b/dotnet-templates.sln index 950b0b6b6..ab160a065 100644 --- a/dotnet-templates.sln +++ b/dotnet-templates.sln @@ -97,6 +97,19 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain.Tests", "equinox-shi EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Watchdog.Integration", "equinox-shipping\Watchdog.Integration\Watchdog.Integration.fsproj", "{83BA87C3-6288-40F4-BC4F-EC3A54586CDF}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "eqxFc", "eqxFc", "{4946576F-1558-49ED-A272-6F4D92FB0031}" +ProjectSection(SolutionItems) = preProject + equinox-fc\.template.config\template.json = equinox-fc\.template.config\template.json + equinox-fc\README.md = equinox-fc\README.md + equinox-fc\Fc.sln = equinox-fc\Fc.sln +EndProjectSection +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "equinox-fc\Domain\Domain.fsproj", "{B3CFC965-6AB9-47E8-AA47-548A8D8A2E2C}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain.Tests", "equinox-fc\Domain.Tests\Domain.Tests.fsproj", "{49890A45-D6C2-4EF6-87AD-39960E03E254}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Web", "equinox-fc\Web\Web.fsproj", "{A6AA8FAA-0D2C-423D-B166-F4B3AF10C442}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -157,6 +170,18 @@ Global {9AFF6138-B63B-4EBF-B86B-4F626E1F1ADF}.Debug|Any CPU.Build.0 = Debug|Any CPU {9AFF6138-B63B-4EBF-B86B-4F626E1F1ADF}.Release|Any CPU.ActiveCfg = Release|Any CPU {9AFF6138-B63B-4EBF-B86B-4F626E1F1ADF}.Release|Any CPU.Build.0 = Release|Any CPU + {B3CFC965-6AB9-47E8-AA47-548A8D8A2E2C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B3CFC965-6AB9-47E8-AA47-548A8D8A2E2C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B3CFC965-6AB9-47E8-AA47-548A8D8A2E2C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B3CFC965-6AB9-47E8-AA47-548A8D8A2E2C}.Release|Any CPU.Build.0 = Release|Any CPU + {49890A45-D6C2-4EF6-87AD-39960E03E254}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {49890A45-D6C2-4EF6-87AD-39960E03E254}.Debug|Any CPU.Build.0 = Debug|Any CPU + {49890A45-D6C2-4EF6-87AD-39960E03E254}.Release|Any CPU.ActiveCfg = Release|Any CPU + {49890A45-D6C2-4EF6-87AD-39960E03E254}.Release|Any CPU.Build.0 = Release|Any CPU + {A6AA8FAA-0D2C-423D-B166-F4B3AF10C442}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A6AA8FAA-0D2C-423D-B166-F4B3AF10C442}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A6AA8FAA-0D2C-423D-B166-F4B3AF10C442}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A6AA8FAA-0D2C-423D-B166-F4B3AF10C442}.Release|Any CPU.Build.0 = Release|Any CPU {26BFE6BC-5887-4E40-8CFD-F15332F5A104}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {26BFE6BC-5887-4E40-8CFD-F15332F5A104}.Debug|Any CPU.Build.0 = Debug|Any CPU {26BFE6BC-5887-4E40-8CFD-F15332F5A104}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -169,6 +194,18 @@ Global {83BA87C3-6288-40F4-BC4F-EC3A54586CDF}.Debug|Any CPU.Build.0 = Debug|Any CPU {83BA87C3-6288-40F4-BC4F-EC3A54586CDF}.Release|Any CPU.ActiveCfg = Release|Any CPU {83BA87C3-6288-40F4-BC4F-EC3A54586CDF}.Release|Any CPU.Build.0 = Release|Any CPU + {8C92B728-85A5-4231-863A-E4236E46CC36}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8C92B728-85A5-4231-863A-E4236E46CC36}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8C92B728-85A5-4231-863A-E4236E46CC36}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8C92B728-85A5-4231-863A-E4236E46CC36}.Release|Any CPU.Build.0 = Release|Any CPU + {46B8B7C9-3334-4C13-A339-57571C14F2B9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {46B8B7C9-3334-4C13-A339-57571C14F2B9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {46B8B7C9-3334-4C13-A339-57571C14F2B9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {46B8B7C9-3334-4C13-A339-57571C14F2B9}.Release|Any CPU.Build.0 = Release|Any CPU + {6CB10946-AC56-4DE7-AB65-F6B13B86C703}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6CB10946-AC56-4DE7-AB65-F6B13B86C703}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6CB10946-AC56-4DE7-AB65-F6B13B86C703}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6CB10946-AC56-4DE7-AB65-F6B13B86C703}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {F66A5BFE-7C81-44DC-97DE-3FD8C83B8F06} = {B72FFAAE-7801-41B2-86F5-FD90E97A30F7} @@ -182,9 +219,14 @@ Global {D7ACBDF8-5F24-420F-9657-20096CE08B49} = {818D28A6-E6AB-4416-BDA6-1577C5D54447} {B6389F9E-A8E4-4BD7-B4C0-703B1A69BEA1} = {E7434881-8655-4C22-82CD-91ADB5123A73} {36C2D70A-F292-4481-8ADA-5066A80F92B2} = {1F3C9245-F973-43A3-97C9-5E527B93060C} + {B3CFC965-6AB9-47E8-AA47-548A8D8A2E2C} = {4946576F-1558-49ED-A272-6F4D92FB0031} + {49890A45-D6C2-4EF6-87AD-39960E03E254} = {4946576F-1558-49ED-A272-6F4D92FB0031} {7B96FCF8-0BB5-4494-A143-628882A6E50A} = {DAE9E2B9-EDA2-4064-B0CE-FD5294549374} {9AFF6138-B63B-4EBF-B86B-4F626E1F1ADF} = {DAE9E2B9-EDA2-4064-B0CE-FD5294549374} + {6CB10946-AC56-4DE7-AB65-F6B13B86C703} = {4946576F-1558-49ED-A272-6F4D92FB0031} {5A45EF21-576B-4B40-86BD-F5960ECD66BF} = {DAE9E2B9-EDA2-4064-B0CE-FD5294549374} {83BA87C3-6288-40F4-BC4F-EC3A54586CDF} = {DAE9E2B9-EDA2-4064-B0CE-FD5294549374} + {46B8B7C9-3334-4C13-A339-57571C14F2B9} = {4946576F-1558-49ED-A272-6F4D92FB0031} + {A6AA8FAA-0D2C-423D-B166-F4B3AF10C442} = {4946576F-1558-49ED-A272-6F4D92FB0031} EndGlobalSection EndGlobal diff --git a/equinox-fc/.template.config/template.json b/equinox-fc/.template.config/template.json new file mode 100644 index 000000000..135b57883 --- /dev/null +++ b/equinox-fc/.template.config/template.json @@ -0,0 +1,18 @@ +{ + "$schema": "http://json.schemastore.org/template", + "author": "@jet @bartelink", + "classifications": [ + "Equinox", + "Event Sourcing", + "Fc", + "Propulsion" + ], + "tags": { + "language": "F#" + }, + "identity": "Equinox.Fc", + "name": "Equinox Fc Example", + "shortName": "eqxFc", + "sourceName": "Fc", + "preferNameDirectory": true +} \ No newline at end of file diff --git a/equinox-fc/Domain.Tests/Domain.Tests.fsproj b/equinox-fc/Domain.Tests/Domain.Tests.fsproj new file mode 100644 index 000000000..edf669ae5 --- /dev/null +++ b/equinox-fc/Domain.Tests/Domain.Tests.fsproj @@ -0,0 +1,32 @@ + + + + netcoreapp2.1 + 5 + false + true + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/equinox-fc/Domain.Tests/Fixtures.fs b/equinox-fc/Domain.Tests/Fixtures.fs new file mode 100644 index 000000000..58dbe1e3e --- /dev/null +++ b/equinox-fc/Domain.Tests/Fixtures.fs @@ -0,0 +1,53 @@ +[] +module Fc.Domain.Tests.Fixtures + +open Serilog +open System + +module EnvVar = + + let tryGet k = Environment.GetEnvironmentVariable k |> Option.ofObj + +module EventStore = + + open Equinox.EventStore + let connect () = + match EnvVar.tryGet "EQUINOX_ES_HOST", EnvVar.tryGet "EQUINOX_ES_USERNAME", EnvVar.tryGet "EQUINOX_ES_PASSWORD" with + | Some h, Some u, Some p -> + let appName = "Domain.Tests" + let discovery = Discovery.GossipDns h + let connector = Connector(u, p, TimeSpan.FromSeconds 5., 5, Logger.SerilogNormal Serilog.Log.Logger) + let connection = connector.Establish(appName, discovery, ConnectionStrategy.ClusterSingle NodePreference.Master) |> Async.RunSynchronously + let context = Context(connection, BatchingPolicy(500)) + let cache = Equinox.Cache (appName, 10) + context, cache + | h, u, p -> + failwithf "Host, Username and Password EQUINOX_ES_* Environment variables are required (%b,%b,%b)" + (Option.isSome h) (Option.isSome u) (Option.isSome p) + +module TestOutputLogger = + + /// Adapts the XUnit ITestOutputHelper to be a Serilog Sink + type TestOutputAdapter(testOutput : Xunit.Abstractions.ITestOutputHelper) = + let template = "{Timestamp:HH:mm:ss.fff zzz} [{Level:u3}] {Message} {Properties}{NewLine}{Exception}" + let formatter = Serilog.Formatting.Display.MessageTemplateTextFormatter(template, null); + let writeSerilogEvent logEvent = + use writer = new System.IO.StringWriter() + formatter.Format(logEvent, writer) + let messageLine = string writer + testOutput.WriteLine messageLine + System.Diagnostics.Debug.Write messageLine + interface Serilog.Core.ILogEventSink with member __.Emit logEvent = writeSerilogEvent logEvent + + let create output = + let logger = TestOutputAdapter output + LoggerConfiguration().Destructure.FSharpTypes().WriteTo.Sink(logger).CreateLogger() + +(* Generic FsCheck helpers *) + +let (|Id|) (x : Guid) = x.ToString "N" |> FSharp.UMX.UMX.tag +let inline mkId () = Guid.NewGuid() |> (|Id|) +let (|Ids|) (xs : Guid[]) = xs |> Array.map (|Id|) +let (|IdsAtLeastOne|) (Ids xs, Id x) = [| yield x; yield! xs |] +let (|AtLeastOne|) (x, xs) = x::xs + diff --git a/equinox-fc/Domain.Tests/LocationEpochTests.fs b/equinox-fc/Domain.Tests/LocationEpochTests.fs new file mode 100644 index 000000000..9de8ec45d --- /dev/null +++ b/equinox-fc/Domain.Tests/LocationEpochTests.fs @@ -0,0 +1,55 @@ +module Fc.Domain.Tests.LocationEpochTests + +open FsCheck.Xunit +open Swensen.Unquote + +open Fc.Domain.Location.Epoch + +let decide transactionId delta _balance = + match delta with + | 0 -> (), [] + | delta when delta < 0 -> (), [Events.Removed {| delta = -delta; id = transactionId |}] + | delta -> (), [Events.Added {| delta = delta; id = transactionId |}] + +let verifyDeltaEvent transactionId delta events = + let dEvents = events |> List.filter (function Events.Added _ | Events.Removed _ -> true | _ -> false) + test <@ decide transactionId delta (Unchecked.defaultof<_>) = ((), dEvents) @> + +let [] properties transactionId carriedForward delta1 closeImmediately delta2 close = + + (* Starting with an empty stream, we'll need to supply the balance carried forward, optionally we apply a delta and potentially close *) + + let initialShouldClose _state = closeImmediately + let res, events = + sync (Some carriedForward) (decide transactionId delta1) initialShouldClose Fold.initial + let cfEvents events = events |> List.choose (function Events.CarriedForward e -> Some e | _ -> None) + let closeEvents events = events |> List.filter (function Events.Closed -> true | _ -> false) + let state1 = Fold.fold Fold.initial events + let expectedBalance = carriedForward.initial + delta1 + // Only expect closing if it was requested + let expectImmediateClose = closeImmediately + let (Fold.Current bal) = res.history + test <@ Option.isSome res.result + && expectedBalance = bal @> + test <@ carriedForward = List.head (cfEvents events) + && (not expectImmediateClose || 1 = Seq.length (closeEvents events)) @> + verifyDeltaEvent transactionId delta1 events + + (* After initializing, validate we don't need to supply a carriedForward, and don't produce a CarriedForward event *) + + let shouldClose _state = close + let { isOpen = isOpen; result = worked; history = (Fold.Current bal) }, events = + sync None (decide transactionId delta2) shouldClose state1 + let expectedBalance = if expectImmediateClose then expectedBalance else expectedBalance + delta2 + test <@ [] = cfEvents events + && (expectImmediateClose || not close || 1 = Seq.length (closeEvents events)) @> + test <@ (expectImmediateClose || close || isOpen) + && expectedBalance = bal @> + if not expectImmediateClose then + test <@ Option.isSome worked @> + verifyDeltaEvent transactionId delta2 events + +let [] ``codec can roundtrip`` event = + let ee = Events.codec.Encode(None, event) + let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) + test <@ Some event = Events.codec.TryDecode ie @> diff --git a/equinox-fc/Domain.Tests/LocationSeriesTests.fs b/equinox-fc/Domain.Tests/LocationSeriesTests.fs new file mode 100644 index 000000000..fb59a5e85 --- /dev/null +++ b/equinox-fc/Domain.Tests/LocationSeriesTests.fs @@ -0,0 +1,45 @@ +module Fc.Domain.Tests.LocationSeriesTests + +open FsCheck.Xunit +open FSharp.UMX +open Swensen.Unquote + +open Fc.Domain.Location.Series + +let [] properties c1 c2 = + let events = interpretAdvanceIngestionEpoch c1 Fold.initial + let state1 = Fold.fold Fold.initial events + let epoch0 = %0 + match c1, events, state1 with + // Started events are not written for < 0 + | n, [], activeEpoch when n < epoch0 -> + test <@ None = activeEpoch @> + // Any >=0 value should trigger a Started event, initially + | n, [Events.Started { epoch = ee }], Some activatedEpoch -> + test <@ n >= epoch0 && n = ee && n = activatedEpoch @> + // Nothing else should yield events + | _, l, _ -> + test <@ List.isEmpty l @> + + let events = interpretAdvanceIngestionEpoch c2 state1 + let state2 = Fold.fold state1 events + match state1, c2, events, state2 with + // Started events are not written for < 0 + | None, n, [], activeEpoch when n < epoch0 -> + test <@ None = activeEpoch @> + // Any >= 0 epochId should trigger a Started event if first command didnt do anything + | None, n, [Events.Started { epoch = ee }], Some activatedEpoch -> + let eEpoch = %ee + test <@ n >= epoch0 && n = eEpoch && n = activatedEpoch @> + // Any higher epochId should trigger a Started event (gaps are fine - we are only tying to reduce walks) + | Some s1, n, [Events.Started { epoch = ee }], Some activatedEpoch -> + let eEpoch = %ee + test <@ n > s1 && n = eEpoch && n > epoch0 && n = activatedEpoch @> + // Nothing else should yield events + | _, _, l, _ -> + test <@ List.isEmpty l @> + +let [] ``codec can roundtrip`` event = + let ee = Events.codec.Encode(None, event) + let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) + test <@ Some event = Events.codec.TryDecode ie @> diff --git a/equinox-fc/Domain.Tests/LocationTests.fs b/equinox-fc/Domain.Tests/LocationTests.fs new file mode 100644 index 000000000..4e4a67645 --- /dev/null +++ b/equinox-fc/Domain.Tests/LocationTests.fs @@ -0,0 +1,75 @@ +module Fc.Domain.Tests.LocationTests + +open FsCheck.Xunit +open FSharp.UMX +open Swensen.Unquote +open System + +open Fc.Domain +open Fc.Domain.Location + +/// Helpers to match `module Cosmos/EventStore` wrapping inside the impl +module Location = + + module MemoryStore = + + open Equinox.MemoryStore + + module Series = + + let resolve store = Resolver(store, Series.Events.codec, Series.Fold.fold, Series.Fold.initial).Resolve + + module Epoch = + + let resolve store = Resolver(store, Epoch.Events.codec, Epoch.Fold.fold, Epoch.Fold.initial).Resolve + + let create (zeroBalance, toBalanceCarriedForward, shouldClose) store = + let maxAttempts = Int32.MaxValue + let series = Series.create (fun (id, _opt) -> Series.resolve store id) maxAttempts + let epochs = Epoch.create (Epoch.resolve store) maxAttempts + create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs) + +let run (service : Service) (IdsAtLeastOne locations, deltas : _[], transactionId) = Async.RunSynchronously <| async { + let runId = mkId () // Need to make making state in store unique when replaying or shrinking + let locations = locations |> Array.map (fun x -> % (sprintf "%O/%O" x runId)) + + let updates = deltas |> Seq.mapi (fun i x -> locations.[i % locations.Length], x) |> Seq.cache + + (* Apply random deltas *) + + let adjust delta (state : Epoch.Fold.State) = + let (Epoch.Fold.Balance bal) = state + let value = max -bal delta + if value = 0 then 0, [] + elif value < 0 then value, [Epoch.Events.Removed {| delta = -value; id = transactionId |}] + else value, [Epoch.Events.Added {| delta = value; id = transactionId |}] + let! appliedDeltas = seq { for loc, x in updates -> async { let! eff = service.Execute(loc, adjust x) in return loc,eff } } |> Async.Parallel + let expectedBalances = Seq.append (seq { for l in locations -> l, 0}) appliedDeltas |> Seq.groupBy fst |> Seq.map (fun (l, xs) -> l, xs |> Seq.sumBy snd) |> Set.ofSeq + + (* Verify loading yields identical state *) + + let! balances = seq { for loc in locations -> async { let! bal = service.Execute(loc,(fun (Epoch.Fold.Balance bal) -> bal, [])) in return loc,bal } } |> Async.Parallel + test <@ expectedBalances = Set.ofSeq balances @> } + +let [] ``MemoryStore properties`` epochLen args = + let store = Equinox.MemoryStore.VolatileStore() + + let epochLen, idsWindow = max 1 epochLen, 5 + let zero, cf, sc = Epoch.zeroBalance, Epoch.toBalanceCarriedForward idsWindow, Epoch.shouldClose epochLen + + let service = Location.MemoryStore.create (zero, cf, sc) store + run service args + +type EventStore(testOutput) = + + let log = TestOutputLogger.create testOutput + do Serilog.Log.Logger <- log + + let context, cache = EventStore.connect () + + let [] properties epochLen args = + let epochLen, idsWindow = max 1 epochLen, 5 + let zero, cf, sc = Epoch.zeroBalance, Epoch.toBalanceCarriedForward idsWindow, Epoch.shouldClose epochLen + + let service = Location.EventStore.create (zero, cf, sc) (context, cache, 50) + run service args diff --git a/equinox-fc/Domain/Domain.fsproj b/equinox-fc/Domain/Domain.fsproj new file mode 100644 index 000000000..3d0c2a748 --- /dev/null +++ b/equinox-fc/Domain/Domain.fsproj @@ -0,0 +1,24 @@ + + + + netstandard2.0 + 5 + true + + + + + + + + + + + + + + + + + + diff --git a/equinox-fc/Domain/Infrastructure.fs b/equinox-fc/Domain/Infrastructure.fs new file mode 100644 index 000000000..febe0e6b8 --- /dev/null +++ b/equinox-fc/Domain/Infrastructure.fs @@ -0,0 +1,29 @@ +namespace global + +open FSharp.UMX // see https://github.com/fsprojects/FSharp.UMX - % operator and ability to apply units of measure to Guids+strings + +type LocationId = string +and [] locationId +module LocationId = + let parse (value : string) : LocationId = %value + let toString (value : LocationId) : string = %value + +type LocationEpochId = int +and [] locationEpochId +module LocationEpochId = + let parse (value : int) : LocationEpochId = %value + let next (value : LocationEpochId) : LocationEpochId = % (%value + 1) + let toString (value : LocationEpochId) : string = string %value + +type InventoryId = string +and [] inventoryId +module InventoryId = + let parse (value : string) : InventoryId = %value + let toString (value : InventoryId) : string = %value + +type InventoryTransactionId = string +and [] inventoryTransactionId +module InventoryTransactionId = + let parse (value : string) : InventoryTransactionId = %value + let (|Parse|) = parse + let toString (value : InventoryTransactionId) : string = %value diff --git a/equinox-fc/Domain/Inventory.fs b/equinox-fc/Domain/Inventory.fs new file mode 100644 index 000000000..6d6578df4 --- /dev/null +++ b/equinox-fc/Domain/Inventory.fs @@ -0,0 +1,63 @@ +namespace Fc.Domain.Inventory + +open Equinox.Core // we use Equinox's AsyncCacheCell helper below + +type internal IdsCache<'Id>() = + let all = System.Collections.Concurrent.ConcurrentDictionary<'Id, unit>() // Bounded only by relatively low number of physical pick tickets IRL + static member Create init = let x = IdsCache() in x.Add init; x + member __.Add ids = for x in ids do all.[x] <- () + member __.Contains id = all.ContainsKey id + +/// Ingests items into a log of items, making a best effort at deduplicating as it writes +/// Prior to first add, reads recent ids, in order to minimize the number of duplicated Ids we ingest +type Service internal (inventoryId, epochs : Epoch.Service) = + + static let log = Serilog.Log.ForContext() + + // We want max one request in flight to establish the pre-existing Events from which the TransactionIds cache will be seeded + let previousIds : AsyncCacheCell> = + let read = async { let! r = epochs.TryIngest(inventoryId, Seq.empty) in return r.transactionIds } + AsyncCacheCell read + + // TransactionIds cache - used to maintain a list of transactions that have already been ingested in order to avoid db round-trips + let previousIds : AsyncCacheCell> = AsyncCacheCell <| async { + let! previousIds = previousIds.AwaitValue() + return IdsCache.Create(previousIds) } + + let tryIngest events = async { + let! previousIds = previousIds.AwaitValue() + + let rec aux totalIngested items = async { + let SeqPartition f = Seq.toArray >> Array.partition f + let dup, fresh = items |> SeqPartition (Epoch.Events.chooseInventoryTransactionId >> Option.exists previousIds.Contains) + let fullCount = List.length items + let dropping = fullCount - Array.length fresh + if dropping <> 0 then log.Information("Ignoring {count}/{fullCount} duplicate ids: {ids}", dropping, fullCount, dup) + if Array.isEmpty fresh then + return totalIngested + else + let! res = epochs.TryIngest(inventoryId, fresh) + log.Information("Added {count} items to {inventoryId:l}", res.added, inventoryId) + // The adding is potentially redundant; we don't care + previousIds.Add res.transactionIds + let totalIngestedTransactions = totalIngested + res.added + return totalIngestedTransactions } + return! aux 0 events + } + + /// Upon startup, we initialize the TransactionIds cache with recent epochs; we want to kick that process off before our first ingest + member __.Initialize() = previousIds.AwaitValue() |> Async.Ignore + + /// Feeds the events into the sequence of transactions. Returns the number actually added [excluding duplicates] + member __.Ingest(events : Epoch.Events.Event list) : Async = tryIngest events + +module internal Helpers = + + let create inventoryId epochs = + Service(inventoryId, epochs) + +module EventStore = + + let create inventoryId (context, cache) = + let epochs = Epoch.EventStore.create (context, cache) + Helpers.create inventoryId epochs diff --git a/equinox-fc/Domain/InventoryEpoch.fs b/equinox-fc/Domain/InventoryEpoch.fs new file mode 100644 index 000000000..eff0cb5a8 --- /dev/null +++ b/equinox-fc/Domain/InventoryEpoch.fs @@ -0,0 +1,74 @@ +/// Manages the ingestion (and deduplication based on a TransactionId) of events reflecting transfers or stock adjustments +/// that have been effected across a given set of Inventory +/// See Inventory.Service for surface level API which manages the ingestion +module Fc.Domain.Inventory.Epoch + +let [] Category = "InventoryEpoch" +let streamName inventoryId = FsCodec.StreamName.compose Category [InventoryId.toString inventoryId; "0"] + +// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +[] +module Events = + + type TransactionRef = { transactionId : InventoryTransactionId } + + type Event = + | Adjusted of TransactionRef + | Transferred of TransactionRef + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + + /// Used for deduplicating input events + let chooseInventoryTransactionId = function + | Adjusted { transactionId = id } | Transferred { transactionId = id } -> Some id + +module Fold = + + type State = { closed : bool; ids : Set } + let initial = { closed = false; ids = Set.empty } + let evolve state = function + | (Events.Adjusted e | Events.Transferred e) -> { state with ids = Set.add e.transactionId state.ids } + let fold : State -> Events.Event seq -> State = Seq.fold evolve + +type Result = + { /// Count of items added to this epoch. May be less than requested due to removal of duplicates and/or rejected items + added : int + /// identifiers for all items in this epoch + transactionIds : Set } + +let decideSync events (state : Fold.State) : Result * Events.Event list = + let isFresh = function + | Events.Adjusted { transactionId = id } + | Events.Transferred { transactionId = id } -> (not << state.ids.Contains) id + let news = events |> Seq.filter isFresh |> List.ofSeq + let newCount = List.length news + let events = [ if newCount <> 0 then yield! news ] + let state' = Fold.fold state events + { added = newCount; transactionIds = state'.ids }, events + +type Service internal (resolve : InventoryId -> Equinox.Stream) = + + /// Attempt ingestion of `events` into the cited Epoch. + /// - None will be accepted if the Epoch is `closed` + /// - The `capacity` function will be passed a non-closed `state` in order to determine number of items that can be admitted prior to closing + /// - If the computed capacity result is >= the number of items being submitted (which may be 0), the Epoch will be marked Closed + /// NOTE the result may include rejected items (which the caller is expected to feed into a successor epoch) + member __.TryIngest(inventoryId, events) : Async = + let stream = resolve inventoryId + stream.Transact(decideSync events) + +let create resolve = + let resolve ids = + let stream = resolve (streamName ids) + Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts=2) + Service(resolve) + +module EventStore = + + open Equinox.EventStore + + // No use of RollingSnapshots as we're intentionally making an epoch short enough to simply read any time + let create (context, cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy) + create resolver.Resolve diff --git a/equinox-fc/Domain/Location.fs b/equinox-fc/Domain/Location.fs new file mode 100644 index 000000000..dddfd29fb --- /dev/null +++ b/equinox-fc/Domain/Location.fs @@ -0,0 +1,48 @@ +namespace Fc.Domain.Location + +[] +type Wip<'R> = + | Pending of decide : (Epoch.Fold.State -> 'R * Epoch.Events.Event list) + | Complete of 'R + +/// Manages Reads and Writes for a Series of Epochs, with a running total being carried forward to the next Epoch when it's marked Closed +type Service internal (zeroBalance, toBalanceCarriedForward, shouldClose, series : Series.Service, epochs : Epoch.Service) = + + let execute locationId originEpochId = + let rec aux epochId balanceToCarryForward wip = async { + let decide state = match wip with Complete r -> r, [] | Pending decide -> decide state + match! epochs.Sync(locationId, epochId, balanceToCarryForward, decide, shouldClose) with + | { result = Some res; isOpen = true } -> + if originEpochId <> epochId then + do! series.AdvanceIngestionEpoch(locationId, epochId) + return res + | { history = history; result = Some res } -> + let successorEpochId = LocationEpochId.next epochId + let cf = toBalanceCarriedForward history + return! aux successorEpochId (Some cf) (Complete res) + | { history = history } -> + let successorEpochId = LocationEpochId.next epochId + let cf = toBalanceCarriedForward history + return! aux successorEpochId (Some cf) wip } + aux + + member __.Execute(locationId, decide) = async { + let! activeEpoch = series.TryReadIngestionEpoch locationId + let originEpochId, epochId, balanceCarriedForward = + match activeEpoch with + | None -> LocationEpochId.parse -1, LocationEpochId.parse 0, Some zeroBalance + | Some activeEpochId -> activeEpochId, activeEpochId, None + return! execute locationId originEpochId epochId balanceCarriedForward (Pending decide)} + +[] +module Helpers = + + let create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs) = + Service(zeroBalance, toBalanceCarriedForward, shouldClose, series, epochs) + +module EventStore = + + let create (zeroBalance, toBalanceCarriedForward, shouldClose) (context, cache, maxAttempts) = + let series = Series.EventStore.create (context, cache, maxAttempts) + let epochs = Epoch.EventStore.create (context, cache, maxAttempts) + create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs) diff --git a/equinox-fc/Domain/LocationEpoch.fs b/equinox-fc/Domain/LocationEpoch.fs new file mode 100644 index 000000000..1b0422e05 --- /dev/null +++ b/equinox-fc/Domain/LocationEpoch.fs @@ -0,0 +1,165 @@ +/// Manages Stock adjustments and deltas for a given Location +/// Provides for controlled opening and closing of an epoch, carrying forward incoming balances when a given Epoch reaches a 'full' state +/// See Location.Service for the logic that allows competing readers/writers to co-operate in bringing this about +module Fc.Domain.Location.Epoch + +let [] Category = "LocationEpoch" +let streamName (locationId, epochId) = FsCodec.StreamName.compose Category [LocationId.toString locationId; LocationEpochId.toString epochId] + +// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +[] +module Events = + + type CarriedForward = { initial : int; recentTransactions : InventoryTransactionId[] } + type Event = + | CarriedForward of CarriedForward + | Added of {| delta : int; id : InventoryTransactionId |} + | Removed of {| delta : int; id : InventoryTransactionId |} + | Reset of {| value : int; id : InventoryTransactionId |} + | Closed + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + +module Fold = + + type State = + | Initial + | Open of Record list // reverse order, i.e. most recent first + | Closed of Record list // trimmed + and Record = + | Init of Events.CarriedForward + | Step of Step + and Step = { balance : Balance; id : InventoryTransactionId } + and Balance = int + let initial = Initial + let (|Current|) = function + | (Init { initial = bal } | Step { balance = bal }) :: _ -> bal + | [] -> failwith "Cannot transact when no CarriedForward" + let (|History|) = function Initial -> [] | (Open history | Closed history) -> history + let (|Balance|) (History h) = (|Current|) h + let evolve state event = + match event, state with + | Events.CarriedForward e, Initial -> Open [Init e] + | Events.Added e, Open (Current cur as log) -> Open (Step { id = e.id ; balance = cur + e.delta } :: log) + | Events.Removed e, Open (Current cur as log) -> Open (Step { id = e.id ; balance = cur - e.delta } :: log) + | Events.Reset e, Open log -> Open (Step { id = e.id ; balance = e.value } :: log) + | Events.Closed, Open log -> Closed log + | Events.CarriedForward _, (Open _ | Closed _ as x) -> failwithf "CarriedForward : Unexpected %A" x + | (Events.Added _ | Events.Removed _ | Events.Reset _ | Events.Closed) as e, (Initial | Closed _ as s) -> + failwithf "Unexpected %A when %A" e s + let fold = Seq.fold evolve + +let zeroBalance : Events.CarriedForward = + { initial = 0 + recentTransactions = [||] } +let shouldClose maxEvents deltas = + List.length deltas > maxEvents +let toBalanceCarriedForward idsRetentionWindow (history : Fold.Record list) : Events.CarriedForward = + let steps = + history + |> Seq.choose (function Fold.Record.Step s -> Some s | Fold.Record.Init _ -> None) + |> Seq.rev + |> Seq.cache + { initial = let final = Seq.head steps in final.balance + recentTransactions = seq { for x in steps -> x.id } |> Seq.truncate idsRetentionWindow |> Array.ofSeq } + +/// Holds events accumulated from a series of decisions while also evolving the presented `state` to reflect the pended events +type private Accumulator() = + let acc = ResizeArray() + member __.Ingest state : 'res * Events.Event list -> 'res * Fold.State = function + | res, [] -> res, state + | res, [e] -> acc.Add e; res, Fold.evolve state e + | res, xs -> acc.AddRange xs; res, Fold.fold state (Seq.ofList xs) + member __.Accumulated = List.ofSeq acc + +type Result<'t> = { history : Fold.Record list; result : 't option; isOpen : bool } + +let sync (carriedForward : Events.CarriedForward option) + (decide : Fold.State -> 't * Events.Event list) + shouldClose + state + : Result<'t> * Events.Event list = + + let acc = Accumulator() + // 1. Guarantee a CarriedForward event at the start of any Epoch's event stream + let (), state = + acc.Ingest state <| + match state with + | Fold.Initial -> (), [Events.CarriedForward (Option.get carriedForward )] + | Fold.Open _ | Fold.Closed _ -> (), [] + // 2. Transact (unless we determine we're in Closed state) + let result, state = + acc.Ingest state <| + match state with + | Fold.Initial -> failwith "We've just guaranteed not Initial" + | Fold.Open _ -> let r, es = decide state in Some r, es + | Fold.Closed _ -> None, [] + // 3. Finally (iff we're `Open`, have run a `decide` and `shouldClose`), we generate a Closed event + let (history, isOpen), _ = + acc.Ingest state <| + match state with + | Fold.Initial -> failwith "Can't be Initial" + | Fold.Open history -> + if shouldClose history then (history, false), [Events.Closed] + else (history, true), [] + | Fold.Closed history -> (history, false), [] + { history = history; result = result; isOpen = isOpen }, acc.Accumulated + +type DupCheckResult = NotDuplicate | IdempotentInsert of Fold.Balance | DupCarriedForward +let private tryFindDup transactionId (history : Fold.Record list) = + let tryMatch : Fold.Record -> Fold.Balance option option = function + | Fold.Step { balance = bal; id = id } when id = transactionId -> Some (Some bal) + | Fold.Init { recentTransactions = prevs } when prevs |> Array.contains transactionId -> Some None + | _ -> None + match history |> Seq.tryPick tryMatch with + | None -> NotDuplicate + | Some None -> DupCarriedForward + | Some (Some bal) -> IdempotentInsert bal + +type Command = + | Reset of value : int + | Add of delta : int + | Remove of delta : int + +type Result = Denied | Accepted of Fold.Balance | DupFromPreviousEpoch + +let decide transactionId command (state: Fold.State) = + match state with + | Fold.Closed _ | Fold.Initial -> failwithf "Cannot apply in state %A" state + | Fold.Open (Fold.Current cur as history) -> + + match tryFindDup transactionId history with + | IdempotentInsert bal -> Accepted bal, [] + | DupCarriedForward -> DupFromPreviousEpoch, [] + | NotDuplicate -> + + let accepted, events = + match command with + | Reset value -> true, [Events.Reset {| value = value; id = transactionId |}] + | Add delta -> true, [Events.Added {| delta = delta; id = transactionId |}] + | Remove delta when delta > cur -> false, [] + | Remove delta -> true, [Events.Removed {| delta = delta; id = transactionId |}] + match Fold.fold state events with + | Fold.Open (Fold.Current cur) -> (if accepted then Accepted cur else Denied), events + | s -> failwithf "Unexpected state %A" s + +type Service internal (resolve : LocationId * LocationEpochId -> Equinox.Stream) = + + member __.Sync<'R>(locationId, epochId, prevEpochBalanceCarriedForward, decide, shouldClose) : Async> = + let stream = resolve (locationId, epochId) + stream.Transact(sync prevEpochBalanceCarriedForward decide shouldClose) + +let create resolve maxAttempts = + let resolve locationId = + let stream = resolve (streamName locationId) + Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts=maxAttempts) + Service (resolve) + +module EventStore = + + open Equinox.EventStore + + let create (context, cache, maxAttempts) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy) + create resolver.Resolve maxAttempts diff --git a/equinox-fc/Domain/LocationSeries.fs b/equinox-fc/Domain/LocationSeries.fs new file mode 100644 index 000000000..d58274aa8 --- /dev/null +++ b/equinox-fc/Domain/LocationSeries.fs @@ -0,0 +1,56 @@ +/// Manages the active epoch for a given Location +module Fc.Domain.Location.Series + +let [] Category = "LocationSeries" +let streamName locationId = FsCodec.StreamName.create Category (LocationId.toString locationId) + +// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +[] +module Events = + + type Started = { epoch : LocationEpochId } + type Event = + | Started of Started + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + +module Fold = + + type State = LocationEpochId option + let initial : State = None + let private evolve _state = function + | Events.Started e -> Some e.epoch + let fold = Seq.fold evolve + +let interpretAdvanceIngestionEpoch (epochId : LocationEpochId) (state : Fold.State) = + if epochId < LocationEpochId.parse 0 then [] else + + [if state |> Option.forall (fun s -> s < epochId) then yield Events.Started { epoch = epochId }] + +type Service internal (resolve : LocationId -> Equinox.Stream) = + + member __.TryReadIngestionEpoch(locationId) : Async = + let stream = resolve locationId + stream.Query id + + member __.AdvanceIngestionEpoch(locationId, epochId) : Async = + let stream = resolve locationId + stream.Transact(interpretAdvanceIngestionEpoch epochId) + +let create resolve maxAttempts = + let opt = Equinox.ResolveOption.AllowStale + let resolve locationId = + let stream = resolve (streamName locationId, opt) + Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts=maxAttempts) + Service(resolve) + +module EventStore = + + open Equinox.EventStore + + let accessStrategy = AccessStrategy.LatestKnownEvent + let create (context, cache, maxAttempts) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) + let resolve (id, opt) = resolver.Resolve(id, opt) + create resolve maxAttempts diff --git a/equinox-fc/Domain/StockProcessManager.fs b/equinox-fc/Domain/StockProcessManager.fs new file mode 100644 index 000000000..4722c3c29 --- /dev/null +++ b/equinox-fc/Domain/StockProcessManager.fs @@ -0,0 +1,57 @@ +module Fc.Domain.StockProcessManager + +type Service(transactions : StockTransaction.Service, locations : Location.Service, inventory : Inventory.Service) = + + let execute transactionId = + let f = Location.Epoch.decide transactionId + let rec aux update = async { + let! action = transactions.Apply(transactionId, update) + let aux event = aux (Some event) + match action with + | StockTransaction.Adjust (loc, bal) -> + match! locations.Execute(loc, f (Location.Epoch.Reset bal)) with + | Location.Epoch.Accepted _ -> return! aux StockTransaction.Events.Adjusted + | Location.Epoch.Denied -> return failwith "Cannot Deny Reset" + | Location.Epoch.DupFromPreviousEpoch -> return failwith "TODO walk back to previous epoch" + | StockTransaction.Remove (loc, delta) -> + match! locations.Execute(loc, f (Location.Epoch.Remove delta)) with + | Location.Epoch.Accepted bal -> return! aux (StockTransaction.Events.Removed { balance = bal }) + | Location.Epoch.Denied -> return! aux StockTransaction.Events.Failed + | Location.Epoch.DupFromPreviousEpoch -> return failwith "TODO walk back to previous epoch" + | StockTransaction.Add (loc, delta) -> + match! locations.Execute(loc, f (Location.Epoch.Add delta)) with + | Location.Epoch.Accepted bal -> return! aux (StockTransaction.Events.Added { balance = bal }) + | Location.Epoch.Denied -> return failwith "Cannot Deny Add" + | Location.Epoch.DupFromPreviousEpoch -> return failwith "TODO walk back to previous epoch" + | StockTransaction.Log (StockTransaction.Adjusted _) -> + let! _count = inventory.Ingest([Inventory.Epoch.Events.Adjusted { transactionId = transactionId }]) + return! aux StockTransaction.Events.Logged + | StockTransaction.Log (StockTransaction.Transferred _) -> + let! _count = inventory.Ingest([Inventory.Epoch.Events.Transferred { transactionId = transactionId }]) + return! aux StockTransaction.Events.Logged + | StockTransaction.Finish success -> + return success + } + aux + let run transactionId req = execute transactionId (Some req) + + member __.Adjust(transactionId, location, quantity) = + run transactionId (StockTransaction.Events.AdjustmentRequested { location = location; quantity = quantity }) + + member __.TryTransfer(transactionId, source, destination, quantity) = + run transactionId (StockTransaction.Events.TransferRequested { source = source; destination = destination; quantity = quantity }) + + /// Used by Watchdog to force conclusion of a transaction whose progress has stalled + member __.Drive(transactionId) = async { + let! _ = execute transactionId None in () } + +module EventStore = + + let create (context, cache) inventoryId (epochLen, idsWindow, maxAttempts) = + let transactions, locations, inventory = + let transactions = StockTransaction.EventStore.create (context, cache) + let zero, cf, sc = Location.Epoch.zeroBalance, Location.Epoch.toBalanceCarriedForward idsWindow, Location.Epoch.shouldClose epochLen + let locations = Location.EventStore.create (zero, cf, sc) (context, cache, maxAttempts) + let inventory = Inventory.EventStore.create inventoryId (context, cache) + transactions, locations, inventory + Service(transactions, locations, inventory) diff --git a/equinox-fc/Domain/StockTransaction.fs b/equinox-fc/Domain/StockTransaction.fs new file mode 100644 index 000000000..f21d6513f --- /dev/null +++ b/equinox-fc/Domain/StockTransaction.fs @@ -0,0 +1,154 @@ +/// Process Manager used to: +/// - Coordinate competing attempts to transfer quantities from stock; if balance is 3 one of contesting requests to remove 2 or 3 items must reach `Failed` +/// - maintain rolling balance of stock levels per Location +/// - while recording any transfers or adjustment in an overall Inventory record +/// The Process is driven by two collaborating actors: +/// 1) The 'happy path', where a given actor is executing the known steps of the command flow +/// In the normal case, such an actor will bring the flow to a terminal state (Completed or Failed) +/// 2) A watchdog-projector, which reacts to observed events in this Category by stepping in to complete in-flight requests that have stalled +/// This represents the case where a 'happy path' actor died, or experienced another impediment on the path. +module Fc.Domain.StockTransaction + +let [] Category = "InventoryTransaction" +let streamName transactionId = FsCodec.StreamName.create Category (InventoryTransactionId.toString transactionId) + +// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +[] +module Events = + + type AdjustmentRequested = { location : LocationId; quantity : int } + type TransferRequested = { source : LocationId; destination : LocationId; quantity : int } + type Removed = { balance : int } + type Added = { balance : int } + + type Event = + (* Stock Adjustment Flow *) + | AdjustmentRequested of AdjustmentRequested + | Adjusted + + (* Stock Transfer Flow *) + | TransferRequested of TransferRequested + | Failed // terminal + | Removed of Removed + | Added of Added + + (* Successful completion *) + | Logged + | Completed + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + +type Action = + | Adjust of LocationId * int + | Remove of LocationId * int + | Add of LocationId * int + | Log of LoggingState + | Finish of success : bool +and LoggingState = + | Adjusted of Events.AdjustmentRequested + | Transferred of Added +and Added = { request : Events.TransferRequested; removed : Events.Removed; added : Events.Added } + +module Fold = + + type State = + | Initial + | Running of RunningState + | Logging of LoggingState + | Completed of TerminalState + and RunningState = + | Adjust of Events.AdjustmentRequested + | Transfer of TransferState + and TransferState = + | Requested of Events.TransferRequested + | Adding of Removed + and TerminalState = + | Adjusted of Events.AdjustmentRequested + | Transferred of Added + | TransferFailed of Events.TransferRequested + and Removed = { request : Events.TransferRequested; removed : Events.Removed } + let initial = Initial + let evolve state event = + match state, event with + (* Adjustment Process *) + | Initial, Events.AdjustmentRequested r -> + Running (Adjust r) + | Running (Adjust r), Events.Adjusted -> + Logging (LoggingState.Adjusted r) + + (* Transfer Process *) + | Initial, Events.TransferRequested e -> + Running (Transfer (Requested e)) + + | Running (Transfer (Requested s)), Events.Failed -> + Completed (TransferFailed s) + + | Running (Transfer (Requested s)), Events.Removed e -> + Running (Transfer (Adding { request = s; removed = e })) + | Running (Transfer (Adding s)), Events.Added e -> + Logging (LoggingState.Transferred { request = s.request; removed = s.removed; added = e }) + + (* Log result *) + | Logging (LoggingState.Adjusted s), Events.Logged -> + Completed (Adjusted s) + | Logging (LoggingState.Transferred s), Events.Logged -> + Completed (Transferred s) + + (* Any disallowed state changes represent gaps in the model, so we fail fast *) + | state, event -> failwithf "Unexpected %A when %A" event state + let fold : State -> Events.Event seq -> State = Seq.fold evolve + + /// Validates an event actually represents an acceptable, non-redundant state transition + let filter event state = + match state, event with + | Initial, Events.AdjustmentRequested _ + | Initial, Events.TransferRequested _ + | Running (Adjust _), Events.Adjusted + | Running (Transfer (Requested _)), Events.Failed + | Running (Transfer (Requested _)), Events.Removed _ + | Running (Transfer (Adding _)), Events.Added _ + | Logging _, Events.Logged -> + [event] + | _ -> [] + + /// Determines the next action (if any) to be carried out in this workflow + let nextAction : State -> Action = function + | Initial -> failwith "Cannot interpret Initial state" + | Running (Adjust r) -> Action.Adjust (r.location, r.quantity) + | Running (Transfer (Requested r)) -> Action.Remove (r.source, r.quantity) + | Running (Transfer (Adding r)) -> Action.Add (r.request.destination, r.request.quantity) + | Logging s -> Action.Log s + | Completed (TransferFailed _) -> Finish false + | Completed (Transferred _ | Adjusted _) -> Finish true + +/// Given an event from the Process's timeline, yields the State, in order that it can be completed +let decide update (state : Fold.State) : Action * Events.Event list = + let events = + match update with + | None -> [] + | Some update -> Fold.filter update state + let state' = Fold.fold state events + Fold.nextAction state', events + +type Service internal (resolve : InventoryTransactionId -> Equinox.Stream) = + + member __.Apply(transactionId, update) : Async = + let stream = resolve transactionId + stream.Transact(decide update) + +let create resolve = + // there will generally be a single actor touching it at a given time, so we don't need to do a load (which would be more expensive than normal given the `accessStrategy`) before we sync + let opt = Equinox.AllowStale + let resolve inventoryTransactionId = + let stream = resolve (streamName inventoryTransactionId, opt) + Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts=2) + Service(resolve) + +module EventStore = + + open Equinox.EventStore + + let create (context, cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy) + create <| fun (id, opt) -> resolver.Resolve(id, opt) diff --git a/equinox-fc/Fc.sln b/equinox-fc/Fc.sln new file mode 100644 index 000000000..5df8389a2 --- /dev/null +++ b/equinox-fc/Fc.sln @@ -0,0 +1,67 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.26124.0 +MinimumVisualStudioVersion = 15.0.26124.0 +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "Domain\Domain.fsproj", "{4722DF4D-DBE1-4030-9C55-6BA3A4147322}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain.Tests", "Domain.Tests\Domain.Tests.fsproj", "{AA589DC7-D0B7-4F95-BC29-6B89AF4E9280}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".project", ".project", "{FEADBAC9-0CA2-440B-AF9B-184FED9B362E}" +ProjectSection(SolutionItems) = preProject + README.md = README.md +EndProjectSection +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Web", "Web\Web.fsproj", "{B57DFA65-DC4F-4673-A5DC-7B00A08EFFF0}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 + Release|Any CPU = Release|Any CPU + Release|x64 = Release|x64 + Release|x86 = Release|x86 + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {4722DF4D-DBE1-4030-9C55-6BA3A4147322}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4722DF4D-DBE1-4030-9C55-6BA3A4147322}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4722DF4D-DBE1-4030-9C55-6BA3A4147322}.Debug|x64.ActiveCfg = Debug|Any CPU + {4722DF4D-DBE1-4030-9C55-6BA3A4147322}.Debug|x64.Build.0 = Debug|Any CPU + {4722DF4D-DBE1-4030-9C55-6BA3A4147322}.Debug|x86.ActiveCfg = Debug|Any CPU + {4722DF4D-DBE1-4030-9C55-6BA3A4147322}.Debug|x86.Build.0 = Debug|Any CPU + {4722DF4D-DBE1-4030-9C55-6BA3A4147322}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4722DF4D-DBE1-4030-9C55-6BA3A4147322}.Release|Any CPU.Build.0 = Release|Any CPU + {4722DF4D-DBE1-4030-9C55-6BA3A4147322}.Release|x64.ActiveCfg = Release|Any CPU + {4722DF4D-DBE1-4030-9C55-6BA3A4147322}.Release|x64.Build.0 = Release|Any CPU + {4722DF4D-DBE1-4030-9C55-6BA3A4147322}.Release|x86.ActiveCfg = Release|Any CPU + {4722DF4D-DBE1-4030-9C55-6BA3A4147322}.Release|x86.Build.0 = Release|Any CPU + {AA589DC7-D0B7-4F95-BC29-6B89AF4E9280}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AA589DC7-D0B7-4F95-BC29-6B89AF4E9280}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AA589DC7-D0B7-4F95-BC29-6B89AF4E9280}.Debug|x64.ActiveCfg = Debug|Any CPU + {AA589DC7-D0B7-4F95-BC29-6B89AF4E9280}.Debug|x64.Build.0 = Debug|Any CPU + {AA589DC7-D0B7-4F95-BC29-6B89AF4E9280}.Debug|x86.ActiveCfg = Debug|Any CPU + {AA589DC7-D0B7-4F95-BC29-6B89AF4E9280}.Debug|x86.Build.0 = Debug|Any CPU + {AA589DC7-D0B7-4F95-BC29-6B89AF4E9280}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AA589DC7-D0B7-4F95-BC29-6B89AF4E9280}.Release|Any CPU.Build.0 = Release|Any CPU + {AA589DC7-D0B7-4F95-BC29-6B89AF4E9280}.Release|x64.ActiveCfg = Release|Any CPU + {AA589DC7-D0B7-4F95-BC29-6B89AF4E9280}.Release|x64.Build.0 = Release|Any CPU + {AA589DC7-D0B7-4F95-BC29-6B89AF4E9280}.Release|x86.ActiveCfg = Release|Any CPU + {AA589DC7-D0B7-4F95-BC29-6B89AF4E9280}.Release|x86.Build.0 = Release|Any CPU + {B57DFA65-DC4F-4673-A5DC-7B00A08EFFF0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B57DFA65-DC4F-4673-A5DC-7B00A08EFFF0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B57DFA65-DC4F-4673-A5DC-7B00A08EFFF0}.Debug|x64.ActiveCfg = Debug|Any CPU + {B57DFA65-DC4F-4673-A5DC-7B00A08EFFF0}.Debug|x64.Build.0 = Debug|Any CPU + {B57DFA65-DC4F-4673-A5DC-7B00A08EFFF0}.Debug|x86.ActiveCfg = Debug|Any CPU + {B57DFA65-DC4F-4673-A5DC-7B00A08EFFF0}.Debug|x86.Build.0 = Debug|Any CPU + {B57DFA65-DC4F-4673-A5DC-7B00A08EFFF0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B57DFA65-DC4F-4673-A5DC-7B00A08EFFF0}.Release|Any CPU.Build.0 = Release|Any CPU + {B57DFA65-DC4F-4673-A5DC-7B00A08EFFF0}.Release|x64.ActiveCfg = Release|Any CPU + {B57DFA65-DC4F-4673-A5DC-7B00A08EFFF0}.Release|x64.Build.0 = Release|Any CPU + {B57DFA65-DC4F-4673-A5DC-7B00A08EFFF0}.Release|x86.ActiveCfg = Release|Any CPU + {B57DFA65-DC4F-4673-A5DC-7B00A08EFFF0}.Release|x86.Build.0 = Release|Any CPU + EndGlobalSection +EndGlobal diff --git a/equinox-fc/README.md b/equinox-fc/README.md new file mode 100644 index 000000000..350598cff --- /dev/null +++ b/equinox-fc/README.md @@ -0,0 +1,26 @@ +# Equinox FC Sample + +This project was generated using: + + dotnet new -i Equinox.Templates # just once, to install in the local templates store + + dotnet new eqxfc # use --help to see options regarding storage subsystem configuration etc + +# `Location*` + +The `Location`* `module`s illustrates a way to approach the modelling of a long-running state by having writers adhere to a common protocol when writing: + +- the `LocationEpoch` category represents a span of time, which is guaranteed to have a `CarriedForward` event representing the opening balance, and/or the balance carried forward when the preceding epoch was marked `Closed` +- the `LocationSeries` category bears the verified active epoch (competing readers/writers read this optimistically on a cached basis; in the event that they're behind, they'll compete to log the successful commencement of a new epoch, which cannot happen before the `CarriedForward` event for the successor epoch has been committed) + +- `Domain.Tests` includes (`FsCheck`) Property-based unit tests, and an integration tests that demonstrates parallel writes (that trigger Optimistic Concurrency Control-based conflict resolution, including against `MemoryStore`) + +## Notes + +- Referencing an `Equinox.*` Store module from the `Domain` project is not mandatory; it's common to defer all wiring and configuration of the elements in `module Cosmos`, `module EventStore` etc. and instead maintain that alongside the Composition Root, outside of the `Domain` project + +- While using an `AccessStrategy` such as `Snapshot` may in some cases be relevant too, in the general case, using the `Equinox.Cache`, combined with having a compact Fold `State` and a sufficiently constrained maximum number/size of events means the state can be established within a predictable latency range. + +- Representing a long-running state in this fashion is no panacea; in modeling a system, the ideal is to have streams that have a naturally constrained number of events over their lifetime. + +- [Original PR](https://github.com/jet/dotnet-templates/pull/40) \ No newline at end of file diff --git a/equinox-fc/Web/Program.fs b/equinox-fc/Web/Program.fs new file mode 100644 index 000000000..aa9562926 --- /dev/null +++ b/equinox-fc/Web/Program.fs @@ -0,0 +1,180 @@ +module Fc.Web.Program + +open Microsoft.AspNetCore +open Microsoft.AspNetCore.Builder +open Microsoft.AspNetCore.Hosting +open Microsoft.AspNetCore.Mvc +open Microsoft.Extensions.DependencyInjection +open Microsoft.Extensions.Hosting +open Serilog +open System + +module EnvVar = + + let tryGet varName : string option = Environment.GetEnvironmentVariable varName |> Option.ofObj + let set varName value : unit = Environment.SetEnvironmentVariable(varName, value) + +module Configuration = + + let private initEnvVar var key loadF = + if None = EnvVar.tryGet var then + printfn "Setting %s from %A" var key + EnvVar.set var (loadF key) + + let initialize () = + // e.g. initEnvVar "EQUINOX_COSMOS_CONTAINER" "CONSUL KEY" readFromConsul + () // TODO add any custom logic preprocessing commandline arguments and/or gathering custom defaults from external sources, etc + +module Args = + + exception MissingArg of string + let private getEnvVarForArgumentOrThrow varName argName = + match EnvVar.tryGet varName with + | None -> raise (MissingArg(sprintf "Please provide a %s, either as an argument or via the %s environment variable" argName varName)) + | Some x -> x + let private defaultWithEnvVar varName argName = function + | None -> getEnvVarForArgumentOrThrow varName argName + | Some x -> x + let isEnvVarTrue varName = EnvVar.tryGet varName |> Option.exists (fun s -> String.Equals(s, bool.TrueString, StringComparison.OrdinalIgnoreCase)) + open Argu + open Equinox.EventStore + [] + type Parameters = + | [] Verbose + + | [] Es of ParseResults + interface IArgParserTemplate with + member a.Usage = + match a with + | Verbose -> "request Verbose Logging. Default: off." + | Es _ -> "specify EventStore input parameters." + and Arguments(a : ParseResults) = + member __.Verbose = a.Contains Parameters.Verbose + member __.StatsInterval = TimeSpan.FromMinutes 1. + + member val Source : EsArguments = + match a.TryGetSubCommand() with + | Some (Es es) -> (EsArguments es) + | _ -> raise (MissingArg "Must specify one of cosmos or es for Src") + and [] EsParameters = + | [] Verbose + | [] Timeout of float + | [] Retries of int + | [] HeartbeatTimeout of float + | [] Tcp + | [] Host of string + | [] Port of int + | [] Username of string + | [] Password of string + interface IArgParserTemplate with + member a.Usage = a |> function + | Verbose -> "Include low level Store logging." + | Tcp -> "Request connecting direct to a TCP/IP endpoint. Default: Use Clustered mode with Gossip-driven discovery (unless environment variable EQUINOX_ES_TCP specifies 'true')." + | Host _ -> "TCP mode: specify a hostname to connect to directly. Clustered mode: use Gossip protocol against all A records returned from DNS query. (optional if environment variable EQUINOX_ES_HOST specified)" + | Port _ -> "specify a custom port. Uses value of environment variable EQUINOX_ES_PORT if specified. Defaults for Cluster and Direct TCP/IP mode are 30778 and 1113 respectively." + | Username _ -> "specify a username. (optional if environment variable EQUINOX_ES_USERNAME specified)" + | Password _ -> "specify a Password. (optional if environment variable EQUINOX_ES_PASSWORD specified)" + | Timeout _ -> "specify operation timeout in seconds. Default: 20." + | Retries _ -> "specify operation retries. Default: 3." + | HeartbeatTimeout _ -> "specify heartbeat timeout in seconds. Default: 1.5." + and EsArguments(a : ParseResults) = + member __.Discovery = + match __.Tcp, __.Port with + | false, None -> Discovery.GossipDns __.Host + | false, Some p -> Discovery.GossipDnsCustomPort (__.Host, p) + | true, None -> Discovery.Uri (UriBuilder("tcp", __.Host, 1113).Uri) + | true, Some p -> Discovery.Uri (UriBuilder("tcp", __.Host, p).Uri) + member __.Tcp = a.Contains Tcp || isEnvVarTrue "EQUINOX_ES_TCP" + member __.Port = match a.TryGetResult Port with Some x -> Some x | None -> EnvVar.tryGet "EQUINOX_ES_PORT" |> Option.map int + member __.Host = a.TryGetResult Host |> defaultWithEnvVar "EQUINOX_ES_HOST" "Host" + member __.User = a.TryGetResult Username |> defaultWithEnvVar "EQUINOX_ES_USERNAME" "Username" + member __.Password = a.TryGetResult Password |> defaultWithEnvVar "EQUINOX_ES_PASSWORD" "Password" + member __.Retries = a.GetResult(EsParameters.Retries, 3) + member __.Timeout = a.GetResult(EsParameters.Timeout, 20.) |> TimeSpan.FromSeconds + member __.Heartbeat = a.GetResult(HeartbeatTimeout, 1.5) |> TimeSpan.FromSeconds + member x.Connect(log: ILogger, storeLog: ILogger, appName, connectionStrategy) = + let s (x : TimeSpan) = x.TotalSeconds + let discovery = x.Discovery + log.ForContext("host", x.Host).ForContext("port", x.Port) + .Information("EventStore {discovery} heartbeat: {heartbeat}s Timeout: {timeout}s Retries {retries}", + discovery, s x.Heartbeat, s x.Timeout, x.Retries) + let log=if storeLog.IsEnabled Serilog.Events.LogEventLevel.Debug then Logger.SerilogVerbose storeLog else Logger.SerilogNormal storeLog + let tags=["M", Environment.MachineName; "I", Guid.NewGuid() |> string] + Connector(x.User, x.Password, x.Timeout, x.Retries, log=log, heartbeatTimeout=x.Heartbeat, tags=tags) + .Establish(appName, discovery, connectionStrategy) |> Async.RunSynchronously + + /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args + let parse argv = + let programName = System.Reflection.Assembly.GetEntryAssembly().GetName().Name + let parser = ArgumentParser.Create(programName = programName) + parser.ParseCommandLine argv |> Arguments + +module Logging = + + let initialize verbose = + Log.Logger <- + LoggerConfiguration() + .Destructure.FSharpTypes() + .Enrich.FromLogContext() + .MinimumLevel.Override("Microsoft.AspNetCore", Serilog.Events.LogEventLevel.Warning) + |> fun c -> if verbose then c.MinimumLevel.Debug() else c + |> fun c -> let t = "[{Timestamp:HH:mm:ss} {Level:u3}] {partitionKeyRangeId,2} {Message:lj} {NewLine}{Exception}" + c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t) + |> fun c -> c.CreateLogger() + +let [] AppName = "Fc.Web" + +/// Defines the Hosting configuration, including registration of the store and backend services +type Startup() = + + // This method gets called by the runtime. Use this method to add services to the container. + member __.ConfigureServices(services: IServiceCollection) : unit = + services + .AddMvc() + .SetCompatibilityVersion(CompatibilityVersion.Latest) + .AddNewtonsoftJson() // until FsCodec.SystemTextJson is available + |> ignore + + // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. + member __.Configure(app: IApplicationBuilder, env: IHostEnvironment) : unit = + if env.IsDevelopment() then app.UseDeveloperExceptionPage() |> ignore + else app.UseHsts() |> ignore + + app.UseHttpsRedirection() + .UseRouting() + .UseSerilogRequestLogging() // see https://nblumhardt.com/2019/10/serilog-in-aspnetcore-3/ + .UseEndpoints(fun endpoints -> endpoints.MapControllers() |> ignore) + |> ignore + +let build (args : Args.Arguments) = + let cache = Equinox.Cache(AppName, sizeMb=10) + let create = + let es = args.Source + let connection = es.Connect(Log.Logger, Log.Logger, AppName, Equinox.EventStore.ConnectionStrategy.ClusterSingle Equinox.EventStore.NodePreference.Master) + let context = Equinox.EventStore.Context(connection, Equinox.EventStore.BatchingPolicy(maxBatchSize=500)) + Fc.Domain.StockProcessManager.EventStore.create (context, cache) + let inventoryId = InventoryId.parse "FC000" + create inventoryId (1000, 5, 3) + +let run argv args = + let processManager = build args + WebHost + .CreateDefaultBuilder(argv) + .UseSerilog() + .ConfigureServices(fun svc -> svc.AddSingleton(processManager) |> ignore) + .UseStartup() + .Build() + .Run() + +[] +let main argv = + try let args = Args.parse argv + try Logging.initialize args.Verbose + try Configuration.initialize () + run argv args + 0 + with e when not (e :? Args.MissingArg) -> Log.Fatal(e, "Exiting"); 2 + finally Log.CloseAndFlush() + with Args.MissingArg msg -> eprintfn "%s" msg; 1 + | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 + | e -> eprintf "Exception %s" e.Message; 1 diff --git a/equinox-fc/Web/Web.fsproj b/equinox-fc/Web/Web.fsproj new file mode 100644 index 000000000..85146417a --- /dev/null +++ b/equinox-fc/Web/Web.fsproj @@ -0,0 +1,26 @@ + + + + Exe + netcoreapp3.1 + 5 + + + + + + + + + + + + + + + + + + + +