@@ -4,7 +4,17 @@ import { DEFAULT_HYDRATION_STATE, HydrationState, ParameterLookupScope } from '.
44import { SourceTableInterface } from './SourceTableInterface.js' ;
55import { GetQuerierOptions } from './SqlSyncRules.js' ;
66import { TablePattern } from './TablePattern.js' ;
7- import { EvaluatedParametersResult , EvaluateRowOptions , EvaluationResult , SourceSchema , SqliteRow } from './types.js' ;
7+ import {
8+ EvaluatedParametersResult ,
9+ EvaluatedRow ,
10+ EvaluateRowOptions ,
11+ EvaluationResult ,
12+ isEvaluationError ,
13+ SourceEvaluationResult ,
14+ SourceSchema ,
15+ SqliteRow
16+ } from './types.js' ;
17+ import { buildBucketName } from './utils.js' ;
818
919export interface CreateSourceParams {
1020 hydrationState : HydrationState ;
@@ -30,7 +40,7 @@ export interface BucketSource {
3040 * Specifically, bucket definitions would always have a single data source, while stream definitions may have
3141 * one per variant.
3242 */
33- readonly dataSources : BucketDataSourceDefinition [ ] ;
43+ readonly dataSources : BucketDataSource [ ] ;
3444
3545 /**
3646 * BucketParameterQuerierSource describing the parameter queries / stream subqueries in this bucket/stream definition.
@@ -57,8 +67,11 @@ export interface HydratedBucketSource {
5767
5868/**
5969 * Encodes a static definition of a bucket source, as parsed from sync rules or stream definitions.
70+ *
71+ * This does not require any "hydration" itself: All results are independent of bucket names.
72+ * The higher-level HydratedSyncRules will use a HydrationState to generate bucket names.
6073 */
61- export interface BucketDataSourceDefinition {
74+ export interface BucketDataSource {
6275 /**
6376 * Bucket prefix if no transformations are defined.
6477 *
@@ -70,10 +83,16 @@ export interface BucketDataSourceDefinition {
7083 * For debug use only.
7184 */
7285 readonly bucketParameters : string [ ] ;
86+
7387 getSourceTables ( ) : Set < TablePattern > ;
74- createDataSource ( params : CreateSourceParams ) : BucketDataSource ;
7588 tableSyncsData ( table : SourceTableInterface ) : boolean ;
7689
90+ /**
91+ * Given a row as it appears in a table that affects sync data, return buckets, logical table names and transformed
92+ * data for rows to add to buckets.
93+ */
94+ evaluateRow ( options : EvaluateRowOptions ) : SourceEvaluationResult [ ] ;
95+
7796 /**
7897 * Given a static schema, infer all logical tables and associated columns that appear in buckets defined by this
7998 * source.
@@ -121,29 +140,11 @@ export interface BucketParameterQuerierSourceDefinition {
121140 *
122141 * Note that queriers do not persist data themselves; they only resolve which buckets to load based on request parameters.
123142 */
124- readonly querierDataSource : BucketDataSourceDefinition ;
143+ readonly querierDataSource : BucketDataSource ;
125144
126145 createParameterQuerierSource ( params : CreateSourceParams ) : BucketParameterQuerierSource ;
127146}
128147
129- /**
130- * An interface declaring
131- *
132- * - which buckets the sync service should create when processing change streams from the database.
133- * - how data in source tables maps to data in buckets (e.g. when we're not selecting all columns).
134- * - which buckets a given connection has access to.
135- *
136- * There are two ways to define bucket sources: Via sync rules made up of parameter and data queries, and via stream
137- * definitions that only consist of a single query.
138- */
139- export interface BucketDataSource {
140- /**
141- * Given a row as it appears in a table that affects sync data, return buckets, logical table names and transformed
142- * data for rows to add to buckets.
143- */
144- evaluateRow ( options : EvaluateRowOptions ) : EvaluationResult [ ] ;
145- }
146-
147148export interface BucketParameterLookupSource {
148149 /**
149150 * Given a row in a source table that affects sync parameters, returns a structure to index which buckets rows should
@@ -165,10 +166,9 @@ export interface BucketParameterQuerierSource {
165166 pushBucketParameterQueriers ( result : PendingQueriers , options : GetQuerierOptions ) : void ;
166167}
167168
168- export interface DebugMergedSource
169- extends BucketDataSource ,
170- BucketParameterLookupSource ,
171- BucketParameterQuerierSource { }
169+ export interface DebugMergedSource extends BucketParameterLookupSource , BucketParameterQuerierSource {
170+ evaluateRow ( options : EvaluateRowOptions ) : EvaluationResult [ ] ;
171+ }
172172
173173export enum BucketSourceType {
174174 SYNC_RULE ,
@@ -177,14 +177,31 @@ export enum BucketSourceType {
177177
178178export type ResultSetDescription = { name : string ; columns : ColumnDefinition [ ] } ;
179179
180- export function mergeDataSources ( sources : BucketDataSource [ ] ) : BucketDataSource {
180+ export function hydrateEvaluateRow (
181+ hydrationState : HydrationState ,
182+ source : BucketDataSource
183+ ) : ( options : EvaluateRowOptions ) => EvaluationResult [ ] {
184+ const scope = hydrationState . getBucketSourceScope ( source ) ;
185+ return ( options : EvaluateRowOptions ) : EvaluationResult [ ] => {
186+ return source . evaluateRow ( options ) . map ( ( result ) => {
187+ if ( isEvaluationError ( result ) ) {
188+ return result ;
189+ }
190+ return {
191+ bucket : buildBucketName ( scope , result . serializedBucketParameters ) ,
192+ id : result . id ,
193+ table : result . table ,
194+ data : result . data
195+ } satisfies EvaluatedRow ;
196+ } ) ;
197+ } ;
198+ }
199+
200+ export function mergeDataSources ( hydrationState : HydrationState , sources : BucketDataSource [ ] ) {
201+ const withScope = sources . map ( ( source ) => hydrateEvaluateRow ( hydrationState , source ) ) ;
181202 return {
182203 evaluateRow ( options : EvaluateRowOptions ) : EvaluationResult [ ] {
183- let results : EvaluationResult [ ] = [ ] ;
184- for ( let source of sources ) {
185- results . push ( ...source . evaluateRow ( options ) ) ;
186- }
187- return results ;
204+ return withScope . flatMap ( ( source ) => source ( options ) ) ;
188205 }
189206 } ;
190207}
@@ -218,9 +235,7 @@ export function mergeParameterQuerierSources(sources: BucketParameterQuerierSour
218235export function debugHydratedMergedSource ( bucketSource : BucketSource , params ?: CreateSourceParams ) : DebugMergedSource {
219236 const hydrationState = params ?. hydrationState ?? DEFAULT_HYDRATION_STATE ;
220237 const resolvedParams = { hydrationState } ;
221- const dataSource = mergeDataSources (
222- bucketSource . dataSources . map ( ( source ) => source . createDataSource ( resolvedParams ) )
223- ) ;
238+ const dataSource = mergeDataSources ( hydrationState , bucketSource . dataSources ) ;
224239 const parameterLookupSource = mergeParameterLookupSources (
225240 bucketSource . parameterLookupSources . map ( ( source ) => source . createParameterLookupSource ( resolvedParams ) )
226241 ) ;
0 commit comments