Skip to content

Commit ddcb8a0

Browse files
committed
Use separate sources per sync stream variant.
1 parent ec5851a commit ddcb8a0

File tree

12 files changed

+142
-115
lines changed

12 files changed

+142
-115
lines changed

packages/sync-rules/src/BaseSqlDataQuery.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ export class BaseSqlDataQuery {
182182

183183
const tables = { [this.table.nameInSchema]: this.addSpecialParameters(table, row) };
184184
const resolvedBucketIds = bucketIds(tables);
185+
if (resolvedBucketIds.length == 0) {
186+
// Short-circuit: No need to transform the row if there are no matching buckets.
187+
return [];
188+
}
185189

186190
const data = this.transformRow(tables);
187191
let id = data.id;

packages/sync-rules/src/BucketSource.ts

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ export interface CreateSourceParams {
1616
bucketIdTransformer: BucketIdTransformer;
1717
}
1818

19+
/**
20+
* A BucketSource is a _logical_ bucket or sync stream definition. It is primarily used to group together
21+
* related BucketDataSource, BucketParameterLookupSource and BucketParameterQuerierSource definitions,
22+
* for the purpose of subscribing to specific streams. It does not directly define the implementation
23+
* or replication process.
24+
*/
1925
export interface BucketSource {
2026
readonly name: string;
2127
readonly type: BucketSourceType;
@@ -116,8 +122,6 @@ export interface BucketParameterQuerierSourceDefinition {
116122
* definitions that only consist of a single query.
117123
*/
118124
export interface BucketDataSource {
119-
readonly definition: BucketDataSourceDefinition;
120-
121125
/**
122126
* Given a row as it appears in a table that affects sync data, return buckets, logical table names and transformed
123127
* data for rows to add to buckets.
@@ -126,7 +130,6 @@ export interface BucketDataSource {
126130
}
127131

128132
export interface BucketParameterLookupSource {
129-
readonly definition: BucketParameterLookupSourceDefinition;
130133
/**
131134
* Given a row in a source table that affects sync parameters, returns a structure to index which buckets rows should
132135
* be associated with.
@@ -138,8 +141,6 @@ export interface BucketParameterLookupSource {
138141
}
139142

140143
export interface BucketParameterQuerierSource {
141-
readonly definition: BucketParameterQuerierSourceDefinition;
142-
143144
/**
144145
* Reports {@link BucketParameterQuerier}s resolving buckets that a specific stream request should have access to.
145146
*
@@ -149,9 +150,70 @@ export interface BucketParameterQuerierSource {
149150
pushBucketParameterQueriers(result: PendingQueriers, options: GetQuerierOptions): void;
150151
}
151152

153+
export interface DebugMergedSource
154+
extends BucketDataSource,
155+
BucketParameterLookupSource,
156+
BucketParameterQuerierSource {}
157+
152158
export enum BucketSourceType {
153159
SYNC_RULE,
154160
SYNC_STREAM
155161
}
156162

157163
export type ResultSetDescription = { name: string; columns: ColumnDefinition[] };
164+
165+
export function mergeDataSources(sources: BucketDataSource[]): BucketDataSource {
166+
return {
167+
evaluateRow(options: EvaluateRowOptions): EvaluationResult[] {
168+
let results: EvaluationResult[] = [];
169+
for (let source of sources) {
170+
results.push(...source.evaluateRow(options));
171+
}
172+
return results;
173+
}
174+
};
175+
}
176+
177+
export function mergeParameterLookupSources(sources: BucketParameterLookupSource[]): BucketParameterLookupSource {
178+
return {
179+
evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[] {
180+
let results: EvaluatedParametersResult[] = [];
181+
for (let source of sources) {
182+
results.push(...source.evaluateParameterRow(sourceTable, row));
183+
}
184+
return results;
185+
}
186+
};
187+
}
188+
189+
export function mergeParameterQuerierSources(sources: BucketParameterQuerierSource[]): BucketParameterQuerierSource {
190+
return {
191+
pushBucketParameterQueriers(result: PendingQueriers, options: GetQuerierOptions): void {
192+
for (let source of sources) {
193+
source.pushBucketParameterQueriers(result, options);
194+
}
195+
}
196+
};
197+
}
198+
199+
/**
200+
* For production purposes, we typically need to operate on the different sources separately. However, for debugging,
201+
* it is useful to have a single merged source that can evaluate everything.
202+
*/
203+
export function debugHydratedMergedSource(bucketSource: BucketSource, params?: CreateSourceParams): DebugMergedSource {
204+
const resolvedParams = params ?? { bucketIdTransformer: (id: string) => id };
205+
const dataSource = mergeDataSources(
206+
bucketSource.dataSources.map((source) => source.createDataSource(resolvedParams))
207+
);
208+
const parameterLookupSource = mergeParameterLookupSources(
209+
bucketSource.parameterLookupSources.map((source) => source.createParameterLookupSource(resolvedParams))
210+
);
211+
const parameterQuerierSource = mergeParameterQuerierSources(
212+
bucketSource.parameterQuerierSources.map((source) => source.createParameterQuerierSource(resolvedParams))
213+
);
214+
return {
215+
evaluateParameterRow: parameterLookupSource.evaluateParameterRow.bind(parameterLookupSource),
216+
evaluateRow: dataSource.evaluateRow.bind(dataSource),
217+
pushBucketParameterQueriers: parameterQuerierSource.pushBucketParameterQueriers.bind(parameterQuerierSource)
218+
};
219+
}

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ import {
33
BucketDataSourceDefinition,
44
BucketSource,
55
BucketSourceType,
6-
CreateSourceParams
6+
CreateSourceParams,
7+
DebugMergedSource,
8+
mergeParameterLookupSources,
9+
mergeParameterQuerierSources
710
} from './BucketSource.js';
811
import { ColumnDefinition } from './ExpressionType.js';
912
import { IdSequence } from './IdSequence.js';
@@ -106,7 +109,6 @@ export class SqlBucketDescriptor implements BucketDataSourceDefinition, BucketSo
106109

107110
createDataSource(params: CreateSourceParams): BucketDataSource {
108111
return {
109-
definition: this,
110112
evaluateRow: (options) => {
111113
let results: EvaluationResult[] = [];
112114
for (let query of this.dataQueries) {

packages/sync-rules/src/SqlParameterQuery.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,6 @@ export class SqlParameterQuery
326326

327327
createParameterQuerierSource(params: CreateSourceParams): BucketParameterQuerierSource {
328328
return {
329-
definition: this,
330-
331329
pushBucketParameterQueriers: (result: PendingQueriers, options: GetQuerierOptions) => {
332330
const q = this.getBucketParameterQuerier(options.globalParameters, ['default'], params.bucketIdTransformer);
333331
result.queriers.push(q);
@@ -337,8 +335,6 @@ export class SqlParameterQuery
337335

338336
createParameterLookupSource(params: CreateSourceParams): BucketParameterLookupSource {
339337
return {
340-
definition: this,
341-
342338
evaluateParameterRow: (sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[] => {
343339
if (this.tableSyncsParameters(sourceTable)) {
344340
return this.evaluateParameterRow(row);

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,10 +399,8 @@ export class SqlSyncRules {
399399
: (id: string) => id;
400400
return new HydratedSyncRules({
401401
definition: this,
402+
createParams: { bucketIdTransformer },
402403
bucketDataSources: this.bucketDataSources.map((d) => d.createDataSource({ bucketIdTransformer })),
403-
bucketParameterQuerierSources: this.bucketParameterQuerierSources.map((d) =>
404-
d.createParameterQuerierSource({ bucketIdTransformer })
405-
),
406404
bucketParameterLookupSources: this.bucketParameterLookupSources.map((d) =>
407405
d.createParameterLookupSource({ bucketIdTransformer })
408406
),

packages/sync-rules/src/StaticSqlParameterQuery.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,6 @@ export class StaticSqlParameterQuery implements BucketParameterQuerierSourceDefi
180180

181181
createParameterQuerierSource(params: CreateSourceParams): BucketParameterQuerierSource {
182182
return {
183-
definition: this,
184-
185183
pushBucketParameterQueriers: (result: PendingQueriers, options: GetQuerierOptions) => {
186184
const staticBuckets = this.getStaticBucketDescriptions(
187185
options.globalParameters,

packages/sync-rules/src/SyncRules.ts

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
BucketParameterLookupSource,
44
BucketParameterQuerierSource,
55
BucketParameterQuerierSourceDefinition,
6+
CreateSourceParams,
67
HydratedBucketSource
78
} from './BucketSource.js';
89
import {
@@ -33,7 +34,6 @@ import { EvaluatedParametersResult, EvaluateRowOptions, EvaluationResult, Sqlite
3334
export class HydratedSyncRules {
3435
bucketSources: HydratedBucketSource[] = [];
3536
bucketDataSources: BucketDataSource[];
36-
bucketParameterQuerierSources: BucketParameterQuerierSource[];
3737
bucketParameterLookupSources: BucketParameterLookupSource[];
3838

3939
eventDescriptors: SqlEventDescriptor[] = [];
@@ -43,14 +43,13 @@ export class HydratedSyncRules {
4343

4444
constructor(params: {
4545
definition: SqlSyncRules;
46+
createParams: CreateSourceParams;
4647
bucketDataSources: BucketDataSource[];
47-
bucketParameterQuerierSources: BucketParameterQuerierSource[];
4848
bucketParameterLookupSources: BucketParameterLookupSource[];
4949
eventDescriptors?: SqlEventDescriptor[];
5050
compatibility?: CompatibilityContext;
5151
}) {
5252
this.bucketDataSources = params.bucketDataSources;
53-
this.bucketParameterQuerierSources = params.bucketParameterQuerierSources;
5453
this.bucketParameterLookupSources = params.bucketParameterLookupSources;
5554
this.definition = params.definition;
5655

@@ -61,21 +60,13 @@ export class HydratedSyncRules {
6160
this.compatibility = params.compatibility;
6261
}
6362

64-
let querierMap = new Map<BucketParameterQuerierSourceDefinition, HydratedBucketSource>();
6563
for (let definition of this.definition.bucketSources) {
6664
const hydratedBucketSource: HydratedBucketSource = { definition: definition, parameterQuerierSources: [] };
6765
this.bucketSources.push(hydratedBucketSource);
6866
for (let querier of definition.parameterQuerierSources) {
69-
querierMap.set(querier, hydratedBucketSource);
67+
hydratedBucketSource.parameterQuerierSources.push(querier.createParameterQuerierSource(params.createParams));
7068
}
7169
}
72-
for (let querier of params.bucketParameterQuerierSources) {
73-
const bucketSource = querierMap.get(querier.definition);
74-
if (bucketSource == null) {
75-
throw new Error('Cannot find BucketSource for BucketParameterQuerierSource');
76-
}
77-
bucketSource.parameterQuerierSources.push(querier);
78-
}
7970
}
8071

8172
// These methods do not depend on hydration, so we can just forward them to the definition.

packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,6 @@ export class TableValuedFunctionSqlParameterQuery implements BucketParameterQuer
225225

226226
createParameterQuerierSource(params: CreateSourceParams): BucketParameterQuerierSource {
227227
return {
228-
definition: this,
229-
230228
pushBucketParameterQueriers: (result: PendingQueriers, options: GetQuerierOptions) => {
231229
const staticBuckets = this.getStaticBucketDescriptions(
232230
options.globalParameters,

packages/sync-rules/src/streams/from_sql.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,13 @@ class SyncStreamCompiler {
100100
let filter = this.whereClauseToFilters(tools, query.where);
101101
filter = filter.toDisjunctiveNormalForm(tools);
102102

103+
const variants = filter.isValid(tools) ? filter.compileVariants(this.descriptorName) : [];
103104
const stream = new SyncStream(
104105
this.descriptorName,
105-
new BaseSqlDataQuery(this.compileDataQuery(tools, query, alias, sourceTable))
106+
new BaseSqlDataQuery(this.compileDataQuery(tools, query, alias, sourceTable)),
107+
variants
106108
);
107109
stream.subscribedToByDefault = this.options.auto_subscribe ?? false;
108-
if (filter.isValid(tools)) {
109-
stream.variants = filter.compileVariants(this.descriptorName);
110-
}
111110

112111
this.errors.push(...tools.errors);
113112
if (this.parameterDetector.usesStreamParameters && stream.subscribedToByDefault) {

0 commit comments

Comments
 (0)