diff --git a/packages/apify/src/actor.ts b/packages/apify/src/actor.ts index 70ee82f4ea..712ee981aa 100644 --- a/packages/apify/src/actor.ts +++ b/packages/apify/src/actor.ts @@ -33,7 +33,11 @@ import type { Webhook, WebhookEventType, } from 'apify-client'; -import { ActorRun as ClientActorRun, ApifyClient } from 'apify-client'; +import { + ActorRun as ClientActorRun, + ApifyClient, + DatasetClient, +} from 'apify-client'; import ow from 'ow'; import { @@ -59,6 +63,7 @@ import { PlatformEventManager } from './platform_event_manager.js'; import type { ProxyConfigurationOptions } from './proxy_configuration.js'; import { ProxyConfiguration } from './proxy_configuration.js'; import { checkCrawleeVersion, getSystemInfo } from './utils.js'; +import { AsyncLocalStorage } from 'node:async_hooks'; export interface InitOptions { storage?: StorageClient; @@ -341,6 +346,11 @@ export const EXIT_CODES = { ERROR_UNKNOWN: 92, }; +interface PpeAwarePushDataContext { + eventName: string; + chargeResult?: ChargeResult; +} + /** * `Actor` class serves as an alternative approach to the static helpers exported from the package. It allows to pass configuration * that will be used on the instance methods. Environment variables will have precedence over this configuration. @@ -387,6 +397,10 @@ export class Actor { private chargingManager: ChargingManager; + private ppeAwarePushDataContext = new AsyncLocalStorage< + PpeAwarePushDataContext | undefined + >(); + constructor(options: ConfigurationOptions = {}) { // use default configuration object if nothing overridden (it fallbacks to env vars) this.config = @@ -1099,26 +1113,54 @@ export class Actor { const dataset = await this.openDataset(); - const maxChargedCount = - eventName !== undefined - ? this.chargingManager.calculateMaxEventChargeCountWithinLimit( - eventName, - ) - : Infinity; - const toCharge = Array.isArray(item) ? item.length : 1; - - if (toCharge > maxChargedCount) { - // Push as many items as we can charge for - const items = Array.isArray(item) ? item : [item]; - await dataset.pushData(items.slice(0, maxChargedCount)); + // Check if dataset is using PatchedApifyClient (which handles charging internally) + // or a different storage client (e.g., local storage in development) + const usingPatchedClient = + dataset.client === + this.apifyClient.dataset(this.config.get('defaultDatasetId')); + + if (usingPatchedClient) { + // PatchedDatasetClient will handle charging and item limiting + const context: PpeAwarePushDataContext | undefined = + eventName !== undefined ? { eventName } : undefined; + + await this.ppeAwarePushDataContext.run(context, async () => { + await dataset.pushData(item); + }); + + if (eventName !== undefined) { + if (context?.chargeResult === undefined) { + throw new Error( + 'Internal error - missing result of charge operation', + ); + } + + return context.chargeResult; + } } else { - await dataset.pushData(item); - } + // Local storage or other client - handle charging directly + const autoEventName = + eventName ?? + (dataset.id === this.config.get('defaultDatasetId') + ? 'apify-default-dataset-item' + : undefined); + + if (autoEventName === undefined) { + // No charging needed + await dataset.pushData(item); + return; + } + + const { limitedItems, countToCharge } = + this.calculatePushDataLimits(item, autoEventName); + + if (countToCharge > 0) { + await dataset.pushData(limitedItems); + } - if (eventName) { return await this.chargingManager.charge({ - eventName, - count: Math.min(toCharge, maxChargedCount), + eventName: autoEventName, + count: countToCharge, }); } } @@ -1545,11 +1587,94 @@ export class Actor { * @ignore */ newClient(options: ApifyClientOptions = {}): ApifyClient { + const actor = this; + const { storageDir, ...storageClientOptions } = this.config.get( 'storageClientOptions', ) as Dictionary; const { apifyVersion, crawleeVersion } = getSystemInfo(); - return new ApifyClient({ + + class PatchedDatasetClient< + Data extends Record = Record< + string | number, + unknown + >, + > extends DatasetClient { + override async pushItems( + items: string | Data | string[] | Data[], + ): Promise { + const context = actor.ppeAwarePushDataContext.getStore(); + const eventName = + context?.eventName ?? + (this.id === actor.config.get('defaultDatasetId') + ? 'apify-default-dataset-item' + : undefined); + + if (eventName === undefined) { + // No charging needed + await super.pushItems(items); + return; + } + + const { limitedItems, countToCharge } = + actor.calculatePushDataLimits(items, eventName); + + if (countToCharge > 0) { + await super.pushItems(limitedItems as string[] | Data[]); + } + + const chargeResult = await actor.chargingManager.charge({ + eventName, + count: countToCharge, + }); + + if (context) { + // For a single invocation of Dataset.pushData, there may be more than one call to DatasetClient.pushItems - we need to aggregate the `ChargeResult` objects + if (context.chargeResult === undefined) { + context.chargeResult = chargeResult; + } else { + context.chargeResult = { + eventChargeLimitReached: + context.chargeResult.eventChargeLimitReached && + chargeResult.eventChargeLimitReached, + chargedCount: + context.chargeResult.chargedCount + + chargeResult.chargedCount, + chargeableWithinLimit: Object.fromEntries( + Object.entries( + context.chargeResult.chargeableWithinLimit, + ).map(([key, oldValue]) => [ + key, + Math.min( + oldValue, + chargeResult.chargeableWithinLimit[key], + ), + ]), + ), + }; + } + } + } + } + + class PatchedApifyClient extends ApifyClient { + override dataset< + Data extends Record = Record< + string | number, + unknown + >, + >(id: string): DatasetClient { + return new PatchedDatasetClient({ + id, + baseUrl: this.baseUrl, + publicBaseUrl: this.publicBaseUrl, + apifyClient: this, + httpClient: this.httpClient, + }); + } + } + + const client = new PatchedApifyClient({ baseUrl: this.config.get('apiBaseUrl'), publicBaseUrl: this.config.get('apiPublicBaseUrl'), token: this.config.get('token'), @@ -1560,6 +1685,8 @@ export class Actor { ...storageClientOptions, ...options, // allow overriding the instance configuration }); + + return client; } /** @@ -2315,6 +2442,35 @@ export class Actor { return this._instance; } + /** + * Helper to calculate how many items can be pushed within charging limits. + * Returns the limited items and count to charge. + */ + private calculatePushDataLimits( + items: T | T[], + eventName: string, + ): { limitedItems: T[]; countToCharge: number } { + const itemsArray = Array.isArray(items) ? items : [items]; + const maxChargedCount = + this.chargingManager.calculateMaxEventChargeCountWithinLimit( + eventName, + ); + const itemsToKeep = Math.min(itemsArray.length, maxChargedCount); + + let limitedItems: T | T[]; + if (Array.isArray(items)) { + limitedItems = itemsArray.slice(0, itemsToKeep); + } else { + // Single item: keep it if we can charge for at least one, otherwise empty array + limitedItems = [items]; + } + + return { + limitedItems, + countToCharge: Math.min(itemsArray.length, maxChargedCount), + }; + } + private async _openStorage( storageClass: Constructor, id?: string, diff --git a/test/apify/utils.test.ts b/test/apify/utils.test.ts index 442f9101d9..b8fc5cecdd 100644 --- a/test/apify/utils.test.ts +++ b/test/apify/utils.test.ts @@ -2,7 +2,7 @@ import type { IncomingMessage } from 'node:http'; import type { Request } from '@crawlee/core'; import { createRequestDebugInfo } from '@crawlee/utils'; -import { Actor } from 'apify'; +import { Actor, ApifyClient } from 'apify'; import semver from 'semver'; import { APIFY_ENV_VARS } from '@apify/consts'; @@ -25,7 +25,7 @@ describe('Actor.newClient()', () => { process.env[APIFY_ENV_VARS.TOKEN] = 'token'; const client = Actor.newClient(); - expect(client.constructor.name).toBe('ApifyClient'); + expect(client).toBeInstanceOf(ApifyClient); expect(client.token).toBe('token'); expect(client.baseUrl).toBe('http://www.example.com:1234/path/v2'); });