Skip to content

Commit 60290ae

Browse files
committed
Move SubqueryParameterLookupSource deeper to be specific to variants.
1 parent 3bddef9 commit 60290ae

File tree

7 files changed

+139
-87
lines changed

7 files changed

+139
-87
lines changed

packages/sync-rules/src/BucketSource.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ export interface HydratedBucketSource {
6565
* Encodes a static definition of a bucket source, as parsed from sync rules or stream definitions.
6666
*/
6767
export interface BucketDataSourceDefinition {
68+
/**
69+
* Bucket prefix if no transformations are defined.
70+
*
71+
* Transformations may use this as a base, or may generate an entirely different prefix.
72+
*/
73+
readonly defaultBucketPrefix: string;
74+
6875
/**
6976
* For debug use only.
7077
*/
@@ -90,6 +97,14 @@ export interface BucketDataSourceDefinition {
9097
* This is only relevant for parameter queries that query tables.
9198
*/
9299
export interface BucketParameterLookupSourceDefinition {
100+
/**
101+
* lookupName + queryId is used to uniquely identify parameter queries for parameter storage.
102+
*
103+
* This defines the default values if no transformations are applied.
104+
*/
105+
defaultLookupName: string;
106+
defaultQueryId: string;
107+
93108
getSourceTables(): Set<TablePattern>;
94109
createParameterLookupSource(params: CreateSourceParams): BucketParameterLookupSource;
95110

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ export class BucketDefinitionDataSource implements BucketDataSourceDefinition {
146146
return this.descriptor.bucketParameters;
147147
}
148148

149+
public get defaultBucketPrefix(): string {
150+
return this.descriptor.name;
151+
}
152+
149153
createDataSource(params: CreateSourceParams): BucketDataSource {
150154
return {
151155
evaluateRow: (options) => {

packages/sync-rules/src/SqlParameterQuery.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,14 @@ export class SqlParameterQuery
316316
this.errors = options.errors ?? [];
317317
}
318318

319+
public get defaultLookupName(): string {
320+
return this.descriptorName;
321+
}
322+
323+
public get defaultQueryId(): string {
324+
return this.queryId;
325+
}
326+
319327
tableSyncsParameters(table: SourceTableInterface): boolean {
320328
return this.sourceTable.matches(table);
321329
}

packages/sync-rules/src/streams/filter.ts

Lines changed: 90 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
import { isParameterValueClause, isRowValueClause, SQLITE_TRUE, sqliteBool } from '../sql_support.js';
22
import { TablePattern } from '../TablePattern.js';
3-
import { ParameterMatchClause, ParameterValueClause, RowValueClause, SqliteJsonValue } from '../types.js';
3+
import {
4+
EvaluatedParametersResult,
5+
ParameterMatchClause,
6+
ParameterValueClause,
7+
RowValueClause,
8+
SqliteJsonValue,
9+
SqliteRow
10+
} from '../types.js';
411
import { isJsonValue, normalizeParameterValue } from '../utils.js';
512
import { SqlTools } from '../sql_filters.js';
613
import { checkJsonArray, OPERATOR_NOT } from '../sql_functions.js';
@@ -10,6 +17,12 @@ import { StreamVariant } from './variant.js';
1017
import { SubqueryEvaluator } from './parameter.js';
1118
import { cartesianProduct } from './utils.js';
1219
import { NodeLocation } from 'pgsql-ast-parser';
20+
import {
21+
BucketParameterLookupSource,
22+
BucketParameterLookupSourceDefinition,
23+
CreateSourceParams
24+
} from '../BucketSource.js';
25+
import { SourceTableInterface } from '../SourceTableInterface.js';
1326

1427
/**
1528
* An intermediate representation of a `WHERE` clause for stream queries.
@@ -253,19 +266,10 @@ export class Subquery {
253266

254267
const evaluator: SubqueryEvaluator = {
255268
parameterTable: this.table,
256-
lookupsForParameterRow(sourceTable, row) {
257-
const value = column.evaluate({ [sourceTable.name]: row });
258-
if (!isJsonValue(value)) {
259-
return null;
260-
}
261-
262-
const lookups: ParameterLookup[] = [];
263-
for (const [variant, id] of innerVariants) {
264-
for (const instantiation of variant.instantiationsForRow({ sourceTable, record: row })) {
265-
lookups.push(ParameterLookup.normalized(context.streamName, id, instantiation));
266-
}
267-
}
268-
return { value, lookups };
269+
lookupSources(streamName) {
270+
return innerVariants.map(([variant, id]) => {
271+
return new SubqueryParameterLookupSource(evaluator, column, variant, id, streamName);
272+
});
269273
},
270274
lookupsForRequest(parameters) {
271275
const lookups: ParameterLookup[] = [];
@@ -510,3 +514,75 @@ export class EvaluateSimpleCondition extends FilterOperator {
510514
);
511515
}
512516
}
517+
518+
export class SubqueryParameterLookupSource implements BucketParameterLookupSourceDefinition {
519+
constructor(
520+
private subquery: SubqueryEvaluator,
521+
private column: RowValueClause,
522+
private innerVariant: StreamVariant,
523+
public readonly defaultQueryId: string,
524+
private streamName: string
525+
) {}
526+
527+
get defaultLookupName() {
528+
return this.streamName;
529+
}
530+
531+
getSourceTables(): Set<TablePattern> {
532+
let result = new Set<TablePattern>();
533+
result.add(this.subquery.parameterTable);
534+
return result;
535+
}
536+
537+
/**
538+
* Creates lookup indices for dynamically-resolved parameters.
539+
*
540+
* Resolving dynamic parameters is a two-step process: First, for tables referenced in subqueries, we create an index
541+
* to resolve which request parameters would match rows in subqueries. Then, when resolving bucket ids for a request,
542+
* we compute subquery results by looking up results in that index.
543+
*
544+
* This implements the first step of that process.
545+
*
546+
* @param result The array into which evaluation results should be written to.
547+
* @param sourceTable A table we depend on in a subquery.
548+
* @param row Row data to index.
549+
*/
550+
evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[] {
551+
if (this.subquery.parameterTable.matches(sourceTable)) {
552+
// Theoretically we're doing duplicate work by doing this for each innerVariant in a subquery.
553+
// In practice, we don't have more than one innerVariant per subquery right now, so this is fine.
554+
const value = this.column.evaluate({ [sourceTable.name]: row });
555+
if (!isJsonValue(value)) {
556+
return [];
557+
}
558+
559+
const lookups: ParameterLookup[] = [];
560+
for (const instantiation of this.innerVariant.instantiationsForRow({ sourceTable, record: row })) {
561+
// TODO: dynamic lookup name and query id
562+
lookups.push(ParameterLookup.normalized(this.defaultLookupName, this.defaultQueryId, instantiation));
563+
}
564+
565+
// The row of the subquery. Since we only support subqueries with a single column, we unconditionally name the
566+
// column `result` for simplicity.
567+
const resultRow = { result: value };
568+
569+
return lookups.map((l) => ({
570+
lookup: l,
571+
bucketParameters: [resultRow]
572+
}));
573+
}
574+
return [];
575+
}
576+
577+
createParameterLookupSource(params: CreateSourceParams): BucketParameterLookupSource {
578+
return {
579+
evaluateParameterRow: (sourceTable, row) => {
580+
return this.evaluateParameterRow(sourceTable, row);
581+
}
582+
};
583+
}
584+
585+
tableSyncsParameters(table: SourceTableInterface): boolean {
586+
return this.subquery.parameterTable.matches(table);
587+
}
588+
}

packages/sync-rules/src/streams/parameter.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { ParameterLookup } from '../BucketParameterQuerier.js';
2+
import { BucketParameterLookupSource, BucketParameterLookupSourceDefinition } from '../BucketSource.js';
23
import { SourceTableInterface } from '../SourceTableInterface.js';
34
import { TablePattern } from '../TablePattern.js';
45
import {
@@ -38,8 +39,9 @@ export interface BucketParameter {
3839
export interface SubqueryEvaluator {
3940
parameterTable: TablePattern;
4041

41-
lookupsForParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): SubqueryLookups | null;
4242
lookupsForRequest(params: RequestParameters): ParameterLookup[];
43+
44+
lookupSources(streamName: string): BucketParameterLookupSourceDefinition[];
4345
}
4446

4547
export interface SubqueryLookups {

packages/sync-rules/src/streams/stream.ts

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ export class SyncStream implements BucketSource {
4747

4848
this.dataSources = variants.map((variant) => new SyncStreamDataSource(this, data, variant));
4949
this.parameterQuerierSources = variants.map((variant) => new SyncStreamParameterQuerierSource(this, variant));
50-
this.parameterLookupSources = variants.map((variant) => new SyncStreamParameterLookupSource(this, variant));
50+
this.parameterLookupSources = variants.flatMap((variant) => variant.lookupSources(name));
5151
}
5252

5353
public get type(): BucketSourceType {
@@ -81,6 +81,10 @@ export class SyncStreamDataSource implements BucketDataSourceDefinition {
8181
return [];
8282
}
8383

84+
public get defaultBucketPrefix(): string {
85+
return this.variant.defaultBucketPrefix(this.stream.name);
86+
}
87+
8488
getSourceTables(): Set<TablePattern> {
8589
return new Set<TablePattern>([this.data.sourceTable]);
8690
}
@@ -199,39 +203,3 @@ export class SyncStreamParameterQuerierSource implements BucketParameterQuerierS
199203
}
200204
}
201205
}
202-
203-
export class SyncStreamParameterLookupSource implements BucketParameterLookupSourceDefinition {
204-
constructor(
205-
private stream: SyncStream,
206-
private variant: StreamVariant
207-
) {}
208-
209-
getSourceTables(): Set<TablePattern> {
210-
let result = new Set<TablePattern>();
211-
for (const subquery of this.variant.subqueries) {
212-
result.add(subquery.parameterTable);
213-
}
214-
215-
return result;
216-
}
217-
218-
createParameterLookupSource(params: CreateSourceParams): BucketParameterLookupSource {
219-
return {
220-
evaluateParameterRow: (sourceTable, row) => {
221-
const result: EvaluatedParametersResult[] = [];
222-
this.variant.pushParameterRowEvaluation(result, sourceTable, row);
223-
return result;
224-
}
225-
};
226-
}
227-
228-
tableSyncsParameters(table: SourceTableInterface): boolean {
229-
for (const subquery of this.variant.subqueries) {
230-
if (subquery.parameterTable.matches(table)) {
231-
return true;
232-
}
233-
}
234-
235-
return false;
236-
}
237-
}

packages/sync-rules/src/streams/variant.ts

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
import { BucketInclusionReason, ResolvedBucket } from '../BucketDescription.js';
22
import { BucketParameterQuerier, ParameterLookup } from '../BucketParameterQuerier.js';
3+
import {
4+
BucketParameterLookupSource,
5+
BucketParameterLookupSourceDefinition,
6+
CreateSourceParams
7+
} from '../BucketSource.js';
38
import { SourceTableInterface } from '../SourceTableInterface.js';
9+
import { TablePattern } from '../TablePattern.js';
410
import {
511
BucketIdTransformer,
612
EvaluatedParametersResult,
@@ -65,6 +71,14 @@ export class StreamVariant {
6571
this.requestFilters = [];
6672
}
6773

74+
defaultBucketPrefix(streamName: string): string {
75+
return `${streamName}|${this.id}`;
76+
}
77+
78+
lookupSources(streamName: string): BucketParameterLookupSourceDefinition[] {
79+
return this.subqueries.flatMap((subquery) => subquery.lookupSources(streamName));
80+
}
81+
6882
/**
6983
* Given a row in the table this stream selects from, returns all ids of buckets to which that row belongs to.
7084
*/
@@ -216,41 +230,6 @@ export class StreamVariant {
216230
);
217231
}
218232

219-
/**
220-
* Creates lookup indices for dynamically-resolved parameters.
221-
*
222-
* Resolving dynamic parameters is a two-step process: First, for tables referenced in subqueries, we create an index
223-
* to resolve which request parameters would match rows in subqueries. Then, when resolving bucket ids for a request,
224-
* we compute subquery results by looking up results in that index.
225-
*
226-
* This implements the first step of that process.
227-
*
228-
* @param result The array into which evaluation results should be written to.
229-
* @param sourceTable A table we depend on in a subquery.
230-
* @param row Row data to index.
231-
*/
232-
pushParameterRowEvaluation(result: EvaluatedParametersResult[], sourceTable: SourceTableInterface, row: SqliteRow) {
233-
for (const subquery of this.subqueries) {
234-
if (subquery.parameterTable.matches(sourceTable)) {
235-
const lookups = subquery.lookupsForParameterRow(sourceTable, row);
236-
if (lookups == null) {
237-
continue;
238-
}
239-
240-
// The row of the subquery. Since we only support subqueries with a single column, we unconditionally name the
241-
// column `result` for simplicity.
242-
const resultRow = { result: lookups.value };
243-
244-
result.push(
245-
...lookups.lookups.map((l) => ({
246-
lookup: l,
247-
bucketParameters: [resultRow]
248-
}))
249-
);
250-
}
251-
}
252-
}
253-
254233
debugRepresentation(): any {
255234
return {
256235
id: this.id,

0 commit comments

Comments
 (0)