From 25af9b4bf62159df49752803863712e1fda5642b Mon Sep 17 00:00:00 2001 From: Daniil Forshev Date: Tue, 5 May 2026 16:00:00 +0500 Subject: [PATCH 1/3] perf(skip masks): skip masks as roaring bitmaps in fetch --- frac/active_index.go | 15 ++---- frac/fraction_test.go | 4 ++ frac/sealed_index.go | 17 ++---- fracmanager/fracmanager_test.go | 4 ++ fracmanager/fraction_provider.go | 2 + go.mod | 3 ++ go.sum | 7 +++ skipmaskmanager/encoding.go | 46 ++++++++-------- skipmaskmanager/iterator_asc.go | 9 ++-- skipmaskmanager/iterator_asc_test.go | 3 +- skipmaskmanager/iterator_desc.go | 9 ++-- skipmaskmanager/iterator_desc_test.go | 3 +- skipmaskmanager/loader.go | 72 ++++++++++++++++++------- skipmaskmanager/loader_test.go | 19 +++++-- skipmaskmanager/merged_iterator.go | 2 +- skipmaskmanager/merged_iterator_test.go | 6 +-- skipmaskmanager/skip_mask_manager.go | 45 ++++++++++++++-- 17 files changed, 178 insertions(+), 88 deletions(-) diff --git a/frac/active_index.go b/frac/active_index.go index e91eddf9..752c2ae1 100644 --- a/frac/active_index.go +++ b/frac/active_index.go @@ -282,12 +282,12 @@ func (di *activeFetchIndex) GetDocPos(ids []seq.ID, noSkipMasks bool) ([]seq.Doc } minLID, maxLID := uint32(0), uint32(math.MaxUint32) - skipLIDsIterator, has, err := di.skipMaskProvider.GetIDsIteratorByFrac(di.fracName, minLID, maxLID, false) + skipLIDsBitmap, err := di.skipMaskProvider.GetIDsBitmapByFrac(di.fracName, minLID, maxLID) if err != nil { return nil, err } - if !has { + if skipLIDsBitmap == nil { return docsPos, nil } @@ -298,17 +298,8 @@ func (di *activeFetchIndex) GetDocPos(ids []seq.ID, noSkipMasks bool) ([]seq.Doc } } - skipLIDs := make(map[uint32]struct{}) - for { - lid := skipLIDsIterator.Next() - if lid.IsNull() { - break - } - skipLIDs[lid.Unpack()] = struct{}{} - } - for i, lid := range allLids { - if _, ok := skipLIDs[lid]; ok { + if skipLIDsBitmap.Contains(uint32(lid)) { docsPos[i] = seq.DocPosNotFound } } diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 200f8348..d63a9406 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/RoaringBitmap/roaring" "github.com/alecthomas/units" "github.com/johannesboyne/gofakes3" "github.com/johannesboyne/gofakes3/backend/s3mem" @@ -37,6 +38,9 @@ type testSkipMaskProvider struct{} func (testSkipMaskProvider) GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) { return node.NewStatic([]uint32{}, false), false, nil } +func (testSkipMaskProvider) GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error) { + return nil, nil +} func (testSkipMaskProvider) RemoveFrac(_ string) {} type FractionTestSuite struct { diff --git a/frac/sealed_index.go b/frac/sealed_index.go index 76542177..80e7a3a1 100644 --- a/frac/sealed_index.go +++ b/frac/sealed_index.go @@ -5,6 +5,7 @@ import ( "fmt" "math" + "github.com/RoaringBitmap/roaring" "go.uber.org/zap" "github.com/ozontech/seq-db/frac/common" @@ -24,6 +25,7 @@ import ( type skipMaskProvider interface { GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) + GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error) RemoveFrac(fracName string) } @@ -298,26 +300,17 @@ func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID, noSkipMasks bool) ([]seq.Doc minLID, maxLID = uint32(minVal), uint32(maxVal) } - skipLIDsIterator, has, err := fi.skipMaskProvider.GetIDsIteratorByFrac(fi.fracName, minLID, maxLID, false) + skipLIDsBitmap, err := fi.skipMaskProvider.GetIDsBitmapByFrac(fi.fracName, minLID, maxLID) if err != nil { return nil, err } - if !has { + if skipLIDsBitmap == nil { return fi.getDocPosByLIDs(allLids), nil } - skipLIDs := make(map[uint32]struct{}) - for { - lid := skipLIDsIterator.Next() - if lid.IsNull() { - break - } - skipLIDs[lid.Unpack()] = struct{}{} - } - for i, lid := range allLids { - if _, ok := skipLIDs[uint32(lid)]; ok { + if skipLIDsBitmap.Contains(uint32(lid)) { allLids[i] = 0 } } diff --git a/fracmanager/fracmanager_test.go b/fracmanager/fracmanager_test.go index 663dedec..687329a2 100644 --- a/fracmanager/fracmanager_test.go +++ b/fracmanager/fracmanager_test.go @@ -3,6 +3,7 @@ package fracmanager import ( "testing" + "github.com/RoaringBitmap/roaring" "github.com/alecthomas/units" "github.com/stretchr/testify/assert" @@ -17,6 +18,9 @@ type testSkipMaskProvider struct{} func (testSkipMaskProvider) GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) { return node.NewStatic([]uint32{}, reverse), false, nil } +func (testSkipMaskProvider) GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error) { + return nil, nil +} func (testSkipMaskProvider) RefreshFrac(_ frac.Fraction) {} func (testSkipMaskProvider) RemoveFrac(_ string) {} diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index 66e6477b..5bcd92d9 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -7,6 +7,7 @@ import ( "path/filepath" "time" + "github.com/RoaringBitmap/roaring" "github.com/oklog/ulid/v2" "github.com/ozontech/seq-db/frac" @@ -22,6 +23,7 @@ const fileBasePattern = "seq-db-" type skipMaskProvider interface { GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) + GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error) RefreshFrac(frac frac.Fraction) RemoveFrac(fracName string) } diff --git a/go.mod b/go.mod index 7efc9e75..7ea44701 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25 require ( contrib.go.opencensus.io/exporter/jaeger v0.2.1 github.com/KimMachineGun/automemlimit v0.7.5 + github.com/RoaringBitmap/roaring v1.9.4 github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b github.com/aws/aws-sdk-go-v2 v1.41.1 github.com/aws/aws-sdk-go-v2/config v1.31.13 @@ -61,11 +62,13 @@ require ( github.com/aws/aws-sdk-go-v2/service/sts v1.41.2 // indirect github.com/aws/smithy-go v1.24.0 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/bits-and-blooms/bitset v1.12.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/pprof v0.0.0-20250422154841-e1f9c1950416 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/mschoch/smat v0.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect diff --git a/go.sum b/go.sum index 07cca964..2820a54c 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/KimMachineGun/automemlimit v0.7.5 h1:RkbaC0MwhjL1ZuBKunGDjE/ggwAX43DwZrJqVwyveTk= github.com/KimMachineGun/automemlimit v0.7.5/go.mod h1:QZxpHaGOQoYvFhv/r4u3U0JTC2ZcOwbSr11UZF46UBM= +github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ= +github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b h1:mimo19zliBX/vSQ6PWWSL9lK8qwHozUj03+zLoEB8O0= @@ -71,6 +73,8 @@ github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk= github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA= +github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee h1:BnPxIde0gjtTnc9Er7cxvBk8DHLWhEux0SxayC8dP6I= github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M= github.com/cactus/go-statsd-client v3.1.1+incompatible/go.mod h1:cMRcwZDklk7hXp+Law83urTHUiHMzCev/r4JMYr/zU0= @@ -196,6 +200,8 @@ github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1 github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s= @@ -241,6 +247,7 @@ github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpE github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= diff --git a/skipmaskmanager/encoding.go b/skipmaskmanager/encoding.go index c0bd9c81..c096f8ab 100644 --- a/skipmaskmanager/encoding.go +++ b/skipmaskmanager/encoding.go @@ -207,7 +207,9 @@ func unmarshalSkipMask(dst *SkipMaskBinOut, src []byte) (_ []byte, err error) { return nil, fmt.Errorf("invalid skip mask binary version: %d", version) } - dst.LIDs, src, err = unmarshalLIDsBlocks(dst.LIDs, src) + src, err = unmarshalLIDsBlocks(src, func(lid uint32) { + dst.LIDs = append(dst.LIDs, lid) + }) if err != nil { return src, err } @@ -218,7 +220,7 @@ func unmarshalSkipMask(dst *SkipMaskBinOut, src []byte) (_ []byte, err error) { // unmarshalLIDsBlocks reads all LIDs blocks from the source data. // First reads the number of blocks, then parses each block header, // and finally decodes each block's data. -func unmarshalLIDsBlocks(dst []uint32, src []byte) ([]uint32, []byte, error) { +func unmarshalLIDsBlocks(src []byte, add func(uint32)) ([]byte, error) { numberOfBlocks := binary.LittleEndian.Uint32(src) src = src[sizeOfUint32:] @@ -229,34 +231,34 @@ func unmarshalLIDsBlocks(dst []uint32, src []byte) ([]uint32, []byte, error) { header := lidsBlockHeader{} src, err = header.unmarshal(src) if err != nil { - return dst, src, fmt.Errorf("can't unmarshal lids header: %s", err) + return src, fmt.Errorf("can't unmarshal lids header: %s", err) } headers = append(headers, header) } for i := range numberOfBlocks { - dst, src, err = unmarshalLIDsBlock(dst, src, headers[i]) + src, err = unmarshalLIDsBlock(src, headers[i], add) if err != nil { - return dst, src, err + return src, err } } if len(src) > 0 { - return dst, src, fmt.Errorf("unexpected tail when unmarshaling LIDs blocks") + return src, fmt.Errorf("unexpected tail when unmarshaling LIDs blocks") } - return dst, src, nil + return src, nil } // unmarshalLIDsBlock decodes a single LIDs block based on its header. // Handles both compressed (zstd) and uncompressed codec types. -func unmarshalLIDsBlock(dst []uint32, src []byte, header lidsBlockHeader) ([]uint32, []byte, error) { +func unmarshalLIDsBlock(src []byte, header lidsBlockHeader, add func(uint32)) ([]byte, error) { if len(src) == 0 { - return dst, src, fmt.Errorf("empty LIDs block") + return src, fmt.Errorf("empty LIDs block") } if header.Size == 0 || int(header.Size) > len(src) { - return nil, src, fmt.Errorf("invalid LIDs block length %d; want %d", len(src), header.Size) + return src, fmt.Errorf("invalid LIDs block length %d; want %d", len(src), header.Size) } block := src[:header.Size] @@ -270,39 +272,39 @@ func unmarshalLIDsBlock(dst []uint32, src []byte, header lidsBlockHeader) ([]uin defer lidsBlockBufPool.Put(b) b.B, err = zstd.Decompress(block, b.B) if err != nil { - return dst, src, fmt.Errorf("can't decompress ids block: %s", err) + return src, fmt.Errorf("can't decompress ids block: %s", err) } - dst, err = unmarshalLIDsDelta(dst, b.B, header) + err = unmarshalLIDsDelta(b.B, header, add) if err != nil { - return dst, src, err + return src, err } - return dst, src, nil + return src, nil case lidsCodecDelta: - dst, err = unmarshalLIDsDelta(dst, block, header) + err = unmarshalLIDsDelta(block, header, add) if err != nil { - return dst, src, err + return src, err } - return dst, src, nil + return src, nil default: - return dst, src, fmt.Errorf("unknown ids codec: %d", header.Codec) + return src, fmt.Errorf("unknown ids codec: %d", header.Codec) } } -func unmarshalLIDsDelta(dst []uint32, block []byte, header lidsBlockHeader) ([]uint32, error) { +func unmarshalLIDsDelta(block []byte, header lidsBlockHeader, add func(uint32)) error { prevLID := uint32(0) for range header.Length { v, n := binary.Varint(block) block = block[n:] lid := prevLID + uint32(v) prevLID = lid - dst = append(dst, lid) + add(lid) } if len(block) > 0 { - return dst, fmt.Errorf("unexpected tail when unmarshaling LIDs block") + return fmt.Errorf("unexpected tail when unmarshaling LIDs block") } - return dst, nil + return nil } // getCompressLevel returns the appropriate zstd compression level based on data size. diff --git a/skipmaskmanager/iterator_asc.go b/skipmaskmanager/iterator_asc.go index a50b6dcb..94dbfeba 100644 --- a/skipmaskmanager/iterator_asc.go +++ b/skipmaskmanager/iterator_asc.go @@ -10,7 +10,7 @@ import ( type IteratorAsc Iterator func (it *IteratorAsc) String() string { - return "HIDE_FLAG_ITERATOR_ASC" + return "SKIP_MASK_ITERATOR_ASC" } func (it *IteratorAsc) Next() node.LID { @@ -57,12 +57,15 @@ func (it *IteratorAsc) loadNextLIDsBlock() { return } - block, err := it.loader.loadBlock(it.blockIndex) + lids := make([]uint32, 0, it.loader.headers[it.blockIndex].Length) + err := it.loader.loadBlock(it.blockIndex, func(lid uint32) { + lids = append(lids, lid) + }) if err != nil { logger.Panic("error loading LIDs block", zap.Error(err)) } - it.lids = block + it.lids = lids it.needTryNextBlock() } diff --git a/skipmaskmanager/iterator_asc_test.go b/skipmaskmanager/iterator_asc_test.go index 72ab47ff..3f77bb53 100644 --- a/skipmaskmanager/iterator_asc_test.go +++ b/skipmaskmanager/iterator_asc_test.go @@ -58,8 +58,7 @@ func TestIteratorAsc(t *testing.T) { err := os.WriteFile(filePath, rawSkipMask, 0o644) require.NoError(t, err) - loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) - require.NoError(t, err) + loader := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) iterator := (*IteratorAsc)(NewIterator(loader, tc.minLID, tc.maxLID)) resLIDs := make([]uint32, 0, len(tc.expected)) diff --git a/skipmaskmanager/iterator_desc.go b/skipmaskmanager/iterator_desc.go index c207d671..fc04ae03 100644 --- a/skipmaskmanager/iterator_desc.go +++ b/skipmaskmanager/iterator_desc.go @@ -10,7 +10,7 @@ import ( type IteratorDesc Iterator func (it *IteratorDesc) String() string { - return "HIDE_FLAG_ITERATOR_DESC" + return "SKIP_MASK_ITERATOR_DESC" } func (it *IteratorDesc) Next() node.LID { @@ -55,12 +55,15 @@ func (it *IteratorDesc) loadNextLIDsBlock() { return } - block, err := it.loader.loadBlock(it.blockIndex) + lids := make([]uint32, 0, it.loader.headers[it.blockIndex].Length) + err := it.loader.loadBlock(it.blockIndex, func(lid uint32) { + lids = append(lids, lid) + }) if err != nil { logger.Panic("error loading LIDs block", zap.Error(err)) } - it.lids = block + it.lids = lids it.needTryNextBlock() } diff --git a/skipmaskmanager/iterator_desc_test.go b/skipmaskmanager/iterator_desc_test.go index 749ced7d..063e1b0d 100644 --- a/skipmaskmanager/iterator_desc_test.go +++ b/skipmaskmanager/iterator_desc_test.go @@ -53,8 +53,7 @@ func TestIteratorDesc(t *testing.T) { err := os.WriteFile(filePath, rawSkipMask, 0o644) require.NoError(t, err) - loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) - require.NoError(t, err) + loader := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) iterator := (*IteratorDesc)(NewIterator(loader, tc.minLID, tc.maxLID)) resLIDs := make([]uint32, 0, len(tc.expected)) diff --git a/skipmaskmanager/loader.go b/skipmaskmanager/loader.go index 0a0717b3..90e02ada 100644 --- a/skipmaskmanager/loader.go +++ b/skipmaskmanager/loader.go @@ -5,7 +5,9 @@ import ( "fmt" "io" "os" + "sync" + "github.com/RoaringBitmap/roaring" "go.uber.org/zap" "github.com/ozontech/seq-db/cache" @@ -20,12 +22,12 @@ type loader struct { cashKey uint32 } -func newLoader(filePath string, headersCache *cache.Cache[[]lidsBlockHeader]) (*loader, error) { +func newLoader(filePath string, headersCache *cache.Cache[[]lidsBlockHeader]) *loader { return &loader{ filePath: filePath, headersCache: headersCache, cashKey: hashFilePath(filePath), - }, nil + } } func (l *loader) getFile() (*os.File, error) { @@ -99,22 +101,18 @@ func (l *loader) loadHeaders() ([]lidsBlockHeader, error) { return headers, nil } -func (l *loader) loadBlock(index int) ([]uint32, error) { - if l.headers == nil { - headers, err := l.getHeaders() - if err != nil { - return nil, err - } - l.headers = headers +func (l *loader) loadBlock(index int, add func(uint32)) error { + if err := l.ensureHeaders(); err != nil { + return err } if len(l.headers) < index+1 { - return nil, fmt.Errorf("can't load block: headers len=%d, index=%d", len(l.headers), index) + return fmt.Errorf("can't load block: headers len=%d, index=%d", len(l.headers), index) } file, err := l.getFile() if err != nil { - return nil, err + return err } header := l.headers[index] @@ -122,23 +120,61 @@ func (l *loader) loadBlock(index int) ([]uint32, error) { blockBuf := make([]byte, header.Size) n, err := file.ReadAt(blockBuf, int64(header.Offset)) if err != nil { - return nil, err + return err } if n != len(blockBuf) { - return nil, fmt.Errorf("can't read lids block, read=%d, requested=%d", n, len(blockBuf)) + return fmt.Errorf("can't read lids block, read=%d, requested=%d", n, len(blockBuf)) } - lids := make([]uint32, 0, header.Length) - lids, blockBuf, err = unmarshalLIDsBlock(lids, blockBuf, header) + blockBuf, err = unmarshalLIDsBlock(blockBuf, header, add) if err != nil { - return nil, err + return err } if len(blockBuf) > 0 { - return nil, fmt.Errorf("unexpected tail when unmarshaling LIDs block") + return fmt.Errorf("unexpected tail when unmarshaling LIDs block") + } + + return nil +} + +func (l *loader) loadToBitmap(bitmap *roaring.Bitmap, mu *sync.Mutex, minLID, maxLID uint32) error { + if err := l.ensureHeaders(); err != nil { + return err + } + + for i, header := range l.headers { + if header.MaxLID < minLID || header.MinLID > maxLID { + continue + } + + err := l.loadBlock(i, func(lid uint32) { + mu.Lock() + bitmap.Add(lid) + mu.Unlock() + }) + if err != nil { + return err + } + } + + if err := l.release(); err != nil { + return err + } + + return nil +} + +func (l *loader) ensureHeaders() error { + if l.headers == nil { + headers, err := l.getHeaders() + if err != nil { + return err + } + l.headers = headers } - return lids, nil + return nil } func (l *loader) release() error { diff --git a/skipmaskmanager/loader_test.go b/skipmaskmanager/loader_test.go index eb49a472..3c383d0e 100644 --- a/skipmaskmanager/loader_test.go +++ b/skipmaskmanager/loader_test.go @@ -1,10 +1,13 @@ package skipmaskmanager import ( + "math" "os" "path/filepath" + "sync" "testing" + "github.com/RoaringBitmap/roaring" "github.com/stretchr/testify/require" "github.com/ozontech/seq-db/cache" @@ -23,17 +26,25 @@ func TestLoader(t *testing.T) { err := os.WriteFile(filePath, rawSkipMask, 0o644) require.NoError(t, err) - loader, err := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) - require.NoError(t, err) + loader := newLoader(filePath, cache.NewCache[[]lidsBlockHeader](nil, nil)) + // test load to []uint32 resLIDs := make([]uint32, 0, len(multipleBlocksLIDs)) const numberOfBlocks = 4 for i := range numberOfBlocks { - block, err := loader.loadBlock(i) + err := loader.loadBlock(i, func(lid uint32) { + resLIDs = append(resLIDs, lid) + }) require.NoError(t, err) - resLIDs = append(resLIDs, block...) } require.Equal(t, lidsToUint32s(multipleBlocksLIDs), resLIDs) + // test load to bitmap + bitmap := roaring.New() + mu := &sync.Mutex{} + err = loader.loadToBitmap(bitmap, mu, 0, math.MaxUint32) + require.NoError(t, err) + require.Equal(t, lidsToUint32s(multipleBlocksLIDs), bitmap.ToArray()) + require.NoError(t, loader.release()) } diff --git a/skipmaskmanager/merged_iterator.go b/skipmaskmanager/merged_iterator.go index e949c12f..7f444fd0 100644 --- a/skipmaskmanager/merged_iterator.go +++ b/skipmaskmanager/merged_iterator.go @@ -22,7 +22,7 @@ func NewNMergedIterators(iterators []node.Node) node.Node { type EmptyIterator struct{} func (it *EmptyIterator) String() string { - return "EMPTY_HIDE_FLAG_ITERATOR" + return "EMPTY_SKIP_MASK_ITERATOR" } func (it *EmptyIterator) Next() node.LID { diff --git a/skipmaskmanager/merged_iterator_test.go b/skipmaskmanager/merged_iterator_test.go index fe0af2af..ff4eae4c 100644 --- a/skipmaskmanager/merged_iterator_test.go +++ b/skipmaskmanager/merged_iterator_test.go @@ -1,7 +1,6 @@ package skipmaskmanager import ( - "fmt" "testing" "github.com/stretchr/testify/require" @@ -46,7 +45,6 @@ func TestMergedIteratorReverse(t *testing.T) { resLIDs = append(resLIDs, lid.Unpack()) } - fmt.Println(resLIDs) require.Equal(t, []uint32{45, 33, 22, 15, 9, 8, 7, 5, 3, 2, 1}, resLIDs) } @@ -55,7 +53,7 @@ type testIteratorDesc struct { } func (it *testIteratorDesc) String() string { - return "TEST_HIDE_FLAG_ITERATOR_DESC" + return "TEST_SKIP_MASK_ITERATOR_DESC" } func (it *testIteratorDesc) Next() node.LID { @@ -77,7 +75,7 @@ type testIteratorAsc struct { } func (it *testIteratorAsc) String() string { - return "TEST_HIDE_FLAG_ITERATOR_ASC" + return "TEST_SKIP_MASK_ITERATOR_ASC" } func (it *testIteratorAsc) Next() node.LID { diff --git a/skipmaskmanager/skip_mask_manager.go b/skipmaskmanager/skip_mask_manager.go index 79f6680d..f74b3642 100644 --- a/skipmaskmanager/skip_mask_manager.go +++ b/skipmaskmanager/skip_mask_manager.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/RoaringBitmap/roaring" "go.uber.org/zap" "github.com/ozontech/seq-db/cache" @@ -213,11 +214,7 @@ func (smm *SkipMaskManager) GetIDsIteratorByFrac( iterators := make([]node.Node, 0, len(fracFiles)) for _, f := range fracFiles { - loader, err := newLoader(f, smm.headersCache) - if err != nil { - logger.Error("can't open skip mask file", zap.String("path", f), zap.Error(err)) - return nil, has, err - } + loader := newLoader(f, smm.headersCache) if reverse { iterators = append(iterators, (*IteratorAsc)(NewIterator(loader, minLID, maxLID))) } else { @@ -228,6 +225,44 @@ func (smm *SkipMaskManager) GetIDsIteratorByFrac( return NewNMergedIterators(iterators), has, nil } +// GetSkipMaskAsRoaringBitmap returns skip masks as roaring bitmap. +// Currently used in fetch resuests +func (smm *SkipMaskManager) GetIDsBitmapByFrac( + fracName string, + minLID, maxLID uint32, +) (*roaring.Bitmap, error) { + smm.fracsMu.RLock() + defer smm.fracsMu.RUnlock() + + fracFiles, has := smm.fracs[fracName] + if !has { + return nil, nil + } + + bitmap := roaring.New() + mu := &sync.Mutex{} + wg := &sync.WaitGroup{} + var loaderErr error + + for _, f := range fracFiles { + wg.Go(func() { + loader := newLoader(f, smm.headersCache) + if err := loader.loadToBitmap(bitmap, mu, minLID, maxLID); err != nil { + logger.Error("can't load skip mask to bitmap", zap.String("path", f), zap.Error(err)) + loaderErr = err + } + }) + } + + wg.Wait() + + if loaderErr != nil { + return nil, loaderErr + } + + return bitmap, nil +} + // RefreshFrac recomputes skip mask files for a fraction after it has been sealed. // This is called when an active fraction becomes sealed. // The method: From cc90039d9b931d1ee7155307ffb2a69dedfbba1f Mon Sep 17 00:00:00 2001 From: Daniil Forshev Date: Fri, 29 May 2026 14:43:31 +0500 Subject: [PATCH 2/3] chore(skipmaskmanager): create separate bitmaps and merge them --- skipmaskmanager/loader.go | 5 +---- skipmaskmanager/loader_test.go | 4 +--- skipmaskmanager/skip_mask_manager.go | 13 +++++++------ 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/skipmaskmanager/loader.go b/skipmaskmanager/loader.go index 90e02ada..4fdc680e 100644 --- a/skipmaskmanager/loader.go +++ b/skipmaskmanager/loader.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "os" - "sync" "github.com/RoaringBitmap/roaring" "go.uber.org/zap" @@ -138,7 +137,7 @@ func (l *loader) loadBlock(index int, add func(uint32)) error { return nil } -func (l *loader) loadToBitmap(bitmap *roaring.Bitmap, mu *sync.Mutex, minLID, maxLID uint32) error { +func (l *loader) loadToBitmap(bitmap *roaring.Bitmap, minLID, maxLID uint32) error { if err := l.ensureHeaders(); err != nil { return err } @@ -149,9 +148,7 @@ func (l *loader) loadToBitmap(bitmap *roaring.Bitmap, mu *sync.Mutex, minLID, ma } err := l.loadBlock(i, func(lid uint32) { - mu.Lock() bitmap.Add(lid) - mu.Unlock() }) if err != nil { return err diff --git a/skipmaskmanager/loader_test.go b/skipmaskmanager/loader_test.go index 3c383d0e..237ebde9 100644 --- a/skipmaskmanager/loader_test.go +++ b/skipmaskmanager/loader_test.go @@ -4,7 +4,6 @@ import ( "math" "os" "path/filepath" - "sync" "testing" "github.com/RoaringBitmap/roaring" @@ -41,8 +40,7 @@ func TestLoader(t *testing.T) { // test load to bitmap bitmap := roaring.New() - mu := &sync.Mutex{} - err = loader.loadToBitmap(bitmap, mu, 0, math.MaxUint32) + err = loader.loadToBitmap(bitmap, 0, math.MaxUint32) require.NoError(t, err) require.Equal(t, lidsToUint32s(multipleBlocksLIDs), bitmap.ToArray()) diff --git a/skipmaskmanager/skip_mask_manager.go b/skipmaskmanager/skip_mask_manager.go index f74b3642..43d02d5c 100644 --- a/skipmaskmanager/skip_mask_manager.go +++ b/skipmaskmanager/skip_mask_manager.go @@ -225,7 +225,7 @@ func (smm *SkipMaskManager) GetIDsIteratorByFrac( return NewNMergedIterators(iterators), has, nil } -// GetSkipMaskAsRoaringBitmap returns skip masks as roaring bitmap. +// GetIDsBitmapByFrac returns skip masks as roaring bitmap. // Currently used in fetch resuests func (smm *SkipMaskManager) GetIDsBitmapByFrac( fracName string, @@ -239,18 +239,19 @@ func (smm *SkipMaskManager) GetIDsBitmapByFrac( return nil, nil } - bitmap := roaring.New() - mu := &sync.Mutex{} + bitmaps := make([]*roaring.Bitmap, len(fracFiles)) wg := &sync.WaitGroup{} var loaderErr error - for _, f := range fracFiles { + for i, f := range fracFiles { wg.Go(func() { loader := newLoader(f, smm.headersCache) - if err := loader.loadToBitmap(bitmap, mu, minLID, maxLID); err != nil { + bitmap := roaring.New() + if err := loader.loadToBitmap(bitmap, minLID, maxLID); err != nil { logger.Error("can't load skip mask to bitmap", zap.String("path", f), zap.Error(err)) loaderErr = err } + bitmaps[i] = bitmap }) } @@ -260,7 +261,7 @@ func (smm *SkipMaskManager) GetIDsBitmapByFrac( return nil, loaderErr } - return bitmap, nil + return roaring.FastOr(bitmaps...), nil } // RefreshFrac recomputes skip mask files for a fraction after it has been sealed. From a750a3616109080868b934629d3cd63ea62e8f1b Mon Sep 17 00:00:00 2001 From: Daniil Forshev Date: Fri, 29 May 2026 16:23:33 +0500 Subject: [PATCH 3/3] chore(skipmaskmanager): release loader after search is done --- frac/active_index.go | 8 +++---- frac/fraction_test.go | 4 ++-- frac/processor/search.go | 12 ++++++---- frac/sealed_index.go | 4 ++-- fracmanager/fracmanager_test.go | 4 ++-- fracmanager/fraction_provider.go | 2 +- skipmaskmanager/iterator_asc.go | 3 --- skipmaskmanager/iterator_asc_test.go | 2 ++ skipmaskmanager/iterator_desc.go | 3 --- skipmaskmanager/iterator_desc_test.go | 2 ++ skipmaskmanager/loader.go | 13 +++++----- skipmaskmanager/loader_test.go | 3 +-- skipmaskmanager/skip_mask_manager.go | 34 +++++++++++++++++---------- 13 files changed, 53 insertions(+), 41 deletions(-) diff --git a/frac/active_index.go b/frac/active_index.go index 752c2ae1..20528fad 100644 --- a/frac/active_index.go +++ b/frac/active_index.go @@ -191,13 +191,13 @@ type activeSearchIndex struct { fracName string } -func (si *activeSearchIndex) GetSkipLIDs(minLID, maxLID uint32, reverse bool) (node.Node, bool, error) { +func (si *activeSearchIndex) GetSkipLIDs(minLID, maxLID uint32, reverse bool) (node.Node, bool, func() error, error) { // active fraction doesn't meet min and max lid minLID, maxLID = uint32(0), uint32(math.MaxUint32) - iterator, has, err := si.skipMaskProvider.GetIDsIteratorByFrac(si.fracName, minLID, maxLID, reverse) + iterator, has, release, err := si.skipMaskProvider.GetIDsIteratorByFrac(si.fracName, minLID, maxLID, reverse) if err != nil { - return nil, false, err + return nil, false, release, err } res := make([]uint32, 0) @@ -215,7 +215,7 @@ func (si *activeSearchIndex) GetSkipLIDs(minLID, maxLID uint32, reverse bool) (n // we need to sort inversed values since they may be out of order after replay of active fraction slices.Sort(res) - return node.NewStatic(res, reverse), has, nil + return node.NewStatic(res, reverse), has, release, nil } type activeTokenIndex struct { diff --git a/frac/fraction_test.go b/frac/fraction_test.go index d63a9406..9a6b7344 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -35,8 +35,8 @@ import ( type testSkipMaskProvider struct{} -func (testSkipMaskProvider) GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) { - return node.NewStatic([]uint32{}, false), false, nil +func (testSkipMaskProvider) GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, func() error, error) { + return node.NewStatic([]uint32{}, false), false, func() error { return nil }, nil } func (testSkipMaskProvider) GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error) { return nil, nil diff --git a/frac/processor/search.go b/frac/processor/search.go index 3c08eae0..108a0591 100644 --- a/frac/processor/search.go +++ b/frac/processor/search.go @@ -2,6 +2,7 @@ package processor import ( "context" + "errors" "math" "time" @@ -38,7 +39,7 @@ type tokenIndex interface { type searchIndex interface { tokenIndex idsIndex - GetSkipLIDs(minLID, maxLID uint32, reverse bool) (node.Node, bool, error) + GetSkipLIDs(minLID, maxLID uint32, reverse bool) (node.Node, bool, func() error, error) } func IndexSearch( @@ -47,7 +48,7 @@ func IndexSearch( index searchIndex, aggLimits AggLimits, sw *stopwatch.Stopwatch, -) (*seq.QPR, error) { +) (qpr *seq.QPR, err error) { stats := &searchStats{} m := sw.Start("get_lids_borders") @@ -95,7 +96,10 @@ func IndexSearch( } m = sw.Start("get_skip_lids") - skipLIDs, hasSkipLIDs, err := index.GetSkipLIDs(minLID, maxLID, params.Order.IsReverse()) + skipLIDs, hasSkipLIDs, release, err := index.GetSkipLIDs(minLID, maxLID, params.Order.IsReverse()) + defer func() { + err = errors.Join(err, release()) + }() m.Stop() if err != nil { return nil, err @@ -138,7 +142,7 @@ func IndexSearch( total = 0 } - qpr := &seq.QPR{ + qpr = &seq.QPR{ IDs: ids, Aggs: aggsResult, Total: uint64(total), diff --git a/frac/sealed_index.go b/frac/sealed_index.go index 80e7a3a1..39876537 100644 --- a/frac/sealed_index.go +++ b/frac/sealed_index.go @@ -24,7 +24,7 @@ import ( ) type skipMaskProvider interface { - GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) + GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, func() error, error) GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error) RemoveFrac(fracName string) } @@ -373,6 +373,6 @@ type sealedSearchIndex struct { skipMaskProvider skipMaskProvider } -func (si *sealedSearchIndex) GetSkipLIDs(minLID, maxLID uint32, reverse bool) (node.Node, bool, error) { +func (si *sealedSearchIndex) GetSkipLIDs(minLID, maxLID uint32, reverse bool) (node.Node, bool, func() error, error) { return si.skipMaskProvider.GetIDsIteratorByFrac(si.fracName, minLID, maxLID, reverse) } diff --git a/fracmanager/fracmanager_test.go b/fracmanager/fracmanager_test.go index 687329a2..b2f8636f 100644 --- a/fracmanager/fracmanager_test.go +++ b/fracmanager/fracmanager_test.go @@ -15,8 +15,8 @@ import ( type testSkipMaskProvider struct{} -func (testSkipMaskProvider) GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) { - return node.NewStatic([]uint32{}, reverse), false, nil +func (testSkipMaskProvider) GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, func() error, error) { + return node.NewStatic([]uint32{}, reverse), false, func() error { return nil }, nil } func (testSkipMaskProvider) GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error) { return nil, nil diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index 5bcd92d9..f5ce5002 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -22,7 +22,7 @@ import ( const fileBasePattern = "seq-db-" type skipMaskProvider interface { - GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, error) + GetIDsIteratorByFrac(fracName string, minLID, maxLID uint32, reverse bool) (node.Node, bool, func() error, error) GetIDsBitmapByFrac(fracName string, minLID, maxLID uint32) (*roaring.Bitmap, error) RefreshFrac(frac frac.Fraction) RemoveFrac(fracName string) diff --git a/skipmaskmanager/iterator_asc.go b/skipmaskmanager/iterator_asc.go index 94dbfeba..92ec0c53 100644 --- a/skipmaskmanager/iterator_asc.go +++ b/skipmaskmanager/iterator_asc.go @@ -25,9 +25,6 @@ func (it *IteratorAsc) Next() node.LID { for len(it.lids) == 0 { if !it.tryNextBlock { - if err := it.loader.release(); err != nil { - logger.Panic("error closing loader", zap.Error(err)) - } return node.NullLID() } diff --git a/skipmaskmanager/iterator_asc_test.go b/skipmaskmanager/iterator_asc_test.go index 3f77bb53..53c7341f 100644 --- a/skipmaskmanager/iterator_asc_test.go +++ b/skipmaskmanager/iterator_asc_test.go @@ -66,6 +66,8 @@ func TestIteratorAsc(t *testing.T) { resLIDs = append(resLIDs, lid.Unpack()) } require.Equal(t, tc.expected, resLIDs) + + require.NoError(t, loader.release()) }) } } diff --git a/skipmaskmanager/iterator_desc.go b/skipmaskmanager/iterator_desc.go index fc04ae03..e77ffdd3 100644 --- a/skipmaskmanager/iterator_desc.go +++ b/skipmaskmanager/iterator_desc.go @@ -24,9 +24,6 @@ func (it *IteratorDesc) Next() node.LID { for len(it.lids) == 0 { if !it.tryNextBlock { - if err := it.loader.release(); err != nil { - logger.Panic("error closing loader", zap.Error(err)) - } return node.NullLID() } diff --git a/skipmaskmanager/iterator_desc_test.go b/skipmaskmanager/iterator_desc_test.go index 063e1b0d..3e4f8b00 100644 --- a/skipmaskmanager/iterator_desc_test.go +++ b/skipmaskmanager/iterator_desc_test.go @@ -61,6 +61,8 @@ func TestIteratorDesc(t *testing.T) { resLIDs = append(resLIDs, lid.Unpack()) } require.Equal(t, tc.expected, resLIDs) + + require.NoError(t, loader.release()) }) } } diff --git a/skipmaskmanager/loader.go b/skipmaskmanager/loader.go index 4fdc680e..e3a2f1a3 100644 --- a/skipmaskmanager/loader.go +++ b/skipmaskmanager/loader.go @@ -2,6 +2,7 @@ package skipmaskmanager import ( "encoding/binary" + "errors" "fmt" "io" "os" @@ -137,7 +138,11 @@ func (l *loader) loadBlock(index int, add func(uint32)) error { return nil } -func (l *loader) loadToBitmap(bitmap *roaring.Bitmap, minLID, maxLID uint32) error { +func (l *loader) loadToBitmap(bitmap *roaring.Bitmap, minLID, maxLID uint32) (err error) { + defer func() { + err = errors.Join(err, l.release()) + }() + if err := l.ensureHeaders(); err != nil { return err } @@ -155,11 +160,7 @@ func (l *loader) loadToBitmap(bitmap *roaring.Bitmap, minLID, maxLID uint32) err } } - if err := l.release(); err != nil { - return err - } - - return nil + return } func (l *loader) ensureHeaders() error { diff --git a/skipmaskmanager/loader_test.go b/skipmaskmanager/loader_test.go index 237ebde9..a8d28032 100644 --- a/skipmaskmanager/loader_test.go +++ b/skipmaskmanager/loader_test.go @@ -37,12 +37,11 @@ func TestLoader(t *testing.T) { require.NoError(t, err) } require.Equal(t, lidsToUint32s(multipleBlocksLIDs), resLIDs) + require.NoError(t, loader.release()) // test load to bitmap bitmap := roaring.New() err = loader.loadToBitmap(bitmap, 0, math.MaxUint32) require.NoError(t, err) require.Equal(t, lidsToUint32s(multipleBlocksLIDs), bitmap.ToArray()) - - require.NoError(t, loader.release()) } diff --git a/skipmaskmanager/skip_mask_manager.go b/skipmaskmanager/skip_mask_manager.go index 43d02d5c..8104ac4f 100644 --- a/skipmaskmanager/skip_mask_manager.go +++ b/skipmaskmanager/skip_mask_manager.go @@ -15,6 +15,7 @@ import ( "github.com/RoaringBitmap/roaring" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/frac" @@ -203,18 +204,21 @@ func (smm *SkipMaskManager) GetIDsIteratorByFrac( fracName string, minLID, maxLID uint32, reverse bool, -) (node.Node, bool, error) { +) (node.Node, bool, func() error, error) { smm.fracsMu.RLock() defer smm.fracsMu.RUnlock() fracFiles, has := smm.fracs[fracName] if !has { - return &EmptyIterator{}, has, nil + return &EmptyIterator{}, has, func() error { return nil }, nil } + releaseFuncs := make([]func() error, len(fracFiles)) + iterators := make([]node.Node, 0, len(fracFiles)) - for _, f := range fracFiles { + for i, f := range fracFiles { loader := newLoader(f, smm.headersCache) + releaseFuncs[i] = loader.release if reverse { iterators = append(iterators, (*IteratorAsc)(NewIterator(loader, minLID, maxLID))) } else { @@ -222,7 +226,15 @@ func (smm *SkipMaskManager) GetIDsIteratorByFrac( } } - return NewNMergedIterators(iterators), has, nil + release := func() error { + var err error + for _, f := range releaseFuncs { + err = errors.Join(err, f()) + } + return err + } + + return NewNMergedIterators(iterators), has, release, nil } // GetIDsBitmapByFrac returns skip masks as roaring bitmap. @@ -240,25 +252,23 @@ func (smm *SkipMaskManager) GetIDsBitmapByFrac( } bitmaps := make([]*roaring.Bitmap, len(fracFiles)) - wg := &sync.WaitGroup{} - var loaderErr error + var eg errgroup.Group for i, f := range fracFiles { - wg.Go(func() { + eg.Go(func() error { loader := newLoader(f, smm.headersCache) bitmap := roaring.New() if err := loader.loadToBitmap(bitmap, minLID, maxLID); err != nil { logger.Error("can't load skip mask to bitmap", zap.String("path", f), zap.Error(err)) - loaderErr = err + return err } bitmaps[i] = bitmap + return nil }) } - wg.Wait() - - if loaderErr != nil { - return nil, loaderErr + if err := eg.Wait(); err != nil { + return nil, err } return roaring.FastOr(bitmaps...), nil