Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e2b300b
Added store_id to validator_internal_config
pasindutennage-da Nov 17, 2025
a99ca68
Reset table
pasindutennage-da Nov 18, 2025
c66009b
Added new db update schema
pasindutennage-da Nov 18, 2025
04dc84f
Moved StoreDescriptor from DbMultiDomainAcsStore
pasindutennage-da Nov 18, 2025
fdd8e3f
Moved trates from DbMultiDomainACsStore
pasindutennage-da Nov 18, 2025
9e4a962
Moved initiatiolation logic for store descriptor
pasindutennage-da Nov 18, 2025
abc4bb3
Added stuff from previous PR
pasindutennage-da Nov 18, 2025
4f3bb0f
Added init logic for ValidatorInternalConfig
pasindutennage-da Nov 18, 2025
262ad84
Added store_id to queries
pasindutennage-da Nov 18, 2025
fe199e9
Test passing
pasindutennage-da Nov 18, 2025
cb9d121
Removed .iml
pasindutennage-da Nov 18, 2025
f318d9a
Updated .gitignore
pasindutennage-da Nov 18, 2025
dff1129
Addressed comments, added a new test
pasindutennage-da Nov 19, 2025
6cb410e
Merge branch 'main' into pasindutennage/fix-db-internal-config-fix-st…
pasindutennage-da Nov 19, 2025
d8b9163
Missing imports
pasindutennage-da Nov 19, 2025
442e085
Merge conflicts resolved
pasindutennage-da Nov 19, 2025
650ffd5
Removed DSO Party from validator internal store store descriptor
pasindutennage-da Nov 19, 2025
6fbb09f
Updated primary key of config sql
pasindutennage-da Nov 19, 2025
38f50e6
Fixed implicits
pasindutennage-da Nov 19, 2025
16b8d00
Renamed
pasindutennage-da Nov 19, 2025
3bf6e12
Code refactorted
pasindutennage-da Nov 19, 2025
b3c4038
Merge branch 'main' into pasindutennage/fix-db-internal-config-fix-st…
pasindutennage-da Nov 19, 2025
385a207
Nits
pasindutennage-da Nov 19, 2025
d59c6e7
Adding missed files from merge
pasindutennage-da Nov 19, 2025
9afa622
Nits
pasindutennage-da Nov 19, 2025
7c232d0
Nits
pasindutennage-da Nov 19, 2025
2a8e67b
Added participant Id to private config descrirptor
pasindutennage-da Nov 19, 2025
2ecb55f
Removed participant ID from descriptor
pasindutennage-da Nov 20, 2025
dd23d0b
Added back participant
pasindutennage-da Nov 20, 2025
676d69b
Added Db Init
pasindutennage-da Nov 20, 2025
790598e
Code refactorted
pasindutennage-da Nov 21, 2025
3c4b3f1
Code refactored
pasindutennage-da Nov 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.daml
*.dar
!daml/dars/*
*.iml
*.log
*.clog
.DS_Store
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
ALTER TABLE validator_internal_config
ADD COLUMN store_id integer;

ALTER TABLE validator_internal_config
DROP CONSTRAINT uc_validator_internal_config;

ALTER TABLE validator_internal_config
ALTER COLUMN store_id SET NOT NULL;

ALTER TABLE validator_internal_config
ADD CONSTRAINT uc_validator_internal_config
PRIMARY KEY (store_id, config_key);

ALTER TABLE validator_internal_config
ADD CONSTRAINT fk_store_id
FOREIGN KEY (store_id)
REFERENCES store_descriptors(id);
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ abstract class DbTxLogAppStore[TXE](
acsTableName: String,
txLogTableName: String,
interfaceViewsTableNameOpt: Option[String],
acsStoreDescriptor: DbMultiDomainAcsStore.StoreDescriptor,
txLogStoreDescriptor: DbMultiDomainAcsStore.StoreDescriptor,
acsStoreDescriptor: StoreDescriptor,
txLogStoreDescriptor: StoreDescriptor,
domainMigrationInfo: DomainMigrationInfo,
ingestionConfig: IngestionConfig,
)(implicit
Expand Down Expand Up @@ -59,7 +59,7 @@ abstract class DbAppStore(
storage: DbStorage,
acsTableName: String,
interfaceViewsTableNameOpt: Option[String],
acsStoreDescriptor: DbMultiDomainAcsStore.StoreDescriptor,
acsStoreDescriptor: StoreDescriptor,
domainMigrationInfo: DomainMigrationInfo,
ingestionConfig: IngestionConfig,
)(implicit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import com.digitalasset.canton.discard.Implicits.DiscardOps
import com.digitalasset.canton.lifecycle.{CloseContext, FutureUnlessShutdown}
import com.digitalasset.canton.logging.{NamedLoggerFactory, NamedLogging}
import com.digitalasset.canton.resource.DbStorage
import com.digitalasset.canton.topology.{ParticipantId, PartyId, SynchronizerId}
import com.digitalasset.canton.topology.SynchronizerId
import com.digitalasset.canton.tracing.TraceContext
import com.digitalasset.canton.util.ShowUtil.showPretty

Expand All @@ -58,7 +58,6 @@ import org.lfdecentralizedtrust.splice.store.db.AcsQueries.{
SelectFromAcsTableWithStateResult,
}
import org.lfdecentralizedtrust.splice.store.db.AcsTables.ContractStateRowData
import org.lfdecentralizedtrust.splice.store.db.DbMultiDomainAcsStore.StoreDescriptor
import com.daml.nonempty.NonEmpty
import com.digitalasset.canton.data.CantonTimestamp
import com.daml.metrics.api.MetricHandle.LabeledMetricsFactory
Expand Down Expand Up @@ -877,50 +876,13 @@ final class DbMultiDomainAcsStore[TXE](
override lazy val ingestionSink: IngestionSink = new MultiDomainAcsStore.IngestionSink {
override def ingestionFilter: IngestionFilter = contractFilter.ingestionFilter

private sealed trait InitializeDescriptorResult[StoreId]
private case class StoreHasData[StoreId](
storeId: StoreId,
lastIngestedOffset: Long,
) extends InitializeDescriptorResult[StoreId]
private case class StoreHasNoData[StoreId](
storeId: StoreId
) extends InitializeDescriptorResult[StoreId]
private case class StoreNotUsed[StoreId]() extends InitializeDescriptorResult[StoreId]

private[this] def initializeDescriptor(
descriptor: StoreDescriptor
)(implicit
traceContext: TraceContext
): Future[InitializeDescriptorResult[Int]] = {
// Notes:
// - Postgres JSONB does not preserve white space, does not preserve the order of object keys, and does not keep duplicate object keys
// - Postgres JSONB columns have a maximum size of 255MB
// - We are using noSpacesSortKeys to insert a canonical serialization of the JSON object, even though this is not necessary for Postgres
// - 'ON CONFLICT DO NOTHING RETURNING ...' does not return anything if the row already exists, that's why we are using two separate queries
val descriptorStr = String256M.tryCreate(descriptor.toJson.noSpacesSortKeys)
for {
_ <- storage
.update(
sql"""
insert into store_descriptors (descriptor)
values (${descriptorStr}::jsonb)
on conflict do nothing
""".asUpdate,
"initializeDescriptor.1",
)

newStoreId <- storage
.querySingle(
sql"""
select id
from store_descriptors
where descriptor = ${descriptorStr}::jsonb
""".as[Int].headOption,
"initializeDescriptor.2",
)
.getOrRaise(
new RuntimeException(s"No row for $descriptor found, which was just inserted!")
)
newStoreId <- StoreDescriptorStore.getStoreIdForDescriptor(descriptor, storage)

_ <- storage
.update(
Expand All @@ -931,6 +893,7 @@ final class DbMultiDomainAcsStore[TXE](
""".asUpdate,
"initializeDescriptor.3",
)

lastIngestedOffset <- storage
.querySingle(
sql"""
Expand Down Expand Up @@ -2304,39 +2267,6 @@ object DbMultiDomainAcsStore {
)
}

/** Identifies an instance of a store.
*
* @param version The version of the store.
* Bumping this number will cause the store to forget all previously ingested data
* and start from a clean state.
* Bump this number whenever you make breaking changes in the ingestion filter or
* TxLog parser, or if you want to reset the store after fixing a bug that lead to
* data corruption.
* @param name The name of the store, usually the simple name of the corresponding scala class.
* @param party The party that owns the store (i.e., the party that subscribes
* to the update stream that feeds the store).
* @param participant The participant that serves the update stream that feeds this store.
* @param key A set of named values that are used to filter the update stream or
* can otherwise be used to distinguish between different instances of the store.
*/
case class StoreDescriptor(
version: Int,
name: String,
party: PartyId,
participant: ParticipantId,
key: Map[String, String],
) {
def toJson: io.circe.Json = {
Json.obj(
"version" -> Json.fromInt(version),
"name" -> Json.fromString(name),
"party" -> Json.fromString(party.toProtoPrimitive),
"participant" -> Json.fromString(participant.toProtoPrimitive),
"key" -> Json.obj(key.map { case (k, v) => k -> Json.fromString(v) }.toSeq*),
)
}
}

