-
Notifications
You must be signed in to change notification settings - Fork 31
Granular sync rule parsing #432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
🦋 Changeset detectedLatest commit: 8602a75 The changes in this PR will be included in the next version bump. This PR includes changesets to release 18 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
@simolus3 It would be good if you could specifically check the changes to sync streams |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm starting to understand the concept of this, and I think this is a helpful refactoring. I'll start with some very high level questions and comments, and I'll take a look at the sync streams port of this afterwards.
First, a conceptual question: As discussed offline, the idea is that BucketDataSourceDefinition and BucketParameterLookupSourceDefinitions are hoisted into a top-level structure which allows diffing them easily. Assuming that we can actually diff them, am I right to assume that the overal flow once this feature is complete would be something like this:
- State A: We have two streams (say
SELECT * FROM aandSELECT * FROM b WHERE foo).- In the hydration state,
getBucketSourceState(a) == 1#aandgetBucketSourceState(b) == 1#b.
- In the hydration state,
- State B: We have two streams (say
SELECT * FROM aandSELECT * FROM b WHERE NOT foo).- We realize that the
BucketDataSourceDefinitionfor stream a is identical, and doesn't need reprocessing. - This is reflected by
getBucketSourceState(a) == 1#awhilegetBucketSourceState(b) == 2#b. - Because the querier looks up this prefix from the hydrated state, it finds the right buckets for both streams.
- We realize that the
Is that correct?
To scaffold an API of diffing these definitions (even though a proper implementation will be another hard problem), I wonder if we should start with a trivial diff that always considers two definitions to be incompatible if they're not the same JS object. Maybe the API for that would look like this?
interface BucketDataSourceDefinition {
// Whether this definition takes the same input rows and is guaranteed to generate the
// exact same evaluated row for an input row as the other definition
isIdenticalTo(other: BucketDataSourceDefinition): boolean {
// our only implementation for now:
return other === this;
}
// or maybe we instead want to expose compatibility like this?
identityHashCode(): string {
// our onlny implementation for now: a random uuid generated on instance creation
}
// ...
}Having this in place would be a reminder that we need to eventually implement this. Since it's partial equality, it would also allow us to do it piece-by-piece: Maybe we eventually add an implementation comparing input texts and later move to a more accurate semantic AST diff.
Either way, fixing the interface that determines this is a necessary first step, and it would let us think about how to react to this (maybe SqlSyncRules can already remove duplicates on creation).
Yes, that's correct. But it's about more than just diffing: It's about knowing when to re-use the existing persisted data for a source versus removing it, and being able to combine multiple SyncRules into one replication stream, without duplicating work when there is overlap.
I don't think we gain much by having the scaffolding if the definitions will never be considered equal. I also think we can get by with a pretty simple equality check to get started with: Check if the source SQL matches. It isn't quite optimal: a change to which request parameter is used in a query should not need to affect the storage at all, but we can implement more granular checks for that later. Just comparing the SQL should already help to cover changes where you're only adding one new steam, for example. Either way, there is still a lot of work to do on the storage and replication layer before we can actually do incremental reprocessing. This PR was already getting big, so I thought it's better to delay that implementation for the next PR. |
dab9c47 to
1679f4a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sync-rules refactoring and integration into the service looks great to me, I only have a few minor comments.
We can re-add it if we actually need it later.
This is an internal factoring in the sync-rules package, and should have no visible impact on the service on its own. This makes no change to storage format or functionality, but does bring us closer to being able to implement Incremental reprocessing.
This changes the sync rule structure to split:
SqlSyncRulesclass is the sync rules "definition", as parsed from the sync rules yaml file.HydratedSyncRulesclass is used to represent the combination of the parsed sync rules, with any relevant state in bucket storage. In a way, this is similar to howVersionedSyncRuleswas used before. This now uses HydrationState as a more flexible representation of the old BucketIdTransformer.Right now, the only relevant state is the sync rules "version", used in bucket names if
versioned_bucket_idsis enabled. This avoids the need to pass in thebucketIdTransformerin specific functions, instead keeping it as state inHydratedSyncRules.The second big change is to split out some functionality from BucketSource:
BucketDataSource: Represents data queries, more specifically the transform of replicated data -> bucket data.ParameterIndexLookupCreator: Represents table index lookups for parameter queries, more specifically the transform of replicated data -> parameter lookup data.The
BucketSourceitself still exists, handling stream subscriptions and queries.Now, along with the split, the
BucketDataSources andParameterIndexLookupCreators are also pulled up into a flat structure in the top-level. What this means is that when a new version of sync rules is deployed, each of theBucketDataSourceandParameterIndexLookupCreatorinstances can be compared with the ones in the old version of sync rules. If there is overlap, we can re-use the existing data, instead of re-replicating that from scratch. That is the core of the Incremental reprocessing project.Some specifics in how data is persisted and linked for each:
BucketDataSourcerepresents the combination of data queries in a bucket definition. If any of those queries changes, we need a new set of buckets. Each uniqueBucketDataSourceneeds a unique bucket name prefix (the part before the bucket parameters).ParameterIndexLookupCreatorrepresents a single parameter query that references a source table.a. Each unique
ParameterIndexLookupCreatorneeds an unique identifier in thebucket_parameterslookup table, but it does not have to correlate with the bucket name.b.
ParameterIndexLookupCreatordoes not need any reference to the bucket name prefix. The sameParameterIndexLookupCreatorcan be re-used in different bucket definitions in different sync rules versions, and even in different bucket definitions in the same sync rules version.c. In theory, if the query is the same, the same
ParameterIndexLookupCreatorcan be used across multiple bucket definitions.BucketParameterQuerierSource(internal concept) does not have any persisted data.a. At runtime, the hydrated
BucketParameterQuerierSourceneeds a reference to theParameterIndexLookupCreator(s) to evaluate table lookups (if relevant). In theory, this can be different in different sync rules version.b. At runtime, the hydrated
BucketParameterQuerierSourceneeds a reference to the bucket name prefixes for the relatedBucketDataSource. This can be different in different sync rules version that use the sameBucketParameterQuerierSource.Note that even though we can now split out a sync rules definition into these different parts, the plan is not to persist the definitions for these parts separately. We would however store mappings of definition -> bucket name prefix or parameter lookup source name, so that these could be re-used across sync rules versions.
Further work required in this PR:
Implement logic for comparing source entities, and for merging entities from multiple sync rules into one.- Later PRBucket definitions
erDiagram BucketSource ||--|| BucketDefinitionDataSource : dataSource BucketSource ||--o{ SqlParameterQuery : parameterLookupSources BucketSource ||--o{ SqlParameterQuery : parameterQuerierSources BucketSource ||--o{ StaticSqlParameterQuery : parameterQuerierSources SqlParameterQuery }o--|| BucketDefinitionDataSource : querierDataSource StaticSqlParameterQuery }o--|| BucketDefinitionDataSource : querierDataSource BucketDefinitionDataSource ||--o{ SqlDataQuery : contains BucketSource["SqlBucketDescriptor (BucketSource)"] { string name SYNC_RULE type true subscribedToByDefault } BucketDefinitionDataSource["BucketDefinitionDataSource (BucketDefinitionDataSource)"] { string unqiueName "BucketSource.name" string[] bucketParameters "debug use only" } SqlDataQuery { TablePattern table string sql } SqlParameterQuery["SqlParameterQuery (ParameterIndexLookupCreator)"] { TablePattern sourceTable string sql ParameterLookupScope defaultLookupScope } StaticSqlParameterQuery["StaticSqlParameterQuery"] { string sql }Sync streams
Each sync stream can have multiple variants, where each variant has a specific bucket parameter "shape" (more details here that I'm ignoring for now).
Each variant can have multiple subqueries, which each references a single source table.
A variant can also use request parameters directly.
The ParameterIndexLookupCreator is tied to a specific subquery, so that it's one per source table. In code there may be multiple per subquery, but in reality it would only be one per subquery at the moment.
The BucketDataSource is tied to a variant - there is effectively one set of buckets for each variant.
The diagram here is slightly simplified from reality.
erDiagram BucketSource ||--o{ StreamVariant : variants StreamVariant ||--|| SyncStreamDataSource : dataSources StreamVariant ||--|| SyncStreamParameterQuerierSource : parameterQuerierSources SyncStreamParameterQuerierSource }o--|| SyncStreamDataSource : querierDataSource StreamVariant ||--o{ SubqueryEvaluator : subqueries SubqueryEvaluator ||--o{ SubqueryParameterLookupSource : lookupSources SyncStreamParameterQuerierSource ||--o{ SubqueryParameterLookupSource : "implicit lookup relationship via subqueries" BucketSource["SyncStream (BucketSource)"] { string name SYNC_STREAM type boolean subscribedToByDefault } SyncStreamDataSource["SyncStreamDataSource (BucketDataSource)"] { TablePattern table string uniqueName "BucketSource.name|variant.id" } StreamVariant { } SubqueryEvaluator { TablePattern table } SyncStreamParameterQuerierSource["createParameterQuerierSource (internal)"] { } SubqueryParameterLookupSource["SubqueryParameterLookupSource"] { TablePattern parameterTable ParameterLookupScope defaultLookupScope }