Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `CosmosStore`: Only log `bytes` when log level is `Debug` [#305](https://github.com/jet/equinox/pull/305)
- `CosmosStore.AccessStrategy.MultiSnapshot`,`Custom`: Change `list` and `seq` types to `array` [#338](https://github.com/jet/equinox/pull/338)
- `CosmosStore.Core.Initialization.initAux`: Replace hard-coded manual 400 RU with `mode` parameter [#328](https://github.com/jet/equinox/pull/328) :pray: [@brihadish](https://github.com/brihadish)
- `CosmosStore.CosmosClientFactory`: Moved to Core [#430](https://github.com/jet/equinox/pull/430)
- `EventStore`: Target `EventStore.Client` v `22.0.0-preview`; rename `Connector` -> `EventStoreConnector` [#317](https://github.com/jet/equinox/pull/317)
- `Tool`/`samples/`: switched to use `Equinox.EventStoreDb` [#196](https://github.com/jet/equinox/pull/196)

Expand Down
11 changes: 4 additions & 7 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1840,7 +1840,6 @@ following key benefits:
### Example Code

```fsharp

open Equinox.CosmosStore.Core
// open MyCodecs.Json // example of using specific codec which can yield UTF-8
// byte arrays from a type using `Json.toBytes` via Fleece
Expand All @@ -1852,22 +1851,20 @@ type EventData with

// Load connection string from your Key Vault (example here is the CosmosDB
// simulator's well known key)
// see https://github.com/jet/equinox-provisioning-cosmosdb
// see https://github.com/jet/equinox#provisioning-cosmosdb
let connectionString: string =
"AccountEndpoint=https://localhost:8081;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;"

// Forward to Log (you can use `Log.Logger` and/or `Log.ForContext` if your app
// uses Serilog already)
let outputLog = LoggerConfiguration().WriteTo.NLog().CreateLogger()
// Forward to Log (use `Log.Logger` if your app already uses Serilog)
let outputLog = LoggerConfiguration().WriteTo.Console().CreateLogger()
// Serilog has a `ForContext<T>()`, but if you are using a `module` for the
// wiring, you might create a tagged logger like this:
let gatewayLog =
outputLog.ForContext(Serilog.Core.Constants.SourceContextPropertyName, "Equinox")

let discovery = Discovery.ConnectionString (read "EQUINOX_COSMOS_CONNECTION")
let connector: Equinox.CosmosStore.CosmosStoreConnector =
CosmosStoreConnector(
discovery,
Equinox.CosmosStore.Discovery.ConnectionString connectionString,
requestTimeout = TimeSpan.FromSeconds 5.,
maxRetryAttemptsOnRateLimitedRequests = 1,
maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromSeconds 3.)
Expand Down
20 changes: 12 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -665,21 +665,25 @@ For more complete instructions, follow https://developers.eventstore.com/server/

#### Using Azure Cosmos DB Service

dotnet run --project tools/Equinox.Tool -- init -ru 400 `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER
# Same for a Archive Container for integration testing of the archive store fallback mechanism
$env:EQUINOX_COSMOS_CONTAINER_ARCHIVE="equinox-test-archive"
dotnet run --project tools/Equinox.Tool -- init -ru 400 `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER_ARCHIVE
```bash
dotnet run --project tools/Equinox.Tool -- init -ru 400 `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER
# Same for a Archive Container for integration testing of the archive store fallback mechanism
$env:EQUINOX_COSMOS_CONTAINER_ARCHIVE="equinox-test-archive"
dotnet run --project tools/Equinox.Tool -- init -ru 400 `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER_ARCHIVE
```

#### Using Cosmos Emulator on an Intel Mac

NOTE There's [no Apple Silicon emulator available as yet](https://github.com/Azure/azure-cosmos-db-emulator-docker/issues/54#issuecomment-1399067365).

NOTE Have not tested with the Windows Emulator, but it should work with analogous steps.

docker compose up equinox-cosmos -d
bash docker-compose-cosmos.sh
```bash
docker compose up equinox-cosmos -d
bash docker-compose-cosmos.sh
```

### Provisioning SqlStreamStore

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ services:
- AZURE_COSMOS_EMULATOR_IP_ADDRESS_OVERRIDE=127.0.0.1
ports:
- "8081:8081" # so docker-cosmos-init.sh can get the cert and/or humans can use https://localhost:8081/_explorer/index.html
- "10250-10256:10250-10256" # tests connect using Direct mode
- "10250-10255:10250-10255" # tests connect using Direct mode

equinox-mssql:
container_name: equinox-mssql
Expand Down
122 changes: 51 additions & 71 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,35 @@ module ConnectionString =
| true, (:? string as s) when not (String.IsNullOrEmpty s) -> s
| _ -> invalidOp "Connection string does not contain an \"AccountEndpoint\""

[<RequireQualifiedAccess; NoComparison>]
type DiscoveryMode =
| AccountUriAndKey of accountUri: string * key: string
| ConnectionString of connectionString: string
member x.Endpoint = x |> function
| DiscoveryMode.AccountUriAndKey (u, _k) -> u
| DiscoveryMode.ConnectionString (ConnectionString.AccountEndpoint e) -> e

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
type CosmosClientFactory(options) =
static member CreateDefaultOptions(requestTimeout: TimeSpan, maxRetryAttemptsOnRateLimitedRequests: int, maxRetryWaitTimeOnRateLimitedRequests: TimeSpan) =
CosmosClientOptions(
MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests,
MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests,
RequestTimeout = requestTimeout,
Serializer = CosmosJsonSerializer(JsonSerializerOptions()))
/// CosmosClientOptions for this CosmosClientFactory as configured (NOTE while the Options object is not immutable, it should not have setters called on it)
member val Options = options
/// Creates an instance of CosmosClient without actually validating or establishing the connection
/// It's recommended to use <c>CreateAndInitializeAsync</c> in preference to this API
/// in order to avoid latency spikes, and/or deferring discovery of connectivity or permission issues.
member x.CreateUninitialized(discovery: DiscoveryMode) = discovery |> function
| DiscoveryMode.AccountUriAndKey (accountUri = uri; key = key) -> new CosmosClient(uri, key, x.Options)
| DiscoveryMode.ConnectionString cs -> new CosmosClient(cs, x.Options)
/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.CreateAndInitializeAsync(discovery: DiscoveryMode, containers, ct) = discovery |> function
| DiscoveryMode.AccountUriAndKey (accountUri = uri; key = key) -> CosmosClient.CreateAndInitializeAsync(uri, key, containers, x.Options, ct)
| DiscoveryMode.ConnectionString cs -> CosmosClient.CreateAndInitializeAsync(cs, containers, x.Options, ct)

namespace Equinox.CosmosStore

open Equinox.Core
Expand All @@ -1120,63 +1149,9 @@ type Discovery =
| AccountUriAndKey of accountUri: Uri * key: string
/// Cosmos SDK Connection String
| ConnectionString of connectionString: string
member x.Endpoint: Uri = x |> function
| Discovery.AccountUriAndKey (u, _k) -> u
| Discovery.ConnectionString (ConnectionString.AccountEndpoint e) -> Uri e

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
[<System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)>]
type CosmosClientFactory
( // Timeout to apply to individual reads/write round-trips going to CosmosDB. CosmosDB Default: 1m.
requestTimeout: TimeSpan,
// Maximum number of times to attempt when failure reason is a 429 from CosmosDB, signifying RU limits have been breached. CosmosDB default: 9
maxRetryAttemptsOnRateLimitedRequests: int,
// Maximum number of seconds to wait (especially if a higher wait delay is suggested by CosmosDB in the 429 response). CosmosDB default: 30s
maxRetryWaitTimeOnRateLimitedRequests: TimeSpan,
// Connection mode (default: ConnectionMode.Direct (best performance, same as Microsoft.Azure.Cosmos SDK default)
// NOTE: default for Equinox.Cosmos.Connector (i.e. V2) was Gateway (worst performance, least trouble, Microsoft.Azure.DocumentDb SDK default)
[<O; D null>] ?mode: ConnectionMode,
// Connection limit for Gateway Mode. CosmosDB default: 50
[<O; D null>] ?gatewayModeMaxConnectionLimit,
// consistency mode (default: ConsistencyLevel.Session)
[<O; D null>] ?defaultConsistencyLevel: ConsistencyLevel,
// Inhibits certificate verification when set to <c>true</c>, i.e. for working with the CosmosDB Emulator (default <c>false</c>)
[<O; D null>] ?bypassCertificateValidation: bool) =

/// CosmosClientOptions for this CosmosClientFactory as configured
member val Options =
let co = CosmosClientOptions(
MaxRetryAttemptsOnRateLimitedRequests = maxRetryAttemptsOnRateLimitedRequests,
MaxRetryWaitTimeOnRateLimitedRequests = maxRetryWaitTimeOnRateLimitedRequests,
RequestTimeout = requestTimeout,
Serializer = CosmosJsonSerializer(System.Text.Json.JsonSerializerOptions()))
match mode with
| None | Some ConnectionMode.Direct -> co.ConnectionMode <- ConnectionMode.Direct
| Some ConnectionMode.Gateway | Some _ (* enum total match :( *) -> co.ConnectionMode <- ConnectionMode.Gateway // only supports Https
match gatewayModeMaxConnectionLimit with
| Some _ when co.ConnectionMode = ConnectionMode.Direct -> invalidArg "gatewayModeMaxConnectionLimit" "Not admissible in Direct mode"
| x -> if co.ConnectionMode = ConnectionMode.Gateway then co.GatewayModeMaxConnectionLimit <- defaultArg x 50
match defaultConsistencyLevel with
| Some x -> co.ConsistencyLevel <- x
| None -> ()
// https://github.com/Azure/azure-cosmos-dotnet-v3/blob/1ef6e399f114a0fd580272d4cdca86b9f8732cf3/Microsoft.Azure.Cosmos.Samples/Usage/HttpClientFactory/Program.cs#L96
if bypassCertificateValidation = Some true && co.ConnectionMode = ConnectionMode.Gateway then
let cb = System.Net.Http.HttpClientHandler.DangerousAcceptAnyServerCertificateValidator
let ch = new System.Net.Http.HttpClientHandler(ServerCertificateCustomValidationCallback = cb)
co.HttpClientFactory <- fun () -> new System.Net.Http.HttpClient(ch)
co

/// Creates an instance of CosmosClient without actually validating or establishing the connection
/// It's recommended to use <c>CreateAndInitializeAsync</c> in preference to this API
/// in order to avoid latency spikes, and/or deferring discovery of connectivity or permission issues.
member x.CreateUninitialized(discovery: Discovery) = discovery |> function
| Discovery.AccountUriAndKey (accountUri = uri; key = key) -> new CosmosClient(string uri, key, x.Options)
| Discovery.ConnectionString cs -> new CosmosClient(cs, x.Options)

/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.CreateAndInitializeAsync(discovery: Discovery, containers, ct) = discovery |> function
| Discovery.AccountUriAndKey (accountUri = uri; key = key) -> CosmosClient.CreateAndInitializeAsync(string uri, key, containers, x.Options, ct)
| Discovery.ConnectionString cs -> CosmosClient.CreateAndInitializeAsync(cs, containers, x.Options, ct)
member x.ToDiscoveryMode() = x |> function
| Discovery.AccountUriAndKey (u, k) -> DiscoveryMode.AccountUriAndKey (string u, k)
| Discovery.ConnectionString c -> DiscoveryMode.ConnectionString c

/// Manages establishing a CosmosClient, which is used by CosmosStoreClient to read from the underlying Cosmos DB Container.
type CosmosStoreConnector
Expand All @@ -1191,32 +1166,37 @@ type CosmosStoreConnector
// Connection mode (default: ConnectionMode.Direct (best performance, same as Microsoft.Azure.Cosmos SDK default)
// NOTE: default for Equinox.Cosmos.Connector (i.e. V2) was Gateway (worst performance, least trouble, Microsoft.Azure.DocumentDb SDK default)
[<O; D null>] ?mode: ConnectionMode,
// Connection limit for Gateway Mode. CosmosDB default: 50
[<O; D null>] ?gatewayModeMaxConnectionLimit,
// consistency mode (default: ConsistencyLevel.Session)
// consistency mode (default: use configuration specified for Database)
[<O; D null>] ?defaultConsistencyLevel: ConsistencyLevel,
// Inhibits certificate verification when set to <c>true</c>, i.e. for working with the CosmosDB Emulator (default <c>false</c>)
[<O; D null>] ?bypassCertificateValidation: bool) =

// Inhibits certificate verification when set to `true`. Default: false.
[<O; D null>] ?bypassCertificateValidation: bool,
[<O; D null>] ?customize: Action<CosmosClientOptions>) =
let discoveryMode = discovery.ToDiscoveryMode()
let factory =
CosmosClientFactory
( requestTimeout, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests, ?mode = mode,
?gatewayModeMaxConnectionLimit = gatewayModeMaxConnectionLimit, ?defaultConsistencyLevel = defaultConsistencyLevel,
?bypassCertificateValidation = bypassCertificateValidation)
let o = CosmosClientFactory.CreateDefaultOptions(requestTimeout, maxRetryAttemptsOnRateLimitedRequests, maxRetryWaitTimeOnRateLimitedRequests)
mode |> Option.iter (fun x -> o.ConnectionMode <- x)
defaultConsistencyLevel |> Option.iter (fun x -> o.ConsistencyLevel <- x)
// https://github.com/Azure/azure-cosmos-dotnet-v3/blob/1ef6e399f114a0fd580272d4cdca86b9f8732cf3/Microsoft.Azure.Cosmos.Samples/Usage/HttpClientFactory/Program.cs#L96
if defaultArg bypassCertificateValidation false then
let cb = System.Net.Http.HttpClientHandler.DangerousAcceptAnyServerCertificateValidator
let ch = new System.Net.Http.HttpClientHandler(ServerCertificateCustomValidationCallback = cb)
o.HttpClientFactory <- fun () -> new System.Net.Http.HttpClient(ch)
customize |> Option.iter (fun c -> c.Invoke o)
CosmosClientFactory o

/// The <c>CosmosClientOptions</c> used when connecting to CosmosDB
member _.Options = factory.Options

/// The Endpoint Uri for the target CosmosDB
member _.Endpoint = discovery.Endpoint
member val Endpoint = discoveryMode.Endpoint |> Uri

/// Creates an instance of CosmosClient without actually validating or establishing the connection
/// It's recommended to use <c>Connect</c> and/or <c>CreateAndInitialize</c> in preference to this API
/// in order to avoid latency spikes, and/or deferring discovery of connectivity or permission issues.
member _.CreateUninitialized() = factory.CreateUninitialized(discovery)
member _.CreateUninitialized() = factory.CreateUninitialized(discoveryMode)

/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member _.CreateAndInitializeAsync(containers, ct): Task<CosmosClient> = factory.CreateAndInitializeAsync(discovery, containers, ct)
member _.CreateAndInitializeAsync(containers, ct): Task<CosmosClient> = factory.CreateAndInitializeAsync(discoveryMode, containers, ct)
/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.CreateAndInitialize(databaseAndContainerIds: struct (string * string)[]) =
Async.call (fun ct -> x.CreateAndInitializeAsync(databaseAndContainerIds, ct))
Expand All @@ -1226,7 +1206,7 @@ type CosmosStoreConnector

/// Creates and validates a CosmosStoreClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member _.ConnectAsync(containers, ct): Task<CosmosStoreClient> = task {
let! cosmosClient = factory.CreateAndInitializeAsync(discovery, containers, ct)
let! cosmosClient = factory.CreateAndInitializeAsync(discoveryMode, containers, ct)
return CosmosStoreClient(cosmosClient) }
/// Creates and validates a CosmosStoreClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.Connect(databaseAndContainerIds: struct (string * string)[]) =
Expand Down
8 changes: 3 additions & 5 deletions src/Equinox.EventStoreDb/EventStoreDb.fs
Original file line number Diff line number Diff line change
Expand Up @@ -449,23 +449,21 @@ type ConnectionStrategy =
type EventStoreConnector
( reqTimeout: TimeSpan,
[<O; D null>] ?readRetryPolicy, [<O; D null>] ?writeRetryPolicy, [<O; D null>] ?tags,
[<O; D null>] ?customize: EventStoreClientSettings -> unit) =
[<O; D null>] ?customize: Action<EventStoreClientSettings>) =

member _.Connect
( // Name should be sufficient to uniquely identify this connection within a single app instance's logs
name, discovery: Discovery, ?clusterNodePreference): EventStoreClient =
let settings =
match discovery with
| Discovery.ConnectionString s -> EventStoreClientSettings.Create(s)
if name = null then nullArg "name"
let name = String.concat ";" <| seq {
name
string clusterNodePreference
match tags with None -> () | Some tags -> for key, value in tags do sprintf "%s=%s" key value }
let sanitizedName = name.Replace('\'','_').Replace(':','_') // ES internally uses `:` and `'` as separators in log messages and ... people regex logs
let settings = discovery |> function Discovery.ConnectionString s -> EventStoreClientSettings.Create s
settings.ConnectionName <- sanitizedName
match clusterNodePreference with None -> () | Some np -> settings.ConnectivitySettings.NodePreference <- np
match customize with None -> () | Some f -> f settings
match customize with None -> () | Some f -> f.Invoke settings
settings.DefaultDeadline <- reqTimeout
// TODO implement reqRetries
new EventStoreClient(settings)
Expand Down
4 changes: 2 additions & 2 deletions tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ let private databaseId = tryRead "EQUINOX_COSMOS_DATABASE" |> Option.defaultValu
let private containerId = tryRead "EQUINOX_COSMOS_CONTAINER" |> Option.defaultValue "equinox-test"
let private archiveContainerId = tryRead "EQUINOX_COSMOS_CONTAINER_ARCHIVE" |> Option.defaultValue "equinox-test-archive"

// see https://github.com/jet/equinox-provisioning-cosmosdb for details of what's expected in terms of provisioned containers etc
// see https://github.com/jet/equinox#provisioning-cosmosdb for details of what's expected in terms of provisioned containers etc
let discoverConnection () =
match tryRead "EQUINOX_COSMOS_CONNECTION" with
| None -> "localDocDbSim", Discovery.AccountUriAndKey(Uri "https://localhost:8081", "C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==")
Expand All @@ -90,7 +90,7 @@ let createConnector (log: Serilog.ILogger) =
let name, discovery = discoverConnection ()
let connector = CosmosStoreConnector(discovery, requestTimeout = TimeSpan.FromSeconds 3.,
maxRetryAttemptsOnRateLimitedRequests = 2, maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromMinutes 1.)
log.Information("CosmosStore {name} {endpoint}", name, discovery.Endpoint)
log.Information("CosmosStore {name} {endpoint}", name, connector.Endpoint)
connector

[<Xunit.CollectionDefinition "DocStore">]
Expand Down
Loading