sealed trait BatchStep
case class UpdateCheckpoint(checkpoint: TreeUpdateOrOffsetCheckpoint.Checkpoint) extends BatchStep
case class IngestReassignment(update: ReassignmentUpdate, synchronizerId: SynchronizerId)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package org.lfdecentralizedtrust.splice.store.db

import com.digitalasset.canton.config.CantonRequireTypes.String256M
import com.digitalasset.canton.lifecycle.CloseContext
import com.digitalasset.canton.resource.DbStorage
import com.digitalasset.canton.topology.{ParticipantId, PartyId}
import com.digitalasset.canton.tracing.TraceContext
import io.circe.Json
import org.lfdecentralizedtrust.splice.store.StoreErrors
import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton
import com.digitalasset.canton.lifecycle.FutureUnlessShutdown

/** Identifies an instance of a store.
*
* @param version The version of the store.
* Bumping this number will cause the store to forget all previously ingested data
* and start from a clean state.
* Bump this number whenever you make breaking changes in the ingestion filter or
* TxLog parser, or if you want to reset the store after fixing a bug that lead to
* data corruption.
* @param name The name of the store, usually the simple name of the corresponding scala class.
* @param party The party that owns the store (i.e., the party that subscribes
* to the update stream that feeds the store).
* @param participant The participant that serves the update stream that feeds this store.
* @param key A set of named values that are used to filter the update stream or
* can otherwise be used to distinguish between different instances of the store.
*/
case class StoreDescriptor(
version: Int,
name: String,
party: PartyId,
participant: ParticipantId,
key: Map[String, String],
) {
def toJson: io.circe.Json = {
Json.obj(
"version" -> Json.fromInt(version),
"name" -> Json.fromString(name),
"party" -> Json.fromString(party.toProtoPrimitive),
"participant" -> Json.fromString(participant.toProtoPrimitive),
"key" -> Json.obj(key.map { case (k, v) => k -> Json.fromString(v) }.toSeq*),
)
}
}

