diff --git a/batch_test.go b/batch_test.go new file mode 100644 index 0000000..f39acf5 --- /dev/null +++ b/batch_test.go @@ -0,0 +1,532 @@ +package flatfs_test + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/query" + flatfs "github.com/ipfs/go-ds-flatfs" +) + +func TestBatchWritesToTempUntilCommit(t *testing.T) { + tryAllShardFuncs(t, testBatchWritesToTempUntilCommit) +} + +func testBatchWritesToTempUntilCommit(dirFunc mkShardFunc, t *testing.T) { + temp, cleanup := tempdir(t) + defer cleanup() + defer checkTemp(t, temp) + + fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) + if err != nil { + t.Fatalf("New fail: %v\n", err) + } + defer fs.Close() + + // Create a batch + batch, err := fs.Batch(context.Background()) + if err != nil { + t.Fatal(err) + } + + // Put some keys + keys := []string{"QUUX", "QAAX", "QBBX"} + for _, key := range keys { + err = batch.Put(context.Background(), datastore.NewKey(key), []byte("testdata")) + if err != nil { + t.Fatal(err) + } + } + + // Check that files don't exist in the main datastore yet + for _, key := range keys { + has, err := fs.Has(context.Background(), datastore.NewKey(key)) + if err != nil { + t.Fatal(err) + } + if has { + t.Errorf("key %s should not exist in datastore before commit", key) + } + } + + // Check that no data files exist in shard directories + checkNoDataFiles := func() bool { + found := false + filepath.Walk(temp, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if filepath.Ext(path) == ".data" { + relPath, _ := filepath.Rel(temp, path) + // Ignore if it's in .temp directory + if !isInTempDir(relPath) { + t.Errorf("found data file before commit: %s", relPath) + found = true + } + } + return nil + }) + return found + } + + if checkNoDataFiles() { + t.Fatal("data files found in main directories before commit") + } + + // Now commit + err = batch.Commit(context.Background()) + if err != nil { + t.Fatal(err) + } + + // After commit, all keys should exist + for _, key := range keys { + has, err := fs.Has(context.Background(), datastore.NewKey(key)) + if err != nil { + t.Fatal(err) + } + if !has { + t.Errorf("key %s should exist in datastore after commit", key) + } + } +} + +func isInTempDir(path string) bool { + // Check if path starts with .temp/ or contains /.temp/ + return len(path) >= 6 && (path[:6] == ".temp/" || path[:6] == ".temp\\") +} + +func TestBatchReadOperations(t *testing.T) { + tryAllShardFuncs(t, testBatchReadOperations) +} + +func testBatchReadOperations(dirFunc mkShardFunc, t *testing.T) { + temp, cleanup := tempdir(t) + defer cleanup() + defer checkTemp(t, temp) + + fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) + if err != nil { + t.Fatalf("New fail: %v\n", err) + } + defer fs.Close() + + // Put some initial data in the datastore + initialKey := datastore.NewKey("INITIAL") + initialData := []byte("initial data") + err = fs.Put(context.Background(), initialKey, initialData) + if err != nil { + t.Fatal(err) + } + + // Create a batch + batch, err := fs.Batch(context.Background()) + if err != nil { + t.Fatal(err) + } + + // Cast to BatchReader interface + batchReader, ok := batch.(flatfs.BatchReader) + if !ok { + t.Fatal("batch does not implement BatchReader interface") + } + + // Put a new key in batch + batchKey := datastore.NewKey("BATCH") + batchData := []byte("batch data") + err = batch.Put(context.Background(), batchKey, batchData) + if err != nil { + t.Fatal(err) + } + + // Overwrite initial key in batch + overwriteData := []byte("overwritten data") + err = batch.Put(context.Background(), initialKey, overwriteData) + if err != nil { + t.Fatal(err) + } + + // Delete a key that will be created + deleteKey := datastore.NewKey("TODELETE") + err = fs.Put(context.Background(), deleteKey, []byte("to be deleted")) + if err != nil { + t.Fatal(err) + } + err = batch.Delete(context.Background(), deleteKey) + if err != nil { + t.Fatal(err) + } + + // Test Get operations before commit + // 1. Get from batch (new key) + data, err := batchReader.Get(context.Background(), batchKey) + if err != nil { + t.Fatal(err) + } + if string(data) != string(batchData) { + t.Errorf("expected %s, got %s", batchData, data) + } + + // 2. Get overwritten key should return new data from batch + data, err = batchReader.Get(context.Background(), initialKey) + if err != nil { + t.Fatal(err) + } + if string(data) != string(overwriteData) { + t.Errorf("expected %s, got %s", overwriteData, data) + } + + // 3. Get deleted key should return not found + _, err = batchReader.Get(context.Background(), deleteKey) + if err != datastore.ErrNotFound { + t.Errorf("expected ErrNotFound for deleted key, got %v", err) + } + + // Test Has operations before commit + // 1. Has for new key in batch + has, err := batchReader.Has(context.Background(), batchKey) + if err != nil { + t.Fatal(err) + } + if !has { + t.Error("expected batch key to exist") + } + + // 2. Has for overwritten key + has, err = batchReader.Has(context.Background(), initialKey) + if err != nil { + t.Fatal(err) + } + if !has { + t.Error("expected initial key to exist") + } + + // 3. Has for deleted key should return false + has, err = batchReader.Has(context.Background(), deleteKey) + if err != nil { + t.Fatal(err) + } + if has { + t.Error("expected deleted key to not exist") + } + + // Test GetSize operations before commit + size, err := batchReader.GetSize(context.Background(), batchKey) + if err != nil { + t.Fatal(err) + } + if size != len(batchData) { + t.Errorf("expected size %d, got %d", len(batchData), size) + } + + // GetSize for deleted key should return not found + _, err = batchReader.GetSize(context.Background(), deleteKey) + if err != datastore.ErrNotFound { + t.Errorf("expected ErrNotFound for deleted key size, got %v", err) + } + + // Main datastore should still have original data + data, err = fs.Get(context.Background(), initialKey) + if err != nil { + t.Fatal(err) + } + if string(data) != string(initialData) { + t.Errorf("main datastore should still have original data: expected %s, got %s", initialData, data) + } + + // Commit the batch + err = batch.Commit(context.Background()) + if err != nil { + t.Fatal(err) + } + + // Verify final state in main datastore + // 1. New key should exist + data, err = fs.Get(context.Background(), batchKey) + if err != nil { + t.Fatal(err) + } + if string(data) != string(batchData) { + t.Errorf("expected %s, got %s", batchData, data) + } + + // 2. Initial key should be overwritten + data, err = fs.Get(context.Background(), initialKey) + if err != nil { + t.Fatal(err) + } + if string(data) != string(overwriteData) { + t.Errorf("expected %s, got %s", overwriteData, data) + } + + // 3. Deleted key should not exist + _, err = fs.Get(context.Background(), deleteKey) + if err != datastore.ErrNotFound { + t.Errorf("expected ErrNotFound for deleted key after commit, got %v", err) + } +} + +func TestBatchDiscard(t *testing.T) { + tryAllShardFuncs(t, testBatchDiscard) +} + +func testBatchDiscard(dirFunc mkShardFunc, t *testing.T) { + temp, cleanup := tempdir(t) + defer cleanup() + defer checkTemp(t, temp) + + fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) + if err != nil { + t.Fatalf("New fail: %v\n", err) + } + defer fs.Close() + + // Create a batch + batch, err := fs.Batch(context.Background()) + if err != nil { + t.Fatal(err) + } + + // Put some keys + keys := []string{"QUUX", "QAAX", "QBBX"} + for _, key := range keys { + err = batch.Put(context.Background(), datastore.NewKey(key), []byte("testdata")) + if err != nil { + t.Fatal(err) + } + } + + // Cast to DiscardableBatch interface + discardable, ok := batch.(flatfs.DiscardableBatch) + if !ok { + t.Fatal("batch does not implement DiscardableBatch interface") + } + + // Discard the batch + err = discardable.Discard(context.Background()) + if err != nil { + t.Fatal(err) + } + + // Check that files still don't exist in the main datastore + for _, key := range keys { + has, err := fs.Has(context.Background(), datastore.NewKey(key)) + if err != nil { + t.Fatal(err) + } + if has { + t.Errorf("key %s should not exist in datastore after discard", key) + } + } + + // Verify temp directory was cleaned up + tempBatchDirs, err := filepath.Glob(filepath.Join(temp, ".temp", "batch-*")) + if err != nil { + t.Fatal(err) + } + if len(tempBatchDirs) > 0 { + t.Errorf("batch temp directories should be cleaned up after discard, found: %v", tempBatchDirs) + } +} + +func TestBatchQuery(t *testing.T) { + tryAllShardFuncs(t, testBatchQuery) +} + +func testBatchQuery(dirFunc mkShardFunc, t *testing.T) { + temp, cleanup := tempdir(t) + defer cleanup() + defer checkTemp(t, temp) + + fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false) + if err != nil { + t.Fatalf("CreateOrOpen fail: %v\n", err) + } + defer fs.Close() + + ctx := context.Background() + + // Add some data to the main datastore + mainKeys := []string{"EXISTING1", "EXISTING2", "EXISTING3"} + for _, k := range mainKeys { + err := fs.Put(ctx, datastore.NewKey(k), []byte("main:"+k)) + if err != nil { + t.Fatalf("Put fail: %v\n", err) + } + } + + // Create a batch + batch, err := fs.Batch(ctx) + if err != nil { + t.Fatal(err) + } + + // Add new keys to batch + batchKeys := []string{"BATCH1", "BATCH2"} + for _, k := range batchKeys { + err := batch.Put(ctx, datastore.NewKey(k), []byte("batch:"+k)) + if err != nil { + t.Fatal(err) + } + } + + // Delete one existing key + err = batch.Delete(ctx, datastore.NewKey("EXISTING2")) + if err != nil { + t.Fatal(err) + } + + // Update an existing key + err = batch.Put(ctx, datastore.NewKey("EXISTING3"), []byte("updated:EXISTING3")) + if err != nil { + t.Fatal(err) + } + + // Query the batch - should see batch changes + batchReader, ok := batch.(flatfs.BatchReader) + if !ok { + t.Fatal("batch should implement BatchReader") + } + + q := query.Query{} + results, err := batchReader.Query(ctx, q) + if err != nil { + t.Fatal(err) + } + + entries := collectQueryResults(t, results) + + // Should have: + // - /EXISTING1 (from main) + // - /EXISTING3 (updated in batch) + // - /BATCH1, /BATCH2 (new in batch) + // Should NOT have: + // - /EXISTING2 (deleted in batch) + + expectedKeys := map[string]string{ + "/EXISTING1": "main:EXISTING1", + "/EXISTING3": "updated:EXISTING3", + "/BATCH1": "batch:BATCH1", + "/BATCH2": "batch:BATCH2", + } + + if len(entries) != len(expectedKeys) { + t.Fatalf("expected %d entries, got %d", len(expectedKeys), len(entries)) + } + + for _, entry := range entries { + expected, ok := expectedKeys[entry.Key] + if !ok { + t.Errorf("unexpected key: %s", entry.Key) + continue + } + if string(entry.Value) != expected { + t.Errorf("value mismatch for key %s: expected %s, got %s", entry.Key, expected, string(entry.Value)) + } + delete(expectedKeys, entry.Key) + } + + if len(expectedKeys) > 0 { + t.Errorf("missing keys in query results: %v", expectedKeys) + } + + // Test KeysOnly query + q = query.Query{KeysOnly: true} + results, err = batchReader.Query(ctx, q) + if err != nil { + t.Fatal(err) + } + + entries = collectQueryResults(t, results) + if len(entries) != 4 { + t.Errorf("expected 4 keys, got %d", len(entries)) + } + for _, entry := range entries { + if entry.Value != nil { + t.Error("KeysOnly query should not return values") + } + } + + // Test ReturnsSizes query + q = query.Query{KeysOnly: true, ReturnsSizes: true} + results, err = batchReader.Query(ctx, q) + if err != nil { + t.Fatal(err) + } + + entries = collectQueryResults(t, results) + if len(entries) != 4 { + t.Errorf("expected 4 keys, got %d", len(entries)) + } + for _, entry := range entries { + if entry.Size <= 0 { + t.Error("ReturnsSizes query should return sizes") + } + if entry.Value != nil { + t.Error("KeysOnly query should not return values") + } + } + + // Commit the batch + err = batch.Commit(ctx) + if err != nil { + t.Fatal(err) + } + + // Query main datastore - should see committed changes + q = query.Query{} + results, err = fs.Query(ctx, q) + if err != nil { + t.Fatal(err) + } + + entries = collectQueryResults(t, results) + if len(entries) != 4 { + t.Errorf("expected 4 entries after commit, got %d", len(entries)) + } + + // Verify committed data + for _, entry := range entries { + switch entry.Key { + case "/EXISTING1": + if string(entry.Value) != "main:EXISTING1" { + t.Errorf("expected main:EXISTING1, got %s", string(entry.Value)) + } + case "/EXISTING3": + if string(entry.Value) != "updated:EXISTING3" { + t.Errorf("expected updated:EXISTING3, got %s", string(entry.Value)) + } + case "/BATCH1": + if string(entry.Value) != "batch:BATCH1" { + t.Errorf("expected batch:BATCH1, got %s", string(entry.Value)) + } + case "/BATCH2": + if string(entry.Value) != "batch:BATCH2" { + t.Errorf("expected batch:BATCH2, got %s", string(entry.Value)) + } + default: + t.Errorf("unexpected key after commit: %s", entry.Key) + } + } + + // Verify /EXISTING2 was deleted + has, err := fs.Has(ctx, datastore.NewKey("EXISTING2")) + if err != nil { + t.Fatal(err) + } + if has { + t.Error("/EXISTING2 should be deleted") + } +} + +func collectQueryResults(t *testing.T, results query.Results) []query.Entry { + t.Helper() + entries, err := results.Rest() + if err != nil { + t.Fatalf("query result error: %v", err) + } + return entries +} diff --git a/flatfs.go b/flatfs.go index d15a35a..e90ea4d 100644 --- a/flatfs.go +++ b/flatfs.go @@ -8,10 +8,13 @@ import ( "encoding/json" "errors" "fmt" + "io/fs" + "maps" "math" "math/rand" "os" "path/filepath" + "slices" "strings" "sync" "sync/atomic" @@ -99,6 +102,8 @@ var _ datastore.Datastore = (*Datastore)(nil) var _ datastore.PersistentDatastore = (*Datastore)(nil) var _ datastore.Batching = (*Datastore)(nil) var _ datastore.Batch = (*flatfsBatch)(nil) +var _ DiscardableBatch = (*flatfsBatch)(nil) +var _ BatchReader = (*flatfsBatch)(nil) var ( ErrDatastoreExists = errors.New("datastore already exists") @@ -244,7 +249,7 @@ func Create(path string, fun *ShardIdV1) error { func Open(path string, syncFiles bool) (*Datastore, error) { _, err := os.Stat(path) - if os.IsNotExist(err) { + if errors.Is(err, fs.ErrNotExist) { return nil, ErrDatastoreDoesNotExist } else if err != nil { return nil, err @@ -252,7 +257,7 @@ func Open(path string, syncFiles bool) (*Datastore, error) { tempPath := filepath.Join(path, ".temp") err = os.RemoveAll(tempPath) - if err != nil && !os.IsNotExist(err) { + if err != nil && !errors.Is(err, fs.ErrNotExist) { return nil, fmt.Errorf("failed to remove temporary directory: %v", err) } @@ -533,131 +538,6 @@ func (fs *Datastore) doPut(key datastore.Key, val []byte) error { return nil } -func (fs *Datastore) putMany(data map[datastore.Key][]byte) error { - fs.shutdownLock.RLock() - defer fs.shutdownLock.RUnlock() - if fs.shutdown { - return ErrClosed - } - - type putManyOp struct { - key datastore.Key - file *os.File - dstPath string - srcPath string - } - - var ( - dirsToSync = make(map[string]struct{}, len(data)) - files = make([]putManyOp, 0, len(data)) - closed int - removed int - ) - - defer func() { - for closed < len(files) { - files[closed].file.Close() - closed++ - } - for removed < len(files) { - _ = os.Remove(files[removed].srcPath) - removed++ - } - }() - - closer := func() error { - for closed < len(files) { - fi := files[closed].file - if fs.sync { - if err := syncFile(fi); err != nil { - return err - } - } - if err := fi.Close(); err != nil { - return err - } - closed++ - } - return nil - } - - // Start by writing all the data in temp files so that we can be sure that - // all the data is on disk before renaming to the final places. - for key, value := range data { - dir, path := fs.encode(key) - if _, err := fs.makeDirNoSync(dir); err != nil { - return err - } - dirsToSync[dir] = struct{}{} - - tmp, err := fs.tempFileOnce() - - // If we have too many files open, try closing some, then try - // again repeatedly. - if isTooManyFDError(err) { - if err = closer(); err != nil { - return err - } - tmp, err = fs.tempFile() - } - - if err != nil { - return err - } - - // Do this _first_ so we close it if writing fails. - files = append(files, putManyOp{ - key: key, - file: tmp, - dstPath: path, - srcPath: tmp.Name(), - }) - - if _, err := tmp.Write(value); err != nil { - return err - } - } - - // Now we sync everything - // sync and close files - err := closer() - if err != nil { - return err - } - - // move files to their proper places - for _, pop := range files { - done, err := fs.doWriteOp(&op{ - typ: opRename, - key: pop.key, - tmp: pop.srcPath, - path: pop.dstPath, - }) - if err != nil { - return err - } else if !done { - _ = os.Remove(pop.file.Name()) - } - removed++ - } - - // now sync the dirs for those files - if fs.sync { - for dir := range dirsToSync { - if err := syncDir(dir); err != nil { - return err - } - } - - // sync top flatfs dir - if err := syncDir(fs.path); err != nil { - return err - } - } - - return nil -} - func (fs *Datastore) Get(ctx context.Context, key datastore.Key) (value []byte, err error) { // Can't exist in datastore. if !keyIsValid(key) { @@ -1128,10 +1008,6 @@ func (fs *Datastore) tempFile() (*os.File, error) { return file, err } -func (fs *Datastore) tempFileOnce() (*os.File, error) { - return tempFileOnce(fs.tempPath, "temp-") -} - // only call this on directories. func (fs *Datastore) walk(ctx context.Context, q query.Query, path string, output chan<- query.Result) error { dir, err := os.Open(path) @@ -1210,46 +1086,540 @@ func (fs *Datastore) Close() error { return nil } +// DiscardableBatch is an optional interface for batches that support discarding changes +type DiscardableBatch interface { + datastore.Batch + Discard(ctx context.Context) error +} + +// BatchReader is an optional interface for batches that support read operations +type BatchReader interface { + datastore.Batch + datastore.Read +} + +// TODO: move this to be with other consts above +const maxConcurrentPuts = 16 + +// flatfsBatch implements atomic batch operations using a temporary directory. +// +// Design principles: +// - All Put operations write to a temp directory (no sharding) +// - Writes are done asynchronously in goroutines for performance +// - Commit atomically renames all files to their sharded destinations +// - On crash/discard, temp directory is cleaned (no partial writes) +// +// Concurrency: Safe for concurrent calls to Put/Delete/Get/Has/GetSize/Query. +// Not safe to call Commit or Discard concurrently with other operations. +// +// Transaction semantics: Read operations (Get/Has/GetSize/Query) see +// uncommitted writes from the same batch, following standard database +// transaction isolation. +// +// Performance characteristics: +// - Put: O(1) with async I/O, returns immediately +// - Get/Has/GetSize: O(n) where n = number of Put operations in batch +// - Commit: O(n) file renames +// +// IMPORTANT: Batch instances should not be reused after Commit or Discard. type flatfsBatch struct { - puts map[datastore.Key][]byte + mu sync.Mutex + puts []datastore.Key // ordered list for iteration (Commit, Query) + putsSet map[datastore.Key]struct{} // O(1) lookup for Get/Has/GetSize deletes map[datastore.Key]struct{} - ds *Datastore + ds *Datastore + tempDir string + + // Async write tracking + asyncWrites sync.WaitGroup + asyncMu sync.Mutex + asyncFirstError error + asyncPutGate chan struct{} } func (fs *Datastore) Batch(_ context.Context) (datastore.Batch, error) { + // Create a unique temp directory for this batch + // Note: Temp files are not sharded (flat structure) for simplicity and speed. + // Files are only sharded when renamed to their final destination on Commit. + tempDir := filepath.Join(fs.tempPath, fmt.Sprintf("batch-%d-%d", time.Now().UnixNano(), rand.Int63())) + if err := os.Mkdir(tempDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create batch temp directory: %w", err) + } + return &flatfsBatch{ - puts: make(map[datastore.Key][]byte), - deletes: make(map[datastore.Key]struct{}), - ds: fs, + deletes: make(map[datastore.Key]struct{}), + ds: fs, + tempDir: tempDir, + asyncPutGate: make(chan struct{}, maxConcurrentPuts), }, nil } +// Put writes val for key to a temporary file asynchronously and returns immediately. +// +// CRITICAL: The caller MUST NOT modify or reuse the val byte slice after calling Put. +// The buffer is used asynchronously by a background goroutine. Violating this will +// cause data corruption. This differs from typical Go semantics where buffers can +// be reused after a function returns. +// +// If you need to reuse buffers, copy them before calling Put: +// buf := make([]byte, len(data)) +// copy(buf, data) +// batch.Put(ctx, key, buf) +// +// Error handling: If an async write fails, the error is captured and returned +// on the next Put/Delete/Commit or any read operation (fail-fast behavior). func (bt *flatfsBatch) Put(ctx context.Context, key datastore.Key, val []byte) error { if !keyIsValid(key) { return fmt.Errorf("when putting '%q': %v", key, ErrInvalidKey) } - bt.puts[key] = val + + if err := bt.getAsyncError(); err != nil { + // If there was an error from a previous async write, return it fast + // This may be useful if, for example, we are out of disk space. + return err + } + + // Acquire semaphore slot before starting another async put. + select { + case bt.asyncPutGate <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + } + + bt.mu.Lock() + noslash := key.String()[1:] + fileName := noslash + extension + tempFile := filepath.Join(bt.tempDir, fileName) + + // Track this key immediately + bt.puts = append(bt.puts, key) + + // Increment wait group for async write + bt.asyncWrites.Add(1) + bt.mu.Unlock() + + // Write to temp file asynchronously in a goroutine + go func(val []byte) { + defer func() { + bt.asyncWrites.Done() + <-bt.asyncPutGate + }() + + // Ensure temp directory exists (recreate if batch was reused after commit) + tempDirPath := filepath.Dir(tempFile) + if _, err := os.Stat(tempDirPath); errors.Is(err, fs.ErrNotExist) { + if err := os.Mkdir(tempDirPath, 0755); err != nil && !os.IsExist(err) { + bt.setAsyncError(fmt.Errorf("failed to recreate temp directory: %w", err)) + return + } + } else if err != nil { + bt.setAsyncError(fmt.Errorf("failed to stat temp directory: %w", err)) + return + } + + file, err := os.Create(tempFile) + if err != nil { + bt.setAsyncError(fmt.Errorf("failed to create temp file: %w", err)) + return + } + defer file.Close() + + if _, err := file.Write(val); err != nil { + os.Remove(tempFile) + bt.setAsyncError(fmt.Errorf("failed to write to temp file: %w", err)) + return + } + + if bt.ds.sync { + if err := syncFile(file); err != nil { + os.Remove(tempFile) + bt.setAsyncError(fmt.Errorf("failed to sync temp file: %w", err)) + return + } + } + + if err := file.Close(); err != nil { + os.Remove(tempFile) + bt.setAsyncError(fmt.Errorf("failed to close temp file: %w", err)) + return + } + }(val) + return nil } +// setAsyncError saves the first error from an async write operation. +// Only the first error is captured; subsequent errors are ignored. +// This provides fail-fast behavior: once any write fails, subsequent +// operations return that error immediately. +func (bt *flatfsBatch) setAsyncError(err error) { + bt.asyncMu.Lock() + defer bt.asyncMu.Unlock() + if bt.asyncFirstError == nil { + bt.asyncFirstError = err + } +} + +// getAsyncError returns the first error from an async write operation +func (bt *flatfsBatch) getAsyncError() error { + bt.asyncMu.Lock() + defer bt.asyncMu.Unlock() + + err := bt.asyncFirstError + return err +} + func (bt *flatfsBatch) Delete(ctx context.Context, key datastore.Key) error { + bt.mu.Lock() + defer bt.mu.Unlock() + + if err := bt.getAsyncError(); err != nil { + return err + } + if keyIsValid(key) { bt.deletes[key] = struct{}{} } // otherwise, delete is a no-op anyways. return nil } +// Get retrieves a value from the batch or underlying datastore. +// +// Transaction semantics: Returns uncommitted data written to this batch via Put, +// even before Commit. This allows building IPLD structures that reference blocks +// added earlier in the same batch. +// +// Performance: O(n) where n is the number of Put operations, as it must scan +// the puts slice to check if the key exists in the batch. +func (bt *flatfsBatch) Get(ctx context.Context, key datastore.Key) ([]byte, error) { + // Wait for all async writes to complete before reading + bt.asyncWrites.Wait() + if err := bt.getAsyncError(); err != nil { + return nil, err + } + bt.mu.Lock() + // Check if key is marked for deletion + if _, deleted := bt.deletes[key]; deleted { + bt.mu.Unlock() + return nil, datastore.ErrNotFound + } + + // Check if key was added in this batch + inBatch := slices.Contains(bt.puts, key) + bt.mu.Unlock() + + // If in batch, read from temp directory + if inBatch { + noslash := key.String()[1:] + tempFile := filepath.Join(bt.tempDir, noslash+extension) + data, err := readFile(tempFile) + if err != nil { + return nil, err + } + return data, nil + } + + // If not in batch, check main datastore + return bt.ds.Get(ctx, key) +} + +func (bt *flatfsBatch) Has(ctx context.Context, key datastore.Key) (bool, error) { + // Wait for all async writes to complete before checking + bt.asyncWrites.Wait() + if err := bt.getAsyncError(); err != nil { + return false, err + } + + bt.mu.Lock() + // Check if key is marked for deletion + if _, deleted := bt.deletes[key]; deleted { + bt.mu.Unlock() + return false, nil + } + + // Check if key was added in this batch + inBatch := slices.Contains(bt.puts, key) + bt.mu.Unlock() + + if inBatch { + return true, nil + } + + // If not in batch, check main datastore + return bt.ds.Has(ctx, key) +} + +func (bt *flatfsBatch) GetSize(ctx context.Context, key datastore.Key) (int, error) { + // Wait for all async writes to complete before checking size + bt.asyncWrites.Wait() + + if err := bt.getAsyncError(); err != nil { + return 0, err + } + bt.mu.Lock() + // Check if key is marked for deletion + if _, deleted := bt.deletes[key]; deleted { + bt.mu.Unlock() + return -1, datastore.ErrNotFound + } + + // Check if key was added in this batch + inBatch := slices.Contains(bt.puts, key) + bt.mu.Unlock() + + // If in batch, get size from temp directory + if inBatch { + noslash := key.String()[1:] + tempFile := filepath.Join(bt.tempDir, noslash+extension) + stat, err := os.Stat(tempFile) + if err != nil { + return -1, err + } + return int(stat.Size()), nil + } + + // If not in batch, check main datastore + return bt.ds.GetSize(ctx, key) +} + +// Query returns all entries from both the batch and underlying datastore, +// properly merging results to reflect the batch's uncommitted state. +// +// Merge logic: +// - Keys written via Put appear in results (even if not committed) +// - Keys marked for Delete do not appear in results +// - Keys Put multiple times appear only once (last write wins) +// - Main datastore results are excluded if overwritten or deleted in batch +// +// The implementation waits for all async writes to complete before querying. +func (bt *flatfsBatch) Query(ctx context.Context, q query.Query) (query.Results, error) { + // Wait for all async writes to complete before querying + bt.asyncWrites.Wait() + if err := bt.getAsyncError(); err != nil { + return nil, err + } + prefix := datastore.NewKey(q.Prefix).String() + if prefix != "/" { + // This datastore can't include keys with multiple components. + // Therefore, it's always correct to return an empty result when + // the user requests a filter by prefix. + return query.ResultsWithEntries(q, nil), nil + } + + // Get results from main datastore + mainResults, err := bt.ds.Query(ctx, q) + if err != nil { + return nil, err + } + + // Merge with temp directory results + results := query.ResultsWithContext(q, func(qctx context.Context, output chan<- query.Result) { + bt.mu.Lock() + // Clone deletes and puts to avoid holding lock during query execution + deletedOrSent := maps.Clone(bt.deletes) + puts := slices.Clone(bt.puts) + tempDir := bt.tempDir + bt.mu.Unlock() + + // First, send results from temp directory (puts) + for _, key := range puts { + // Skip if deleted + if _, deleted := deletedOrSent[key]; deleted { + continue + } + + noslash := key.String()[1:] + tempFile := filepath.Join(tempDir, noslash+extension) + + var result query.Result + result.Key = key.String() + + if !q.KeysOnly { + value, err := readFile(tempFile) + if err != nil { + if !errors.Is(err, fs.ErrNotExist) { + result.Error = err + } else { + continue // File doesn't exist, skip + } + } else { + result.Value = value + result.Size = len(value) + } + } else if q.ReturnsSizes { + stat, err := os.Stat(tempFile) + if err != nil { + if !errors.Is(err, fs.ErrNotExist) { + result.Error = err + } else { + continue // File doesn't exist, skip + } + } else { + result.Size = int(stat.Size()) + } + } + + select { + case output <- result: + // Mark this key as sent by adding it to deletedOrSent map + deletedOrSent[key] = struct{}{} + case <-qctx.Done(): + return + } + } + + // Then, send results from main datastore (excluding deleted and already sent) + mainChan := mainResults.Next() + for { + select { + case result, ok := <-mainChan: + if !ok { + return + } + if result.Error != nil { + select { + case output <- result: + case <-qctx.Done(): + return + } + continue + } + + key := datastore.NewKey(result.Key) + + // Skip if deleted or already sent from temp + if _, skip := deletedOrSent[key]; skip { + continue + } + + select { + case output <- result: + case <-qctx.Done(): + return + } + case <-qctx.Done(): + return + } + } + }) + + // Apply query filters + return query.NaiveQueryApply(q, results), nil +} + +// Discard discards the batch operations without committing +func (bt *flatfsBatch) Discard(ctx context.Context) error { + // Wait for any pending async writes to complete + bt.asyncWrites.Wait() + + bt.mu.Lock() + defer bt.mu.Unlock() + + // Remove the batch temp directory and all subdirectories + _ = os.RemoveAll(bt.tempDir) + bt.puts = nil + bt.asyncFirstError = nil + bt.deletes = make(map[datastore.Key]struct{}) + + return nil +} + +// Commit atomically applies all batch operations to the datastore. +// +// Atomicity guarantee: All Put operations are moved to their final destinations +// only after being written to temp files. If the process crashes before Commit +// completes, the temp directory is cleaned on restart and no partial data remains. +// +// Order of operations: +// 1. Wait for all async Put goroutines to complete +// 2. Check for any async write errors (fail-fast) +// 3. Create all destination shard directories +// 4. Atomically rename temp files to final sharded paths +// 5. Apply all Delete operations +// 6. Sync directories (if sync is enabled) +// +// Concurrency: Uses doWriteOp to handle concurrent commits gracefully. +// If another goroutine commits the same key, the operation succeeds. func (bt *flatfsBatch) Commit(ctx context.Context) error { - if err := bt.ds.putMany(bt.puts); err != nil { + // Wait for all async write operations to complete + bt.asyncWrites.Wait() + + if err := bt.getAsyncError(); err != nil { + // If there was an error from a previous async write, return it fast + // This may be useful if, for example, we are out of disk space. return err } + bt.mu.Lock() + defer bt.mu.Unlock() + + bt.ds.shutdownLock.RLock() + defer bt.ds.shutdownLock.RUnlock() + if bt.ds.shutdown { + return ErrClosed + } + + dirsToSync := make(map[string]struct{}) + + // First, ensure all destination directories exist + for _, key := range bt.puts { + dir, _ := bt.ds.encode(key) + if _, err := bt.ds.makeDirNoSync(dir); err != nil { + return fmt.Errorf("failed to create directory: %w", err) + } + dirsToSync[dir] = struct{}{} + } + + // Move all temp files to their final destinations + for _, key := range bt.puts { + noslash := key.String()[1:] + fileName := noslash + extension + tempFile := filepath.Join(bt.tempDir, fileName) + _, finalPath := bt.ds.encode(key) + + // Use the doWriteOp to handle concurrent operations properly + _, err := bt.ds.doWriteOp(&op{ + typ: opRename, + key: key, + tmp: tempFile, + path: finalPath, + }) + if err != nil { + // Clean up remaining temp files on error + _ = os.RemoveAll(bt.tempDir) + return fmt.Errorf("failed to rename temp file: %w", err) + } + // If doWriteOp returns without error, the operation succeeded + // (either by us or by a concurrent operation) + } + + // Handle deletes for k := range bt.deletes { if err := bt.ds.Delete(ctx, k); err != nil { return err } } + // Sync directories if needed + if bt.ds.sync { + for dir := range dirsToSync { + if err := syncDir(dir); err != nil { + return fmt.Errorf("failed to sync directory: %w", err) + } + } + // Sync root directory + if err := syncDir(bt.ds.path); err != nil { + return fmt.Errorf("failed to sync root directory: %w", err) + } + } + + // Reset state after successful commit so batch can be reused + bt.puts = nil + bt.deletes = make(map[datastore.Key]struct{}) + + // Clean up the batch temp directory after successful commit + os.RemoveAll(bt.tempDir) + return nil }