Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 20 additions & 56 deletions ccip-sdk/src/aptos/logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import type { LogFilter } from '../chain.ts'
import {
CCIPAptosAddressModuleRequiredError,
CCIPAptosTransactionTypeUnexpectedError,
CCIPLogsRequiresStartError,
CCIPLogsWatchRequiresFinalityError,
CCIPLogsWatchRequiresStartError,
CCIPTopicsInvalidError,
} from '../errors/index.ts'
import type { ChainLog } from '../types.ts'
Expand Down Expand Up @@ -97,7 +97,7 @@ async function* fetchEventsForward(
): AsyncGenerator<ResEvent> {
if (opts.watch && typeof opts.endBlock === 'number' && opts.endBlock > 0)
throw new CCIPLogsWatchRequiresFinalityError(opts.endBlock)
opts.endBlock ||= 'latest'
opts.endBlock ??= 'latest'

const fetchBatch = memoize(
async (start?: number) => {
Expand All @@ -119,18 +119,24 @@ async function* fetchEventsForward(

let start
if (
(!opts.startBlock || opts.startBlock < +initialBatch[0]!.version) &&
(!opts.startTime ||
opts.startTime < (await getVersionTimestamp(provider, +initialBatch[0]!.version)))
opts.startTime != null &&
(opts.startBlock == null || opts.startBlock < +initialBatch[0]!.version) &&
opts.startTime < (await getVersionTimestamp(provider, +initialBatch[0]!.version))
) {
const i = await binarySearchFirst(0, Math.floor(end / limit) - 1, async (i) => {
const batch = await fetchBatch(end - (i + 1) * limit + 1)
const firstTimestamp = await getVersionTimestamp(provider, +batch[0]!.version)
return firstTimestamp > opts.startTime!
})
start = end - (i + 1) * limit + 1
start = Math.max(end - (i + 1) * limit + 1, 0)
} else if (
opts.startTime == null &&
opts.startBlock != null &&
opts.startBlock <= +initialBatch[0]!.version
) {
start = 0
} else {
start = end - limit + 1
start = Math.max(end - limit + 1, 0)
}

let notAfter =
Expand All @@ -155,7 +161,7 @@ async function* fetchEventsForward(
const data = await fetchBatch(start)
if (
first &&
opts.startTime &&
opts.startTime != null &&
(await getVersionTimestamp(provider, +data[0]!.version)) < opts.startTime
) {
// the first batch may have some head which is not in the range
Expand All @@ -172,10 +178,10 @@ async function* fetchEventsForward(
first = false

for (const ev of data) {
if (opts.startBlock && +ev.version < opts.startBlock) continue
if (opts.startBlock != null && +ev.version < opts.startBlock) continue
// there may be an unknown interval between yields, so we support memoized negative finality
if (
notAfter &&
notAfter != null &&
+ev.version > (typeof notAfter === 'function' ? await notAfter() : notAfter)
) {
catchedUp = true
Expand All @@ -197,41 +203,6 @@ async function* fetchEventsForward(
}
}

async function* fetchEventsBackward(
{ provider }: { provider: Aptos },
opts: LogFilter,
eventHandlerField: string,
stateAddr: string,
limit = 100,
): AsyncGenerator<ResEvent> {
let start
let cont = true
const notAfter =
typeof opts.endBlock !== 'number'
? undefined
: opts.endBlock < 0
? +(await provider.getLedgerInfo()).ledger_version + opts.endBlock
: opts.endBlock
do {
const { data } = await getAptosFullNode<object, ResEvent[]>({
aptosConfig: provider.config,
originMethod: 'getEventsByEventHandle',
path: `accounts/${stateAddr}/events/${opts.address}::${eventHandlerField}`,
params: { start, limit },
})

if (!data.length) break
else if (start === 1) cont = false
else start = Math.max(+data[0]!.sequence_number - limit, 1)

for (const ev of data.reverse()) {
if (notAfter && +ev.version > notAfter) continue
if (+ev.sequence_number <= 1) cont = false
yield ev
}
} while (cont)
}

/**
* Streams logs from the Aptos blockchain based on filter options.
* @param provider - Aptos provider instance.
Expand All @@ -246,6 +217,9 @@ export async function* streamAptosLogs(
if (!opts.address || !opts.address.includes('::')) throw new CCIPAptosAddressModuleRequiredError()
if (opts.topics?.length !== 1 || typeof opts.topics[0] !== 'string')
throw new CCIPTopicsInvalidError(opts.topics!)
const hasStart = opts.startBlock != null || opts.startTime != null
if (!hasStart) throw new CCIPLogsRequiresStartError()

let eventHandlerField = opts.topics[0]
if (!eventHandlerField.includes('/')) {
eventHandlerField = (eventToHandler as Record<string, string>)[eventHandlerField]!
Expand All @@ -257,18 +231,8 @@ export async function* streamAptosLogs(
},
})

let eventsIter
if (opts.startBlock || opts.startTime) {
eventsIter = fetchEventsForward(ctx, opts, eventHandlerField, stateAddr, limit)
} else if (opts.watch) {
throw new CCIPLogsWatchRequiresStartError()
} else {
// backwards, just paginate down to lowest sequence number
eventsIter = fetchEventsBackward(ctx, opts, eventHandlerField, stateAddr, limit)
}

let topics
for await (const ev of eventsIter) {
for await (const ev of fetchEventsForward(ctx, opts, eventHandlerField, stateAddr, limit)) {
topics ??= [ev.type.slice(ev.type.lastIndexOf('::') + 2)]
yield {
address: opts.address,
Expand Down
25 changes: 13 additions & 12 deletions ccip-sdk/src/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
CCIPArgumentInvalidError,
CCIPChainFamilyMismatchError,
CCIPExecTxRevertedError,
CCIPLogsRequiresStartError,
CCIPNotImplementedError,
CCIPTokenPoolChainConfigNotFoundError,
CCIPTransactionNotFinalizedError,
Expand Down Expand Up @@ -166,13 +167,13 @@ export const DEFAULT_API_RETRY_CONFIG: Required<ApiRetryConfig> = {
* Filter options for getLogs queries across chains.
*/
export type LogFilter = {
/** Starting block number (inclusive). */
/** Starting block number (inclusive). Required unless startTime is provided; explicit 0 is allowed. */
startBlock?: number
/** Starting Unix timestamp (inclusive). */
startTime?: number
/** Ending block number (inclusive). */
endBlock?: number | 'finalized' | 'latest'
/** Solana: optional hint txHash for end of iteration. */
/** Cursor hint for the exclusive upper end of iteration on chains that support it. */
endBefore?: string
/** watch mode: polls for new logs after fetching since start (required), until endBlock finality tag
* (e.g. endBlock=finalized polls only finalized logs); can be a promise to cancel loop
Expand Down Expand Up @@ -671,13 +672,12 @@ export abstract class Chain<F extends ChainFamily = ChainFamily> {
/**
* An async generator that yields logs based on the provided options.
* @param opts - Options object containing:
* - `startBlock`: if provided, fetch and generate logs forward starting from this block;
* otherwise, returns logs backwards in time from endBlock;
* optionally, startTime may be provided to fetch logs forward starting from this time
* - `startBlock`: fetch and generate logs forward starting from this block;
* required unless startTime is provided; explicit 0 is allowed
* - `startTime`: instead of a startBlock, a start timestamp may be provided;
* if either is provided, fetch logs forward from this starting point; otherwise, backwards
* - `endBlock`: if omitted, use latest block; can be a block number, 'latest', 'finalized' or
* negative finality block depth
* if either is provided, fetch logs forward from this starting point
* - `endBlock`: a fixed block height, finality tag or negative finality depth to stop iteration
* at; defaults to `latest`
* - `endBefore`: optional hint signature for end of iteration, instead of endBlock
* - `address`: if provided, fetch logs for this address only (may be required in some
* networks/implementations)
Expand All @@ -686,10 +686,11 @@ export abstract class Chain<F extends ChainFamily = ChainFamily> {
* some networks/implementations may not be able to filter topics other than topic0s, so one may
* want to assume those are optimization hints, instead of hard filters, and verify results
* - `page`: if provided, try to use this page/range for batches
* - `watch`: true or cancellation promise, getLogs continuously after initial fetch
* - `watch`: true or cancellation promise, stream logs after initial catch-up until endBlock
* finality tag (e.g. 'finalized'); requires endBlock to be a finality tag
* @returns An async iterable iterator of logs.
* @throws {@link CCIPLogsWatchRequiresFinalityError} if watch mode is used without a finality endBlock tag
* @throws {@link CCIPLogsWatchRequiresStartError} if watch mode is used without startBlock or startTime
* @throws {@link CCIPLogsRequiresStartError} if used without startBlock or startTime
* @throws {@link CCIPLogsAddressRequiredError} if address is required but not provided (chain-specific)
*/
abstract getLogs(opts: LogFilter): AsyncIterableIterator<ChainLog>
Expand Down Expand Up @@ -1396,7 +1397,7 @@ export abstract class Chain<F extends ChainFamily = ChainFamily> {
'page' | 'watch' | 'startBlock' | 'startTime'
>): AsyncIterableIterator<CCIPExecution> {
if (verifications && 'log' in verifications) hints.startBlock ??= verifications.log.blockNumber
const onlyLast = !hints.startTime && !hints.startBlock // backwards
if (hints.startTime == null && hints.startBlock == null) throw new CCIPLogsRequiresStartError()
for await (const log of this.getLogs({
address: offRamp,
topics: ['ExecutionStateChanged'],
Expand All @@ -1415,7 +1416,7 @@ export abstract class Chain<F extends ChainFamily = ChainFamily> {

const timestamp = log.tx?.timestamp ?? (await this.getBlockTimestamp(log.blockNumber))
yield { receipt, log, timestamp }
if (onlyLast || receipt.state === ExecutionState.Success) break
if (receipt.state === ExecutionState.Success) break
}
}

Expand Down
4 changes: 3 additions & 1 deletion ccip-sdk/src/commits.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ export async function getOnchainCommitReport(
): Promise<CCIPVerifications> {
for await (const log of dest.getLogs({
...hints,
...(!hints?.startBlock ? { startTime: requestTimestamp } : { startBlock: hints.startBlock }),
...(hints?.startBlock == null
? { startTime: requestTimestamp }
: { startBlock: hints.startBlock }),
address: offRamp,
topics: [lane.version < CCIPVersion.V1_6 ? 'ReportAccepted' : 'CommitReportAccepted'],
})) {
Expand Down
1 change: 1 addition & 0 deletions ccip-sdk/src/errors/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ export const CCIPErrorCode = {
LOG_EVENT_HANDLER_UNKNOWN: 'LOG_EVENT_HANDLER_UNKNOWN',
LOGS_WATCH_REQUIRES_FINALITY: 'LOGS_WATCH_REQUIRES_FINALITY',
LOGS_WATCH_REQUIRES_START: 'LOGS_WATCH_REQUIRES_START',
LOGS_REQUIRES_START: 'LOGS_REQUIRES_START',
LOGS_ADDRESS_REQUIRED: 'LOGS_ADDRESS_REQUIRED',
TOPICS_INVALID: 'TOPICS_INVALID',

Expand Down
1 change: 1 addition & 0 deletions ccip-sdk/src/errors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export { CCIPBlockNotFoundError, CCIPTransactionNotFoundError } from './speciali
// Specialized errors - Logs
export {
CCIPLogsAddressRequiredError,
CCIPLogsRequiresStartError,
CCIPLogsWatchRequiresFinalityError,
CCIPLogsWatchRequiresStartError,
} from './specialized.ts'
Expand Down
1 change: 1 addition & 0 deletions ccip-sdk/src/errors/recovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ export const DEFAULT_RECOVERY_HINTS: Partial<Record<CCIPErrorCode, string>> = {
LOGS_WATCH_REQUIRES_FINALITY:
'Logs watch requires endBlock to be a `finalized`, `latest` or finality block depth (negative).',
LOGS_WATCH_REQUIRES_START: 'Logs watch requires either startBlock or startTime (forward mode).',
LOGS_REQUIRES_START: 'Logs queries require either startBlock or startTime.',
LOGS_ADDRESS_REQUIRED: 'Provide address for logs filtering.',
TOPICS_INVALID: 'Topics must be strings for event filtering.',

Expand Down
43 changes: 30 additions & 13 deletions ccip-sdk/src/errors/specialized.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1493,7 +1493,7 @@ export class CCIPLogsNotFoundError extends CCIPError {
* @example
* ```typescript
* try {
* const logs = await chain.getLogs({ topics: ['0xunknown'] })
* const logs = await chain.getLogs({ startBlock: 0, topics: ['0xunknown'] })
* } catch (error) {
* if (error instanceof CCIPLogTopicsNotFoundError) {
* console.log(`Topics not matched: ${error.context.topics}`)
Expand All @@ -1519,7 +1519,7 @@ export class CCIPLogTopicsNotFoundError extends CCIPError {
* @example
* ```typescript
* try {
* await chain.watchLogs({ endBlock: 1000 }) // Fixed endBlock not allowed
* await chain.watchLogs({ startBlock: 0, endBlock: 1000 }) // Fixed endBlock not allowed
* } catch (error) {
* if (error instanceof CCIPLogsWatchRequiresFinalityError) {
* console.log('Use "latest" or "finalized" for endBlock in watch mode')
Expand All @@ -1542,22 +1542,39 @@ export class CCIPLogsWatchRequiresFinalityError extends CCIPError {
/**
* Thrown when trying to `watch` logs but no start position provided.
*
* @deprecated Log queries now require a start position for all modes and throw
* {@link CCIPLogsRequiresStartError}; this class remains exported for compatibility.
*/
export class CCIPLogsWatchRequiresStartError extends CCIPError {
override readonly name = 'CCIPLogsWatchRequiresStartError'
/** Creates a logs watch requires start error. */
constructor(options?: CCIPErrorOptions) {
super(CCIPErrorCode.LOGS_WATCH_REQUIRES_START, `Watch mode requires startBlock or startTime`, {
...options,
isTransient: false,
})
}
}

/**
* Thrown when querying logs without an explicit start position.
*
* @example
* ```typescript
* try {
* await chain.watchLogs({}) // Missing startBlock or startTime
* await chain.getLogs({ address: '0x...', topics: ['CCIPMessageSent'] })
* } catch (error) {
* if (error instanceof CCIPLogsWatchRequiresStartError) {
* console.log('Provide startBlock or startTime for watch mode')
* if (error instanceof CCIPLogsRequiresStartError) {
* console.log('Provide startBlock or startTime')
* }
* }
* ```
*/
export class CCIPLogsWatchRequiresStartError extends CCIPError {
override readonly name = 'CCIPLogsWatchRequiresStartError'
/** Creates a logs watch requires start error. */
export class CCIPLogsRequiresStartError extends CCIPError {
override readonly name = 'CCIPLogsRequiresStartError'
/** Creates a logs requires start error. */
constructor(options?: CCIPErrorOptions) {
super(CCIPErrorCode.LOGS_WATCH_REQUIRES_START, `Watch mode requires startBlock or startTime`, {
super(CCIPErrorCode.LOGS_REQUIRES_START, `Logs query requires startBlock or startTime`, {
...options,
isTransient: false,
})
Expand All @@ -1570,7 +1587,7 @@ export class CCIPLogsWatchRequiresStartError extends CCIPError {
* @example
* ```typescript
* try {
* await chain.getLogs({ topics: [...] }) // Missing address
* await chain.getLogs({ startBlock: 0, topics: [...] }) // Missing address
* } catch (error) {
* if (error instanceof CCIPLogsAddressRequiredError) {
* console.log('Contract address is required for this chain')
Expand Down Expand Up @@ -2102,7 +2119,7 @@ export class CCIPBlockTimeNotFoundError extends CCIPError {
* @example
* ```typescript
* try {
* await chain.getLogs({ topics: [123] }) // Invalid topic type
* await chain.getLogs({ startBlock: 0, topics: [123] }) // Invalid topic type
* } catch (error) {
* if (error instanceof CCIPTopicsInvalidError) {
* console.log('Topics must be string values')
Expand Down Expand Up @@ -2722,7 +2739,7 @@ export class CCIPAptosTransactionTypeUnexpectedError extends CCIPError {
* @example
* ```typescript
* try {
* await aptosChain.getLogs({ address: '0x1' }) // Missing module
* await aptosChain.getLogs({ address: '0x1', startBlock: 0 }) // Missing module
* } catch (error) {
* if (error instanceof CCIPAptosAddressModuleRequiredError) {
* console.log('Provide address with module name')
Expand Down Expand Up @@ -2751,7 +2768,7 @@ export class CCIPAptosAddressModuleRequiredError extends CCIPError {
* @example
* ```typescript
* try {
* await aptosChain.getLogs({ topics: ['invalid'] })
* await aptosChain.getLogs({ startBlock: 0, topics: ['invalid'] })
* } catch (error) {
* if (error instanceof CCIPAptosTopicInvalidError) {
* console.log(`Invalid topic: ${error.context.topic}`)
Expand Down
4 changes: 4 additions & 0 deletions ccip-sdk/src/evm/fork.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ describe('EVM Fork Tests', { skip, timeout: 180_000 }, () => {
for await (const exec of sepoliaChain.getExecutionReceipts({
offRamp,
messageId: successMsg.messageId,
sourceChainSelector: request.message.sourceChainSelector,
startTime: request.tx.timestamp,
})) {
if (exec.receipt.state === ExecutionState.Success) {
foundSuccess = true
Expand Down Expand Up @@ -538,6 +540,8 @@ describe('EVM Fork Tests', { skip, timeout: 180_000 }, () => {
for await (const exec of sepoliaChain.getExecutionReceipts({
offRamp,
messageId: failedMsg.messageId,
sourceChainSelector: request.message.sourceChainSelector,
startTime: request.tx.timestamp,
})) {
assert.notEqual(
exec.receipt.state,
Expand Down
Loading
Loading