diff --git a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts index 08f0bb5d38..ee38aac5bf 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts @@ -15,8 +15,8 @@ class TestDynamicDsService extends DynamicDsService { let datasourceParams: DatasourceParams[] = initData; @@ -40,7 +41,7 @@ const mockMetadata = (initData: DatasourceParams[] = []) => { describe('DynamicDsService', () => { let service: TestDynamicDsService; const project = { - templates: [{name: 'Test'}], + templates: [{name: 'Test'}, {name: 'Other'}], } as any as ISubqueryProject; beforeEach(() => { @@ -70,6 +71,69 @@ describe('DynamicDsService', () => { ]); }); + it('can destroy a dynamic datasource', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + // Destroy specific datasource by index + await service.destroyDynamicDatasource('Test', 50, 0); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...testParam1, endBlock: 50}); + expect(updatedParams[1]).toEqual(testParam2); + + const datasources = (service as any)._datasources; + expect(datasources[0].endBlock).toBe(50); + }); + + it('throws error when destroying non-existent datasource', async () => { + const meta = mockMetadata([testParam1]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('NonExistent', 50, 0)).rejects.toThrow( + 'Datasource at index 0 has template name "Test", not "NonExistent"' + ); + }); + + it('throws error when destroying already destroyed datasource', async () => { + const destroyedParam = {...testParam1, endBlock: 30}; + const meta = mockMetadata([destroyedParam]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, 0)).rejects.toThrow( + 'Dynamic datasource at index 0 is already destroyed' + ); + }); + + it('allows creating new datasource after destroying existing one', async () => { + const meta = mockMetadata([testParam1]); + await service.init(meta); + + expect((service as any)._datasourceParams).toEqual([testParam1]); + + // Destroy by index + await service.destroyDynamicDatasource('Test', 50, 0); + + const paramsAfterDestroy = (service as any)._datasourceParams; + expect(paramsAfterDestroy[0]).toEqual({...testParam1, endBlock: 50}); + + const newParam = {templateName: 'Test', startBlock: 60}; + await service.createDynamicDatasource(newParam); + + const finalParams: DatasourceParams[] = (service as any)._datasourceParams; + const destroyedCount = finalParams.filter((p: DatasourceParams) => p.endBlock !== undefined).length; + const activeCount = finalParams.filter((p: DatasourceParams) => p.endBlock === undefined).length; + + expect(destroyedCount).toBeGreaterThanOrEqual(1); + expect(activeCount).toBeGreaterThanOrEqual(1); + + const destroyedParam = finalParams.find((p: DatasourceParams) => p.startBlock === 1 && p.endBlock === 50); + expect(destroyedParam).toBeDefined(); + + const newParamFound = finalParams.find((p: DatasourceParams) => p.startBlock === 60 && !p.endBlock); + expect(newParamFound).toBeDefined(); + }); + it('resets dynamic datasources', async () => { const meta = mockMetadata([testParam1, testParam2, testParam3, testParam4]); await service.init(meta); @@ -83,6 +147,26 @@ describe('DynamicDsService', () => { ]); }); + it('handles reset after datasource destruction correctly', async () => { + const params = [testParam1, testParam2, testParam3, testParam4]; + const meta = mockMetadata(params); + await service.init(meta); + + // Destroy only the first datasource by index + await service.destroyDynamicDatasource('Test', 25, 0); + + const paramsAfterDestroy = (service as any)._datasourceParams; + expect(paramsAfterDestroy[0]).toEqual({...testParam1, endBlock: 25}); + + // Reset to block 2 (should keep testParam1 and testParam2) + await service.resetDynamicDatasource(2, null as any); + + const paramsAfterReset = (service as any)._datasourceParams; + expect(paramsAfterReset).toHaveLength(2); + expect(paramsAfterReset[0]).toEqual({...testParam1, endBlock: 25}); + expect(paramsAfterReset[1]).toEqual(testParam2); + }); + it('getDynamicDatasources with force reloads from metadata', async () => { const meta = mockMetadata([testParam1, testParam2]); await service.init(meta); @@ -107,6 +191,30 @@ describe('DynamicDsService', () => { ]); }); + it('loads destroyed datasources with endBlock correctly', async () => { + const destroyedParam = {...testParam1, endBlock: 100}; + const meta = mockMetadata([destroyedParam, testParam2]); + await service.init(meta); + + const datasources = await service.getDynamicDatasources(); + expect(datasources).toHaveLength(2); + expect((datasources[0] as any).endBlock).toBe(100); + expect((datasources[1] as any).endBlock).toBeUndefined(); + }); + + it('updates metadata correctly when destroying datasource', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + // Destroy first datasource by index + await service.destroyDynamicDatasource('Test', 75, 0); + + const metadataParams = await meta.find('dynamicDatasources'); + expect(metadataParams).toBeDefined(); + expect(metadataParams![0]).toEqual({...testParam1, endBlock: 75}); + expect(metadataParams![1]).toEqual(testParam2); + }); + it('can find a template and cannot mutate the template', () => { const template1 = service.getTemplate('Test', 1); const template2 = service.getTemplate('Test', 2); @@ -119,4 +227,301 @@ describe('DynamicDsService', () => { expect(project.templates![0]).toEqual({name: 'Test'}); }); + + it('can create template with endBlock', () => { + const template = service.getTemplate('Test', 1, 100); + + expect(template.startBlock).toBe(1); + expect((template as any).endBlock).toBe(100); + expect((template as any).name).toBeUndefined(); + }); + + it('handles multiple templates with same name during destruction', async () => { + const param1 = {templateName: 'Test', startBlock: 1}; + const param2 = {templateName: 'Test', startBlock: 5}; + const param3 = {templateName: 'Other', startBlock: 3}; + + const meta = mockMetadata([param1, param2, param3]); + await service.init(meta); + + // Should destroy the first matching one by index + await service.destroyDynamicDatasource('Test', 10, 0); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...param1, endBlock: 10}); + expect(updatedParams[1]).toEqual(param2); // Not destroyed + expect(updatedParams[2]).toEqual(param3); // Not destroyed + }); + + it('throws error when service not initialized for destruction', async () => { + await expect(service.destroyDynamicDatasource('Test', 50, 0)).rejects.toThrow( + 'DynamicDsService has not been initialized' + ); + }); + + describe('getDynamicDatasourcesByTemplate', () => { + it('returns list of active datasources for a template', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3, testParamOther]); + await service.init(meta); + + const testDatasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(testDatasources).toHaveLength(3); + expect(testDatasources[0]).toEqual({ + index: 0, + templateName: 'Test', + startBlock: 1, + endBlock: undefined, + args: undefined, + }); + expect(testDatasources[1]).toEqual({ + index: 1, + templateName: 'Test', + startBlock: 2, + endBlock: undefined, + args: undefined, + }); + expect(testDatasources[2]).toEqual({ + index: 2, + templateName: 'Test', + startBlock: 3, + endBlock: undefined, + args: undefined, + }); + }); + + it('excludes destroyed datasources from list', async () => { + const destroyedParam = {...testParam1, endBlock: 50}; + const meta = mockMetadata([destroyedParam, testParam2, testParam3]); + await service.init(meta); + + const datasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(datasources).toHaveLength(2); + expect(datasources[0].index).toBe(1); // Global index + expect(datasources[0].startBlock).toBe(2); + expect(datasources[1].index).toBe(2); // Global index + expect(datasources[1].startBlock).toBe(3); + }); + + it('returns empty array when no datasources match template', async () => { + const meta = mockMetadata([testParamOther]); + await service.init(meta); + + const datasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(datasources).toEqual([]); + }); + + it('includes args in datasource info when present', async () => { + const paramWithArgs = {...testParam1, args: {address: '0x123', tokenId: 1}}; + const meta = mockMetadata([paramWithArgs]); + await service.init(meta); + + const datasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(datasources).toHaveLength(1); + expect(datasources[0].args).toEqual({address: '0x123', tokenId: 1}); + }); + + it('throws error when service not initialized', () => { + expect(() => service.getDynamicDatasourcesByTemplate('Test')).toThrow( + 'DynamicDsService has not been initialized' + ); + }); + }); + + describe('destroyDynamicDatasource with index', () => { + it('destroys specific datasource by index', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3, testParamOther]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 50, 1); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual(testParam1); // Not destroyed + expect(updatedParams[1]).toEqual({...testParam2, endBlock: 50}); // Destroyed + expect(updatedParams[2]).toEqual(testParam3); // Not destroyed + expect(updatedParams[3]).toEqual(testParamOther); // Not destroyed + }); + + it('throws error when index is out of bounds', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, 5)).rejects.toThrow( + 'Index 5 is out of bounds. There are 2 datasource(s) in total' + ); + }); + + it('throws error when index is negative', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, -1)).rejects.toThrow( + 'Index -1 is out of bounds. There are 2 datasource(s) in total' + ); + }); + + it('throws error when trying to destroy already destroyed datasource', async () => { + const destroyedParam = {...testParam1, endBlock: 30}; + const meta = mockMetadata([destroyedParam]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, 0)).rejects.toThrow( + 'Dynamic datasource at index 0 is already destroyed' + ); + }); + + it('correctly handles global index after some datasources are destroyed', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3, testParam4]); + await service.init(meta); + + // Destroy the first one using global index 0 + await service.destroyDynamicDatasource('Test', 40, 0); + + // Now only 3 active datasources for 'Test' template, with global indices 1, 2, 3 + const activeDatasources = service.getDynamicDatasourcesByTemplate('Test'); + expect(activeDatasources).toHaveLength(3); + expect(activeDatasources[0].index).toBe(1); // Global index + expect(activeDatasources[0].startBlock).toBe(2); + expect(activeDatasources[1].index).toBe(2); // Global index + expect(activeDatasources[1].startBlock).toBe(3); + expect(activeDatasources[2].index).toBe(3); // Global index + expect(activeDatasources[2].startBlock).toBe(4); + + // Destroy using global index 2 (testParam3) + await service.destroyDynamicDatasource('Test', 60, 2); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...testParam1, endBlock: 40}); + expect(updatedParams[1]).toEqual(testParam2); // Still active + expect(updatedParams[2]).toEqual({...testParam3, endBlock: 60}); + expect(updatedParams[3]).toEqual(testParam4); // Still active + }); + + it('updates datasources in memory correctly when destroying by index', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 100, 1); + + const datasources = (service as any)._datasources; + expect(datasources[0].endBlock).toBeUndefined(); + expect(datasources[1].endBlock).toBe(100); + expect(datasources[2].endBlock).toBeUndefined(); + }); + + it('allows destroying datasources from different templates independently', async () => { + const meta = mockMetadata([testParam1, testParam2, testParamOther]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 50, 0); + await service.destroyDynamicDatasource('Other', 60, 2); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...testParam1, endBlock: 50}); + expect(updatedParams[1]).toEqual(testParam2); // Not destroyed + expect(updatedParams[2]).toEqual({...testParamOther, endBlock: 60}); + }); + + it('throws error when template name does not match global index', async () => { + const meta = mockMetadata([testParam1, testParam2, testParamOther]); + await service.init(meta); + + // Try to destroy 'Test' template with index 2, which is 'Other' template + await expect(service.destroyDynamicDatasource('Test', 50, 2)).rejects.toThrow( + 'Datasource at index 2 has template name "Other", not "Test"' + ); + }); + + it('sets endBlock correctly allowing in-place removal during block processing', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + // Destroy datasource at index 1 at block 50 + await service.destroyDynamicDatasource('Test', 50, 1); + + // Verify the datasource has endBlock set + const dsParam = service.getDatasourceParamByIndex(1); + expect(dsParam).toBeDefined(); + expect(dsParam?.endBlock).toBe(50); + expect(dsParam?.startBlock).toBe(2); + expect(dsParam?.templateName).toBe('Test'); + + // Verify the internal _datasources array also has endBlock set + const datasources = (service as any)._datasources; + expect(datasources[1]).toBeDefined(); + expect((datasources[1] as any).endBlock).toBe(50); + }); + + it('destroyed datasource is filtered out in subsequent block processing', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + const blockHeight = 100; + const datasources = (service as any)._datasources; + + // Simulate filtering datasources for block 100 (all should be included initially) + const filterDataSources = (blockHeight: number, dataSources: BaseDataSource[]) => { + return dataSources.filter( + (ds) => + ds.startBlock !== undefined && + ds.startBlock <= blockHeight && + ((ds as any).endBlock ?? Number.MAX_SAFE_INTEGER) > blockHeight + ); + }; + + // Initial state: all 3 datasources should be active + let filteredDs = filterDataSources(blockHeight, datasources); + expect(filteredDs.length).toBe(3); + + // Simulate processing: DS2 destroys DS3 during block 100 + await service.destroyDynamicDatasource('Test', blockHeight, 2); + + // Re-filter datasources + filteredDs = filterDataSources(blockHeight, datasources); + + // After destruction, only DS1 and DS2 should remain + expect(filteredDs.length).toBe(2); + expect(filteredDs[0].startBlock).toBe(1); // DS1 + expect(filteredDs[1].startBlock).toBe(2); // DS2 + + // Verify DS3 was destroyed + expect((datasources[2] as any).endBlock).toBe(blockHeight); + }); + + it('demonstrates traditional for loop pattern works with array reassignment', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + const blockHeight = 100; + let datasources = (service as any)._datasources; + + const processed: number[] = []; + + for (let i = 0; i < datasources.length; i++) { + const ds = datasources[i]; + processed.push(ds.startBlock); + + // When processing DS2, destroy DS3 and re-filter + if (ds.startBlock === 2) { + await service.destroyDynamicDatasource('Test', blockHeight, 2); + + // Re-filter datasources using filter and reassignment + datasources = datasources.filter( + (d: any) => + d.startBlock !== undefined && + d.startBlock <= blockHeight && + (d.endBlock ?? Number.MAX_SAFE_INTEGER) > blockHeight + ); + } + } + expect(processed).toEqual([1, 2]); + + // Verify DS3 has endBlock set + const allDs = (service as any)._datasources; + expect((allDs[2] as any).endBlock).toBe(blockHeight); + }); + }); }); diff --git a/packages/node-core/src/indexer/dynamic-ds.service.ts b/packages/node-core/src/indexer/dynamic-ds.service.ts index 708ab5f3bb..8d468cb545 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: GPL-3.0 import {Inject, Injectable} from '@nestjs/common'; -import {BaseCustomDataSource, BaseDataSource, BaseTemplateDataSource} from '@subql/types-core'; +import {BaseCustomDataSource, BaseDataSource, BaseTemplateDataSource, DynamicDatasourceInfo} from '@subql/types-core'; import {Transaction} from '@subql/x-sequelize'; import {cloneDeep} from 'lodash'; import {IBlockchainService} from '../blockchain.service'; @@ -19,12 +19,16 @@ export interface DatasourceParams { templateName: string; args?: Record; startBlock: number; + endBlock?: number; } export interface IDynamicDsService { dynamicDatasources: DS[]; createDynamicDatasource(params: DatasourceParams): Promise; + destroyDynamicDatasource(templateName: string, currentBlockHeight: number, index: number): Promise; getDynamicDatasources(forceReload?: boolean): Promise; + getDynamicDatasourcesByTemplate(templateName: string): DynamicDatasourceInfo[]; + getDatasourceParamByIndex(index: number): DatasourceParams | undefined; } @Injectable() @@ -91,6 +95,92 @@ export class DynamicDsService ({params, globalIndex})) + .filter(({params}) => params.templateName === templateName && params.endBlock === undefined); + + return matchingDatasources.map(({globalIndex, params}) => ({ + index: globalIndex, + templateName: params.templateName, + startBlock: params.startBlock, + endBlock: params.endBlock, + args: params.args, + })); + } + + /** + * Get datasource parameters by global index. + * + * @param index - Global index in the internal datasource parameters array + * @returns DatasourceParams if found, undefined otherwise + */ + getDatasourceParamByIndex(index: number): DatasourceParams | undefined { + return this._datasourceParams?.[index]; + } + + async destroyDynamicDatasource( + templateName: string, + currentBlockHeight: number, + index: number, + tx?: Transaction + ): Promise { + if (!this._datasources || !this._datasourceParams) { + throw new Error('DynamicDsService has not been initialized'); + } + + // Get the datasource at the global index + const dsParam = this._datasourceParams[index]; + + // Validate datasource exists + if (!dsParam) { + throw new Error( + `Index ${index} is out of bounds. There are ${this._datasourceParams.length} datasource(s) in total` + ); + } + + // Validate it matches the template name and is not already destroyed + if (dsParam.templateName !== templateName) { + throw new Error( + `Datasource at index ${index} has template name "${dsParam.templateName}", not "${templateName}"` + ); + } + + if (dsParam.endBlock !== undefined) { + throw new Error(`Dynamic datasource at index ${index} is already destroyed`); + } + + // Update the datasource params + const updatedParams = {...dsParam, endBlock: currentBlockHeight}; + this._datasourceParams[index] = updatedParams; + + // Update the datasource object if it exists + // Note: _datasources and _datasourceParams arrays should always be in sync. + // If the index is valid for params, it must also be valid for datasources. + const datasource = this._datasources[index]; + if (!datasource) { + throw new Error(`Datasources array out of sync with params at index ${index}`); + } + // Set endBlock on the datasource object + datasource.endBlock = currentBlockHeight; + + await this.metadata.set(METADATA_KEY, this._datasourceParams, tx); + + logger.info(`Destroyed dynamic datasource "${templateName}" at block ${currentBlockHeight}`); + } + // Not force only seems to be used for project changes async getDynamicDatasources(forceReload?: boolean): Promise { // Workers should not cache this result in order to keep in sync @@ -117,19 +207,19 @@ export class DynamicDsService t.name === templateName); if (!t) { throw new Error(`Unable to find matching template in project for name: "${templateName}"`); } const {name, ...template} = cloneDeep(t); - return {...template, startBlock} as DS; + return {...template, startBlock, endBlock} as DS; } private async getDatasource(params: DatasourceParams): Promise { - const dsObj = this.getTemplate(params.templateName, params.startBlock); + const dsObj = this.getTemplate(params.templateName, params.startBlock, params.endBlock); try { await this.blockchainService.updateDynamicDs(params, dsObj); diff --git a/packages/node-core/src/indexer/indexer.manager.ts b/packages/node-core/src/indexer/indexer.manager.ts index c484eeab0f..2547fd2c0d 100644 --- a/packages/node-core/src/indexer/indexer.manager.ts +++ b/packages/node-core/src/indexer/indexer.manager.ts @@ -89,7 +89,7 @@ export abstract class BaseIndexerManager< const blockHeight = block.getHeader().blockHeight; monitorWrite(`- BlockHash: ${block.getHeader().blockHash}`); - const filteredDataSources = this.filterDataSources(blockHeight, dataSources); + let filteredDataSources = this.filterDataSources(blockHeight, dataSources); this.assertDataSources(filteredDataSources, blockHeight); @@ -116,6 +116,19 @@ export abstract class BaseIndexerManager< dynamicDsCreated = true; }, 'createDynamicDatasource'); + // Inject function to get dynamic datasources by template into vm + vm.freeze((templateName: string) => { + return this.dynamicDsService.getDynamicDatasourcesByTemplate(templateName); + }, 'getDynamicDatasources'); + + // Inject function to destroy ds into vm + vm.freeze(async (templateName: string, index: number) => { + await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight, index); + + // Re-filter datasources to exclude the destroyed one + filteredDataSources = this.filterDataSources(blockHeight, filteredDataSources); + }, 'destroyDynamicDatasource'); + return vm; }); } @@ -139,7 +152,7 @@ export abstract class BaseIndexerManager< (ds) => ds.startBlock !== undefined && ds.startBlock <= nextProcessingHeight && - (ds.endBlock ?? Number.MAX_SAFE_INTEGER) >= nextProcessingHeight + (ds.endBlock ?? Number.MAX_SAFE_INTEGER) > nextProcessingHeight ); // perform filter for custom ds diff --git a/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts b/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts index 3c6e456a08..a8c33206fb 100644 --- a/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts @@ -3,16 +3,23 @@ import {isMainThread} from 'node:worker_threads'; import {Injectable} from '@nestjs/common'; +import {DynamicDatasourceInfo} from '@subql/types-core'; import {DatasourceParams, IDynamicDsService} from '../dynamic-ds.service'; export type HostDynamicDS = { dynamicDsCreateDynamicDatasource: (params: DatasourceParams) => Promise; + dynamicDsDestroyDynamicDatasource: (templateName: string, currentBlockHeight: number, index: number) => Promise; dynamicDsGetDynamicDatasources: () => Promise; + dynamicDsGetDynamicDatasourcesByTemplate: (templateName: string) => DynamicDatasourceInfo[]; + dynamicDsGetDatasourceParamByIndex: (index: number) => DatasourceParams | undefined; }; export const hostDynamicDsKeys: (keyof HostDynamicDS)[] = [ 'dynamicDsCreateDynamicDatasource', + 'dynamicDsDestroyDynamicDatasource', 'dynamicDsGetDynamicDatasources', + 'dynamicDsGetDynamicDatasourcesByTemplate', + 'dynamicDsGetDatasourceParamByIndex', ]; @Injectable() @@ -32,14 +39,29 @@ export class WorkerDynamicDsService implements IDynamicDsService { return this.host.dynamicDsCreateDynamicDatasource(JSON.parse(JSON.stringify(params))); } + async destroyDynamicDatasource(templateName: string, currentBlockHeight: number, index: number): Promise { + return this.host.dynamicDsDestroyDynamicDatasource(templateName, currentBlockHeight, index); + } + async getDynamicDatasources(): Promise { return this.host.dynamicDsGetDynamicDatasources(); } + + getDynamicDatasourcesByTemplate(templateName: string): DynamicDatasourceInfo[] { + return this.host.dynamicDsGetDynamicDatasourcesByTemplate(templateName); + } + + getDatasourceParamByIndex(index: number): DatasourceParams | undefined { + return this.host.dynamicDsGetDatasourceParamByIndex(index); + } } export function dynamicDsHostFunctions(dynamicDsService: IDynamicDsService): HostDynamicDS { return { dynamicDsCreateDynamicDatasource: dynamicDsService.createDynamicDatasource.bind(dynamicDsService), + dynamicDsDestroyDynamicDatasource: dynamicDsService.destroyDynamicDatasource.bind(dynamicDsService), dynamicDsGetDynamicDatasources: dynamicDsService.getDynamicDatasources.bind(dynamicDsService), + dynamicDsGetDynamicDatasourcesByTemplate: dynamicDsService.getDynamicDatasourcesByTemplate.bind(dynamicDsService), + dynamicDsGetDatasourceParamByIndex: dynamicDsService.getDatasourceParamByIndex.bind(dynamicDsService), }; } diff --git a/packages/node/src/indexer/indexer.manager.ts b/packages/node/src/indexer/indexer.manager.ts index f505fb3ec0..2bd8d39095 100644 --- a/packages/node/src/indexer/indexer.manager.ts +++ b/packages/node/src/indexer/indexer.manager.ts @@ -97,9 +97,12 @@ export class IndexerManager extends BaseIndexerManager< dataSources: SubstrateProjectDs[], getVM: (d: SubstrateProjectDs) => Promise, ): Promise { + // Extract block height for checking destroyed datasources + const blockHeight = blockContent.block.block.header.number.toNumber(); + if (isFullBlock(blockContent)) { const { block, events, extrinsics } = blockContent; - await this.indexContent(SubstrateHandlerKind.Block)( + await this.indexContent(SubstrateHandlerKind.Block, blockHeight)( block, dataSources, getVM, @@ -131,7 +134,7 @@ export class IndexerManager extends BaseIndexerManager< // Run initialization events for (const event of groupedEvents.init) { - await this.indexContent(SubstrateHandlerKind.Event)( + await this.indexContent(SubstrateHandlerKind.Event, blockHeight)( event, dataSources, getVM, @@ -139,7 +142,7 @@ export class IndexerManager extends BaseIndexerManager< } for (const extrinsic of extrinsics) { - await this.indexContent(SubstrateHandlerKind.Call)( + await this.indexContent(SubstrateHandlerKind.Call, blockHeight)( extrinsic, dataSources, getVM, @@ -151,7 +154,7 @@ export class IndexerManager extends BaseIndexerManager< ); for (const event of extrinsicEvents) { - await this.indexContent(SubstrateHandlerKind.Event)( + await this.indexContent(SubstrateHandlerKind.Event, blockHeight)( event, dataSources, getVM, @@ -161,7 +164,7 @@ export class IndexerManager extends BaseIndexerManager< // Run finalization events for (const event of groupedEvents.finalize) { - await this.indexContent(SubstrateHandlerKind.Event)( + await this.indexContent(SubstrateHandlerKind.Event, blockHeight)( event, dataSources, getVM, @@ -169,7 +172,7 @@ export class IndexerManager extends BaseIndexerManager< } } else { for (const event of blockContent.events) { - await this.indexContent(SubstrateHandlerKind.Event)( + await this.indexContent(SubstrateHandlerKind.Event, blockHeight)( event, dataSources, getVM, @@ -180,6 +183,7 @@ export class IndexerManager extends BaseIndexerManager< private indexContent( kind: SubstrateHandlerKind, + blockHeight: number, ): ( content: | SubstrateBlock @@ -190,7 +194,9 @@ export class IndexerManager extends BaseIndexerManager< getVM: (d: SubstrateProjectDs) => Promise, ) => Promise { return async (content, dataSources, getVM) => { - for (const ds of dataSources) { + // When a datasource is destroyed, its removed from the array and the loop exits early + for (let i = 0; i < dataSources.length; i++) { + const ds = dataSources[i]; await this.indexData(kind, content, ds, getVM); } }; diff --git a/packages/types-core/src/global.ts b/packages/types-core/src/global.ts index cbaf1be173..654999c971 100644 --- a/packages/types-core/src/global.ts +++ b/packages/types-core/src/global.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: GPL-3.0 import type Pino from 'pino'; -import {Cache, DynamicDatasourceCreator} from './interfaces'; +import {Cache, DynamicDatasourceCreator, DynamicDatasourceDestructor, DynamicDatasourceGetter} from './interfaces'; import {Store} from './store'; // base global @@ -12,4 +12,6 @@ declare global { const cache: Cache; const chainId: string; const createDynamicDatasource: DynamicDatasourceCreator; + const destroyDynamicDatasource: DynamicDatasourceDestructor; + const getDynamicDatasources: DynamicDatasourceGetter; } diff --git a/packages/types-core/src/interfaces.ts b/packages/types-core/src/interfaces.ts index bde78e77ac..7930692ad8 100644 --- a/packages/types-core/src/interfaces.ts +++ b/packages/types-core/src/interfaces.ts @@ -2,6 +2,28 @@ // SPDX-License-Identifier: GPL-3.0 export type DynamicDatasourceCreator = (name: string, args: Record) => Promise; +export type DynamicDatasourceDestructor = (name: string, index: number) => Promise; + +/** + * Information about a dynamic datasource instance. + */ +export interface DynamicDatasourceInfo { + /** + * Global index of the datasource in the internal storage array. + * Use this value when calling destroyDynamicDatasource(). + */ + index: number; + /** Template name this datasource was created from */ + templateName: string; + /** Block height where this datasource starts processing */ + startBlock: number; + /** Block height where this datasource stops processing (if destroyed) */ + endBlock?: number; + /** Arguments passed when creating this datasource */ + args?: Record; +} + +export type DynamicDatasourceGetter = (templateName: string) => DynamicDatasourceInfo[]; export interface Cache = Record> { set(key: keyof T, value: T[keyof T]): Promise;