diff --git a/libs/langchain-classic/src/storage/file_system.ts b/libs/langchain-classic/src/storage/file_system.ts index d5b5c2ce57a3..a0074be1e85c 100644 --- a/libs/langchain-classic/src/storage/file_system.ts +++ b/libs/langchain-classic/src/storage/file_system.ts @@ -1,5 +1,6 @@ import * as fs from "node:fs/promises"; import * as path from "node:path"; +import { randomUUID } from "node:crypto"; import { BaseStore } from "@langchain/core/stores"; /** @@ -37,6 +38,12 @@ export class LocalFileStore extends BaseStore { rootPath: string; + /** + * Map to track ongoing write operations per key. + * This ensures that concurrent writes to the same key are serialized. + */ + private writeQueues = new Map>(); + constructor(fields: { rootPath: string }) { super(fields); this.rootPath = fields.rootPath; @@ -75,17 +82,36 @@ export class LocalFileStore extends BaseStore { } /** - * Writes the given key-value pairs to the file at the given path. - * @param fileContent An object with the key-value pairs to be written to the file. + * Writes the given key-value pairs to the file at the given path using atomic write. + * This method writes to a temporary file first, then atomically renames it to the + * final destination. This prevents partial writes and corruption if the process + * crashes during the write operation. + * @param content The content to write to the file. + * @param key The key identifying the file. */ private async setFileContent(content: Uint8Array, key: string) { + const finalPath = this.getFullPath(key); + const tempPath = `${finalPath}.${randomUUID()}.tmp`; + try { - await fs.writeFile(this.getFullPath(key), content); + // Write to temporary file first + await fs.writeFile(tempPath, content); + + // Atomically rename to final destination + // On most filesystems, rename is atomic - either the old file exists or the new one does + await fs.rename(tempPath, finalPath); } catch (error) { + // Clean up temporary file if it exists + try { + await fs.unlink(tempPath); + } catch { + // Ignore cleanup errors - file might not exist + } + throw new Error( - `Error writing file at path: ${this.getFullPath( - key - )}.\nError: ${JSON.stringify(error)}` + `Error writing file at path: ${finalPath}.\nError: ${JSON.stringify( + error + )}` ); } } @@ -135,14 +161,55 @@ export class LocalFileStore extends BaseStore { return values; } + /** + * Queues a write operation for a specific key to ensure serialization. + * If there's already a pending write for this key, the new write will + * wait for it to complete before executing. + * @param key The key to write to. + * @param value The value to write. + * @returns Promise that resolves when the write is complete. + */ + private async queueWrite(key: string, value: Uint8Array): Promise { + // Get the existing queue for this key, or start with a resolved promise + const existingQueue = this.writeQueues.get(key) || Promise.resolve(); + + // Chain the new write operation after the existing queue + const writePromise = existingQueue + .then(() => this.setFileContent(value, key)) + .finally(() => { + // Clean up the queue entry if this is still the current promise + // This prevents memory leaks from accumulating promises + if (this.writeQueues.get(key) === writePromise) { + this.writeQueues.delete(key); + } + }); + + // Store the new promise as the current queue for this key + this.writeQueues.set(key, writePromise); + + return writePromise; + } + /** * Sets the values for the given keys in the store. + * This method handles concurrent writes safely by: + * 1. Deduplicating keys within the same batch (last value wins) + * 2. Serializing writes to the same key across different mset() calls * @param keyValuePairs Array of key-value pairs to set in the store. * @returns Promise that resolves when all key-value pairs have been set. */ async mset(keyValuePairs: [string, Uint8Array][]): Promise { + // Deduplicate keys within this batch - last value wins + const uniqueEntries = new Map(); + for (const [key, value] of keyValuePairs) { + uniqueEntries.set(key, value); + } + + // Queue all writes, ensuring serialization per key await Promise.all( - keyValuePairs.map(([key, value]) => this.setFileContent(value, key)) + Array.from(uniqueEntries.entries()).map(([key, value]) => + this.queueWrite(key, value) + ) ); } @@ -174,6 +241,7 @@ export class LocalFileStore extends BaseStore { /** * Static method for initializing the class. * Preforms a check to see if the directory exists, and if not, creates it. + * Also cleans up any orphaned temporary files from previous crashes. * @param path Path to the directory. * @returns Promise that resolves to an instance of the class. */ diff --git a/libs/langchain/src/storage/file_system.ts b/libs/langchain/src/storage/file_system.ts index d5b5c2ce57a3..a0074be1e85c 100644 --- a/libs/langchain/src/storage/file_system.ts +++ b/libs/langchain/src/storage/file_system.ts @@ -1,5 +1,6 @@ import * as fs from "node:fs/promises"; import * as path from "node:path"; +import { randomUUID } from "node:crypto"; import { BaseStore } from "@langchain/core/stores"; /** @@ -37,6 +38,12 @@ export class LocalFileStore extends BaseStore { rootPath: string; + /** + * Map to track ongoing write operations per key. + * This ensures that concurrent writes to the same key are serialized. + */ + private writeQueues = new Map>(); + constructor(fields: { rootPath: string }) { super(fields); this.rootPath = fields.rootPath; @@ -75,17 +82,36 @@ export class LocalFileStore extends BaseStore { } /** - * Writes the given key-value pairs to the file at the given path. - * @param fileContent An object with the key-value pairs to be written to the file. + * Writes the given key-value pairs to the file at the given path using atomic write. + * This method writes to a temporary file first, then atomically renames it to the + * final destination. This prevents partial writes and corruption if the process + * crashes during the write operation. + * @param content The content to write to the file. + * @param key The key identifying the file. */ private async setFileContent(content: Uint8Array, key: string) { + const finalPath = this.getFullPath(key); + const tempPath = `${finalPath}.${randomUUID()}.tmp`; + try { - await fs.writeFile(this.getFullPath(key), content); + // Write to temporary file first + await fs.writeFile(tempPath, content); + + // Atomically rename to final destination + // On most filesystems, rename is atomic - either the old file exists or the new one does + await fs.rename(tempPath, finalPath); } catch (error) { + // Clean up temporary file if it exists + try { + await fs.unlink(tempPath); + } catch { + // Ignore cleanup errors - file might not exist + } + throw new Error( - `Error writing file at path: ${this.getFullPath( - key - )}.\nError: ${JSON.stringify(error)}` + `Error writing file at path: ${finalPath}.\nError: ${JSON.stringify( + error + )}` ); } } @@ -135,14 +161,55 @@ export class LocalFileStore extends BaseStore { return values; } + /** + * Queues a write operation for a specific key to ensure serialization. + * If there's already a pending write for this key, the new write will + * wait for it to complete before executing. + * @param key The key to write to. + * @param value The value to write. + * @returns Promise that resolves when the write is complete. + */ + private async queueWrite(key: string, value: Uint8Array): Promise { + // Get the existing queue for this key, or start with a resolved promise + const existingQueue = this.writeQueues.get(key) || Promise.resolve(); + + // Chain the new write operation after the existing queue + const writePromise = existingQueue + .then(() => this.setFileContent(value, key)) + .finally(() => { + // Clean up the queue entry if this is still the current promise + // This prevents memory leaks from accumulating promises + if (this.writeQueues.get(key) === writePromise) { + this.writeQueues.delete(key); + } + }); + + // Store the new promise as the current queue for this key + this.writeQueues.set(key, writePromise); + + return writePromise; + } + /** * Sets the values for the given keys in the store. + * This method handles concurrent writes safely by: + * 1. Deduplicating keys within the same batch (last value wins) + * 2. Serializing writes to the same key across different mset() calls * @param keyValuePairs Array of key-value pairs to set in the store. * @returns Promise that resolves when all key-value pairs have been set. */ async mset(keyValuePairs: [string, Uint8Array][]): Promise { + // Deduplicate keys within this batch - last value wins + const uniqueEntries = new Map(); + for (const [key, value] of keyValuePairs) { + uniqueEntries.set(key, value); + } + + // Queue all writes, ensuring serialization per key await Promise.all( - keyValuePairs.map(([key, value]) => this.setFileContent(value, key)) + Array.from(uniqueEntries.entries()).map(([key, value]) => + this.queueWrite(key, value) + ) ); } @@ -174,6 +241,7 @@ export class LocalFileStore extends BaseStore { /** * Static method for initializing the class. * Preforms a check to see if the directory exists, and if not, creates it. + * Also cleans up any orphaned temporary files from previous crashes. * @param path Path to the directory. * @returns Promise that resolves to an instance of the class. */