diff --git a/frac/active.go b/frac/active.go index 9a877af7..e5484b08 100644 --- a/frac/active.go +++ b/frac/active.go @@ -192,20 +192,22 @@ func (f *Active) replayWalFile(ctx context.Context) error { } corruptions = entry.Corruptions - if uint64(entry.Offset) > next { - next += step - progress := float64(uint64(entry.Offset)) / float64(f.info.MetaOnDisk) * 100 - logger.Info("replaying batch, meta", - zap.String("name", f.info.Name()), - zap.Int64("from", entry.Offset), - zap.Int64("to", entry.Offset+entry.Size), - zap.Uint64("target", f.info.MetaOnDisk), - util.ZapFloat64WithPrec("progress_percentage", progress, 2), - ) - } + if entry.ContainsData { + if uint64(entry.Offset) > next { + next += step + progress := float64(uint64(entry.Offset)) / float64(f.info.MetaOnDisk) * 100 + logger.Info("replaying batch, meta", + zap.String("name", f.info.Name()), + zap.Int64("from", entry.Offset), + zap.Int64("to", entry.Offset+entry.Size), + zap.Uint64("target", f.info.MetaOnDisk), + util.ZapFloat64WithPrec("progress_percentage", progress, 2), + ) + } - wg.Add(1) - f.indexer.Index(f, entry.Data, &wg, sw) + wg.Add(1) + f.indexer.Index(f, entry.Data, &wg, sw) + } } wg.Wait() diff --git a/storage/wal_reader.go b/storage/wal_reader.go index 0b9ebc8e..c02885d8 100644 --- a/storage/wal_reader.go +++ b/storage/wal_reader.go @@ -14,11 +14,12 @@ import ( ) type WalRecord struct { - Data WalBlock - Offset int64 - Size int64 - Corruptions uint64 - Err error + ContainsData bool + Data WalBlock + Offset int64 + Size int64 + Corruptions uint64 + Err error } type WalReader struct { @@ -88,11 +89,12 @@ func (r *WalReader) Entries() iter.Seq[WalRecord] { } if errors.Is(err, io.EOF) || n < WalBlockHeaderLen { - // log corruption only if at least 1 was read if n > 0 { + // we read at least one byte - start corruption if not already started startCorruptionTracking(offset) - logCorruptionEnd(offset + int64(n)) } + logCorruptionEnd(offset + int64(n)) + yield(WalRecord{Offset: offset, Corruptions: corruptions}) return } @@ -123,6 +125,7 @@ func (r *WalReader) Entries() iter.Seq[WalRecord] { if errors.Is(err, io.EOF) || int64(n) < blockLen { startCorruptionTracking(offset) logCorruptionEnd(offset + int64(n)) + yield(WalRecord{Offset: offset, Corruptions: corruptions}) return } @@ -137,10 +140,11 @@ func (r *WalReader) Entries() iter.Seq[WalRecord] { logCorruptionEnd(offset) entry := WalRecord{ - Data: mb, - Offset: offset, - Size: blockLen, - Corruptions: corruptions, + ContainsData: true, + Data: mb, + Offset: offset, + Size: blockLen, + Corruptions: corruptions, } if !yield(entry) { diff --git a/storage/wal_writer_test.go b/storage/wal_writer_test.go index b5cbb062..ff462d10 100644 --- a/storage/wal_writer_test.go +++ b/storage/wal_writer_test.go @@ -238,6 +238,9 @@ func TestWalWriter_ConcurrentWrites(t *testing.T) { offset := WalBlockAlignment idx := 0 for entry := range reader.Entries() { + if !entry.ContainsData { + continue + } assert.Equal(t, offset, entry.Offset, "block %d offset mismatch", idx) assert.Equal(t, all[idx].offset, entry.Offset, "block %d offset mismatch", idx) assert.Equal(t, all[idx].payload, entry.Data.Payload(), "block %d payload mismatch", idx) @@ -286,6 +289,9 @@ func TestWalWriterWriteAndRead(t *testing.T) { assert.NoError(t, err) count := 0 for entry := range reader.Entries() { + if !entry.ContainsData { + continue + } assert.Equal(t, offsets[count], entry.Offset, "block %d offset mismatch", count) assert.Equal(t, payloads[count], entry.Data.Payload(), "block %d payload mismatch", count) assert.Equal(t, WalBlockMagic, entry.Data.Magic(), "block %d should have WalBlock magic", count) @@ -306,8 +312,10 @@ func TestWalReaderIteratorEmptyFile(t *testing.T) { assert.NoError(t, err) count := 0 - for range reader.Entries() { - count++ + for entry := range reader.Entries() { + if entry.ContainsData { + count++ + } } assert.Equal(t, 0, count) } @@ -398,6 +406,9 @@ func TestWalReaderIterator(t *testing.T) { var readPayloads [][]byte var readOffsets []int64 for entry := range reader.Entries() { + if !entry.ContainsData { + continue + } readPayloads = append(readPayloads, entry.Data.Payload()) readOffsets = append(readOffsets, entry.Offset) } @@ -446,6 +457,9 @@ func TestWalReaderSkipsCorruptedBlocks(t *testing.T) { var readPayloads [][]byte for entry := range reader.Entries() { + if !entry.ContainsData { + continue + } readPayloads = append(readPayloads, entry.Data.Payload()) t.Logf("read block at offset %d: %q", entry.Offset, entry.Data.Payload()) } @@ -490,7 +504,9 @@ func TestWalReaderSkipsCorruptedPayload(t *testing.T) { var readPayloads [][]byte for entry := range reader.Entries() { - readPayloads = append(readPayloads, entry.Data.Payload()) + if entry.ContainsData { + readPayloads = append(readPayloads, entry.Data.Payload()) + } } assert.Equal(t, 2, len(readPayloads), "should recover 2 out of 3 blocks") @@ -505,7 +521,7 @@ func TestWalReaderSingleByteCorruption(t *testing.T) { numRuns = 100 numBlocks = 100 minPayloadLen = 10 - maxPayloadLen = int(10 * units.KiB) + maxPayloadLen = 256 ) totalLostBlocks := 0 @@ -556,9 +572,13 @@ func TestWalReaderSingleByteCorruption(t *testing.T) { assert.NoError(t, err) readBlocks := 0 + corruptions := uint64(0) for entry := range reader.Entries() { - assert.NoError(t, entry.Err) + corruptions = entry.Corruptions + if !entry.ContainsData { + continue + } assert.True(t, entry.Data.IsCorrect()) expected := blocks[int(entry.Data.Payload()[0])] @@ -569,6 +589,7 @@ func TestWalReaderSingleByteCorruption(t *testing.T) { lostCount := numBlocks - readBlocks totalLostBlocks += lostCount + assert.Equal(t, corruptions, uint64(lostCount), "lost blocks count must match corruptions count") if lostCount > 1 { assert.Fail(t, "lost %d blocks", lostCount) } @@ -634,7 +655,9 @@ func TestWalReaderTruncation(t *testing.T) { readBlocks := 0 for entry := range reader.Entries() { - assert.NoError(t, entry.Err) + if !entry.ContainsData { + continue + } assert.True(t, entry.Data.IsCorrect()) // verify block index matches expected sequence @@ -722,6 +745,9 @@ func TestWalReaderSectorLoss(t *testing.T) { for entry := range reader.Entries() { assert.NoError(t, entry.Err) + if !entry.ContainsData { + continue + } assert.True(t, entry.Data.IsCorrect()) // validate payload content