Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/Equinox.Codec/UnionCodec.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
namespace Equinox.UnionCodec

type OAttribute = System.Runtime.InteropServices.OptionalAttribute
type DAttribute = System.Runtime.InteropServices.DefaultParameterValueAttribute

open Newtonsoft.Json
open System.IO
open TypeShape
Expand Down Expand Up @@ -53,7 +56,7 @@ type JsonUtf8 =
/// <param name="settings">Configuration to be used by the underlying <c>Newtonsoft.Json</c> Serializer when encoding/decoding.</param>
/// <param name="requireRecordFields">Fail encoder generation if union cases contain fields that are not F# records. Defaults to <c>false</c>.</param>
/// <param name="allowNullaryCases">Fail encoder generation if union contains nullary cases. Defaults to <c>true</c>.</param>
static member Create<'Union when 'Union :> UnionContract.IUnionContract>(settings, ?requireRecordFields, ?allowNullaryCases)
static member Create<'Union when 'Union :> UnionContract.IUnionContract>(settings, [<O;D(null)>]?requireRecordFields, [<O;D(null)>]?allowNullaryCases)
: IUnionEncoder<'Union,byte[]> =
let inner =
UnionContract.UnionContractEncoder.Create<'Union,byte[]>(
Expand Down
35 changes: 18 additions & 17 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ open System
open System.Collections.Concurrent

/// Defines policies for retrying with respect to transient failures calling CosmosDb (as opposed to application level concurrency conflicts)
type EqxConnection(client: Microsoft.Azure.Documents.IDocumentClient, ?readRetryPolicy: IRetryPolicy, ?writeRetryPolicy) =
type EqxConnection(client: Microsoft.Azure.Documents.IDocumentClient, [<O; D(null)>]?readRetryPolicy: IRetryPolicy, [<O; D(null)>]?writeRetryPolicy) =
member __.Client = client
member __.TipRetryPolicy = readRetryPolicy
member __.QueryRetryPolicy = readRetryPolicy
Expand All @@ -671,13 +671,13 @@ type EqxConnection(client: Microsoft.Azure.Documents.IDocumentClient, ?readRetry
/// Defines the policies in force regarding how to a) split up calls b) limit the number of events per slice
type EqxBatchingPolicy
( // Max items to request in query response. Defaults to 10.
?defaultMaxItems : int,
[<O; D(null)>]?defaultMaxItems : int,
// Dynamic version of `defaultMaxItems`, allowing one to react to dynamic configuration changes. Default to using `defaultMaxItems`
?getDefaultMaxItems : unit -> int,
[<O; D(null)>]?getDefaultMaxItems : unit -> int,
/// Maximum number of trips to permit when slicing the work into multiple responses based on `MaxSlices`. Default: unlimited.
?maxRequests,
[<O; D(null)>]?maxRequests,
/// Maximum number of events to accumualte within the `WipBatch` before switching to a new one when adding Events. Defaults to 10.
?maxEventsPerSlice) =
[<O; D(null)>]?maxEventsPerSlice) =
let getdefaultMaxItems = defaultArg getDefaultMaxItems (fun () -> defaultArg defaultMaxItems 10)
/// Limit for Maximum number of `Batch` records in a single query batch response
member __.MaxItems = getdefaultMaxItems ()
Expand Down Expand Up @@ -847,7 +847,7 @@ type private EqxCollection(databaseId, collectionId, ?initCollection : Uri -> As
member internal __.InitializationGate = match initGuard with Some g when g.PeekIsValid() |> not -> Some g.AwaitValue | _ -> None

/// Defines a process for mapping from a Stream Name to the appropriate storage area, allowing control over segregation / co-locating of data
type EqxCollections(categoryAndIdToDatabaseCollectionAndStream : string -> string -> string*string*string, ?disableInitialization) =
type EqxCollections(categoryAndIdToDatabaseCollectionAndStream : string -> string -> string*string*string, [<O; D(null)>]?disableInitialization) =
// Index of database*collection -> Initialization Context
let collections = ConcurrentDictionary<string*string, EqxCollection>()
new (databaseId, collectionId) =
Expand All @@ -863,7 +863,7 @@ type EqxCollections(categoryAndIdToDatabaseCollectionAndStream : string -> strin
{ collectionUri = coll.CollectionUri; name = streamName },coll.InitializationGate

/// Pairs a Gateway, defining the retry policies for CosmosDb with an EqxCollections defining mappings from (category,id) to (database,collection,streamName)
type EqxStore(gateway: EqxGateway, collections: EqxCollections, ?resolverLog) =
type EqxStore(gateway: EqxGateway, collections: EqxCollections, [<O; D(null)>]?resolverLog) =
let init = gateway.CreateSyncStoredProcIfNotExists resolverLog
member __.Gateway = gateway
member __.Collections = collections
Expand All @@ -888,7 +888,7 @@ type AccessStrategy<'event,'state> =
/// Trust every event type as being an origin
| AnyKnownEventType

type EqxResolver<'event, 'state>(store : EqxStore, codec, fold, initial, ?access, ?caching) =
type EqxResolver<'event, 'state>(store : EqxStore, codec, fold, initial, [<O; D(null)>]?access, [<O; D(null)>]?caching) =
let readCacheOption =
match caching with
| None -> None
Expand Down Expand Up @@ -955,19 +955,19 @@ type EqxConnector
( requestTimeout: TimeSpan, maxRetryAttemptsOnThrottledRequests: int, maxRetryWaitTimeInSeconds: int,
log : ILogger,
/// Connection limit (default 1000)
?maxConnectionLimit,
[<O; D(null)>]?maxConnectionLimit,
/// Connection mode (default: ConnectionMode.Gateway (lowest perf, least trouble))
?mode : ConnectionMode,
[<O; D(null)>]?mode : ConnectionMode,
/// consistency mode (default: ConsistencyLevel.Session)
?defaultConsistencyLevel : ConsistencyLevel,
[<O; D(null)>]?defaultConsistencyLevel : ConsistencyLevel,

/// Retries for read requests, over and above those defined by the mandatory policies
?readRetryPolicy,
[<O; D(null)>]?readRetryPolicy,
/// Retries for write requests, over and above those defined by the mandatory policies
?writeRetryPolicy,
[<O; D(null)>]?writeRetryPolicy,
/// Additional strings identifying the context of this connection; should provide enough context to disambiguate all potential connections to a cluster
/// NB as this will enter server and client logs, it should not contain sensitive information
?tags : (string*string) seq) =
[<O; D(null)>]?tags : (string*string) seq) =
do if log = null then nullArg "log"

let connPolicy =
Expand Down Expand Up @@ -1008,6 +1008,7 @@ type EqxConnector

namespace Equinox.Cosmos.Core

open Equinox.Store.Infrastructure
open Equinox.Cosmos
open Equinox.Cosmos.Store
open FSharp.Control
Expand All @@ -1029,12 +1030,12 @@ type EqxContext
log : Serilog.ILogger,
/// Optional maximum number of Store.Batch records to retrieve as a set (how many Events are placed therein is controlled by maxEventsPerSlice).
/// Defaults to 10
?defaultMaxItems,
[<O; D(null)>]?defaultMaxItems,
/// Alternate way of specifying defaultMaxItems which facilitates reading it from a cached dynamic configuration
?getDefaultMaxItems,
[<O; D(null)>]?getDefaultMaxItems,
/// Threshold defining the number of events a slice is allowed to hold before switching to a new Batch is triggered.
/// Defaults to 1
?maxEventsPerSlice) =
[<O; D(null)>]?maxEventsPerSlice) =
do if log = null then nullArg "log"
let getDefaultMaxItems = match getDefaultMaxItems with Some f -> f | None -> fun () -> defaultArg defaultMaxItems 10
let maxEventsPerSlice = defaultArg maxEventsPerSlice 1
Expand Down
12 changes: 6 additions & 6 deletions src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ module Token =
let currentVersion, newVersion = current.pos.streamVersion, x.pos.streamVersion
newVersion > currentVersion

