Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 175 additions & 19 deletions packages/apify/src/actor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createPrivateKey } from 'node:crypto';

Check failure on line 1 in packages/apify/src/actor.ts

View workflow job for this annotation

GitHub Actions / Lint

Run autofix to sort these imports!

Check failure on line 1 in packages/apify/src/actor.ts

View workflow job for this annotation

GitHub Actions / Lint

File has too many classes (3). Maximum allowed is 1

import type {
ConfigurationOptions,
Expand Down Expand Up @@ -33,7 +33,11 @@
Webhook,
WebhookEventType,
} from 'apify-client';
import { ActorRun as ClientActorRun, ApifyClient } from 'apify-client';
import {
ActorRun as ClientActorRun,
ApifyClient,
DatasetClient,
} from 'apify-client';
import ow from 'ow';

import {
Expand All @@ -59,6 +63,7 @@
import type { ProxyConfigurationOptions } from './proxy_configuration.js';
import { ProxyConfiguration } from './proxy_configuration.js';
import { checkCrawleeVersion, getSystemInfo } from './utils.js';
import { AsyncLocalStorage } from 'node:async_hooks';

export interface InitOptions {
storage?: StorageClient;
Expand Down Expand Up @@ -341,6 +346,11 @@
ERROR_UNKNOWN: 92,
};

interface PpeAwarePushDataContext {
eventName: string;
chargeResult?: ChargeResult;
}

/**
* `Actor` class serves as an alternative approach to the static helpers exported from the package. It allows to pass configuration
* that will be used on the instance methods. Environment variables will have precedence over this configuration.
Expand Down Expand Up @@ -387,6 +397,10 @@

private chargingManager: ChargingManager;

private ppeAwarePushDataContext = new AsyncLocalStorage<
PpeAwarePushDataContext | undefined
>();

constructor(options: ConfigurationOptions = {}) {
// use default configuration object if nothing overridden (it fallbacks to env vars)
this.config =
Expand Down Expand Up @@ -1099,26 +1113,54 @@

const dataset = await this.openDataset();

const maxChargedCount =
eventName !== undefined
? this.chargingManager.calculateMaxEventChargeCountWithinLimit(
eventName,
)
: Infinity;
const toCharge = Array.isArray(item) ? item.length : 1;

if (toCharge > maxChargedCount) {
// Push as many items as we can charge for
const items = Array.isArray(item) ? item : [item];
await dataset.pushData(items.slice(0, maxChargedCount));
// Check if dataset is using PatchedApifyClient (which handles charging internally)
// or a different storage client (e.g., local storage in development)
const usingPatchedClient =
dataset.client ===
this.apifyClient.dataset(this.config.get('defaultDatasetId'));

if (usingPatchedClient) {
// PatchedDatasetClient will handle charging and item limiting
const context: PpeAwarePushDataContext | undefined =
eventName !== undefined ? { eventName } : undefined;

await this.ppeAwarePushDataContext.run(context, async () => {
await dataset.pushData(item);
});

if (eventName !== undefined) {
if (context?.chargeResult === undefined) {
throw new Error(
'Internal error - missing result of charge operation',
);
}

return context.chargeResult;
}
} else {
await dataset.pushData(item);
}
// Local storage or other client - handle charging directly
const autoEventName =
eventName ??
(dataset.id === this.config.get('defaultDatasetId')
? 'apify-default-dataset-item'
: undefined);

if (autoEventName === undefined) {
// No charging needed
await dataset.pushData(item);
return;

Check failure on line 1151 in packages/apify/src/actor.ts

View workflow job for this annotation

GitHub Actions / Lint

Async method 'pushData' expected a return value
}

const { limitedItems, countToCharge } =
this.calculatePushDataLimits(item, autoEventName);

if (countToCharge > 0) {
await dataset.pushData(limitedItems);
}

if (eventName) {
return await this.chargingManager.charge({
eventName,
count: Math.min(toCharge, maxChargedCount),
eventName: autoEventName,
count: countToCharge,
});
}
}
Expand Down Expand Up @@ -1545,11 +1587,94 @@
* @ignore
*/
newClient(options: ApifyClientOptions = {}): ApifyClient {
const actor = this;

Check failure on line 1590 in packages/apify/src/actor.ts

View workflow job for this annotation

GitHub Actions / Lint

Unexpected aliasing of 'this' to local variable

const { storageDir, ...storageClientOptions } = this.config.get(
'storageClientOptions',
) as Dictionary;
const { apifyVersion, crawleeVersion } = getSystemInfo();
return new ApifyClient({

class PatchedDatasetClient<
Data extends Record<string | number, any> = Record<

Check failure on line 1598 in packages/apify/src/actor.ts

View workflow job for this annotation

GitHub Actions / Lint

'Data' is already declared in the upper scope on line 359 column 20
string | number,
unknown
>,
> extends DatasetClient<Data> {
override async pushItems(
items: string | Data | string[] | Data[],
): Promise<void> {
const context = actor.ppeAwarePushDataContext.getStore();
const eventName =
context?.eventName ??
(this.id === actor.config.get('defaultDatasetId')
? 'apify-default-dataset-item'
: undefined);

if (eventName === undefined) {
// No charging needed
await super.pushItems(items);
return;
}

const { limitedItems, countToCharge } =
actor.calculatePushDataLimits(items, eventName);

if (countToCharge > 0) {
await super.pushItems(limitedItems as string[] | Data[]);
}

const chargeResult = await actor.chargingManager.charge({
eventName,
count: countToCharge,
});

if (context) {
// For a single invocation of Dataset.pushData, there may be more than one call to DatasetClient.pushItems - we need to aggregate the `ChargeResult` objects
if (context.chargeResult === undefined) {
context.chargeResult = chargeResult;
} else {
context.chargeResult = {
eventChargeLimitReached:
context.chargeResult.eventChargeLimitReached &&
chargeResult.eventChargeLimitReached,
chargedCount:
context.chargeResult.chargedCount +
chargeResult.chargedCount,
chargeableWithinLimit: Object.fromEntries(
Object.entries(
context.chargeResult.chargeableWithinLimit,
).map(([key, oldValue]) => [
key,
Math.min(
oldValue,
chargeResult.chargeableWithinLimit[key],
),
]),
),
};
}
}
}
}

class PatchedApifyClient extends ApifyClient {
override dataset<
Data extends Record<string | number, any> = Record<

Check failure on line 1662 in packages/apify/src/actor.ts

View workflow job for this annotation

GitHub Actions / Lint

'Data' is already declared in the upper scope on line 359 column 20
string | number,
unknown
>,
>(id: string): DatasetClient<Data> {
return new PatchedDatasetClient<Data>({
id,
baseUrl: this.baseUrl,
publicBaseUrl: this.publicBaseUrl,
apifyClient: this,
httpClient: this.httpClient,
});
}
}

const client = new PatchedApifyClient({
baseUrl: this.config.get('apiBaseUrl'),
publicBaseUrl: this.config.get('apiPublicBaseUrl'),
token: this.config.get('token'),
Expand All @@ -1560,6 +1685,8 @@
...storageClientOptions,
...options, // allow overriding the instance configuration
});

return client;
}

/**
Expand Down Expand Up @@ -2315,6 +2442,35 @@
return this._instance;
}

/**
* Helper to calculate how many items can be pushed within charging limits.
* Returns the limited items and count to charge.
*/
private calculatePushDataLimits<T>(
items: T | T[],
eventName: string,
): { limitedItems: T[]; countToCharge: number } {
const itemsArray = Array.isArray(items) ? items : [items];
const maxChargedCount =
this.chargingManager.calculateMaxEventChargeCountWithinLimit(
eventName,
);
const itemsToKeep = Math.min(itemsArray.length, maxChargedCount);

let limitedItems: T | T[];
if (Array.isArray(items)) {
limitedItems = itemsArray.slice(0, itemsToKeep);
} else {
// Single item: keep it if we can charge for at least one, otherwise empty array
limitedItems = [items];
}

return {
limitedItems,
countToCharge: Math.min(itemsArray.length, maxChargedCount),
};
}

private async _openStorage<T extends IStorage>(
storageClass: Constructor<T>,
id?: string,
Expand Down
4 changes: 2 additions & 2 deletions test/apify/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { IncomingMessage } from 'node:http';

import type { Request } from '@crawlee/core';
import { createRequestDebugInfo } from '@crawlee/utils';
import { Actor } from 'apify';
import { Actor, ApifyClient } from 'apify';
import semver from 'semver';

import { APIFY_ENV_VARS } from '@apify/consts';
Expand All @@ -25,7 +25,7 @@ describe('Actor.newClient()', () => {
process.env[APIFY_ENV_VARS.TOKEN] = 'token';
const client = Actor.newClient();

expect(client.constructor.name).toBe('ApifyClient');
expect(client).toBeInstanceOf(ApifyClient);
expect(client.token).toBe('token');
expect(client.baseUrl).toBe('http://www.example.com:1234/path/v2');
});
Expand Down
Loading