Skip to content

Commit ddd53d9

Browse files
committed
feat: implement batch Query with temp directory merging
1 parent 1e8b13e commit ddd53d9

File tree

2 files changed

+319
-3
lines changed

2 files changed

+319
-3
lines changed

batch_test.go

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"testing"
88

99
"github.com/ipfs/go-datastore"
10+
"github.com/ipfs/go-datastore/query"
1011
flatfs "github.com/ipfs/go-ds-flatfs"
1112
)
1213

@@ -330,3 +331,204 @@ func testBatchDiscard(dirFunc mkShardFunc, t *testing.T) {
330331
t.Errorf("batch temp directories should be cleaned up after discard, found: %v", tempBatchDirs)
331332
}
332333
}
334+
335+
func TestBatchQuery(t *testing.T) {
336+
tryAllShardFuncs(t, testBatchQuery)
337+
}
338+
339+
func testBatchQuery(dirFunc mkShardFunc, t *testing.T) {
340+
temp, cleanup := tempdir(t)
341+
defer cleanup()
342+
defer checkTemp(t, temp)
343+
344+
fs, err := flatfs.CreateOrOpen(temp, dirFunc(2), false)
345+
if err != nil {
346+
t.Fatalf("CreateOrOpen fail: %v\n", err)
347+
}
348+
defer fs.Close()
349+
350+
ctx := context.Background()
351+
352+
// Add some data to the main datastore
353+
mainKeys := []string{"EXISTING1", "EXISTING2", "EXISTING3"}
354+
for _, k := range mainKeys {
355+
err := fs.Put(ctx, datastore.NewKey(k), []byte("main:"+k))
356+
if err != nil {
357+
t.Fatalf("Put fail: %v\n", err)
358+
}
359+
}
360+
361+
// Create a batch
362+
batch, err := fs.Batch(ctx)
363+
if err != nil {
364+
t.Fatal(err)
365+
}
366+
367+
// Add new keys to batch
368+
batchKeys := []string{"BATCH1", "BATCH2"}
369+
for _, k := range batchKeys {
370+
err := batch.Put(ctx, datastore.NewKey(k), []byte("batch:"+k))
371+
if err != nil {
372+
t.Fatal(err)
373+
}
374+
}
375+
376+
// Delete one existing key
377+
err = batch.Delete(ctx, datastore.NewKey("EXISTING2"))
378+
if err != nil {
379+
t.Fatal(err)
380+
}
381+
382+
// Update an existing key
383+
err = batch.Put(ctx, datastore.NewKey("EXISTING3"), []byte("updated:EXISTING3"))
384+
if err != nil {
385+
t.Fatal(err)
386+
}
387+
388+
// Query the batch - should see batch changes
389+
batchReader, ok := batch.(flatfs.BatchReader)
390+
if !ok {
391+
t.Fatal("batch should implement BatchReader")
392+
}
393+
394+
q := query.Query{}
395+
results, err := batchReader.Query(ctx, q)
396+
if err != nil {
397+
t.Fatal(err)
398+
}
399+
400+
entries := collectQueryResults(t, results)
401+
402+
// Should have:
403+
// - /EXISTING1 (from main)
404+
// - /EXISTING3 (updated in batch)
405+
// - /BATCH1, /BATCH2 (new in batch)
406+
// Should NOT have:
407+
// - /EXISTING2 (deleted in batch)
408+
409+
expectedKeys := map[string]string{
410+
"/EXISTING1": "main:EXISTING1",
411+
"/EXISTING3": "updated:EXISTING3",
412+
"/BATCH1": "batch:BATCH1",
413+
"/BATCH2": "batch:BATCH2",
414+
}
415+
416+
if len(entries) != len(expectedKeys) {
417+
t.Fatalf("expected %d entries, got %d", len(expectedKeys), len(entries))
418+
}
419+
420+
for _, entry := range entries {
421+
expected, ok := expectedKeys[entry.Key]
422+
if !ok {
423+
t.Errorf("unexpected key: %s", entry.Key)
424+
continue
425+
}
426+
if string(entry.Value) != expected {
427+
t.Errorf("value mismatch for key %s: expected %s, got %s", entry.Key, expected, string(entry.Value))
428+
}
429+
delete(expectedKeys, entry.Key)
430+
}
431+
432+
if len(expectedKeys) > 0 {
433+
t.Errorf("missing keys in query results: %v", expectedKeys)
434+
}
435+
436+
// Test KeysOnly query
437+
q = query.Query{KeysOnly: true}
438+
results, err = batchReader.Query(ctx, q)
439+
if err != nil {
440+
t.Fatal(err)
441+
}
442+
443+
entries = collectQueryResults(t, results)
444+
if len(entries) != 4 {
445+
t.Errorf("expected 4 keys, got %d", len(entries))
446+
}
447+
for _, entry := range entries {
448+
if entry.Value != nil {
449+
t.Error("KeysOnly query should not return values")
450+
}
451+
}
452+
453+
// Test ReturnsSizes query
454+
q = query.Query{KeysOnly: true, ReturnsSizes: true}
455+
results, err = batchReader.Query(ctx, q)
456+
if err != nil {
457+
t.Fatal(err)
458+
}
459+
460+
entries = collectQueryResults(t, results)
461+
if len(entries) != 4 {
462+
t.Errorf("expected 4 keys, got %d", len(entries))
463+
}
464+
for _, entry := range entries {
465+
if entry.Size <= 0 {
466+
t.Error("ReturnsSizes query should return sizes")
467+
}
468+
if entry.Value != nil {
469+
t.Error("KeysOnly query should not return values")
470+
}
471+
}
472+
473+
// Commit the batch
474+
err = batch.Commit(ctx)
475+
if err != nil {
476+
t.Fatal(err)
477+
}
478+
479+
// Query main datastore - should see committed changes
480+
q = query.Query{}
481+
results, err = fs.Query(ctx, q)
482+
if err != nil {
483+
t.Fatal(err)
484+
}
485+
486+
entries = collectQueryResults(t, results)
487+
if len(entries) != 4 {
488+
t.Errorf("expected 4 entries after commit, got %d", len(entries))
489+
}
490+
491+
// Verify committed data
492+
for _, entry := range entries {
493+
switch entry.Key {
494+
case "/EXISTING1":
495+
if string(entry.Value) != "main:EXISTING1" {
496+
t.Errorf("expected main:EXISTING1, got %s", string(entry.Value))
497+
}
498+
case "/EXISTING3":
499+
if string(entry.Value) != "updated:EXISTING3" {
500+
t.Errorf("expected updated:EXISTING3, got %s", string(entry.Value))
501+
}
502+
case "/BATCH1":
503+
if string(entry.Value) != "batch:BATCH1" {
504+
t.Errorf("expected batch:BATCH1, got %s", string(entry.Value))
505+
}
506+
case "/BATCH2":
507+
if string(entry.Value) != "batch:BATCH2" {
508+
t.Errorf("expected batch:BATCH2, got %s", string(entry.Value))
509+
}
510+
default:
511+
t.Errorf("unexpected key after commit: %s", entry.Key)
512+
}
513+
}
514+
515+
// Verify /EXISTING2 was deleted
516+
has, err := fs.Has(ctx, datastore.NewKey("EXISTING2"))
517+
if err != nil {
518+
t.Fatal(err)
519+
}
520+
if has {
521+
t.Error("/EXISTING2 should be deleted")
522+
}
523+
}
524+
525+
func collectQueryResults(t *testing.T, results query.Results) []query.Entry {
526+
var entries []query.Entry
527+
for result := range results.Next() {
528+
if result.Error != nil {
529+
t.Fatalf("query result error: %v", result.Error)
530+
}
531+
entries = append(entries, result.Entry)
532+
}
533+
return entries
534+
}

