Skip to content

Commit 028c1d9

Browse files
committed
Bring back BucketSource.
1 parent c1fd10d commit 028c1d9

File tree

6 files changed

+125
-48
lines changed

6 files changed

+125
-48
lines changed

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {
22
BucketDataSourceDefinition,
33
BucketDescription,
44
BucketPriority,
5+
BucketSource,
56
HydratedSyncRules,
67
RequestedStream,
78
RequestJwtPayload,
@@ -248,15 +249,15 @@ export class BucketChecksumState {
248249
const streamNameToIndex = new Map<string, number>();
249250
this.streamNameToIndex = streamNameToIndex;
250251

251-
for (const source of this.parameterState.syncRules.bucketDataSources) {
252-
if (this.parameterState.isSubscribedToStream(source.definition)) {
253-
streamNameToIndex.set(source.definition.name, subscriptions.length);
252+
for (const source of this.parameterState.syncRules.definition.bucketSources) {
253+
if (this.parameterState.isSubscribedToStream(source)) {
254+
streamNameToIndex.set(source.name, subscriptions.length);
254255

255256
subscriptions.push({
256-
name: source.definition.name,
257-
is_default: source.definition.subscribedToByDefault,
257+
name: source.name,
258+
is_default: source.subscribedToByDefault,
258259
errors:
259-
this.parameterState.streamErrors[source.definition.name]?.map((e) => ({
260+
this.parameterState.streamErrors[source.name]?.map((e) => ({
260261
subscription: e.subscription?.opaque_id ?? 'default',
261262
message: e.message
262263
})) ?? []
@@ -484,7 +485,7 @@ export class BucketParameterState {
484485
};
485486
}
486487

487-
isSubscribedToStream(desc: BucketDataSourceDefinition): boolean {
488+
isSubscribedToStream(desc: BucketSource): boolean {
488489
return (desc.subscribedToByDefault && this.includeDefaultStreams) || this.subscribedStreamNames.has(desc.name);
489490
}
490491

packages/sync-rules/src/BucketSource.ts

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,43 @@ export interface CreateSourceParams {
1616
bucketIdTransformer: BucketIdTransformer;
1717
}
1818

19+
export interface BucketSource {
20+
readonly name: string;
21+
readonly type: BucketSourceType;
22+
readonly subscribedToByDefault: boolean;
23+
24+
/**
25+
* BucketDataSource describing the data in this bucket/stream definition.
26+
*
27+
* The same data source could in theory be present in multiple stream definitions.
28+
*/
29+
readonly dataSource: BucketDataSourceDefinition;
30+
31+
/**
32+
* BucketParameterQuerierSource describing the parameter queries / stream subqueries in this bucket/stream definition.
33+
*
34+
* The same source could in theory be present in multiple stream definitions.
35+
*/
36+
readonly parameterQuerierSources: BucketParameterQuerierSourceDefinition[];
37+
38+
/**
39+
* BucketParameterLookupSource describing the parameter tables used in this bucket/stream definition.
40+
*
41+
* The same source could in theory be present in multiple stream definitions.
42+
*/
43+
readonly parameterLookupSources: BucketParameterLookupSourceDefinition[];
44+
}
45+
46+
export interface HydratedBucketSource {
47+
readonly definition: BucketSource;
48+
49+
readonly parameterQuerierSources: BucketParameterQuerierSource[];
50+
}
51+
1952
/**
2053
* Encodes a static definition of a bucket source, as parsed from sync rules or stream definitions.
2154
*/
2255
export interface BucketDataSourceDefinition {
23-
readonly name: string;
24-
readonly type: BucketSourceType;
25-
readonly subscribedToByDefault: boolean;
2656
/**
2757
* For debug use only.
2858
*/
@@ -50,8 +80,6 @@ export interface BucketDataSourceDefinition {
5080
* This is only relevant for parameter queries that query tables.
5181
*/
5282
export interface BucketParameterLookupSourceDefinition {
53-
readonly name: string;
54-
readonly type: BucketSourceType;
5583
/**
5684
* For debug use only.
5785
*/
@@ -70,9 +98,6 @@ export interface BucketParameterLookupSourceDefinition {
7098
* This may use request data only, or it may use parameter lookup data persisted by a BucketParameterLookupSourceDefinition.
7199
*/
72100
export interface BucketParameterQuerierSourceDefinition {
73-
readonly name: string;
74-
readonly type: BucketSourceType;
75-
readonly subscribedToByDefault: boolean;
76101
/**
77102
* For debug use only.
78103
*/

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import {
44
BucketParameterLookupSourceDefinition,
55
BucketParameterQuerierSourceDefinition,
66
BucketSourceType,
7-
CreateSourceParams
7+
CreateSourceParams,
8+
BucketSource
89
} from './BucketSource.js';
910
import { ColumnDefinition } from './ExpressionType.js';
1011
import { IdSequence } from './IdSequence.js';
@@ -28,10 +29,21 @@ export interface QueryParseResult {
2829
errors: SqlRuleError[];
2930
}
3031

31-
export class SqlBucketDescriptor implements BucketDataSourceDefinition {
32+
export class SqlBucketDescriptor implements BucketDataSourceDefinition, BucketSource {
3233
name: string;
3334
private bucketParametersInternal: string[] | null = null;
3435

36+
public readonly subscribedToByDefault: boolean = true;
37+
38+
/**
39+
* source table -> queries
40+
*/
41+
dataQueries: SqlDataQuery[] = [];
42+
parameterQueries: SqlParameterQuery[] = [];
43+
globalParameterQueries: (StaticSqlParameterQuery | TableValuedFunctionSqlParameterQuery)[] = [];
44+
45+
parameterIdSequence = new IdSequence();
46+
3547
constructor(name: string) {
3648
this.name = name;
3749
}
@@ -40,22 +52,21 @@ export class SqlBucketDescriptor implements BucketDataSourceDefinition {
4052
return BucketSourceType.SYNC_RULE;
4153
}
4254

43-
public get subscribedToByDefault(): boolean {
44-
return true;
45-
}
46-
4755
public get bucketParameters(): string[] {
4856
return this.bucketParametersInternal ?? [];
4957
}
5058

51-
/**
52-
* source table -> queries
53-
*/
54-
dataQueries: SqlDataQuery[] = [];
55-
parameterQueries: SqlParameterQuery[] = [];
56-
globalParameterQueries: (StaticSqlParameterQuery | TableValuedFunctionSqlParameterQuery)[] = [];
59+
get dataSource() {
60+
return this;
61+
}
5762

58-
parameterIdSequence = new IdSequence();
63+
get parameterLookupSources() {
64+
return this.parameterQueries;
65+
}
66+
67+
get parameterQuerierSources() {
68+
return [...this.parameterQueries, ...this.globalParameterQueries];
69+
}
5970

6071
addDataQuery(sql: string, options: SyncRulesOptions, compatibility: CompatibilityContext): QueryParseResult {
6172
if (this.bucketParametersInternal == null) {
@@ -112,14 +123,6 @@ export class SqlBucketDescriptor implements BucketDataSourceDefinition {
112123
};
113124
}
114125

115-
getParameterQuerierSourceDefinitions(): BucketParameterQuerierSourceDefinition[] {
116-
return [...this.parameterQueries, ...this.globalParameterQueries];
117-
}
118-
119-
getParameterLookupSourceDefinitions(): BucketParameterLookupSourceDefinition[] {
120-
return [...this.parameterQueries];
121-
}
122-
123126
getSourceTables(): Set<TablePattern> {
124127
let result = new Set<TablePattern>();
125128
for (let query of this.parameterQueries) {

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import { BucketParameterQuerier, QuerierError } from './BucketParameterQuerier.j
44
import {
55
BucketDataSourceDefinition,
66
BucketParameterLookupSourceDefinition,
7-
BucketParameterQuerierSourceDefinition
7+
BucketParameterQuerierSourceDefinition,
8+
BucketSource
89
} from './BucketSource.js';
910
import { CompatibilityContext, CompatibilityEdition, CompatibilityOption } from './compatibility.js';
1011
import { SqlRuleError, SyncRulesErrors, YamlError } from './errors.js';
@@ -85,6 +86,7 @@ export class SqlSyncRules {
8586
bucketDataSources: BucketDataSourceDefinition[] = [];
8687
bucketParameterLookupSources: BucketParameterLookupSourceDefinition[] = [];
8788
bucketParameterQuerierSources: BucketParameterQuerierSourceDefinition[] = [];
89+
bucketSources: BucketSource[] = [];
8890

8991
eventDescriptors: SqlEventDescriptor[] = [];
9092
compatibility: CompatibilityContext = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY;
@@ -236,9 +238,11 @@ export class SqlSyncRules {
236238
return descriptor.addDataQuery(q, queryOptions, compatibility);
237239
});
238240
}
239-
rules.bucketDataSources.push(descriptor);
240-
rules.bucketParameterLookupSources.push(...descriptor.getParameterLookupSourceDefinitions());
241-
rules.bucketParameterQuerierSources.push(...descriptor.getParameterQuerierSourceDefinitions());
241+
242+
rules.bucketSources.push(descriptor);
243+
rules.bucketDataSources.push(descriptor.dataSource);
244+
rules.bucketParameterLookupSources.push(...descriptor.parameterLookupSources);
245+
rules.bucketParameterQuerierSources.push(...descriptor.parameterQuerierSources);
242246
}
243247

244248
for (const entry of streamMap?.items ?? []) {
@@ -263,9 +267,10 @@ export class SqlSyncRules {
263267
if (data instanceof Scalar) {
264268
rules.withScalar(data, (q) => {
265269
const [parsed, errors] = syncStreamFromSql(key, q, queryOptions);
266-
rules.bucketDataSources.push(parsed);
267-
rules.bucketParameterLookupSources.push(parsed);
268-
rules.bucketParameterQuerierSources.push(parsed);
270+
rules.bucketSources.push(parsed);
271+
rules.bucketDataSources.push(parsed.dataSource);
272+
rules.bucketParameterLookupSources.push(...parsed.parameterLookupSources);
273+
rules.bucketParameterQuerierSources.push(...parsed.parameterQuerierSources);
269274
return {
270275
parsed: true,
271276
errors

packages/sync-rules/src/SyncRules.ts

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
import { BucketDataSource, BucketParameterLookupSource, BucketParameterQuerierSource } from './BucketSource.js';
1+
import {
2+
BucketDataSource,
3+
BucketParameterLookupSource,
4+
BucketParameterQuerierSource,
5+
BucketParameterQuerierSourceDefinition,
6+
HydratedBucketSource
7+
} from './BucketSource.js';
28
import {
39
BucketParameterQuerier,
410
CompatibilityContext,
@@ -25,14 +31,15 @@ import { EvaluatedParametersResult, EvaluateRowOptions, EvaluationResult, Sqlite
2531
* specifically affects bucket names.
2632
*/
2733
export class HydratedSyncRules {
34+
bucketSources: HydratedBucketSource[] = [];
2835
bucketDataSources: BucketDataSource[];
2936
bucketParameterQuerierSources: BucketParameterQuerierSource[];
3037
bucketParameterLookupSources: BucketParameterLookupSource[];
3138

3239
eventDescriptors: SqlEventDescriptor[] = [];
3340
compatibility: CompatibilityContext = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY;
3441

35-
private definition: SqlSyncRules;
42+
readonly definition: SqlSyncRules;
3643

3744
constructor(params: {
3845
definition: SqlSyncRules;
@@ -46,12 +53,29 @@ export class HydratedSyncRules {
4653
this.bucketParameterQuerierSources = params.bucketParameterQuerierSources;
4754
this.bucketParameterLookupSources = params.bucketParameterLookupSources;
4855
this.definition = params.definition;
56+
4957
if (params.eventDescriptors) {
5058
this.eventDescriptors = params.eventDescriptors;
5159
}
5260
if (params.compatibility) {
5361
this.compatibility = params.compatibility;
5462
}
63+
64+
let querierMap = new Map<BucketParameterQuerierSourceDefinition, HydratedBucketSource>();
65+
for (let definition of this.definition.bucketSources) {
66+
const hydratedBucketSource: HydratedBucketSource = { definition: definition, parameterQuerierSources: [] };
67+
this.bucketSources.push(hydratedBucketSource);
68+
for (let querier of definition.parameterQuerierSources) {
69+
querierMap.set(querier, hydratedBucketSource);
70+
}
71+
}
72+
for (let querier of params.bucketParameterQuerierSources) {
73+
const bucketSource = querierMap.get(querier.definition);
74+
if (bucketSource == null) {
75+
throw new Error('Cannot find BucketSource for BucketParameterQuerierSource');
76+
}
77+
bucketSource.parameterQuerierSources.push(querier);
78+
}
5579
}
5680

5781
// These methods do not depend on hydration, so we can just forward them to the definition.
@@ -131,12 +155,14 @@ export class HydratedSyncRules {
131155
const errors: QuerierError[] = [];
132156
const pending = { queriers, errors };
133157

134-
for (const source of this.bucketParameterQuerierSources) {
158+
for (const source of this.bucketSources) {
135159
if (
136160
(source.definition.subscribedToByDefault && options.hasDefaultStreams) ||
137161
source.definition.name in options.streams
138162
) {
139-
source.pushBucketParameterQueriers(pending, options);
163+
for (let querier of source.parameterQuerierSources) {
164+
querier.pushBucketParameterQueriers(pending, options);
165+
}
140166
}
141167
}
142168

packages/sync-rules/src/streams/stream.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
BucketParameterLookupSourceDefinition,
99
BucketParameterQuerierSource,
1010
BucketParameterQuerierSourceDefinition,
11+
BucketSource,
1112
BucketSourceType,
1213
CreateSourceParams
1314
} from '../BucketSource.js';
@@ -27,7 +28,11 @@ import {
2728
import { StreamVariant } from './variant.js';
2829

2930
export class SyncStream
30-
implements BucketDataSourceDefinition, BucketParameterLookupSourceDefinition, BucketParameterQuerierSourceDefinition
31+
implements
32+
BucketDataSourceDefinition,
33+
BucketParameterLookupSourceDefinition,
34+
BucketParameterQuerierSourceDefinition,
35+
BucketSource
3136
{
3237
name: string;
3338
subscribedToByDefault: boolean;
@@ -53,6 +58,18 @@ export class SyncStream
5358
return this.data.bucketParameters;
5459
}
5560

61+
public get dataSource() {
62+
return this;
63+
}
64+
65+
public get parameterLookupSources() {
66+
return [this];
67+
}
68+
69+
public get parameterQuerierSources() {
70+
return [this];
71+
}
72+
5673
createDataSource(params: CreateSourceParams): BucketDataSource {
5774
return {
5875
definition: this,

0 commit comments

Comments
 (0)