diff --git a/src/Equinox.Codec/UnionCodec.fs b/src/Equinox.Codec/UnionCodec.fs index 4232e238a..5299afe66 100644 --- a/src/Equinox.Codec/UnionCodec.fs +++ b/src/Equinox.Codec/UnionCodec.fs @@ -1,5 +1,8 @@ namespace Equinox.UnionCodec +type OAttribute = System.Runtime.InteropServices.OptionalAttribute +type DAttribute = System.Runtime.InteropServices.DefaultParameterValueAttribute + open Newtonsoft.Json open System.IO open TypeShape @@ -53,7 +56,7 @@ type JsonUtf8 = /// Configuration to be used by the underlying Newtonsoft.Json Serializer when encoding/decoding. /// Fail encoder generation if union cases contain fields that are not F# records. Defaults to false. /// Fail encoder generation if union contains nullary cases. Defaults to true. - static member Create<'Union when 'Union :> UnionContract.IUnionContract>(settings, ?requireRecordFields, ?allowNullaryCases) + static member Create<'Union when 'Union :> UnionContract.IUnionContract>(settings, []?requireRecordFields, []?allowNullaryCases) : IUnionEncoder<'Union,byte[]> = let inner = UnionContract.UnionContractEncoder.Create<'Union,byte[]>( diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index 034b4c2d9..15fb807e2 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -662,7 +662,7 @@ open System open System.Collections.Concurrent /// Defines policies for retrying with respect to transient failures calling CosmosDb (as opposed to application level concurrency conflicts) -type EqxConnection(client: Microsoft.Azure.Documents.IDocumentClient, ?readRetryPolicy: IRetryPolicy, ?writeRetryPolicy) = +type EqxConnection(client: Microsoft.Azure.Documents.IDocumentClient, []?readRetryPolicy: IRetryPolicy, []?writeRetryPolicy) = member __.Client = client member __.TipRetryPolicy = readRetryPolicy member __.QueryRetryPolicy = readRetryPolicy @@ -671,13 +671,13 @@ type EqxConnection(client: Microsoft.Azure.Documents.IDocumentClient, ?readRetry /// Defines the policies in force regarding how to a) split up calls b) limit the number of events per slice type EqxBatchingPolicy ( // Max items to request in query response. Defaults to 10. - ?defaultMaxItems : int, + []?defaultMaxItems : int, // Dynamic version of `defaultMaxItems`, allowing one to react to dynamic configuration changes. Default to using `defaultMaxItems` - ?getDefaultMaxItems : unit -> int, + []?getDefaultMaxItems : unit -> int, /// Maximum number of trips to permit when slicing the work into multiple responses based on `MaxSlices`. Default: unlimited. - ?maxRequests, + []?maxRequests, /// Maximum number of events to accumualte within the `WipBatch` before switching to a new one when adding Events. Defaults to 10. - ?maxEventsPerSlice) = + []?maxEventsPerSlice) = let getdefaultMaxItems = defaultArg getDefaultMaxItems (fun () -> defaultArg defaultMaxItems 10) /// Limit for Maximum number of `Batch` records in a single query batch response member __.MaxItems = getdefaultMaxItems () @@ -847,7 +847,7 @@ type private EqxCollection(databaseId, collectionId, ?initCollection : Uri -> As member internal __.InitializationGate = match initGuard with Some g when g.PeekIsValid() |> not -> Some g.AwaitValue | _ -> None /// Defines a process for mapping from a Stream Name to the appropriate storage area, allowing control over segregation / co-locating of data -type EqxCollections(categoryAndIdToDatabaseCollectionAndStream : string -> string -> string*string*string, ?disableInitialization) = +type EqxCollections(categoryAndIdToDatabaseCollectionAndStream : string -> string -> string*string*string, []?disableInitialization) = // Index of database*collection -> Initialization Context let collections = ConcurrentDictionary() new (databaseId, collectionId) = @@ -863,7 +863,7 @@ type EqxCollections(categoryAndIdToDatabaseCollectionAndStream : string -> strin { collectionUri = coll.CollectionUri; name = streamName },coll.InitializationGate /// Pairs a Gateway, defining the retry policies for CosmosDb with an EqxCollections defining mappings from (category,id) to (database,collection,streamName) -type EqxStore(gateway: EqxGateway, collections: EqxCollections, ?resolverLog) = +type EqxStore(gateway: EqxGateway, collections: EqxCollections, []?resolverLog) = let init = gateway.CreateSyncStoredProcIfNotExists resolverLog member __.Gateway = gateway member __.Collections = collections @@ -888,7 +888,7 @@ type AccessStrategy<'event,'state> = /// Trust every event type as being an origin | AnyKnownEventType -type EqxResolver<'event, 'state>(store : EqxStore, codec, fold, initial, ?access, ?caching) = +type EqxResolver<'event, 'state>(store : EqxStore, codec, fold, initial, []?access, []?caching) = let readCacheOption = match caching with | None -> None @@ -955,19 +955,19 @@ type EqxConnector ( requestTimeout: TimeSpan, maxRetryAttemptsOnThrottledRequests: int, maxRetryWaitTimeInSeconds: int, log : ILogger, /// Connection limit (default 1000) - ?maxConnectionLimit, + []?maxConnectionLimit, /// Connection mode (default: ConnectionMode.Gateway (lowest perf, least trouble)) - ?mode : ConnectionMode, + []?mode : ConnectionMode, /// consistency mode (default: ConsistencyLevel.Session) - ?defaultConsistencyLevel : ConsistencyLevel, + []?defaultConsistencyLevel : ConsistencyLevel, /// Retries for read requests, over and above those defined by the mandatory policies - ?readRetryPolicy, + []?readRetryPolicy, /// Retries for write requests, over and above those defined by the mandatory policies - ?writeRetryPolicy, + []?writeRetryPolicy, /// Additional strings identifying the context of this connection; should provide enough context to disambiguate all potential connections to a cluster /// NB as this will enter server and client logs, it should not contain sensitive information - ?tags : (string*string) seq) = + []?tags : (string*string) seq) = do if log = null then nullArg "log" let connPolicy = @@ -1008,6 +1008,7 @@ type EqxConnector namespace Equinox.Cosmos.Core +open Equinox.Store.Infrastructure open Equinox.Cosmos open Equinox.Cosmos.Store open FSharp.Control @@ -1029,12 +1030,12 @@ type EqxContext log : Serilog.ILogger, /// Optional maximum number of Store.Batch records to retrieve as a set (how many Events are placed therein is controlled by maxEventsPerSlice). /// Defaults to 10 - ?defaultMaxItems, + []?defaultMaxItems, /// Alternate way of specifying defaultMaxItems which facilitates reading it from a cached dynamic configuration - ?getDefaultMaxItems, + []?getDefaultMaxItems, /// Threshold defining the number of events a slice is allowed to hold before switching to a new Batch is triggered. /// Defaults to 1 - ?maxEventsPerSlice) = + []?maxEventsPerSlice) = do if log = null then nullArg "log" let getDefaultMaxItems = match getDefaultMaxItems with Some f -> f | None -> fun () -> defaultArg defaultMaxItems 10 let maxEventsPerSlice = defaultArg maxEventsPerSlice 1 diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index 9061ebf94..2b89d4c13 100644 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -249,13 +249,13 @@ module Token = let currentVersion, newVersion = current.pos.streamVersion, x.pos.streamVersion newVersion > currentVersion -type GesConnection(readConnection, ?writeConnection, ?readRetryPolicy, ?writeRetryPolicy) = +type GesConnection(readConnection, []?writeConnection, []?readRetryPolicy, []?writeRetryPolicy) = member __.ReadConnection = readConnection member __.ReadRetryPolicy = readRetryPolicy member __.WriteConnection = defaultArg writeConnection readConnection member __.WriteRetryPolicy = writeRetryPolicy -type GesBatchingPolicy(getMaxBatchSize : unit -> int, ?batchCountLimit) = +type GesBatchingPolicy(getMaxBatchSize : unit -> int, []?batchCountLimit) = new (maxBatchSize) = GesBatchingPolicy(fun () -> maxBatchSize) member __.BatchSize = getMaxBatchSize() member __.MaxBatches = batchCountLimit @@ -445,7 +445,7 @@ type CachingStrategy = /// Prefix is used to segregate multiple folds per stream when they are stored in the cache | SlidingWindowPrefixed of Caching.Cache * window: TimeSpan * prefix: string -type GesResolver<'event,'state>(gateway : GesGateway, codec, fold, initial, ?access, ?caching) = +type GesResolver<'event,'state>(gateway : GesGateway, codec, fold, initial, []?access, []?caching) = do match access with | Some (AccessStrategy.EventsAreState) when Option.isSome caching -> "Equinox.EventStore does not support (and it would make things _less_ efficient even if it did)" @@ -554,11 +554,11 @@ type ConnectionStrategy = type GesConnector ( username, password, reqTimeout: TimeSpan, reqRetries: int, - ?log : Logger, ?heartbeatTimeout: TimeSpan, ?concurrentOperationsLimit, - ?readRetryPolicy, ?writeRetryPolicy, + []?log : Logger, []?heartbeatTimeout: TimeSpan, []?concurrentOperationsLimit, + []?readRetryPolicy, []?writeRetryPolicy, /// Additional strings identifying the context of this connection; should provide enough context to disambiguate all potential connections to a cluster /// NB as this will enter server and client logs, it should not contain sensitive information - ?tags : (string*string) seq) = + []?tags : (string*string) seq) = let connSettings node = ConnectionSettings.Create().SetDefaultUserCredentials(SystemData.UserCredentials(username, password)) .KeepReconnecting() // ES default: .LimitReconnectionsTo(10) diff --git a/src/Equinox.EventStore/Infrastructure.fs b/src/Equinox.EventStore/Infrastructure.fs index 4f53a4279..e34a18277 100644 --- a/src/Equinox.EventStore/Infrastructure.fs +++ b/src/Equinox.EventStore/Infrastructure.fs @@ -5,6 +5,10 @@ open FSharp.Control open System open System.Diagnostics + +type OAttribute = System.Runtime.InteropServices.OptionalAttribute +type DAttribute = System.Runtime.InteropServices.DefaultParameterValueAttribute + #if NET461 module Seq = let tryLast (source : seq<_>) = diff --git a/src/Equinox/Handler.fs b/src/Equinox/Handler.fs index ae9473b7c..c5f5c0996 100644 --- a/src/Equinox/Handler.fs +++ b/src/Equinox/Handler.fs @@ -1,16 +1,19 @@ namespace Equinox +type OAttribute = System.Runtime.InteropServices.OptionalAttribute +type DAttribute = System.Runtime.InteropServices.DefaultParameterValueAttribute + /// Maintains a rolling folded State while Accumulating Events decided upon as part of a decision flow -type Context<'event, 'state>(fold, originState : 'state) = +type Context<'event, 'state>(fold : 'state -> 'event seq -> 'state, originState : 'state) = let accumulated = ResizeArray<'event>() /// The Events that have thus far been pended via the `decide` functions `Execute`/`Decide`d during the course of this flow - member __.Accumulated = + member __.Accumulated : 'event list = accumulated |> List.ofSeq /// The current folded State, based on the Stream's `originState` + any events that have been Accumulated during the the decision flow - member __.State = - __.Accumulated |> fold originState + member __.State : 'state = + accumulated |> fold originState /// Invoke a decision function, gathering the events (if any) that it decides are necessary into the `Accumulated` sequence member __.Execute(decide : 'state -> 'event list) : unit = @@ -35,7 +38,10 @@ type MaxResyncsExhaustedException(count) = inherit exn(sprintf "Retry failed after %i attempts." count) /// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic -type Handler<'event, 'state>(fold, log, stream : Store.IStream<'event, 'state>, maxAttempts : int, ?mkAttemptsExhaustedException, ?resyncPolicy) = +type Handler<'event, 'state> + ( fold, log, stream : Store.IStream<'event, 'state>, maxAttempts : int, + []?mkAttemptsExhaustedException, + []?resyncPolicy) = let contextArgs = let mkContext state : Context<'event,'state> = Context<'event,'state>(fold, state) let getEvents (ctx: Context<'event,'state>) = ctx.Accumulated @@ -46,13 +52,13 @@ type Handler<'event, 'state>(fold, log, stream : Store.IStream<'event, 'state>, let handleResyncsExceeded = defaultArg mkAttemptsExhaustedException throwMaxResyncsExhaustedException maxAttempts,resyncPolicy,handleResyncsExceeded - /// 0. Invoke the supplied `decide` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome. + /// 0. Invoke the supplied `flow` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome. /// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure. - member __.Decide(flow) = + member __.Decide(flow : Context<'event,'state> -> 'result) : Async<'result> = Flow.decide contextArgs resyncArgs (stream, log, flow) - /// 0. Invoke the supplied _Async_ `decide` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome + /// 0. Invoke the supplied _Async_ `flow` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome /// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure. - member __.DecideAsync(flowAsync) = + member __.DecideAsync(flowAsync : Context<'event,'state> -> Async<'result>) : Async<'result> = Flow.decideAsync contextArgs resyncArgs (stream, log, flowAsync) /// Project from the folded `State` without executing a decision flow as `Decide` does member __.Query(projection : 'state -> 'view) : Async<'view> = diff --git a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs index 14ea74bc2..5095796d7 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs @@ -187,7 +187,7 @@ type Tests(testOutputHelper) = #else test <@ [EqxAct.Conflict] = capture.ExternalCalls @> #endif - verifyRequestChargesMax 5 // 4.02 + verifyRequestChargesMax 7 // 6.64 capture.Clear() }