Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions frac/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,20 +192,22 @@ func (f *Active) replayWalFile(ctx context.Context) error {
}
corruptions = entry.Corruptions

if uint64(entry.Offset) > next {
next += step
progress := float64(uint64(entry.Offset)) / float64(f.info.MetaOnDisk) * 100
logger.Info("replaying batch, meta",
zap.String("name", f.info.Name()),
zap.Int64("from", entry.Offset),
zap.Int64("to", entry.Offset+entry.Size),
zap.Uint64("target", f.info.MetaOnDisk),
util.ZapFloat64WithPrec("progress_percentage", progress, 2),
)
}
if entry.ContainsData {
if uint64(entry.Offset) > next {
next += step
progress := float64(uint64(entry.Offset)) / float64(f.info.MetaOnDisk) * 100
logger.Info("replaying batch, meta",
zap.String("name", f.info.Name()),
zap.Int64("from", entry.Offset),
zap.Int64("to", entry.Offset+entry.Size),
zap.Uint64("target", f.info.MetaOnDisk),
util.ZapFloat64WithPrec("progress_percentage", progress, 2),
)
}

wg.Add(1)
f.indexer.Index(f, entry.Data, &wg, sw)
wg.Add(1)
f.indexer.Index(f, entry.Data, &wg, sw)
}
}

wg.Wait()
Expand Down
26 changes: 15 additions & 11 deletions storage/wal_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
)

type WalRecord struct {
Data WalBlock
Offset int64
Size int64
Corruptions uint64
Err error
ContainsData bool
Data WalBlock
Offset int64
Size int64
Corruptions uint64
Err error
}

type WalReader struct {
Expand Down Expand Up @@ -88,11 +89,12 @@ func (r *WalReader) Entries() iter.Seq[WalRecord] {
}

if errors.Is(err, io.EOF) || n < WalBlockHeaderLen {
// log corruption only if at least 1 was read
if n > 0 {
// we read at least one byte - start corruption if not already started
startCorruptionTracking(offset)
logCorruptionEnd(offset + int64(n))
}
logCorruptionEnd(offset + int64(n))
yield(WalRecord{Offset: offset, Corruptions: corruptions})
return
}

Expand Down Expand Up @@ -123,6 +125,7 @@ func (r *WalReader) Entries() iter.Seq[WalRecord] {
if errors.Is(err, io.EOF) || int64(n) < blockLen {
startCorruptionTracking(offset)
logCorruptionEnd(offset + int64(n))
yield(WalRecord{Offset: offset, Corruptions: corruptions})
return
}

Expand All @@ -137,10 +140,11 @@ func (r *WalReader) Entries() iter.Seq[WalRecord] {
logCorruptionEnd(offset)

entry := WalRecord{
Data: mb,
Offset: offset,
Size: blockLen,
Corruptions: corruptions,
ContainsData: true,
Data: mb,
Offset: offset,
Size: blockLen,
Corruptions: corruptions,
}

if !yield(entry) {
Expand Down
38 changes: 32 additions & 6 deletions storage/wal_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ func TestWalWriter_ConcurrentWrites(t *testing.T) {
offset := WalBlockAlignment
idx := 0
for entry := range reader.Entries() {
if !entry.ContainsData {
continue
}
assert.Equal(t, offset, entry.Offset, "block %d offset mismatch", idx)
assert.Equal(t, all[idx].offset, entry.Offset, "block %d offset mismatch", idx)
assert.Equal(t, all[idx].payload, entry.Data.Payload(), "block %d payload mismatch", idx)
Expand Down Expand Up @@ -286,6 +289,9 @@ func TestWalWriterWriteAndRead(t *testing.T) {
assert.NoError(t, err)
count := 0
for entry := range reader.Entries() {
if !entry.ContainsData {
continue
}
assert.Equal(t, offsets[count], entry.Offset, "block %d offset mismatch", count)
assert.Equal(t, payloads[count], entry.Data.Payload(), "block %d payload mismatch", count)
assert.Equal(t, WalBlockMagic, entry.Data.Magic(), "block %d should have WalBlock magic", count)
Expand All @@ -306,8 +312,10 @@ func TestWalReaderIteratorEmptyFile(t *testing.T) {
assert.NoError(t, err)

count := 0
for range reader.Entries() {
count++
for entry := range reader.Entries() {
if entry.ContainsData {
count++
}
}
assert.Equal(t, 0, count)
}
Expand Down Expand Up @@ -398,6 +406,9 @@ func TestWalReaderIterator(t *testing.T) {
var readPayloads [][]byte
var readOffsets []int64
for entry := range reader.Entries() {
if !entry.ContainsData {
continue
}
readPayloads = append(readPayloads, entry.Data.Payload())
readOffsets = append(readOffsets, entry.Offset)
}
Expand Down Expand Up @@ -446,6 +457,9 @@ func TestWalReaderSkipsCorruptedBlocks(t *testing.T) {

var readPayloads [][]byte
for entry := range reader.Entries() {
if !entry.ContainsData {
continue
}
readPayloads = append(readPayloads, entry.Data.Payload())
t.Logf("read block at offset %d: %q", entry.Offset, entry.Data.Payload())
}
Expand Down Expand Up @@ -490,7 +504,9 @@ func TestWalReaderSkipsCorruptedPayload(t *testing.T) {

var readPayloads [][]byte
for entry := range reader.Entries() {
readPayloads = append(readPayloads, entry.Data.Payload())
if entry.ContainsData {
readPayloads = append(readPayloads, entry.Data.Payload())
}
}

assert.Equal(t, 2, len(readPayloads), "should recover 2 out of 3 blocks")
Expand All @@ -505,7 +521,7 @@ func TestWalReaderSingleByteCorruption(t *testing.T) {
numRuns = 100
numBlocks = 100
minPayloadLen = 10
maxPayloadLen = int(10 * units.KiB)
maxPayloadLen = 256
)

totalLostBlocks := 0
Expand Down Expand Up @@ -556,9 +572,13 @@ func TestWalReaderSingleByteCorruption(t *testing.T) {
assert.NoError(t, err)

readBlocks := 0
corruptions := uint64(0)

for entry := range reader.Entries() {
assert.NoError(t, entry.Err)
corruptions = entry.Corruptions
if !entry.ContainsData {
continue
}
assert.True(t, entry.Data.IsCorrect())

expected := blocks[int(entry.Data.Payload()[0])]
Expand All @@ -569,6 +589,7 @@ func TestWalReaderSingleByteCorruption(t *testing.T) {
lostCount := numBlocks - readBlocks
totalLostBlocks += lostCount

assert.Equal(t, corruptions, uint64(lostCount), "lost blocks count must match corruptions count")
if lostCount > 1 {
assert.Fail(t, "lost %d blocks", lostCount)
}
Expand Down Expand Up @@ -634,7 +655,9 @@ func TestWalReaderTruncation(t *testing.T) {
readBlocks := 0

for entry := range reader.Entries() {
assert.NoError(t, entry.Err)
if !entry.ContainsData {
continue
}
assert.True(t, entry.Data.IsCorrect())

// verify block index matches expected sequence
Expand Down Expand Up @@ -722,6 +745,9 @@ func TestWalReaderSectorLoss(t *testing.T) {

for entry := range reader.Entries() {
assert.NoError(t, entry.Err)
if !entry.ContainsData {
continue
}
assert.True(t, entry.Data.IsCorrect())

// validate payload content
Expand Down
Loading