type GesConnection(readConnection, ?writeConnection, ?readRetryPolicy, ?writeRetryPolicy) =
type GesConnection(readConnection, [<O; D(null)>]?writeConnection, [<O; D(null)>]?readRetryPolicy, [<O; D(null)>]?writeRetryPolicy) =
member __.ReadConnection = readConnection
member __.ReadRetryPolicy = readRetryPolicy
member __.WriteConnection = defaultArg writeConnection readConnection
member __.WriteRetryPolicy = writeRetryPolicy

type GesBatchingPolicy(getMaxBatchSize : unit -> int, ?batchCountLimit) =
type GesBatchingPolicy(getMaxBatchSize : unit -> int, [<O; D(null)>]?batchCountLimit) =
new (maxBatchSize) = GesBatchingPolicy(fun () -> maxBatchSize)
member __.BatchSize = getMaxBatchSize()
member __.MaxBatches = batchCountLimit
Expand Down Expand Up @@ -445,7 +445,7 @@ type CachingStrategy =
/// Prefix is used to segregate multiple folds per stream when they are stored in the cache
| SlidingWindowPrefixed of Caching.Cache * window: TimeSpan * prefix: string

type GesResolver<'event,'state>(gateway : GesGateway, codec, fold, initial, ?access, ?caching) =
type GesResolver<'event,'state>(gateway : GesGateway, codec, fold, initial, [<O; D(null)>]?access, [<O; D(null)>]?caching) =
do match access with
| Some (AccessStrategy.EventsAreState) when Option.isSome caching ->
"Equinox.EventStore does not support (and it would make things _less_ efficient even if it did)"
Expand Down Expand Up @@ -554,11 +554,11 @@ type ConnectionStrategy =

