Skip to content

Commit c1fd10d

Browse files
committed
Split out parameter lookup sources from parameter querier sources.
1 parent 3f378db commit c1fd10d

File tree

10 files changed

+157
-104
lines changed

10 files changed

+157
-104
lines changed

packages/sync-rules/src/BucketSource.ts

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,22 +44,43 @@ export interface BucketDataSourceDefinition {
4444
debugRepresentation(): any;
4545
}
4646

47-
export interface BucketParameterSourceDefinition {
47+
/**
48+
* A parameter lookup source defines how to extract parameter lookup values from parameter queries.
49+
*
50+
* This is only relevant for parameter queries that query tables.
51+
*/
52+
export interface BucketParameterLookupSourceDefinition {
4853
readonly name: string;
4954
readonly type: BucketSourceType;
50-
readonly subscribedToByDefault: boolean;
5155
/**
5256
* For debug use only.
5357
*/
5458
readonly bucketParameters: string[];
5559

5660
getSourceTables(): Set<TablePattern>;
57-
createParameterSource(params: CreateSourceParams): BucketParameterSource;
61+
createParameterLookupSource(params: CreateSourceParams): BucketParameterLookupSource;
5862

5963
/** Whether the table possibly affects the buckets resolved by this source. */
6064
tableSyncsParameters(table: SourceTableInterface): boolean;
6165
}
6266

67+
/**
68+
* Parameter querier source definitions define how to bucket parameter queries are evaluated.
69+
*
70+
* This may use request data only, or it may use parameter lookup data persisted by a BucketParameterLookupSourceDefinition.
71+
*/
72+
export interface BucketParameterQuerierSourceDefinition {
73+
readonly name: string;
74+
readonly type: BucketSourceType;
75+
readonly subscribedToByDefault: boolean;
76+
/**
77+
* For debug use only.
78+
*/
79+
readonly bucketParameters: string[];
80+
81+
createParameterQuerierSource(params: CreateSourceParams): BucketParameterQuerierSource;
82+
}
83+
6384
/**
6485
* An interface declaring
6586
*
@@ -80,18 +101,8 @@ export interface BucketDataSource {
80101
evaluateRow(options: EvaluateRowOptions): EvaluationResult[];
81102
}
82103

83-
/**
84-
* An interface declaring
85-
*
86-
* - which buckets the sync service should create when processing change streams from the database.
87-
* - how data in source tables maps to data in buckets (e.g. when we're not selecting all columns).
88-
* - which buckets a given connection has access to.
89-
*
90-
* There are two ways to define bucket sources: Via sync rules made up of parameter and data queries, and via stream
91-
* definitions that only consist of a single query.
92-
*/
93-
export interface BucketParameterSource {
94-
readonly definition: BucketParameterSourceDefinition;
104+
export interface BucketParameterLookupSource {
105+
readonly definition: BucketParameterLookupSourceDefinition;
95106
/**
96107
* Given a row in a source table that affects sync parameters, returns a structure to index which buckets rows should
97108
* be associated with.
@@ -100,6 +111,10 @@ export interface BucketParameterSource {
100111
* system to find buckets.
101112
*/
102113
evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[];
114+
}
115+
116+
export interface BucketParameterQuerierSource {
117+
readonly definition: BucketParameterQuerierSourceDefinition;
103118

104119
/**
105120
* Reports {@link BucketParameterQuerier}s resolving buckets that a specific stream request should have access to.

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import {
22
BucketDataSource,
33
BucketDataSourceDefinition,
4-
BucketParameterSourceDefinition,
4+
BucketParameterLookupSourceDefinition,
5+
BucketParameterQuerierSourceDefinition,
56
BucketSourceType,
67
CreateSourceParams
78
} from './BucketSource.js';
@@ -111,10 +112,14 @@ export class SqlBucketDescriptor implements BucketDataSourceDefinition {
111112
};
112113
}
113114

114-
getParameterSourceDefinitions(): BucketParameterSourceDefinition[] {
115+
getParameterQuerierSourceDefinitions(): BucketParameterQuerierSourceDefinition[] {
115116
return [...this.parameterQueries, ...this.globalParameterQueries];
116117
}
117118

119+
getParameterLookupSourceDefinitions(): BucketParameterLookupSourceDefinition[] {
120+
return [...this.parameterQueries];
121+
}
122+
118123
getSourceTables(): Set<TablePattern> {
119124
let result = new Set<TablePattern>();
120125
for (let query of this.parameterQueries) {

packages/sync-rules/src/SqlParameterQuery.ts

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@ import {
3838
import { filterJsonRow, getBucketId, isJsonValue, isSelectStatement, normalizeParameterValue } from './utils.js';
3939
import { DetectRequestParameters } from './validators.js';
4040
import {
41-
BucketParameterSource,
42-
BucketParameterSourceDefinition,
41+
BucketParameterLookupSource,
42+
BucketParameterLookupSourceDefinition,
43+
BucketParameterQuerierSource,
44+
BucketParameterQuerierSourceDefinition,
4345
BucketSourceType,
4446
CreateSourceParams
4547
} from './BucketSource.js';
@@ -68,7 +70,9 @@ export interface SqlParameterQueryOptions {
6870
* SELECT id as user_id FROM users WHERE users.user_id = token_parameters.user_id
6971
* SELECT id as user_id, token_parameters.is_admin as is_admin FROM users WHERE users.user_id = token_parameters.user_id
7072
*/
71-
export class SqlParameterQuery implements BucketParameterSourceDefinition {
73+
export class SqlParameterQuery
74+
implements BucketParameterLookupSourceDefinition, BucketParameterQuerierSourceDefinition
75+
{
7276
static fromSql(
7377
descriptorName: string,
7478
sql: string,
@@ -330,7 +334,18 @@ export class SqlParameterQuery implements BucketParameterSourceDefinition {
330334
return new Set([this.sourceTable]);
331335
}
332336

333-
createParameterSource(params: CreateSourceParams): BucketParameterSource {
337+
createParameterQuerierSource(params: CreateSourceParams): BucketParameterQuerierSource {
338+
return {
339+
definition: this,
340+
341+
pushBucketParameterQueriers: (result: PendingQueriers, options: GetQuerierOptions) => {
342+
const q = this.getBucketParameterQuerier(options.globalParameters, ['default'], params.bucketIdTransformer);
343+
result.queriers.push(q);
344+
}
345+
};
346+
}
347+
348+
createParameterLookupSource(params: CreateSourceParams): BucketParameterLookupSource {
334349
return {
335350
definition: this,
336351

@@ -340,10 +355,6 @@ export class SqlParameterQuery implements BucketParameterSourceDefinition {
340355
} else {
341356
return [];
342357
}
343-
},
344-
pushBucketParameterQueriers: (result: PendingQueriers, options: GetQuerierOptions) => {
345-
const q = this.getBucketParameterQuerier(options.globalParameters, ['default'], params.bucketIdTransformer);
346-
result.queriers.push(q);
347358
}
348359
};
349360
}

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import { isScalar, LineCounter, parseDocument, Scalar, YAMLMap, YAMLSeq } from 'yaml';
22
import { isValidPriority } from './BucketDescription.js';
33
import { BucketParameterQuerier, QuerierError } from './BucketParameterQuerier.js';
4-
import { BucketDataSourceDefinition, BucketParameterSourceDefinition } from './BucketSource.js';
4+
import {
5+
BucketDataSourceDefinition,
6+
BucketParameterLookupSourceDefinition,
7+
BucketParameterQuerierSourceDefinition
8+
} from './BucketSource.js';
59
import { CompatibilityContext, CompatibilityEdition, CompatibilityOption } from './compatibility.js';
610
import { SqlRuleError, SyncRulesErrors, YamlError } from './errors.js';
711
import { SqlEventDescriptor } from './events/SqlEventDescriptor.js';
@@ -79,7 +83,8 @@ export interface GetBucketParameterQuerierResult {
7983

8084
export class SqlSyncRules {
8185
bucketDataSources: BucketDataSourceDefinition[] = [];
82-
bucketParameterSources: BucketParameterSourceDefinition[] = [];
86+
bucketParameterLookupSources: BucketParameterLookupSourceDefinition[] = [];
87+
bucketParameterQuerierSources: BucketParameterQuerierSourceDefinition[] = [];
8388

8489
eventDescriptors: SqlEventDescriptor[] = [];
8590
compatibility: CompatibilityContext = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY;
@@ -232,7 +237,8 @@ export class SqlSyncRules {
232237
});
233238
}
234239
rules.bucketDataSources.push(descriptor);
235-
rules.bucketParameterSources.push(...descriptor.getParameterSourceDefinitions());
240+
rules.bucketParameterLookupSources.push(...descriptor.getParameterLookupSourceDefinitions());
241+
rules.bucketParameterQuerierSources.push(...descriptor.getParameterQuerierSourceDefinitions());
236242
}
237243

238244
for (const entry of streamMap?.items ?? []) {
@@ -258,7 +264,8 @@ export class SqlSyncRules {
258264
rules.withScalar(data, (q) => {
259265
const [parsed, errors] = syncStreamFromSql(key, q, queryOptions);
260266
rules.bucketDataSources.push(parsed);
261-
rules.bucketParameterSources.push(parsed);
267+
rules.bucketParameterLookupSources.push(parsed);
268+
rules.bucketParameterQuerierSources.push(parsed);
262269
return {
263270
parsed: true,
264271
errors
@@ -388,7 +395,12 @@ export class SqlSyncRules {
388395
return new HydratedSyncRules({
389396
definition: this,
390397
bucketDataSources: this.bucketDataSources.map((d) => d.createDataSource({ bucketIdTransformer })),
391-
bucketParameterSources: this.bucketParameterSources.map((d) => d.createParameterSource({ bucketIdTransformer })),
398+
bucketParameterQuerierSources: this.bucketParameterQuerierSources.map((d) =>
399+
d.createParameterQuerierSource({ bucketIdTransformer })
400+
),
401+
bucketParameterLookupSources: this.bucketParameterLookupSources.map((d) =>
402+
d.createParameterLookupSource({ bucketIdTransformer })
403+
),
392404
eventDescriptors: this.eventDescriptors,
393405
compatibility: this.compatibility
394406
});
@@ -408,7 +420,7 @@ export class SqlSyncRules {
408420
sourceTables.set(key, r);
409421
}
410422
}
411-
for (const bucket of this.bucketParameterSources) {
423+
for (const bucket of this.bucketParameterLookupSources) {
412424
for (const r of bucket.getSourceTables()) {
413425
const key = `${r.connectionTag}.${r.schema}.${r.tablePattern}`;
414426
sourceTables.set(key, r);
@@ -446,7 +458,7 @@ export class SqlSyncRules {
446458
}
447459

448460
tableSyncsParameters(table: SourceTableInterface): boolean {
449-
return this.bucketParameterSources.some((b) => b.tableSyncsParameters(table));
461+
return this.bucketParameterLookupSources.some((b) => b.tableSyncsParameters(table));
450462
}
451463

452464
debugGetOutputTables() {

packages/sync-rules/src/StaticSqlParameterQuery.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ import {
1515
import { getBucketId, isJsonValue } from './utils.js';
1616
import { DetectRequestParameters } from './validators.js';
1717
import {
18-
BucketParameterSource,
19-
BucketParameterSourceDefinition,
18+
BucketParameterLookupSource,
19+
BucketParameterQuerierSource,
20+
BucketParameterQuerierSourceDefinition,
2021
BucketSourceType,
2122
CreateSourceParams
2223
} from './BucketSource.js';
@@ -42,7 +43,7 @@ export interface StaticSqlParameterQueryOptions {
4243
* SELECT token_parameters.user_id
4344
* SELECT token_parameters.user_id as user_id WHERE token_parameters.is_admin
4445
*/
45-
export class StaticSqlParameterQuery implements BucketParameterSourceDefinition {
46+
export class StaticSqlParameterQuery implements BucketParameterQuerierSourceDefinition {
4647
static fromSql(
4748
descriptorName: string,
4849
sql: string,
@@ -188,14 +189,10 @@ export class StaticSqlParameterQuery implements BucketParameterSourceDefinition
188189
return false;
189190
}
190191

191-
createParameterSource(params: CreateSourceParams): BucketParameterSource {
192+
createParameterQuerierSource(params: CreateSourceParams): BucketParameterQuerierSource {
192193
return {
193194
definition: this,
194195

195-
evaluateParameterRow: (sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[] => {
196-
return [];
197-
},
198-
199196
pushBucketParameterQueriers: (result: PendingQueriers, options: GetQuerierOptions) => {
200197
const staticBuckets = this.getStaticBucketDescriptions(
201198
options.globalParameters,

packages/sync-rules/src/SyncRules.ts

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { BucketDataSource, BucketParameterSource } from './BucketSource.js';
1+
import { BucketDataSource, BucketParameterLookupSource, BucketParameterQuerierSource } from './BucketSource.js';
22
import {
33
BucketParameterQuerier,
44
CompatibilityContext,
@@ -25,8 +25,9 @@ import { EvaluatedParametersResult, EvaluateRowOptions, EvaluationResult, Sqlite
2525
* specifically affects bucket names.
2626
*/
2727
export class HydratedSyncRules {
28-
bucketDataSources: BucketDataSource[] = [];
29-
bucketParameterSources: BucketParameterSource[] = [];
28+
bucketDataSources: BucketDataSource[];
29+
bucketParameterQuerierSources: BucketParameterQuerierSource[];
30+
bucketParameterLookupSources: BucketParameterLookupSource[];
3031

3132
eventDescriptors: SqlEventDescriptor[] = [];
3233
compatibility: CompatibilityContext = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY;
@@ -36,12 +37,14 @@ export class HydratedSyncRules {
3637
constructor(params: {
3738
definition: SqlSyncRules;
3839
bucketDataSources: BucketDataSource[];
39-
bucketParameterSources: BucketParameterSource[];
40+
bucketParameterQuerierSources: BucketParameterQuerierSource[];
41+
bucketParameterLookupSources: BucketParameterLookupSource[];
4042
eventDescriptors?: SqlEventDescriptor[];
4143
compatibility?: CompatibilityContext;
4244
}) {
4345
this.bucketDataSources = params.bucketDataSources;
44-
this.bucketParameterSources = params.bucketParameterSources;
46+
this.bucketParameterQuerierSources = params.bucketParameterQuerierSources;
47+
this.bucketParameterLookupSources = params.bucketParameterLookupSources;
4548
this.definition = params.definition;
4649
if (params.eventDescriptors) {
4750
this.eventDescriptors = params.eventDescriptors;
@@ -114,7 +117,7 @@ export class HydratedSyncRules {
114117
row: SqliteRow
115118
): { results: EvaluatedParameters[]; errors: EvaluationError[] } {
116119
let rawResults: EvaluatedParametersResult[] = [];
117-
for (let source of this.bucketParameterSources) {
120+
for (let source of this.bucketParameterLookupSources) {
118121
rawResults.push(...source.evaluateParameterRow(table, row));
119122
}
120123

@@ -128,7 +131,7 @@ export class HydratedSyncRules {
128131
const errors: QuerierError[] = [];
129132
const pending = { queriers, errors };
130133

131-
for (const source of this.bucketParameterSources) {
134+
for (const source of this.bucketParameterQuerierSources) {
132135
if (
133136
(source.definition.subscribedToByDefault && options.hasDefaultStreams) ||
134137
source.definition.name in options.streams

packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
11
import { FromCall, SelectFromStatement } from 'pgsql-ast-parser';
2+
import { BucketDescription, BucketPriority, DEFAULT_BUCKET_PRIORITY, ResolvedBucket } from './BucketDescription.js';
3+
import {
4+
BucketParameterQuerierSource,
5+
BucketParameterQuerierSourceDefinition,
6+
BucketSourceType,
7+
CreateSourceParams
8+
} from './BucketSource.js';
29
import { SqlRuleError } from './errors.js';
10+
import { BucketParameterQuerier, GetQuerierOptions, PendingQueriers } from './index.js';
11+
import { SourceTableInterface } from './SourceTableInterface.js';
312
import { AvailableTable, SqlTools } from './sql_filters.js';
4-
import { checkUnsupportedFeatures, isClauseError, isParameterValueClause, sqliteBool } from './sql_support.js';
13+
import { checkUnsupportedFeatures, isClauseError, sqliteBool } from './sql_support.js';
14+
import { TablePattern } from './TablePattern.js';
515
import { generateTableValuedFunctions, TableValuedFunction } from './TableValuedFunctions.js';
616
import {
717
BucketIdTransformer,
8-
EvaluatedParametersResult,
918
ParameterValueClause,
1019
ParameterValueSet,
1120
QueryParseOptions,
@@ -14,17 +23,7 @@ import {
1423
SqliteRow
1524
} from './types.js';
1625
import { getBucketId, isJsonValue } from './utils.js';
17-
import { BucketDescription, BucketPriority, DEFAULT_BUCKET_PRIORITY, ResolvedBucket } from './BucketDescription.js';
1826
import { DetectRequestParameters } from './validators.js';
19-
import { TablePattern } from './TablePattern.js';
20-
import {
21-
BucketParameterSource,
22-
BucketParameterSourceDefinition,
23-
BucketSourceType,
24-
CreateSourceParams
25-
} from './BucketSource.js';
26-
import { SourceTableInterface } from './SourceTableInterface.js';
27-
import { BucketParameterQuerier, GetQuerierOptions, mergeBucketParameterQueriers, PendingQueriers } from './index.js';
2827

2928
export interface TableValuedFunctionSqlParameterQueryOptions {
3029
sql: string;
@@ -51,7 +50,7 @@ export interface TableValuedFunctionSqlParameterQueryOptions {
5150
*
5251
* This can currently not be combined with parameter table queries or multiple table-valued functions.
5352
*/
54-
export class TableValuedFunctionSqlParameterQuery implements BucketParameterSourceDefinition {
53+
export class TableValuedFunctionSqlParameterQuery implements BucketParameterQuerierSourceDefinition {
5554
static fromSql(
5655
descriptorName: string,
5756
sql: string,
@@ -232,14 +231,10 @@ export class TableValuedFunctionSqlParameterQuery implements BucketParameterSour
232231
return false;
233232
}
234233

235-
createParameterSource(params: CreateSourceParams): BucketParameterSource {
234+
createParameterQuerierSource(params: CreateSourceParams): BucketParameterQuerierSource {
236235
return {
237236
definition: this,
238237

239-
evaluateParameterRow: (sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[] => {
240-
return [];
241-
},
242-
243238
pushBucketParameterQueriers: (result: PendingQueriers, options: GetQuerierOptions) => {
244239
const staticBuckets = this.getStaticBucketDescriptions(
245240
options.globalParameters,

0 commit comments

Comments
 (0)