flatfs.go

Lines changed: 117 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,9 +1267,123 @@ func (bt *flatfsBatch) GetSize(ctx context.Context, key datastore.Key) (int, err
12671267
}
12681268

12691269
func (bt *flatfsBatch) Query(ctx context.Context, q query.Query) (query.Results, error) {
1270-
// For simplicity, batch queries just delegate to the main datastore
1271-
// A full implementation would merge results from temp and main datastore
1272-
return bt.ds.Query(ctx, q)
1270+
prefix := datastore.NewKey(q.Prefix).String()
1271+
if prefix != "/" {
1272+
// This datastore can't include keys with multiple components.
1273+
// Therefore, it's always correct to return an empty result when
1274+
// the user requests a filter by prefix.
1275+
return query.ResultsWithEntries(q, nil), nil
1276+
}
1277+
1278+
// Get results from main datastore
1279+
mainResults, err := bt.ds.Query(ctx, q)
1280+
if err != nil {
1281+
return nil, err
1282+
}
1283+
1284+
// Merge with temp directory results
1285+
results := query.ResultsWithContext(q, func(qctx context.Context, output chan<- query.Result) {
1286+
bt.mu.Lock()
1287+
deletes := make(map[datastore.Key]struct{})
1288+
for k := range bt.deletes {
1289+
deletes[k] = struct{}{}
1290+
}
1291+
puts := make([]datastore.Key, len(bt.puts))
1292+
copy(puts, bt.puts)
1293+
tempDir := bt.tempDir
1294+
bt.mu.Unlock()
1295+
1296+
// Track which keys we've already sent from temp
1297+
sentKeys := make(map[string]struct{})
1298+
1299+
// First, send results from temp directory (puts)
1300+
for _, key := range puts {
1301+
// Skip if deleted
1302+
if _, deleted := deletes[key]; deleted {
1303+
continue
1304+
}
1305+
1306+
noslash := key.String()[1:]
1307+
tempFile := filepath.Join(tempDir, noslash+extension)
1308+
1309+
var result query.Result
1310+
result.Key = key.String()
1311+
1312+
if !q.KeysOnly {
1313+
value, err := readFile(tempFile)
1314+
if err != nil {
1315+
if !os.IsNotExist(err) {
1316+
result.Error = err
1317+
} else {
1318+
continue // File doesn't exist, skip
1319+
}
1320+
} else {
1321+
result.Value = value
1322+
result.Size = len(value)
1323+
}
1324+
} else if q.ReturnsSizes {
1325+
stat, err := os.Stat(tempFile)
1326+
if err != nil {
1327+
if !os.IsNotExist(err) {
1328+
result.Error = err
1329+
} else {
1330+
continue // File doesn't exist, skip
1331+
}
1332+
} else {
1333+
result.Size = int(stat.Size())
1334+
}
1335+
}
1336+
1337+
select {
1338+
case output <- result:
1339+
sentKeys[key.String()] = struct{}{}
1340+
case <-qctx.Done():
1341+
return
1342+
}
1343+
}
1344+
1345+
// Then, send results from main datastore (excluding deleted and already sent)
1346+
mainChan := mainResults.Next()
1347+
for {
1348+
select {
1349+
case result, ok := <-mainChan:
1350+
if !ok {
1351+
return
1352+
}
1353+
if result.Error != nil {
1354+
select {
1355+
case output <- result:
1356+
case <-qctx.Done():
1357+
return
1358+
}
1359+
continue
1360+
}
1361+
1362+
key := datastore.NewKey(result.Key)
1363+
1364+
// Skip if deleted
1365+
if _, deleted := deletes[key]; deleted {
1366+
continue
1367+
}
1368+
1369+
// Skip if already sent from temp
1370+
if _, sent := sentKeys[key.String()]; sent {
1371+
continue
1372+
}
1373+
1374+
select {
1375+
case output <- result:
1376+
case <-qctx.Done():
1377+
return
1378+
}
1379+
case <-qctx.Done():
1380+
return
1381+
}
1382+
}
1383+
})
1384+
1385+
// Apply query filters
1386+
return query.NaiveQueryApply(q, results), nil
12731387
}
12741388

12751389
// Discard discards the batch operations without committing

0 commit comments

Comments
 (0)