type GesConnector
( username, password, reqTimeout: TimeSpan, reqRetries: int,
?log : Logger, ?heartbeatTimeout: TimeSpan, ?concurrentOperationsLimit,
?readRetryPolicy, ?writeRetryPolicy,
[<O; D(null)>]?log : Logger, [<O; D(null)>]?heartbeatTimeout: TimeSpan, [<O; D(null)>]?concurrentOperationsLimit,
[<O; D(null)>]?readRetryPolicy, [<O; D(null)>]?writeRetryPolicy,
/// Additional strings identifying the context of this connection; should provide enough context to disambiguate all potential connections to a cluster
/// NB as this will enter server and client logs, it should not contain sensitive information
?tags : (string*string) seq) =
[<O; D(null)>]?tags : (string*string) seq) =
let connSettings node =
ConnectionSettings.Create().SetDefaultUserCredentials(SystemData.UserCredentials(username, password))
.KeepReconnecting() // ES default: .LimitReconnectionsTo(10)
Expand Down
4 changes: 4 additions & 0 deletions src/Equinox.EventStore/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ open FSharp.Control
open System
open System.Diagnostics


type OAttribute = System.Runtime.InteropServices.OptionalAttribute
type DAttribute = System.Runtime.InteropServices.DefaultParameterValueAttribute

#if NET461
module Seq =
let tryLast (source : seq<_>) =
Expand Down
24 changes: 15 additions & 9 deletions src/Equinox/Handler.fs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
namespace Equinox

type OAttribute = System.Runtime.InteropServices.OptionalAttribute
type DAttribute = System.Runtime.InteropServices.DefaultParameterValueAttribute

/// Maintains a rolling folded State while Accumulating Events decided upon as part of a decision flow
type Context<'event, 'state>(fold, originState : 'state) =
type Context<'event, 'state>(fold : 'state -> 'event seq -> 'state, originState : 'state) =
let accumulated = ResizeArray<'event>()

/// The Events that have thus far been pended via the `decide` functions `Execute`/`Decide`d during the course of this flow
member __.Accumulated =
member __.Accumulated : 'event list =
accumulated |> List.ofSeq

/// The current folded State, based on the Stream's `originState` + any events that have been Accumulated during the the decision flow
member __.State =
__.Accumulated |> fold originState
member __.State : 'state =
accumulated |> fold originState

/// Invoke a decision function, gathering the events (if any) that it decides are necessary into the `Accumulated` sequence
member __.Execute(decide : 'state -> 'event list) : unit =
Expand All @@ -35,7 +38,10 @@ type MaxResyncsExhaustedException(count) =
inherit exn(sprintf "Retry failed after %i attempts." count)

/// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic
type Handler<'event, 'state>(fold, log, stream : Store.IStream<'event, 'state>, maxAttempts : int, ?mkAttemptsExhaustedException, ?resyncPolicy) =
type Handler<'event, 'state>
( fold, log, stream : Store.IStream<'event, 'state>, maxAttempts : int,
[<O;D(null)>]?mkAttemptsExhaustedException,
[<O;D(null)>]?resyncPolicy) =
let contextArgs =
let mkContext state : Context<'event,'state> = Context<'event,'state>(fold, state)
let getEvents (ctx: Context<'event,'state>) = ctx.Accumulated
Expand All @@ -46,13 +52,13 @@ type Handler<'event, 'state>(fold, log, stream : Store.IStream<'event, 'state>,
let handleResyncsExceeded = defaultArg mkAttemptsExhaustedException throwMaxResyncsExhaustedException
maxAttempts,resyncPolicy,handleResyncsExceeded

/// 0. Invoke the supplied `decide` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome.
/// 0. Invoke the supplied `flow` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome.
/// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure.
member __.Decide(flow) =
member __.Decide(flow : Context<'event,'state> -> 'result) : Async<'result> =
Flow.decide contextArgs resyncArgs (stream, log, flow)
/// 0. Invoke the supplied _Async_ `decide` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome
/// 0. Invoke the supplied _Async_ `flow` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome
/// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure.
member __.DecideAsync(flowAsync) =
member __.DecideAsync(flowAsync : Context<'event,'state> -> Async<'result>) : Async<'result> =
Flow.decideAsync contextArgs resyncArgs (stream, log, flowAsync)
/// Project from the folded `State` without executing a decision flow as `Decide` does
member __.Query(projection : 'state -> 'view) : Async<'view> =
Expand Down
2 changes: 1 addition & 1 deletion tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ type Tests(testOutputHelper) =
#else
test <@ [EqxAct.Conflict] = capture.ExternalCalls @>
#endif
verifyRequestChargesMax 5 // 4.02
verifyRequestChargesMax 7 // 6.64
capture.Clear()
}

Expand Down