Skip to content

Commit ec5851a

Browse files
committed
Support multiple data sources.
1 parent 0049d54 commit ec5851a

File tree

6 files changed

+27
-20
lines changed

6 files changed

+27
-20
lines changed

packages/sync-rules/src/BucketSource.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,12 @@ export interface BucketSource {
2525
* BucketDataSource describing the data in this bucket/stream definition.
2626
*
2727
* The same data source could in theory be present in multiple stream definitions.
28+
*
29+
* Sources must _only_ be split into multiple ones if they will result in different buckets being created.
30+
* Specifically, bucket definitions would always have a single data source, while stream definitions may have
31+
* one per variant.
2832
*/
29-
readonly dataSource: BucketDataSourceDefinition;
33+
readonly dataSources: BucketDataSourceDefinition[];
3034

3135
/**
3236
* BucketParameterQuerierSource describing the parameter queries / stream subqueries in this bucket/stream definition.

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ export class SqlBucketDescriptor implements BucketDataSourceDefinition, BucketSo
5454
return this.bucketParametersInternal ?? [];
5555
}
5656

57-
get dataSource() {
58-
return this;
57+
get dataSources() {
58+
return [this];
5959
}
6060

6161
get parameterLookupSources() {

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ export class SqlSyncRules {
240240
}
241241

242242
rules.bucketSources.push(descriptor);
243-
rules.bucketDataSources.push(descriptor.dataSource);
243+
rules.bucketDataSources.push(...descriptor.dataSources);
244244
rules.bucketParameterLookupSources.push(...descriptor.parameterLookupSources);
245245
rules.bucketParameterQuerierSources.push(...descriptor.parameterQuerierSources);
246246
}
@@ -268,7 +268,7 @@ export class SqlSyncRules {
268268
rules.withScalar(data, (q) => {
269269
const [parsed, errors] = syncStreamFromSql(key, q, queryOptions);
270270
rules.bucketSources.push(parsed);
271-
rules.bucketDataSources.push(parsed.dataSource);
271+
rules.bucketDataSources.push(...parsed.dataSources);
272272
rules.bucketParameterLookupSources.push(...parsed.parameterLookupSources);
273273
rules.bucketParameterQuerierSources.push(...parsed.parameterQuerierSources);
274274
return {

packages/sync-rules/src/streams/stream.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export class SyncStream implements BucketSource {
3434
variants: StreamVariant[];
3535
data: BaseSqlDataQuery;
3636

37-
public readonly dataSource: BucketDataSourceDefinition;
37+
public readonly dataSources: BucketDataSourceDefinition[];
3838
public readonly parameterLookupSources: BucketParameterLookupSourceDefinition[];
3939
public readonly parameterQuerierSources: BucketParameterQuerierSourceDefinition[];
4040

@@ -45,7 +45,7 @@ export class SyncStream implements BucketSource {
4545
this.variants = [];
4646
this.data = data;
4747

48-
this.dataSource = new SyncStreamDataSource(this, data);
48+
this.dataSources = [new SyncStreamDataSource(this, data)];
4949
this.parameterQuerierSources = [new SyncStreamParameterQuerierSource(this)];
5050
this.parameterLookupSources = [new SyncStreamParameterLookupSource(this)];
5151
}

packages/sync-rules/test/src/streams.test.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ describe('streams', () => {
3838
expect(desc.variants).toHaveLength(1);
3939
expect(evaluateBucketIds(desc, COMMENTS, { id: 'foo' })).toStrictEqual(['1#stream|0[]']);
4040
expect(
41-
desc.dataSource
41+
desc.dataSources[0]
4242
.createDataSource({ bucketIdTransformer })
4343
.evaluateRow({ sourceTable: USERS, record: { id: 'foo' } })
4444
).toHaveLength(0);
@@ -747,7 +747,7 @@ describe('streams', () => {
747747
);
748748
const row = { id: 'id', account_id: 'account_id' };
749749

750-
expect(stream.dataSource.tableSyncsData(accountMember)).toBeTruthy();
750+
expect(stream.dataSources[0].tableSyncsData(accountMember)).toBeTruthy();
751751
expect(stream.parameterLookupSources[0].tableSyncsParameters(accountMember)).toBeTruthy();
752752

753753
// Ensure lookup steps work.
@@ -779,7 +779,7 @@ describe('streams', () => {
779779

780780
// And that the data alias is respected for generated schemas.
781781
const outputSchema = {};
782-
stream.dataSource.resolveResultSets(schema, outputSchema);
782+
stream.dataSources[0].resolveResultSets(schema, outputSchema);
783783
expect(Object.keys(outputSchema)).toStrictEqual(['outer']);
784784
});
785785

@@ -937,16 +937,20 @@ const options: StreamParseOptions = {
937937
const bucketIdTransformer = SqlSyncRules.versionedBucketIdTransformer('1');
938938

939939
function evaluateBucketIds(stream: SyncStream, sourceTable: SourceTableInterface, record: SqliteRow) {
940-
return stream.dataSource
941-
.createDataSource({ bucketIdTransformer })
942-
.evaluateRow({ sourceTable, record })
943-
.map((r) => {
944-
if ('error' in r) {
945-
throw new Error(`Unexpected error evaluating row: ${r.error}`);
946-
}
940+
return stream.dataSources
941+
.map((s) =>
942+
s
943+
.createDataSource({ bucketIdTransformer })
944+
.evaluateRow({ sourceTable, record })
945+
.map((r) => {
946+
if ('error' in r) {
947+
throw new Error(`Unexpected error evaluating row: ${r.error}`);
948+
}
947949

948-
return r.bucket;
949-
});
950+
return r.bucket;
951+
})
952+
)
953+
.flat();
950954
}
951955

952956
async function createQueriers(

packages/sync-rules/test/src/sync_rules.test.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ bucket_definitions:
108108
);
109109
const hydrated = rules.hydrate({ bucketIdTransformer });
110110
const parameterLookupSource = rules.bucketParameterLookupSources[0];
111-
expect(parameterLookupSource.bucketParameters).toEqual([]);
112111
expect(hydrated.evaluateParameterRow(USERS, { id: 'user1', is_admin: 1 })).toEqual([
113112
{
114113
bucketParameters: [{}],

0 commit comments

Comments
 (0)