Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 116 additions & 13 deletions libs/langchain-classic/src/storage/file_system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export class LocalFileStore extends BaseStore<string, Uint8Array> {

rootPath: string;

private keyLocks: Map<string, Promise<void>> = new Map();

constructor(fields: { rootPath: string }) {
super(fields);
this.rootPath = fields.rootPath;
Expand Down Expand Up @@ -79,15 +81,18 @@ export class LocalFileStore extends BaseStore<string, Uint8Array> {
* @param fileContent An object with the key-value pairs to be written to the file.
*/
private async setFileContent(content: Uint8Array, key: string) {
try {
await fs.writeFile(this.getFullPath(key), content);
} catch (error) {
throw new Error(
`Error writing file at path: ${this.getFullPath(
key
)}.\nError: ${JSON.stringify(error)}`
);
}
await this.withKeyLock(key, async () => {
const fullPath = this.getFullPath(key);
try {
await this.writeFileAtomically(content, fullPath);
} catch (error) {
throw new Error(
`Error writing file at path: ${fullPath}.\nError: ${JSON.stringify(
error
)}`
);
}
});
}

/**
Expand Down Expand Up @@ -137,12 +142,20 @@ export class LocalFileStore extends BaseStore<string, Uint8Array> {

/**
* Sets the values for the given keys in the store.
* The last value for duplicate keys will be used.
* @param keyValuePairs Array of key-value pairs to set in the store.
* @returns Promise that resolves when all key-value pairs have been set.
*/
async mset(keyValuePairs: [string, Uint8Array][]): Promise<void> {
const deduped = new Map<string, Uint8Array>();
for (const [key, value] of keyValuePairs) {
deduped.set(key, value);
}

await Promise.all(
keyValuePairs.map(([key, value]) => this.setFileContent(value, key))
Array.from(deduped.entries(), ([key, value]) =>
this.setFileContent(value, key)
)
);
}

Expand All @@ -152,7 +165,19 @@ export class LocalFileStore extends BaseStore<string, Uint8Array> {
* @returns Promise that resolves when all keys have been deleted.
*/
async mdelete(keys: string[]): Promise<void> {
await Promise.all(keys.map((key) => fs.unlink(this.getFullPath(key))));
await Promise.all(
keys.map((key) =>
this.withKeyLock(key, async () => {
try {
await fs.unlink(this.getFullPath(key));
} catch (error) {
if (!error || (error as { code?: string }).code !== "ENOENT") {
throw error;
}
}
})
)
);
}

/**
Expand All @@ -162,8 +187,10 @@ export class LocalFileStore extends BaseStore<string, Uint8Array> {
* @returns AsyncGenerator that yields keys from the store.
*/
async *yieldKeys(prefix?: string): AsyncGenerator<string> {
const allFiles = await fs.readdir(this.rootPath);
const allKeys = allFiles.map((file) => file.replace(".txt", ""));
const allFiles: string[] = await fs.readdir(this.rootPath);
const allKeys = allFiles
.filter((file) => file.endsWith(".txt"))
.map((file) => file.replace(/\.txt$/, ""));
for (const key of allKeys) {
if (prefix === undefined || key.startsWith(prefix)) {
yield key;
Expand Down Expand Up @@ -194,6 +221,82 @@ export class LocalFileStore extends BaseStore<string, Uint8Array> {
}
}

// Clean up orphaned temp files left by interrupted atomic writes.
try {
const entries = await fs.readdir(rootPath);
await Promise.all(
entries
.filter((file) => file.endsWith(".tmp"))
.map((tempFile) =>
fs.unlink(path.join(rootPath, tempFile)).catch(() => {})
)
);
} catch {
// Ignore cleanup errors.
}

return new this({ rootPath });
}

/**
* Ensures calls for the same key run sequentially by chaining promises.
* @param key Key to serialize operations for.
* @param fn Async work to execute while the lock is held.
* @returns Promise resolving with the callback result once the lock releases.
*/
private async withKeyLock<T>(key: string, fn: () => Promise<T>): Promise<T> {
const previous = this.keyLocks.get(key) ?? Promise.resolve();
const waitForPrevious = previous.catch(() => {});

let resolveCurrent: (() => void) | undefined;
const current = new Promise<void>((resolve) => {
resolveCurrent = resolve;
});

const tail = waitForPrevious.then(() => current);
this.keyLocks.set(key, tail);

await waitForPrevious;
try {
return await fn();
} finally {
resolveCurrent?.();
if (this.keyLocks.get(key) === tail) {
this.keyLocks.delete(key);
}
}
}

/**
* Writes data to a temporary file before atomically renaming it into place.
* @param content Serialized value to persist.
* @param fullPath Destination path for the stored key.
*/
private async writeFileAtomically(content: Uint8Array, fullPath: string) {
const directory = path.dirname(fullPath);
await fs.mkdir(directory, { recursive: true });

const tempPath = `${fullPath}.${Date.now()}-${Math.random()
.toString(16)
.slice(2)}.tmp`;

try {
await fs.writeFile(tempPath, content);

try {
await fs.rename(tempPath, fullPath);
} catch (renameError) {
const code = (renameError as { code?: string }).code;
if (renameError && (code === "EPERM" || code === "EACCES")) {
await fs.writeFile(fullPath, content);
await fs.unlink(tempPath).catch(() => {});
} else {
throw renameError;
}
}
} catch (error) {
await fs.unlink(tempPath).catch(() => {});
throw error;
}
}
Comment on lines +270 to +301
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of the temp files if we're sequencing files using withKeyLock?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an added guard to prevent partial file writes causing data corruption if the process terminates before the buffer is fully flushed for a particular file.

We write to the temp file first as this might be interrupted. If it is interrupted, only the temp file is corrupted which is cleaned up. Once we have fully flushed the fs buffer and we know it's in a good state, we overwrite the previous file with the rename which cannot be interrupted as it is either successful or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that a common occurrence? What is the rename operation fails?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to say if it's common but I can image relatively likely scenarios and the problem tends to scale up with larger file sizes. Larger files mean larger write times, increasing the chance of a failure mid-write.

A good example might be:

  1. A developer force terminates the process execution while LocalFileStore is halfway through writing a file to the file system.
  2. The incomplete write operation results in malformed JSON as it is not terminated correctly.
  3. Reading the txt file results in an invalid JSON exception.

The rename operation is considerably more reliable then the writeFile because it's a single operation. If we have already written data to the filesystem with writeFile it's very likely the rename will work correctly as it requires the same OS filesystem permissions.

For windows I think it would only fail if there was a open file handle and on Linux it would fail if it doesn't have access to the directory. In both cases I think a failure is expected.

}
103 changes: 102 additions & 1 deletion libs/langchain-classic/src/storage/tests/file_system.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,24 @@ import fs from "node:fs";
import path from "node:path";
import os from "node:os";

import { describe, expect, test } from "vitest";
import { describe, expect, test, vi } from "vitest";

import { LocalFileStore } from "../file_system.js";

function createDeferred<T>() {
let resolve!: (value: T | PromiseLike<T>) => void;
let reject!: (reason?: unknown) => void;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
return {
promise,
resolve: (value: T | PromiseLike<T>) => resolve(value),
reject: (reason?: unknown) => reject(reason),
};
}

describe("LocalFileStore", () => {
const keys = ["key1", "key2"];
const tempDir = fs.mkdtempSync(
Expand All @@ -32,6 +46,93 @@ describe("LocalFileStore", () => {
]);
});

test("LocalFileStore uses last value for duplicate keys in mset", async () => {
const encoder = new TextEncoder();
const decoder = new TextDecoder();
const store = await LocalFileStore.fromPath(tempDir);
const key = "duplicate-key";
await store.mset([
[key, encoder.encode("first")],
[key, encoder.encode("second")],
]);
const [value] = await store.mget([key]);
expect(value).toBeDefined();
expect(decoder.decode(value!)).toBe("second");
await store.mdelete([key]);
});

test("LocalFileStore queues writes for the same key while a lock is held", async () => {
const encoder = new TextEncoder();
const decoder = new TextDecoder();
const store = await LocalFileStore.fromPath(tempDir);
const key = "locked-key";

const prototype = Object.getPrototypeOf(store) as {
writeFileAtomically: (
this: LocalFileStore,
content: Uint8Array,
fullPath: string
) => Promise<void>;
};
type WriteFileArgs = [Uint8Array, string];
const originalWriteFileAtomically = prototype.writeFileAtomically;
const firstWriteGate = createDeferred<void>();
const writeFileSpy = vi
.spyOn(prototype, "writeFileAtomically")
.mockImplementationOnce(async function (this: LocalFileStore, ...args: WriteFileArgs) {
await firstWriteGate.promise;
return originalWriteFileAtomically.apply(this, args);
})
.mockImplementation(function (this: LocalFileStore, ...args: WriteFileArgs) {
return originalWriteFileAtomically.apply(this, args);
});

try {
const firstWrite = store.mset([[key, encoder.encode("first")]]);

await expect.poll(() => writeFileSpy.mock.calls.length).toBe(1);

const secondWrite = store.mset([[key, encoder.encode("second")]]);

await new Promise((resolve) => setTimeout(resolve, 25));

expect(writeFileSpy.mock.calls.length).toBe(1);

firstWriteGate.resolve();

await Promise.all([firstWrite, secondWrite]);

expect(writeFileSpy.mock.calls.length).toBe(2);

const [value] = await store.mget([key]);
expect(value).toBeDefined();
expect(decoder.decode(value!)).toBe("second");

const { keyLocks } = store as unknown as {
keyLocks: Map<string, Promise<void>>;
};
expect(keyLocks.size).toBe(0);
} finally {
writeFileSpy.mockRestore();
await store.mdelete([key]);
}
});

test("LocalFileStore removes orphaned temp files during initialization", async () => {
const cleanupDir = fs.mkdtempSync(
path.join(os.tmpdir(), "file_system_classic_cleanup")
);
const orphanFile = path.join(cleanupDir, "orphan.tmp");
fs.writeFileSync(orphanFile, "stale");

await LocalFileStore.fromPath(cleanupDir);

const remaining = await fs.promises.readdir(cleanupDir);
expect(remaining).not.toContain("orphan.tmp");

await fs.promises.rm(cleanupDir, { recursive: true, force: true });
});

test("LocalFileStore can delete values", async () => {
const encoder = new TextEncoder();
const store = await LocalFileStore.fromPath(tempDir);
Expand Down
Loading
Loading