diff --git a/src/Equinox.Codec/UnionCodec.fs b/src/Equinox.Codec/UnionCodec.fs
index 4232e238a..5299afe66 100644
--- a/src/Equinox.Codec/UnionCodec.fs
+++ b/src/Equinox.Codec/UnionCodec.fs
@@ -1,5 +1,8 @@
namespace Equinox.UnionCodec
+type OAttribute = System.Runtime.InteropServices.OptionalAttribute
+type DAttribute = System.Runtime.InteropServices.DefaultParameterValueAttribute
+
open Newtonsoft.Json
open System.IO
open TypeShape
@@ -53,7 +56,7 @@ type JsonUtf8 =
/// Configuration to be used by the underlying Newtonsoft.Json Serializer when encoding/decoding.
/// Fail encoder generation if union cases contain fields that are not F# records. Defaults to false.
/// Fail encoder generation if union contains nullary cases. Defaults to true.
- static member Create<'Union when 'Union :> UnionContract.IUnionContract>(settings, ?requireRecordFields, ?allowNullaryCases)
+ static member Create<'Union when 'Union :> UnionContract.IUnionContract>(settings, []?requireRecordFields, []?allowNullaryCases)
: IUnionEncoder<'Union,byte[]> =
let inner =
UnionContract.UnionContractEncoder.Create<'Union,byte[]>(
diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs
index 034b4c2d9..15fb807e2 100644
--- a/src/Equinox.Cosmos/Cosmos.fs
+++ b/src/Equinox.Cosmos/Cosmos.fs
@@ -662,7 +662,7 @@ open System
open System.Collections.Concurrent
/// Defines policies for retrying with respect to transient failures calling CosmosDb (as opposed to application level concurrency conflicts)
-type EqxConnection(client: Microsoft.Azure.Documents.IDocumentClient, ?readRetryPolicy: IRetryPolicy, ?writeRetryPolicy) =
+type EqxConnection(client: Microsoft.Azure.Documents.IDocumentClient, []?readRetryPolicy: IRetryPolicy, []?writeRetryPolicy) =
member __.Client = client
member __.TipRetryPolicy = readRetryPolicy
member __.QueryRetryPolicy = readRetryPolicy
@@ -671,13 +671,13 @@ type EqxConnection(client: Microsoft.Azure.Documents.IDocumentClient, ?readRetry
/// Defines the policies in force regarding how to a) split up calls b) limit the number of events per slice
type EqxBatchingPolicy
( // Max items to request in query response. Defaults to 10.
- ?defaultMaxItems : int,
+ []?defaultMaxItems : int,
// Dynamic version of `defaultMaxItems`, allowing one to react to dynamic configuration changes. Default to using `defaultMaxItems`
- ?getDefaultMaxItems : unit -> int,
+ []?getDefaultMaxItems : unit -> int,
/// Maximum number of trips to permit when slicing the work into multiple responses based on `MaxSlices`. Default: unlimited.
- ?maxRequests,
+ []?maxRequests,
/// Maximum number of events to accumualte within the `WipBatch` before switching to a new one when adding Events. Defaults to 10.
- ?maxEventsPerSlice) =
+ []?maxEventsPerSlice) =
let getdefaultMaxItems = defaultArg getDefaultMaxItems (fun () -> defaultArg defaultMaxItems 10)
/// Limit for Maximum number of `Batch` records in a single query batch response
member __.MaxItems = getdefaultMaxItems ()
@@ -847,7 +847,7 @@ type private EqxCollection(databaseId, collectionId, ?initCollection : Uri -> As
member internal __.InitializationGate = match initGuard with Some g when g.PeekIsValid() |> not -> Some g.AwaitValue | _ -> None
/// Defines a process for mapping from a Stream Name to the appropriate storage area, allowing control over segregation / co-locating of data
-type EqxCollections(categoryAndIdToDatabaseCollectionAndStream : string -> string -> string*string*string, ?disableInitialization) =
+type EqxCollections(categoryAndIdToDatabaseCollectionAndStream : string -> string -> string*string*string, []?disableInitialization) =
// Index of database*collection -> Initialization Context
let collections = ConcurrentDictionary()
new (databaseId, collectionId) =
@@ -863,7 +863,7 @@ type EqxCollections(categoryAndIdToDatabaseCollectionAndStream : string -> strin
{ collectionUri = coll.CollectionUri; name = streamName },coll.InitializationGate
/// Pairs a Gateway, defining the retry policies for CosmosDb with an EqxCollections defining mappings from (category,id) to (database,collection,streamName)
-type EqxStore(gateway: EqxGateway, collections: EqxCollections, ?resolverLog) =
+type EqxStore(gateway: EqxGateway, collections: EqxCollections, []?resolverLog) =
let init = gateway.CreateSyncStoredProcIfNotExists resolverLog
member __.Gateway = gateway
member __.Collections = collections
@@ -888,7 +888,7 @@ type AccessStrategy<'event,'state> =
/// Trust every event type as being an origin
| AnyKnownEventType
-type EqxResolver<'event, 'state>(store : EqxStore, codec, fold, initial, ?access, ?caching) =
+type EqxResolver<'event, 'state>(store : EqxStore, codec, fold, initial, []?access, []?caching) =
let readCacheOption =
match caching with
| None -> None
@@ -955,19 +955,19 @@ type EqxConnector
( requestTimeout: TimeSpan, maxRetryAttemptsOnThrottledRequests: int, maxRetryWaitTimeInSeconds: int,
log : ILogger,
/// Connection limit (default 1000)
- ?maxConnectionLimit,
+ []?maxConnectionLimit,
/// Connection mode (default: ConnectionMode.Gateway (lowest perf, least trouble))
- ?mode : ConnectionMode,
+ []?mode : ConnectionMode,
/// consistency mode (default: ConsistencyLevel.Session)
- ?defaultConsistencyLevel : ConsistencyLevel,
+ []?defaultConsistencyLevel : ConsistencyLevel,
/// Retries for read requests, over and above those defined by the mandatory policies
- ?readRetryPolicy,
+ []?readRetryPolicy,
/// Retries for write requests, over and above those defined by the mandatory policies
- ?writeRetryPolicy,
+ []?writeRetryPolicy,
/// Additional strings identifying the context of this connection; should provide enough context to disambiguate all potential connections to a cluster
/// NB as this will enter server and client logs, it should not contain sensitive information
- ?tags : (string*string) seq) =
+ []?tags : (string*string) seq) =
do if log = null then nullArg "log"
let connPolicy =
@@ -1008,6 +1008,7 @@ type EqxConnector
namespace Equinox.Cosmos.Core
+open Equinox.Store.Infrastructure
open Equinox.Cosmos
open Equinox.Cosmos.Store
open FSharp.Control
@@ -1029,12 +1030,12 @@ type EqxContext
log : Serilog.ILogger,
/// Optional maximum number of Store.Batch records to retrieve as a set (how many Events are placed therein is controlled by maxEventsPerSlice).
/// Defaults to 10
- ?defaultMaxItems,
+ []?defaultMaxItems,
/// Alternate way of specifying defaultMaxItems which facilitates reading it from a cached dynamic configuration
- ?getDefaultMaxItems,
+ []?getDefaultMaxItems,
/// Threshold defining the number of events a slice is allowed to hold before switching to a new Batch is triggered.
/// Defaults to 1
- ?maxEventsPerSlice) =
+ []?maxEventsPerSlice) =
do if log = null then nullArg "log"
let getDefaultMaxItems = match getDefaultMaxItems with Some f -> f | None -> fun () -> defaultArg defaultMaxItems 10
let maxEventsPerSlice = defaultArg maxEventsPerSlice 1
diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs
index 9061ebf94..2b89d4c13 100644
--- a/src/Equinox.EventStore/EventStore.fs
+++ b/src/Equinox.EventStore/EventStore.fs
@@ -249,13 +249,13 @@ module Token =
let currentVersion, newVersion = current.pos.streamVersion, x.pos.streamVersion
newVersion > currentVersion
-type GesConnection(readConnection, ?writeConnection, ?readRetryPolicy, ?writeRetryPolicy) =
+type GesConnection(readConnection, []?writeConnection, []?readRetryPolicy, []?writeRetryPolicy) =
member __.ReadConnection = readConnection
member __.ReadRetryPolicy = readRetryPolicy
member __.WriteConnection = defaultArg writeConnection readConnection
member __.WriteRetryPolicy = writeRetryPolicy
-type GesBatchingPolicy(getMaxBatchSize : unit -> int, ?batchCountLimit) =
+type GesBatchingPolicy(getMaxBatchSize : unit -> int, []?batchCountLimit) =
new (maxBatchSize) = GesBatchingPolicy(fun () -> maxBatchSize)
member __.BatchSize = getMaxBatchSize()
member __.MaxBatches = batchCountLimit
@@ -445,7 +445,7 @@ type CachingStrategy =
/// Prefix is used to segregate multiple folds per stream when they are stored in the cache
| SlidingWindowPrefixed of Caching.Cache * window: TimeSpan * prefix: string
-type GesResolver<'event,'state>(gateway : GesGateway, codec, fold, initial, ?access, ?caching) =
+type GesResolver<'event,'state>(gateway : GesGateway, codec, fold, initial, []?access, []?caching) =
do match access with
| Some (AccessStrategy.EventsAreState) when Option.isSome caching ->
"Equinox.EventStore does not support (and it would make things _less_ efficient even if it did)"
@@ -554,11 +554,11 @@ type ConnectionStrategy =
type GesConnector
( username, password, reqTimeout: TimeSpan, reqRetries: int,
- ?log : Logger, ?heartbeatTimeout: TimeSpan, ?concurrentOperationsLimit,
- ?readRetryPolicy, ?writeRetryPolicy,
+ []?log : Logger, []?heartbeatTimeout: TimeSpan, []?concurrentOperationsLimit,
+ []?readRetryPolicy, []?writeRetryPolicy,
/// Additional strings identifying the context of this connection; should provide enough context to disambiguate all potential connections to a cluster
/// NB as this will enter server and client logs, it should not contain sensitive information
- ?tags : (string*string) seq) =
+ []?tags : (string*string) seq) =
let connSettings node =
ConnectionSettings.Create().SetDefaultUserCredentials(SystemData.UserCredentials(username, password))
.KeepReconnecting() // ES default: .LimitReconnectionsTo(10)
diff --git a/src/Equinox.EventStore/Infrastructure.fs b/src/Equinox.EventStore/Infrastructure.fs
index 4f53a4279..e34a18277 100644
--- a/src/Equinox.EventStore/Infrastructure.fs
+++ b/src/Equinox.EventStore/Infrastructure.fs
@@ -5,6 +5,10 @@ open FSharp.Control
open System
open System.Diagnostics
+
+type OAttribute = System.Runtime.InteropServices.OptionalAttribute
+type DAttribute = System.Runtime.InteropServices.DefaultParameterValueAttribute
+
#if NET461
module Seq =
let tryLast (source : seq<_>) =
diff --git a/src/Equinox/Handler.fs b/src/Equinox/Handler.fs
index ae9473b7c..c5f5c0996 100644
--- a/src/Equinox/Handler.fs
+++ b/src/Equinox/Handler.fs
@@ -1,16 +1,19 @@
namespace Equinox
+type OAttribute = System.Runtime.InteropServices.OptionalAttribute
+type DAttribute = System.Runtime.InteropServices.DefaultParameterValueAttribute
+
/// Maintains a rolling folded State while Accumulating Events decided upon as part of a decision flow
-type Context<'event, 'state>(fold, originState : 'state) =
+type Context<'event, 'state>(fold : 'state -> 'event seq -> 'state, originState : 'state) =
let accumulated = ResizeArray<'event>()
/// The Events that have thus far been pended via the `decide` functions `Execute`/`Decide`d during the course of this flow
- member __.Accumulated =
+ member __.Accumulated : 'event list =
accumulated |> List.ofSeq
/// The current folded State, based on the Stream's `originState` + any events that have been Accumulated during the the decision flow
- member __.State =
- __.Accumulated |> fold originState
+ member __.State : 'state =
+ accumulated |> fold originState
/// Invoke a decision function, gathering the events (if any) that it decides are necessary into the `Accumulated` sequence
member __.Execute(decide : 'state -> 'event list) : unit =
@@ -35,7 +38,10 @@ type MaxResyncsExhaustedException(count) =
inherit exn(sprintf "Retry failed after %i attempts." count)
/// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic
-type Handler<'event, 'state>(fold, log, stream : Store.IStream<'event, 'state>, maxAttempts : int, ?mkAttemptsExhaustedException, ?resyncPolicy) =
+type Handler<'event, 'state>
+ ( fold, log, stream : Store.IStream<'event, 'state>, maxAttempts : int,
+ []?mkAttemptsExhaustedException,
+ []?resyncPolicy) =
let contextArgs =
let mkContext state : Context<'event,'state> = Context<'event,'state>(fold, state)
let getEvents (ctx: Context<'event,'state>) = ctx.Accumulated
@@ -46,13 +52,13 @@ type Handler<'event, 'state>(fold, log, stream : Store.IStream<'event, 'state>,
let handleResyncsExceeded = defaultArg mkAttemptsExhaustedException throwMaxResyncsExhaustedException
maxAttempts,resyncPolicy,handleResyncsExceeded
- /// 0. Invoke the supplied `decide` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome.
+ /// 0. Invoke the supplied `flow` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome.
/// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure.
- member __.Decide(flow) =
+ member __.Decide(flow : Context<'event,'state> -> 'result) : Async<'result> =
Flow.decide contextArgs resyncArgs (stream, log, flow)
- /// 0. Invoke the supplied _Async_ `decide` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome
+ /// 0. Invoke the supplied _Async_ `flow` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome
/// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure.
- member __.DecideAsync(flowAsync) =
+ member __.DecideAsync(flowAsync : Context<'event,'state> -> Async<'result>) : Async<'result> =
Flow.decideAsync contextArgs resyncArgs (stream, log, flowAsync)
/// Project from the folded `State` without executing a decision flow as `Decide` does
member __.Query(projection : 'state -> 'view) : Async<'view> =
diff --git a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs
index 14ea74bc2..5095796d7 100644
--- a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs
+++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs
@@ -187,7 +187,7 @@ type Tests(testOutputHelper) =
#else
test <@ [EqxAct.Conflict] = capture.ExternalCalls @>
#endif
- verifyRequestChargesMax 5 // 4.02
+ verifyRequestChargesMax 7 // 6.64
capture.Clear()
}