sealed trait InitializeDescriptorResult[StoreId]
case class StoreHasData[StoreId](
storeId: StoreId,
lastIngestedOffset: Long,
) extends InitializeDescriptorResult[StoreId]
case class StoreHasNoData[StoreId](
storeId: StoreId
) extends InitializeDescriptorResult[StoreId]
case class StoreNotUsed[StoreId]() extends InitializeDescriptorResult[StoreId]

object StoreDescriptorStore extends StoreErrors {

def getStoreIdForDescriptor(
descriptor: StoreDescriptor,
storage: DbStorage,
)(implicit
traceContext: TraceContext,
executionContext: scala.concurrent.ExecutionContext,
closeContext: CloseContext,
): FutureUnlessShutdown[Int] = {
// Notes:
// - Postgres JSONB does not preserve white space, does not preserve the order of object keys, and does not keep duplicate object keys
// - Postgres JSONB columns have a maximum size of 255MB
// - We are using noSpacesSortKeys to insert a canonical serialization of the JSON object, even though this is not necessary for Postgres
// - 'ON CONFLICT DO NOTHING RETURNING ...' does not return anything if the row already exists, that's why we are using two separate queries
val descriptorStr = String256M.tryCreate(descriptor.toJson.noSpacesSortKeys)
for {
_ <- storage
.update(
sql"""
insert into store_descriptors (descriptor)
values (${descriptorStr}::jsonb)
on conflict do nothing
""".asUpdate,
"initializeDescriptor.1",
)

newStoreId <- storage
.querySingle(
sql"""
select id
from store_descriptors
where descriptor = ${descriptorStr}::jsonb
""".as[Int].headOption,
"initializeDescriptor.2",
)
.getOrRaise(
new RuntimeException(s"No row for $descriptor found, which was just inserted!")
)

} yield newStoreId
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.lfdecentralizedtrust.splice.store.db.{
DbMultiDomainAcsStore,
IndexColumnValue,
SplicePostgresTest,
StoreDescriptor,
}
import slick.jdbc.JdbcProfile
import slick.jdbc.canton.ActionBasedSQLInterpolation.Implicits.actionBasedSQLInterpolationCanton
Expand Down Expand Up @@ -494,7 +495,7 @@ class TxLogBackfillingStoreTest
private val sync2: SynchronizerId = SynchronizerId.tryFromString("synchronizer2::synchronizer")

private def storeDescriptor(id: Int, participantId: ParticipantId) =
DbMultiDomainAcsStore.StoreDescriptor(
StoreDescriptor(
version = 1,
name = "DbMultiDomainAcsStoreTest",
party = dsoParty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ class DbMultiDomainAcsStoreTest
}

private def storeDescriptor(id: Int, participantId: ParticipantId) =
DbMultiDomainAcsStore.StoreDescriptor(
StoreDescriptor(
version = 1,
name = "DbMultiDomainAcsStoreTest",
party = dsoParty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import org.lfdecentralizedtrust.splice.scan.store.{
VoteRequestTxLogEntry,
}
import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.ContractCompanion
import org.lfdecentralizedtrust.splice.store.db.StoreDescriptor
import org.lfdecentralizedtrust.splice.store.db.{
AcsQueries,
AcsTables,
Expand Down Expand Up @@ -77,7 +78,6 @@ import io.grpc.Status
import org.lfdecentralizedtrust.splice.config.IngestionConfig
import org.lfdecentralizedtrust.splice.store.UpdateHistoryQueries.UpdateHistoryQueries
import org.lfdecentralizedtrust.splice.store.db.AcsQueries.AcsStoreId
import org.lfdecentralizedtrust.splice.store.db.DbMultiDomainAcsStore.StoreDescriptor
import org.lfdecentralizedtrust.splice.store.db.TxLogQueries.TxLogStoreId

import java.time.Instant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.lfdecentralizedtrust.splice.environment.RetryProvider
import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo
import org.lfdecentralizedtrust.splice.splitwell.config.SplitwellSynchronizerConfig
import org.lfdecentralizedtrust.splice.splitwell.store.SplitwellStore
import org.lfdecentralizedtrust.splice.store.db.DbMultiDomainAcsStore.StoreDescriptor
import org.lfdecentralizedtrust.splice.store.db.StoreDescriptor
import org.lfdecentralizedtrust.splice.store.{LimitHelpers, MultiDomainAcsStore}
import org.lfdecentralizedtrust.splice.store.db.{
AcsInterfaceViewRowData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.lfdecentralizedtrust.splice.environment.RetryProvider
import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo
import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.{ContractCompanion, QueryResult}
import org.lfdecentralizedtrust.splice.store.db.AcsQueries.{AcsStoreId, SelectFromAcsTableResult}
import org.lfdecentralizedtrust.splice.store.db.DbMultiDomainAcsStore.StoreDescriptor
import org.lfdecentralizedtrust.splice.store.db.StoreDescriptor
import org.lfdecentralizedtrust.splice.store.db.{AcsQueries, AcsTables, DbAppStore}
import org.lfdecentralizedtrust.splice.store.{
DbVotesAcsStoreQueryBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.lfdecentralizedtrust.splice.codegen.java.splice.validatoronboarding.{
import org.lfdecentralizedtrust.splice.environment.RetryProvider
import org.lfdecentralizedtrust.splice.migration.DomainMigrationInfo
import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.QueryResult
import org.lfdecentralizedtrust.splice.store.db.DbMultiDomainAcsStore.StoreDescriptor
import org.lfdecentralizedtrust.splice.store.db.StoreDescriptor
import org.lfdecentralizedtrust.splice.store.db.{AcsQueries, AcsTables, DbAppStore}
import org.lfdecentralizedtrust.splice.store.{MultiDomainAcsStore, StoreErrors}
import org.lfdecentralizedtrust.splice.sv.store.{SvStore, SvSvStore}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class ValidatorApp(
}
// Prevet early to make sure we have the required packages even
// before the automation kicks in.
_ <- appInitStep("Vet packages") {
key <- appInitStep("Vet packages") {
for {
amuletRules <- scanConnection.getAmuletRules()
globalSynchronizerId: SynchronizerId <- scanConnection.getAmuletRulesDomain()(
Expand All @@ -314,7 +314,24 @@ class ValidatorApp(
synchronizerId =>
packageVetting.vetCurrentPackages(synchronizerId, amuletRules)
}
} yield ()

participantId <- participantAdminConnection.getParticipantId()
idValidatorParty = ParticipantPartyMigrator.toPartyId(
config.validatorPartyHint
.getOrElse(
BaseLedgerConnection.sanitizeUserIdToPartyString(config.ledgerApiUser)
),
participantId,
)
dsoParty <- appInitStep("Get DSO party id") {
scanConnection.getDsoPartyIdWithRetries()
}

key = ValidatorStore.Key(
validatorParty = idValidatorParty,
dsoParty = dsoParty,
)
} yield (key)
}
_ <- (config.migrateValidatorParty, config.participantBootstrappingDump) match {
case (
Expand All @@ -326,7 +343,11 @@ class ValidatorApp(
BaseLedgerConnection.sanitizeUserIdToPartyString(config.ledgerApiUser)
)
val configProvider = new ValidatorConfigProvider(
ValidatorInternalStore(storage, loggerFactory),
ValidatorInternalStore(
key,
storage,
loggerFactory,
),
loggerFactory,
)
val participantPartyMigrator = new ParticipantPartyMigrator(
Expand Down Expand Up @@ -820,7 +841,7 @@ class ValidatorApp(
loggerFactory,
)
configProvider = new ValidatorConfigProvider(
ValidatorInternalStore(storage, loggerFactory),
ValidatorInternalStore(key, storage, loggerFactory),
loggerFactory,
)
walletManagerOpt =
Expand Down
Loading
Loading