From 14a9cd73bf01074be9b68be8772e6d7a3d86d6f0 Mon Sep 17 00:00:00 2001 From: Daniil Forshev Date: Fri, 6 Mar 2026 15:17:34 +0500 Subject: [PATCH 1/4] feat: add DocsFilter --- asyncsearcher/async_searcher.go | 80 +------ config/config.go | 20 +- docsfilter/docs_filter.go | 413 ++++++++++++++++++++++++++++++++ docsfilter/encoding.go | 285 ++++++++++++++++++++++ docsfilter/encoding_test.go | 42 ++++ docsfilter/filter.go | 64 +++++ docsfilter/metrics.go | 26 ++ frac/active.go | 14 ++ frac/active_index.go | 11 + frac/active_indexer.go | 4 + frac/active_lids_map.go | 37 +++ frac/fraction.go | 1 + frac/remote.go | 10 + frac/sealed.go | 7 + frac/sealed_index.go | 4 + fracmanager/proxy_frac.go | 8 + fracmanager/searcher_test.go | 4 + util/fs.go | 90 ++++++- 18 files changed, 1034 insertions(+), 86 deletions(-) create mode 100644 docsfilter/docs_filter.go create mode 100644 docsfilter/encoding.go create mode 100644 docsfilter/encoding_test.go create mode 100644 docsfilter/filter.go create mode 100644 docsfilter/metrics.go create mode 100644 frac/active_lids_map.go diff --git a/asyncsearcher/async_searcher.go b/asyncsearcher/async_searcher.go index a34fec29..01a7eb3c 100644 --- a/asyncsearcher/async_searcher.go +++ b/asyncsearcher/async_searcher.go @@ -281,7 +281,7 @@ func (as *AsyncSearcher) storeSearchInfoLocked(id string, info asyncSearchInfo) panic(err) } fpath := path.Join(as.config.DataDir, id+asyncSearchExtInfo) - mustWriteFileAtomic(fpath, b) + util.MustWriteFileAtomic(fpath, b, asyncSearchTmpFile) info.infoSize.Store(int64(len(b))) as.requests[id] = info } @@ -420,7 +420,7 @@ func (as *AsyncSearcher) processFrac(f frac.Fraction, info asyncSearchInfo) (err name := getQPRFilename(info.Request.ID, f.Info().Name()) fpath := path.Join(as.config.DataDir, name) - mustWriteFileAtomic(fpath, rawQPR) + util.MustWriteFileAtomic(fpath, rawQPR, asyncSearchTmpFile) info.qprsSize.Add(int64(len(rawQPR))) return nil @@ -465,7 +465,7 @@ func (as *AsyncSearcher) findQPRs(id string) ([]string, error) { files = append(files, path.Join(as.config.DataDir, name)) return nil } - if err := visitFilesWithExt(des, asyncSearchExtQPR, appendQPRInfoFile); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtQPR, appendQPRInfoFile); err != nil { return nil, err } return files, nil @@ -490,7 +490,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { areQPRsMerged[requestID] = true return nil } - if err := visitFilesWithExt(des, asyncSearchExtMergedQPR, loadMergedQPRsInfo); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtMergedQPR, loadMergedQPRsInfo); err != nil { return nil, err } @@ -510,7 +510,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { anyRemove = true return nil } - if err := visitFilesWithExt(des, asyncSearchExtQPR, removeMergedQPRs); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtQPR, removeMergedQPRs); err != nil { return nil, err } @@ -522,11 +522,11 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { anyRemove = true return nil } - if err := visitFilesWithExt(des, asyncSearchTmpFile, removeTmpFiles); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchTmpFile, removeTmpFiles); err != nil { return nil, err } if anyRemove { - mustFsyncFile(dataDir) + util.MustFsyncFile(dataDir) } qprsDuByID := make(map[string]int) @@ -592,7 +592,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { requests[requestID] = info return nil } - if err := visitFilesWithExt(des, asyncSearchExtInfo, loadInfos); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtInfo, loadInfos); err != nil { return nil, err } return requests, nil @@ -807,7 +807,7 @@ func (as *AsyncSearcher) mergeQPRs(job mergeJob) { storeMQPR := func(compressed []byte) error { sizeAfter = len(compressed) mqprPath := path.Join(as.config.DataDir, job.ID+asyncSearchExtMergedQPR) - mustWriteFileAtomic(mqprPath, compressed) + util.MustWriteFileAtomic(mqprPath, compressed, asyncSearchTmpFile) return nil } if err := compressQPR(&qpr, storeMQPR); err != nil { @@ -1026,65 +1026,3 @@ func (as *AsyncSearcher) GetAsyncSearchesList(r GetAsyncSearchesListRequest) []* return items } - -func mustWriteFileAtomic(fpath string, data []byte) { - fpathTmp := fpath + asyncSearchTmpFile - - f, err := os.Create(fpathTmp) - if err != nil { - logger.Fatal("can't create file", zap.Error(err)) - } - defer func() { - if err := f.Close(); err != nil { - logger.Fatal("can't close file", zap.Error(err)) - } - }() - - if _, err := f.Write(data); err != nil { - logger.Fatal("can't write to file", zap.Error(err)) - } - - if err := f.Sync(); err != nil { - logger.Fatal("can't sync file", zap.Error(err)) - } - - if err := os.Rename(fpathTmp, fpath); err != nil { - logger.Fatal("can't rename file", zap.Error(err)) - } - - absFpath, err := filepath.Abs(fpath) - if err != nil { - logger.Fatal("can't get absolute path", zap.String("path", fpath), zap.Error(err)) - } - dir := path.Dir(absFpath) - mustFsyncFile(dir) -} - -func mustFsyncFile(fpath string) { - dirFile, err := os.Open(fpath) - if err != nil { - logger.Fatal("can't open dir", zap.Error(err)) - } - if err := dirFile.Sync(); err != nil { - logger.Fatal("can't sync dir", zap.Error(err)) - } - if err := dirFile.Close(); err != nil { - logger.Fatal("can't close dir", zap.Error(err)) - } -} - -func visitFilesWithExt(des []os.DirEntry, ext string, cb func(name string) error) error { - for _, de := range des { - if de.IsDir() { - continue - } - name := de.Name() - if path.Ext(name) != ext { - continue - } - if err := cb(name); err != nil { - return err - } - } - return nil -} diff --git a/config/config.go b/config/config.go index 00df0d94..84e7582d 100644 --- a/config/config.go +++ b/config/config.go @@ -277,13 +277,13 @@ type Config struct { } `config:"tracing"` // Additional filtering options - Filtering struct { - // If a search query time range overlaps with the [from; to] range - // the search query will be `AND`-ed with an additional predicate with the provided query expression - Query string `config:"query"` - From time.Time `config:"from"` - To time.Time `config:"to"` - } `config:"filtering"` + Filtering Filter `config:"filtering"` + DocsFilter struct { + DataDir string `config:"data_dir"` + Concurrency int `config:"concurrency"` + Filters []Filter `config:"filters"` + CacheSize Bytes `config:"cache_size" default:"100MiB"` + } `config:"docs_filter"` // Experimental provides flags // For configuring experimental features. @@ -305,3 +305,9 @@ func (b *Bytes) UnmarshalString(s string) error { *b = Bytes(bytes) return nil } + +type Filter struct { + Query string `config:"query"` + From time.Time `config:"from"` + To time.Time `config:"to"` +} diff --git a/docsfilter/docs_filter.go b/docsfilter/docs_filter.go new file mode 100644 index 00000000..2a092f7e --- /dev/null +++ b/docsfilter/docs_filter.go @@ -0,0 +1,413 @@ +package docsfilter + +import ( + "context" + "fmt" + "math" + "os" + "path" + "runtime" + "strings" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/processor" + "github.com/ozontech/seq-db/fracmanager" + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" +) + +const ( + fracInQueueExt = ".queue" + fracDoneExt = ".filter" + tmpExt = ".tmp" +) + +const ( + defaultMaintenanceInterval = 30 * time.Second +) + +type MappingProvider interface { + GetMapping() seq.Mapping +} + +type Config struct { + DataDir string + Workers int + CacheSizeLimit uint64 +} + +type DocsFilter struct { + ctx context.Context + + config Config + filters map[string]*Filter + + fracs map[string][]string + fracsMu *sync.RWMutex + + mp MappingProvider + + rateLimit chan struct{} + createDirOnce *sync.Once + + maintenanceInterval time.Duration +} + +func New( + ctx context.Context, + cfg Config, + params []Params, + mp MappingProvider, +) *DocsFilter { + workers := cfg.Workers + if workers <= 0 { + workers = runtime.GOMAXPROCS(0) + } + + filtersMap := make(map[string]*Filter, len(params)) + + for _, p := range params { + f := NewFilter(p) + filtersMap[string(f.Hash())] = f + } + + return &DocsFilter{ + ctx: ctx, + config: cfg, + filters: filtersMap, + fracs: make(map[string][]string), + fracsMu: &sync.RWMutex{}, + mp: mp, + rateLimit: make(chan struct{}, workers), + createDirOnce: &sync.Once{}, + maintenanceInterval: defaultMaintenanceInterval, + } +} + +func (df *DocsFilter) Start(fracs fracmanager.List) { + df.createDataDir() + + err := df.loadFilters() + if err != nil { + logger.Fatal("failed to load previous docs filters", zap.Error(err)) + } + + err = df.buildQueue(fracs) + if err != nil { + logger.Fatal("failed to build docs filters queue", zap.Error(err)) + } + + go df.maintenance() + + mapping := df.mp.GetMapping() + + for _, f := range df.filters { + ast, err := parser.ParseSeqQL(f.params.Query, mapping) + if err != nil { + panic(fmt.Errorf("BUG: search query must be valid: %s", err)) + } + f.ast = ast + + df.processFilter(f, fracs.FilterInRange(seq.MID(f.params.From), seq.MID(f.params.To))) + } +} + +// RefreshFrac replaces frac's tombstone files with newly found results. Used after active frac is sealed. +func (df *DocsFilter) RefreshFrac(fraction frac.Fraction) { + df.fracsMu.RLock() + fracsFiles, has := df.fracs[fraction.Info().Name()] + df.fracsMu.RUnlock() + + if !has { + return + } + + for _, fileName := range fracsFiles { + filter := df.filters[filterNameFromTombstonesPath(fileName)] + + queueFilePath := path.Join(filter.dirPath, makeFileName(fraction.Info().Name(), fracInQueueExt)) + util.MustWriteFileAtomic(queueFilePath, []byte{}, tmpExt) + + filter.processWg.Add(1) + go func() { + if err := df.processFrac(fraction, filter, false); err != nil { + panic(fmt.Errorf("docs filter refresh frac err: %s", err)) + } + }() + } +} + +// RemoveFrac removes fraction's tombstones. Used after frac is deleted +func (df *DocsFilter) RemoveFrac(fracName string) { + df.fracsMu.RLock() + fracsFiles, has := df.fracs[fracName] + df.fracsMu.RUnlock() + + if !has { + return + } + + df.fracsMu.Lock() + delete(df.fracs, fracName) + df.fracsMu.Unlock() + + for _, fileName := range fracsFiles { + util.RemoveFile(fileName) + } +} + +func filterNameFromTombstonesPath(p string) string { + return path.Base(path.Dir(p)) +} + +func (df *DocsFilter) addDoneFrac(fracName, fracPath string) { + df.fracsMu.Lock() + defer df.fracsMu.Unlock() + + df.fracs[fracName] = append(df.fracs[fracName], fracPath) +} + +// loadFilters loads existing filters +func (df *DocsFilter) loadFilters() error { + des, err := os.ReadDir(df.config.DataDir) + if err != nil { + return err + } + + var anyRemove bool + + for _, de := range des { + if !de.IsDir() { + continue + } + + if _, ok := df.filters[de.Name()]; !ok { + logger.Info("there is filter folder on disk, but not in config. need to delete it.") + err := os.RemoveAll(path.Join(df.config.DataDir, de.Name())) + if err != nil && !os.IsNotExist(err) { + return err + } + anyRemove = true + continue + } + + f := df.filters[de.Name()] + f.status = StatusInProgress + f.dirPath = path.Join(df.config.DataDir, de.Name()) + + filterDes, err := os.ReadDir(f.dirPath) + if err != nil { + return fmt.Errorf("reading directory: %s", err) + } + + var hasFracsInQueue bool + + for _, fde := range filterDes { + if fde.IsDir() { + continue + } + name := fde.Name() + + switch path.Ext(name) { + case fracInQueueExt: + hasFracsInQueue = true + case fracDoneExt: + df.addDoneFrac(fracNameFromFilePath(name), path.Join(f.dirPath, name)) + } + } + + if !hasFracsInQueue { + f.status = StatusDone + } + } + + if anyRemove { + util.MustFsyncFile(df.config.DataDir) + } + + return nil +} + +// buildQueue creates a directory for each of unprocessed filters and creates .queue files +func (df *DocsFilter) buildQueue(fracs fracmanager.List) error { + for _, filter := range df.filters { + if filter.status != StatusCreated { + continue + } + filter.dirPath = path.Join(df.config.DataDir, filter.Hash()) + util.MustCreateDir(filter.dirPath) + + filterFracs := fracs.FilterInRange(seq.MID(filter.params.From), seq.MID(filter.params.To)) + for _, f := range filterFracs { + queueFilePath := path.Join(filter.dirPath, makeFileName(f.Info().Name(), fracInQueueExt)) + util.MustWriteFileAtomic(queueFilePath, []byte{}, tmpExt) + } + } + + return nil +} + +// handleFilter finds docs and writes to fs +func (df *DocsFilter) processFilter(filter *Filter, fracs fracmanager.List) { + if len(fracs) == 0 { + return + } + + fracsByName := make(map[string]frac.Fraction) + for _, f := range fracs { + fracsByName[f.Info().Name()] = f + } + + filterDes, err := os.ReadDir(filter.dirPath) + if err != nil { + panic(fmt.Errorf("BUG: reading directory must be successful: %s", err)) + } + + inProgressFilters.Add(1) + + processFracInQueue := func(name string) error { + f, ok := fracsByName[fracNameFromFilePath(name)] + if !ok { // skip missing fracs + return nil + } + filter.processWg.Add(1) + go func() { + if err := df.processFrac(f, filter, false); err != nil { + panic(fmt.Errorf("docs filter process frac err: %s", err)) + } + }() + return nil + } + _ = util.VisitFilesWithExt(filterDes, fracInQueueExt, processFracInQueue) + + go func() { + filter.processWg.Wait() + filter.markAsDone() + inProgressFilters.Add(-1) + }() +} + +func (df *DocsFilter) processFrac(f frac.Fraction, filter *Filter, refresh bool) error { + defer filter.processWg.Done() + + df.rateLimit <- struct{}{} + defer func() { <-df.rateLimit }() + + qpr, err := f.Search(df.ctx, processor.SearchParams{ + AST: filter.ast.Root, + From: seq.MID(filter.params.From), + To: seq.MID(filter.params.To), + Limit: math.MaxInt64, + }) + if err != nil { + return err + } + + queueFilePath := path.Join(filter.dirPath, makeFileName(f.Info().Name(), fracInQueueExt)) + doneFilePath := path.Join(filter.dirPath, makeFileName(f.Info().Name(), fracDoneExt)) + + if len(qpr.IDs) == 0 { + util.RemoveFile(queueFilePath) + return nil + } + + storeDocsFilter := func(rawDocsFilter []byte) error { + util.MustWriteFileAtomic(doneFilePath, rawDocsFilter, tmpExt) + util.RemoveFile(queueFilePath) + return nil + } + + // TODO: here we doing part of the work twice: + // first time we find LIDs inside f.Search() and then find IDs by these LIDs. + // Then we again find LIDs by earlier found IDs in f.FindLIDs(). + // We did it like this because otherwise we had to do serious f.Search() rewrite. + // For now we're ok with some performance penalty. + lids, err := f.FindLIDs(df.ctx, qpr.IDs.IDs()) + if err != nil { + return err + } + + docsFilterBin := DocsFilterBinIn{LIDs: lids} + if err := writeDocsFilter(&docsFilterBin, storeDocsFilter); err != nil { + return err + } + + if !refresh { + df.addDoneFrac(f.Info().Name(), doneFilePath) + } + + return nil +} + +func (df *DocsFilter) maintenance() { + for { + logger.Info("docs filter maintenance iteration") + df.checkDiskUsage() + time.Sleep(df.maintenanceInterval) + } +} + +func (df *DocsFilter) checkDiskUsage() { + du := int64(0) + + for _, f := range df.filters { + des, err := os.ReadDir(f.dirPath) + if err != nil { + logger.Error("docs filter: can't read filter's dir", + zap.String("filter", f.String()), zap.Error(err)) + return + } + + for _, fde := range des { + if fde.IsDir() { + continue + } + info, err := fde.Info() + if err != nil { + logger.Error("docs filter: can't read tombstones file info", + zap.String("filter", f.String()), zap.Error(err)) + return + } + du += info.Size() + } + } + + diskUsage.Set(float64(du)) + storedFilters.Set(float64(len(df.filters))) +} + +func makeFileName(name, ext string) string { + return name + ext +} + +func fracNameFromFilePath(filterFilePath string) string { + return strings.Split(path.Base(filterFilePath), ".")[0] +} + +var marshalBufferPool util.BufferPool + +func writeDocsFilter(df *DocsFilterBinIn, cb func(compressed []byte) error) error { + rawDocsFilter := marshalBufferPool.Get() + defer marshalBufferPool.Put(rawDocsFilter) + + rawDocsFilter.B = marshalDocsFilter(rawDocsFilter.B, df) + if err := cb(rawDocsFilter.B); err != nil { + return err + } + return nil +} + +// createDataDir creates dir data lazily to avoid creating extra folders. +func (df *DocsFilter) createDataDir() { + df.createDirOnce.Do(func() { + if err := os.MkdirAll(df.config.DataDir, 0o777); err != nil { + panic(err) + } + }) +} diff --git a/docsfilter/encoding.go b/docsfilter/encoding.go new file mode 100644 index 00000000..dccfc62f --- /dev/null +++ b/docsfilter/encoding.go @@ -0,0 +1,285 @@ +package docsfilter + +import ( + "encoding/binary" + "errors" + "fmt" + "math" + "unsafe" + + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" + "github.com/ozontech/seq-db/zstd" +) + +type DocsFilterBinIn struct { + LIDs []seq.LID +} + +type DocsFilterBinOut struct { + LIDs []uint32 +} + +type docsFilterBinVersion uint8 + +const ( + docsFilterBinVersion1 docsFilterBinVersion = iota + 1 +) + +var availableVersions = map[docsFilterBinVersion]struct{}{ + docsFilterBinVersion1: {}, +} + +type lidsCodec byte + +const ( + lidsCodecDelta = 1 + lidsCodecDeltaZstd = 2 +) + +type lidsBlockHeader struct { + Codec lidsCodec + Length uint32 // Number of LIDs in block + MinLID uint32 + MaxLID uint32 + Size uint32 // Size of ids block in bytes. + Offset uint64 // block's offset in file +} + +func (h *lidsBlockHeader) marshal(dst []byte) { + if len(dst) < int(lidsBlockHeaderSizeBytes) { + panic("BUG: marshal lidsBlockHeader: len(dst) is less than header size") + } + + dst[0] = byte(h.Codec) + dst = dst[1:] + binary.BigEndian.PutUint32(dst, h.Length) + dst = dst[sizeOfUint32:] + binary.BigEndian.PutUint32(dst, h.MinLID) + dst = dst[sizeOfUint32:] + binary.BigEndian.PutUint32(dst, h.MaxLID) + dst = dst[sizeOfUint32:] + binary.BigEndian.PutUint32(dst, h.Size) + dst = dst[sizeOfUint32:] + binary.BigEndian.PutUint64(dst, h.Offset) + dst = dst[sizeOfUint64:] +} + +func (h *lidsBlockHeader) unmarshal(src []byte) ([]byte, error) { + if len(src) < int(lidsBlockHeaderSizeBytes) { + return src, errors.New("too few bytes") + } + + h.Codec = lidsCodec(src[0]) + src = src[1:] + h.Length = binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + h.MinLID = binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + h.MaxLID = binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + h.Size = binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + h.Offset = binary.BigEndian.Uint64(src) + src = src[sizeOfUint64:] + + return src, nil +} + +func marshalDocsFilter(dst []byte, in *DocsFilterBinIn) []byte { + dst = append(dst, uint8(docsFilterBinVersion1)) + dst = marshalLIDsBlocks(dst, in.LIDs) + return dst +} + +const ( + sizeOfUint32 = unsafe.Sizeof(uint32(0)) + sizeOfUint64 = unsafe.Sizeof(uint64(0)) +) + +const ( + lidsBlockHeaderSizeBytes = 1 + (4 * sizeOfUint32) + sizeOfUint64 + maxLIDsBlockLen = 1024 +) + +var lidsBlockBufPool util.BufferPool + +func marshalLIDsBlocks(dst []byte, in []seq.LID) []byte { + b := lidsBlockBufPool.Get() + defer lidsBlockBufPool.Put(b) + + numberOfBlocks := (len(in) + maxLIDsBlockLen - 1) / maxLIDsBlockLen + dst = binary.BigEndian.AppendUint32(dst, uint32(numberOfBlocks)) + + // reserve space for headers + curHeaderOffset := len(dst) + dst = append(dst, make([]byte, numberOfBlocks*int(lidsBlockHeaderSizeBytes))...) + + var start int + for range numberOfBlocks { + end := min(maxLIDsBlockLen, len(in[start:])) + chunk := in[start : start+end] + + var codec lidsCodec + b.B, codec = marshalLIDsBlock(b.B[:0], chunk) + if len(b.B) > math.MaxUint32 { + panic(fmt.Errorf("unexpected block length %d; want up to %d", len(b.B), math.MaxUint32)) + } + + header := lidsBlockHeader{ + Codec: codec, + Length: uint32(len(chunk)), + MinLID: uint32(chunk[0]), + MaxLID: uint32(chunk[len(chunk)-1]), + Size: uint32(len(b.B)), + Offset: uint64(len(dst)), + } + header.marshal(dst[curHeaderOffset:]) + curHeaderOffset += int(lidsBlockHeaderSizeBytes) + + dst = append(dst, b.B...) + start += end + } + + return dst +} + +func marshalLIDsBlock(dst []byte, in []seq.LID) ([]byte, lidsCodec) { + b := lidsBlockBufPool.Get() + defer lidsBlockBufPool.Put(b) + + prev := seq.LID(0) + for i := range len(in) { + lid := in[i] + deltaLID := lid - prev + prev = lid + b.B = binary.AppendVarint(b.B, int64(deltaLID)) + } + + orig := dst + dst = zstd.CompressLevel(b.B, dst, getCompressLevel(len(b.B))) + + compressRatio := float64(len(dst)-len(orig)) / float64(len(b.B)) + if compressRatio < 1.05 { + orig = append(orig, b.B...) + return orig, lidsCodecDelta + } + + return dst, lidsCodecDeltaZstd +} + +const minLIDsFIlterBytesLen = 10 // 1 byte lidsBinVersion + 8 byte number of LIDs + N (min 1) bytes varint + delta encoded LIDs + +func unmarshalDocsFilter(dst *DocsFilterBinOut, src []byte) (_ []byte, err error) { + if len(src) < minLIDsFIlterBytesLen { + return nil, fmt.Errorf("invalid LIDs filter format; want %d bytes, got %d", minLIDsFIlterBytesLen, len(src)) + } + + version := docsFilterBinVersion(src[0]) + src = src[1:] + if _, ok := availableVersions[version]; !ok { + return nil, fmt.Errorf("invalid LIDs binary version: %d", version) + } + + dst.LIDs, src, err = unmarshalLIDsBlocks(dst.LIDs, src) + if err != nil { + return src, err + } + + return src, nil +} + +func unmarshalLIDsBlocks(dst []uint32, src []byte) ([]uint32, []byte, error) { + numberOfBlocks := binary.BigEndian.Uint32(src) + src = src[sizeOfUint32:] + + var err error + + headers := make([]lidsBlockHeader, 0, numberOfBlocks) + for range numberOfBlocks { + header := lidsBlockHeader{} + src, err = header.unmarshal(src) + if err != nil { + return dst, src, fmt.Errorf("can't unmarshal lids header: %s", err) + } + headers = append(headers, header) + } + + for i := range numberOfBlocks { + dst, src, err = unmarshalLIDsBlock(dst, src, headers[i]) + if err != nil { + return dst, src, err + } + } + + if len(src) > 0 { + return dst, src, fmt.Errorf("unexpected tail when unmarshaling LIDs blocks") + } + + return dst, src, nil +} + +func unmarshalLIDsBlock(dst []uint32, src []byte, header lidsBlockHeader) ([]uint32, []byte, error) { + if len(src) == 0 { + return dst, src, fmt.Errorf("empty LIDs block") + } + + if header.Size == 0 || int(header.Size) > len(src) { + return nil, src, fmt.Errorf("invalid LIDs block length %d; want %d", len(src), header.Size) + } + + block := src[:header.Size] + src = src[header.Size:] + + var err error + + switch header.Codec { + case lidsCodecDeltaZstd: + b := lidsBlockBufPool.Get() + defer lidsBlockBufPool.Put(b) + b.B, err = zstd.Decompress(block, b.B) + if err != nil { + return dst, src, fmt.Errorf("can't decompress ids block: %s", err) + } + dst, err = unmarshalLIDsDelta(dst, b.B, header) + if err != nil { + return dst, src, err + } + return dst, src, nil + case lidsCodecDelta: + dst, err = unmarshalLIDsDelta(dst, block, header) + if err != nil { + return dst, src, err + } + return dst, src, nil + default: + return dst, src, fmt.Errorf("unknown ids codec: %d", header.Codec) + } +} + +func unmarshalLIDsDelta(dst []uint32, block []byte, header lidsBlockHeader) ([]uint32, error) { + prevLID := uint32(0) + for range header.Length { + v, n := binary.Varint(block) + block = block[n:] + lid := prevLID + uint32(v) + prevLID = lid + dst = append(dst, lid) + } + + if len(block) > 0 { + return dst, fmt.Errorf("unexpected tail when unmarshaling LIDs block") + } + + return dst, nil +} + +func getCompressLevel(size int) int { + level := 3 + if size <= 512 { + level = 1 + } else if size <= 4*1024 { + level = 2 + } + return level +} diff --git a/docsfilter/encoding_test.go b/docsfilter/encoding_test.go new file mode 100644 index 00000000..777285fb --- /dev/null +++ b/docsfilter/encoding_test.go @@ -0,0 +1,42 @@ +package docsfilter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/seq" +) + +func TestMarshalUnmarshalLIDsFilter(t *testing.T) { + test := func(df DocsFilterBinIn) { + t.Helper() + + rawDocsFilter := marshalDocsFilter(nil, &df) + var out DocsFilterBinOut + tail, err := unmarshalDocsFilter(&out, rawDocsFilter) + require.NoError(t, err) + require.Equal(t, 0, len(tail)) + assert.Equal(t, lidsToUint32s(df.LIDs), out.LIDs) + } + + test(DocsFilterBinIn{LIDs: []seq.LID{0, 1, 2, 3}}) + test(DocsFilterBinIn{LIDs: []seq.LID{10, 15, 22, 18, 105, 1010}}) + test(DocsFilterBinIn{LIDs: []seq.LID{11}}) + + multipleBlocksSize := maxLIDsBlockLen*3 + 15 + multipleBlocksLIDs := make([]seq.LID, 0, multipleBlocksSize) + for i := range multipleBlocksSize { + multipleBlocksLIDs = append(multipleBlocksLIDs, seq.LID(i)) + } + test(DocsFilterBinIn{LIDs: multipleBlocksLIDs}) +} + +func lidsToUint32s(in []seq.LID) []uint32 { + out := make([]uint32, 0, len(in)) + for _, i := range in { + out = append(out, uint32(i)) + } + return out +} diff --git a/docsfilter/filter.go b/docsfilter/filter.go new file mode 100644 index 00000000..a5bc7770 --- /dev/null +++ b/docsfilter/filter.go @@ -0,0 +1,64 @@ +package docsfilter + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "sync" + + "github.com/ozontech/seq-db/parser" +) + +type FilterStatus byte + +const ( + StatusCreated FilterStatus = iota + StatusInProgress + StatusDone + StatusError +) + +type Params struct { + Query string + From int64 + To int64 +} + +type Filter struct { + params Params + + status FilterStatus + + ast parser.SeqQLQuery + + hash string + dirPath string + + processWg *sync.WaitGroup +} + +func NewFilter(params Params) *Filter { + return &Filter{ + params: params, + status: StatusCreated, + processWg: &sync.WaitGroup{}, + } +} + +func (f *Filter) String() string { + return fmt.Sprintf("%s_%d_%d", f.params.Query, f.params.From, f.params.To) +} + +func (f *Filter) Hash() string { + if f.hash == "" { + h := sha256.New() + h.Write([]byte(f.String())) + bs := h.Sum(nil) + f.hash = hex.EncodeToString(bs) + } + return f.hash +} + +func (f *Filter) markAsDone() { + f.status = StatusDone +} diff --git a/docsfilter/metrics.go b/docsfilter/metrics.go new file mode 100644 index 00000000..de45bc40 --- /dev/null +++ b/docsfilter/metrics.go @@ -0,0 +1,26 @@ +package docsfilter + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + inProgressFilters = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "filters", + Name: "in_progress", + Help: "Number of doc filters in progress", + }) + diskUsage = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "filters", + Name: "disk_usage_bytes", + }) + storedFilters = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "filters", + Name: "stored", + Help: "Number of active doc filters", + }) +) diff --git a/frac/active.go b/frac/active.go index b586db51..3e890d01 100644 --- a/frac/active.go +++ b/frac/active.go @@ -46,6 +46,7 @@ type Active struct { TokenList *TokenList DocsPositions *DocsPositions + IDsToLIDs *ActiveLIDs docsFile *os.File docsReader storage.DocsReader @@ -86,6 +87,7 @@ func NewActive( f := &Active{ TokenList: NewActiveTokenList(config.IndexWorkers), DocsPositions: NewSyncDocsPositions(), + IDsToLIDs: NewActiveLIDs(), MIDs: NewIDs(), RIDs: NewIDs(), DocBlocks: NewIDs(), @@ -385,6 +387,17 @@ func (f *Active) Search(ctx context.Context, params processor.SearchParams) (*se return dp.Search(params) } +func (f *Active) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + if f.Info().DocsTotal == 0 { // it is empty active fraction state + return nil, nil + } + + dp := f.createDataProvider(ctx) + defer dp.release() + + return dp.FindLIDs(ids) +} + func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider { return &activeDataProvider{ ctx: ctx, @@ -397,6 +410,7 @@ func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider { blocksOffsets: f.DocBlocks.GetVals(), docsPositions: f.DocsPositions, + idsToLids: f.IDsToLIDs, docsReader: &f.docsReader, } } diff --git a/frac/active_index.go b/frac/active_index.go index 350a8e0d..71831c26 100644 --- a/frac/active_index.go +++ b/frac/active_index.go @@ -25,6 +25,7 @@ type activeDataProvider struct { blocksOffsets []uint64 docsPositions *DocsPositions + idsToLids *ActiveLIDs docsReader *storage.DocsReader idsIndex *activeIDsIndex @@ -136,6 +137,16 @@ func (dp *activeDataProvider) Search(params processor.SearchParams) (*seq.QPR, e return res, nil } +func (dp *activeDataProvider) FindLIDs(ids []seq.ID) ([]seq.LID, error) { + res := make([]seq.LID, 0, len(ids)) + for _, id := range ids { + if lid, ok := dp.idsToLids.Get(id); ok { + res = append(res, lid) + } + } + return res, nil +} + type activeIDsIndex struct { mids []uint64 rids []uint64 diff --git a/frac/active_indexer.go b/frac/active_indexer.go index 9f3cc6ae..00b1ccf5 100644 --- a/frac/active_indexer.go +++ b/frac/active_indexer.go @@ -161,6 +161,10 @@ func (ai *ActiveIndexer) appendWorker(index int) { lids := active.AppendIDs(collector.IDs) m.Stop() + m = sw.Start("active_lids_map_set") + active.IDsToLIDs.SetMultiple(collector.IDs, lids) + m.Stop() + m = sw.Start("token_list_append") tokenLIDsPlaces := collector.PrepareTokenLIDsPlaces() active.TokenList.Append(collector.TokensValues, collector.FieldsLengths, tokenLIDsPlaces) diff --git a/frac/active_lids_map.go b/frac/active_lids_map.go new file mode 100644 index 00000000..bae64854 --- /dev/null +++ b/frac/active_lids_map.go @@ -0,0 +1,37 @@ +package frac + +import ( + "sync" + + "github.com/ozontech/seq-db/seq" +) + +type ActiveLIDs struct { + mu *sync.RWMutex + idToLid map[seq.ID]seq.LID +} + +func NewActiveLIDs() *ActiveLIDs { + al := ActiveLIDs{ + mu: &sync.RWMutex{}, + idToLid: make(map[seq.ID]seq.LID), + } + return &al +} + +func (al *ActiveLIDs) Get(id seq.ID) (seq.LID, bool) { + al.mu.RLock() + defer al.mu.RUnlock() + + val, ok := al.idToLid[id] + return val, ok +} + +func (al *ActiveLIDs) SetMultiple(ids []seq.ID, lids []uint32) { + al.mu.Lock() + defer al.mu.Unlock() + + for i, id := range ids { + al.idToLid[id] = seq.LID(lids[i]) + } +} diff --git a/frac/fraction.go b/frac/fraction.go index 59995639..a3027c97 100644 --- a/frac/fraction.go +++ b/frac/fraction.go @@ -21,6 +21,7 @@ type Fraction interface { Contains(mid seq.MID) bool Fetch(context.Context, []seq.ID) ([][]byte, error) Search(context.Context, processor.SearchParams) (*seq.QPR, error) + FindLIDs(context.Context, []seq.ID) ([]seq.LID, error) } var ( diff --git a/frac/remote.go b/frac/remote.go index c2088caa..e15aa73f 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -131,6 +131,16 @@ func (f *Remote) Search(ctx context.Context, params processor.SearchParams) (*se return dp.Search(params) } +func (f *Remote) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + dp, err := f.createDataProvider(ctx) + if err != nil { + return nil, err + } + defer dp.release() + + return dp.FindLIDs(ids) +} + func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, error) { if err := f.load(); err != nil { logger.Error( diff --git a/frac/sealed.go b/frac/sealed.go index c4c033d8..ddae8ccf 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -312,6 +312,13 @@ func (f *Sealed) Search(ctx context.Context, params processor.SearchParams) (*se return dp.Search(params) } +func (f *Sealed) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + dp := f.createDataProvider(ctx) + defer dp.release() + + return dp.FindLIDs(ids) +} + func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { f.load() return &sealedDataProvider{ diff --git a/frac/sealed_index.go b/frac/sealed_index.go index f97c6e84..3899124a 100644 --- a/frac/sealed_index.go +++ b/frac/sealed_index.go @@ -122,6 +122,10 @@ func (dp *sealedDataProvider) Search(params processor.SearchParams) (*seq.QPR, e return qpr, nil } +func (dp *sealedDataProvider) FindLIDs(ids []seq.ID) ([]seq.LID, error) { + return dp.getFetchIndex().findLIDs(ids), nil +} + type sealedIDsIndex struct { fracName string table *seqids.Table diff --git a/fracmanager/proxy_frac.go b/fracmanager/proxy_frac.go index ffc31854..6d4df41f 100644 --- a/fracmanager/proxy_frac.go +++ b/fracmanager/proxy_frac.go @@ -70,6 +70,10 @@ func (p *fractionProxy) Search(ctx context.Context, params processor.SearchParam return p.impl.Search(ctx, params) } +func (p *fractionProxy) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + return p.impl.FindLIDs(ctx, ids) +} + // activeProxy manages an active (writable) fraction // Tracks pending write operations and provides freeze capability. // Lifecycle: Created when fraction becomes active, destroyed after sealing. @@ -191,3 +195,7 @@ func (emptyFraction) Search(_ context.Context, params processor.SearchParams) (* metric.CountersTotal.WithLabelValues("empty_data_provider").Inc() return &seq.QPR{Aggs: make([]seq.AggregatableSamples, len(params.AggQ))}, nil } + +func (emptyFraction) FindLIDs(_ context.Context, _ []seq.ID) ([]seq.LID, error) { + return nil, nil +} diff --git a/fracmanager/searcher_test.go b/fracmanager/searcher_test.go index eb70a2d2..bdb07ec6 100644 --- a/fracmanager/searcher_test.go +++ b/fracmanager/searcher_test.go @@ -62,6 +62,10 @@ func (f *testFakeFrac) Search(context.Context, processor.SearchParams) (*seq.QPR return f.qpr, nil } +func (f *testFakeFrac) FindLIDs(context.Context, []seq.ID) ([]seq.LID, error) { + return []seq.LID{}, nil +} + func newFakeFrac(from, to seq.MID, qpr *seq.QPR) *testFakeFrac { return &testFakeFrac{ info: &common.Info{From: from, To: to, DocsTotal: 1}, diff --git a/util/fs.go b/util/fs.go index 57fd7b17..b6e9eaca 100644 --- a/util/fs.go +++ b/util/fs.go @@ -6,30 +6,32 @@ package util import ( "errors" "os" + "path" + "path/filepath" "go.uber.org/zap" "github.com/ozontech/seq-db/logger" ) -func MustSyncPath(path string) { - if err := SyncPath(path); err != nil { - logger.Panic("cannot sync path", zap.String("path", path), zap.Error(err)) +func MustSyncPath(dirPath string) { + if err := SyncPath(dirPath); err != nil { + logger.Panic("cannot sync path", zap.String("path", dirPath), zap.Error(err)) } } -func MustRemoveFileByPath(path string) { - if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) { +func MustRemoveFileByPath(fpath string) { + if err := os.Remove(fpath); err != nil && !errors.Is(err, os.ErrNotExist) { logger.Panic( "cannot remove file by path", - zap.String("path", path), + zap.String("path", fpath), zap.Error(err), ) } } -func SyncPath(path string) error { - d, err := os.Open(path) +func SyncPath(dirPath string) error { + d, err := os.Open(dirPath) if err != nil { return err } @@ -53,3 +55,75 @@ func RemoveFile(file string) { logger.Error("file removing error", zap.Error(err)) } } + +func MustWriteFileAtomic(fpath string, data []byte, tmpFileExt string) { + fpathTmp := fpath + tmpFileExt + + f, err := os.Create(fpathTmp) + if err != nil { + logger.Panic("can't create file", zap.Error(err)) + } + defer func() { + if err := f.Close(); err != nil { + logger.Panic("can't close file", zap.Error(err)) + } + }() + + if _, err := f.Write(data); err != nil { + logger.Panic("can't write to file", zap.Error(err)) + } + + if err := f.Sync(); err != nil { + logger.Panic("can't sync file", zap.Error(err)) + } + + if err := os.Rename(fpathTmp, fpath); err != nil { + logger.Panic("can't rename file", zap.Error(err)) + } + + absFpath, err := filepath.Abs(fpath) + if err != nil { + logger.Panic("can't get absolute path", zap.String("path", fpath), zap.Error(err)) + } + dir := path.Dir(absFpath) + MustFsyncFile(dir) +} + +func MustFsyncFile(fpath string) { + dirFile, err := os.Open(fpath) + if err != nil { + logger.Panic("can't open dir", zap.Error(err)) + } + if err := dirFile.Sync(); err != nil { + logger.Panic("can't sync dir", zap.Error(err)) + } + if err := dirFile.Close(); err != nil { + logger.Panic("can't close dir", zap.Error(err)) + } +} + +// MustCreateDir creates directory at dirPath. +// Handles the case when directory already exists. +func MustCreateDir(dirPath string) { + err := os.MkdirAll(dirPath, 0o777) + if err != nil && !os.IsExist(err) { + logger.Panic("can't create file", zap.Error(err)) + } +} + +// VisitFilesWithExt traverses all the files with `ext` extension in `des` directory and calls a `cb` func for each of files. +func VisitFilesWithExt(des []os.DirEntry, ext string, cb func(name string) error) error { + for _, de := range des { + if de.IsDir() { + continue + } + name := de.Name() + if path.Ext(name) != ext { + continue + } + if err := cb(name); err != nil { + return err + } + } + return nil +} From be241aa1538113305067e734c3720f218ec4176c Mon Sep 17 00:00:00 2001 From: Daniil Forshev Date: Mon, 6 Apr 2026 18:39:06 +0500 Subject: [PATCH 2/4] fix: review fixes --- asyncsearcher/async_searcher.go | 6 +- {docsfilter => filtermanager}/encoding.go | 26 +-- .../encoding_test.go | 2 +- {docsfilter => filtermanager}/filter.go | 7 +- .../filter_manager.go | 219 ++++++++++-------- {docsfilter => filtermanager}/metrics.go | 3 +- fracmanager/storage_state.go | 44 +--- util/fs.go | 49 ++-- 8 files changed, 174 insertions(+), 182 deletions(-) rename {docsfilter => filtermanager}/encoding.go (91%) rename {docsfilter => filtermanager}/encoding_test.go (97%) rename {docsfilter => filtermanager}/filter.go (91%) rename docsfilter/docs_filter.go => filtermanager/filter_manager.go (56%) rename {docsfilter => filtermanager}/metrics.go (89%) diff --git a/asyncsearcher/async_searcher.go b/asyncsearcher/async_searcher.go index 01a7eb3c..eab092cf 100644 --- a/asyncsearcher/async_searcher.go +++ b/asyncsearcher/async_searcher.go @@ -281,7 +281,7 @@ func (as *AsyncSearcher) storeSearchInfoLocked(id string, info asyncSearchInfo) panic(err) } fpath := path.Join(as.config.DataDir, id+asyncSearchExtInfo) - util.MustWriteFileAtomic(fpath, b, asyncSearchTmpFile) + util.MustWriteFileAtomic(fpath, b, 0o666, asyncSearchTmpFile) info.infoSize.Store(int64(len(b))) as.requests[id] = info } @@ -420,7 +420,7 @@ func (as *AsyncSearcher) processFrac(f frac.Fraction, info asyncSearchInfo) (err name := getQPRFilename(info.Request.ID, f.Info().Name()) fpath := path.Join(as.config.DataDir, name) - util.MustWriteFileAtomic(fpath, rawQPR, asyncSearchTmpFile) + util.MustWriteFileAtomic(fpath, rawQPR, 0o666, asyncSearchTmpFile) info.qprsSize.Add(int64(len(rawQPR))) return nil @@ -807,7 +807,7 @@ func (as *AsyncSearcher) mergeQPRs(job mergeJob) { storeMQPR := func(compressed []byte) error { sizeAfter = len(compressed) mqprPath := path.Join(as.config.DataDir, job.ID+asyncSearchExtMergedQPR) - util.MustWriteFileAtomic(mqprPath, compressed, asyncSearchTmpFile) + util.MustWriteFileAtomic(mqprPath, compressed, 0o666, asyncSearchTmpFile) return nil } if err := compressQPR(&qpr, storeMQPR); err != nil { diff --git a/docsfilter/encoding.go b/filtermanager/encoding.go similarity index 91% rename from docsfilter/encoding.go rename to filtermanager/encoding.go index dccfc62f..26a6fa46 100644 --- a/docsfilter/encoding.go +++ b/filtermanager/encoding.go @@ -1,4 +1,4 @@ -package docsfilter +package filtermanager import ( "encoding/binary" @@ -53,15 +53,15 @@ func (h *lidsBlockHeader) marshal(dst []byte) { dst[0] = byte(h.Codec) dst = dst[1:] - binary.BigEndian.PutUint32(dst, h.Length) + binary.LittleEndian.PutUint32(dst, h.Length) dst = dst[sizeOfUint32:] - binary.BigEndian.PutUint32(dst, h.MinLID) + binary.LittleEndian.PutUint32(dst, h.MinLID) dst = dst[sizeOfUint32:] - binary.BigEndian.PutUint32(dst, h.MaxLID) + binary.LittleEndian.PutUint32(dst, h.MaxLID) dst = dst[sizeOfUint32:] - binary.BigEndian.PutUint32(dst, h.Size) + binary.LittleEndian.PutUint32(dst, h.Size) dst = dst[sizeOfUint32:] - binary.BigEndian.PutUint64(dst, h.Offset) + binary.LittleEndian.PutUint64(dst, h.Offset) dst = dst[sizeOfUint64:] } @@ -72,15 +72,15 @@ func (h *lidsBlockHeader) unmarshal(src []byte) ([]byte, error) { h.Codec = lidsCodec(src[0]) src = src[1:] - h.Length = binary.BigEndian.Uint32(src) + h.Length = binary.LittleEndian.Uint32(src) src = src[sizeOfUint32:] - h.MinLID = binary.BigEndian.Uint32(src) + h.MinLID = binary.LittleEndian.Uint32(src) src = src[sizeOfUint32:] - h.MaxLID = binary.BigEndian.Uint32(src) + h.MaxLID = binary.LittleEndian.Uint32(src) src = src[sizeOfUint32:] - h.Size = binary.BigEndian.Uint32(src) + h.Size = binary.LittleEndian.Uint32(src) src = src[sizeOfUint32:] - h.Offset = binary.BigEndian.Uint64(src) + h.Offset = binary.LittleEndian.Uint64(src) src = src[sizeOfUint64:] return src, nil @@ -109,7 +109,7 @@ func marshalLIDsBlocks(dst []byte, in []seq.LID) []byte { defer lidsBlockBufPool.Put(b) numberOfBlocks := (len(in) + maxLIDsBlockLen - 1) / maxLIDsBlockLen - dst = binary.BigEndian.AppendUint32(dst, uint32(numberOfBlocks)) + dst = binary.LittleEndian.AppendUint32(dst, uint32(numberOfBlocks)) // reserve space for headers curHeaderOffset := len(dst) @@ -190,7 +190,7 @@ func unmarshalDocsFilter(dst *DocsFilterBinOut, src []byte) (_ []byte, err error } func unmarshalLIDsBlocks(dst []uint32, src []byte) ([]uint32, []byte, error) { - numberOfBlocks := binary.BigEndian.Uint32(src) + numberOfBlocks := binary.LittleEndian.Uint32(src) src = src[sizeOfUint32:] var err error diff --git a/docsfilter/encoding_test.go b/filtermanager/encoding_test.go similarity index 97% rename from docsfilter/encoding_test.go rename to filtermanager/encoding_test.go index 777285fb..8af0b7a3 100644 --- a/docsfilter/encoding_test.go +++ b/filtermanager/encoding_test.go @@ -1,4 +1,4 @@ -package docsfilter +package filtermanager import ( "testing" diff --git a/docsfilter/filter.go b/filtermanager/filter.go similarity index 91% rename from docsfilter/filter.go rename to filtermanager/filter.go index a5bc7770..f7d03566 100644 --- a/docsfilter/filter.go +++ b/filtermanager/filter.go @@ -1,4 +1,4 @@ -package docsfilter +package filtermanager import ( "crypto/sha256" @@ -7,6 +7,7 @@ import ( "sync" "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/seq" ) type FilterStatus byte @@ -20,8 +21,8 @@ const ( type Params struct { Query string - From int64 - To int64 + From seq.MID + To seq.MID } type Filter struct { diff --git a/docsfilter/docs_filter.go b/filtermanager/filter_manager.go similarity index 56% rename from docsfilter/docs_filter.go rename to filtermanager/filter_manager.go index 2a092f7e..ffd43aa0 100644 --- a/docsfilter/docs_filter.go +++ b/filtermanager/filter_manager.go @@ -1,4 +1,4 @@ -package docsfilter +package filtermanager import ( "context" @@ -26,6 +26,8 @@ const ( fracInQueueExt = ".queue" fracDoneExt = ".filter" tmpExt = ".tmp" + + tmpDirSuffix = "_tmp" ) const ( @@ -42,7 +44,7 @@ type Config struct { CacheSizeLimit uint64 } -type DocsFilter struct { +type FilterManager struct { ctx context.Context config Config @@ -53,10 +55,11 @@ type DocsFilter struct { mp MappingProvider - rateLimit chan struct{} - createDirOnce *sync.Once + rateLimit chan struct{} + maintenanceWG *sync.WaitGroup maintenanceInterval time.Duration + maintenanceStop context.CancelFunc } func New( @@ -64,7 +67,7 @@ func New( cfg Config, params []Params, mp MappingProvider, -) *DocsFilter { +) *FilterManager { workers := cfg.Workers if workers <= 0 { workers = runtime.GOMAXPROCS(0) @@ -74,10 +77,10 @@ func New( for _, p := range params { f := NewFilter(p) - filtersMap[string(f.Hash())] = f + filtersMap[f.Hash()] = f } - return &DocsFilter{ + return &FilterManager{ ctx: ctx, config: cfg, filters: filtersMap, @@ -85,97 +88,106 @@ func New( fracsMu: &sync.RWMutex{}, mp: mp, rateLimit: make(chan struct{}, workers), - createDirOnce: &sync.Once{}, maintenanceInterval: defaultMaintenanceInterval, } } -func (df *DocsFilter) Start(fracs fracmanager.List) { - df.createDataDir() +func (fm *FilterManager) Start(ctx context.Context, fracs fracmanager.List) { + fm.createDataDir() - err := df.loadFilters() + err := fm.loadFilters() if err != nil { logger.Fatal("failed to load previous docs filters", zap.Error(err)) } - err = df.buildQueue(fracs) + err = fm.buildQueue(fracs) if err != nil { logger.Fatal("failed to build docs filters queue", zap.Error(err)) } - go df.maintenance() + ctx, cancel := context.WithCancel(ctx) + fm.maintenanceStop = cancel + fm.startMaintenance(ctx) - mapping := df.mp.GetMapping() + mapping := fm.mp.GetMapping() - for _, f := range df.filters { - ast, err := parser.ParseSeqQL(f.params.Query, mapping) - if err != nil { - panic(fmt.Errorf("BUG: search query must be valid: %s", err)) + go func() { + for _, f := range fm.filters { + ast, err := parser.ParseSeqQL(f.params.Query, mapping) + if err != nil { + panic(fmt.Errorf("BUG: search query must be valid: %s", err)) + } + f.ast = ast + + fm.processFilter(ctx, f, fracs.FilterInRange(f.params.From, f.params.To)) } - f.ast = ast + }() +} - df.processFilter(f, fracs.FilterInRange(seq.MID(f.params.From), seq.MID(f.params.To))) - } +func (fm *FilterManager) Stop() { + fm.maintenanceStop() + fm.maintenanceWG.Wait() } -// RefreshFrac replaces frac's tombstone files with newly found results. Used after active frac is sealed. -func (df *DocsFilter) RefreshFrac(fraction frac.Fraction) { - df.fracsMu.RLock() - fracsFiles, has := df.fracs[fraction.Info().Name()] - df.fracsMu.RUnlock() +// RefreshFrac replaces frac's filter files with newly found results. Used after active frac is sealed. +func (fm *FilterManager) RefreshFrac(fraction frac.Fraction) { + fm.fracsMu.RLock() + fracsFiles, has := fm.fracs[fraction.Info().Name()] + fm.fracsMu.RUnlock() if !has { return } for _, fileName := range fracsFiles { - filter := df.filters[filterNameFromTombstonesPath(fileName)] + filter := fm.filters[filterNameFromPath(fileName)] queueFilePath := path.Join(filter.dirPath, makeFileName(fraction.Info().Name(), fracInQueueExt)) - util.MustWriteFileAtomic(queueFilePath, []byte{}, tmpExt) + util.MustWriteFileAtomic(queueFilePath, []byte{}, 0o666, tmpExt) - filter.processWg.Add(1) + fm.rateLimit <- struct{}{} go func() { - if err := df.processFrac(fraction, filter, false); err != nil { + defer func() { <-fm.rateLimit }() + if err := fm.processFrac(fraction, filter, false); err != nil { panic(fmt.Errorf("docs filter refresh frac err: %s", err)) } }() } } -// RemoveFrac removes fraction's tombstones. Used after frac is deleted -func (df *DocsFilter) RemoveFrac(fracName string) { - df.fracsMu.RLock() - fracsFiles, has := df.fracs[fracName] - df.fracsMu.RUnlock() +// RemoveFrac removes fraction's filter files. Used after frac is deleted +func (fm *FilterManager) RemoveFrac(fracName string) { + fm.fracsMu.RLock() + fracsFiles, has := fm.fracs[fracName] + fm.fracsMu.RUnlock() if !has { return } - df.fracsMu.Lock() - delete(df.fracs, fracName) - df.fracsMu.Unlock() + fm.fracsMu.Lock() + delete(fm.fracs, fracName) + fm.fracsMu.Unlock() for _, fileName := range fracsFiles { util.RemoveFile(fileName) } } -func filterNameFromTombstonesPath(p string) string { +func filterNameFromPath(p string) string { return path.Base(path.Dir(p)) } -func (df *DocsFilter) addDoneFrac(fracName, fracPath string) { - df.fracsMu.Lock() - defer df.fracsMu.Unlock() +func (fm *FilterManager) addDoneFrac(fracName, fracPath string) { + fm.fracsMu.Lock() + defer fm.fracsMu.Unlock() - df.fracs[fracName] = append(df.fracs[fracName], fracPath) + fm.fracs[fracName] = append(fm.fracs[fracName], fracPath) } // loadFilters loads existing filters -func (df *DocsFilter) loadFilters() error { - des, err := os.ReadDir(df.config.DataDir) +func (fm *FilterManager) loadFilters() error { + des, err := os.ReadDir(fm.config.DataDir) if err != nil { return err } @@ -187,9 +199,9 @@ func (df *DocsFilter) loadFilters() error { continue } - if _, ok := df.filters[de.Name()]; !ok { + if _, ok := fm.filters[de.Name()]; !ok { logger.Info("there is filter folder on disk, but not in config. need to delete it.") - err := os.RemoveAll(path.Join(df.config.DataDir, de.Name())) + err := os.RemoveAll(path.Join(fm.config.DataDir, de.Name())) if err != nil && !os.IsNotExist(err) { return err } @@ -197,9 +209,9 @@ func (df *DocsFilter) loadFilters() error { continue } - f := df.filters[de.Name()] + f := fm.filters[de.Name()] f.status = StatusInProgress - f.dirPath = path.Join(df.config.DataDir, de.Name()) + f.dirPath = path.Join(fm.config.DataDir, de.Name()) filterDes, err := os.ReadDir(f.dirPath) if err != nil { @@ -218,7 +230,7 @@ func (df *DocsFilter) loadFilters() error { case fracInQueueExt: hasFracsInQueue = true case fracDoneExt: - df.addDoneFrac(fracNameFromFilePath(name), path.Join(f.dirPath, name)) + fm.addDoneFrac(fracNameFromFilePath(name), path.Join(f.dirPath, name)) } } @@ -228,33 +240,43 @@ func (df *DocsFilter) loadFilters() error { } if anyRemove { - util.MustFsyncFile(df.config.DataDir) + util.MustFsyncFile(fm.config.DataDir) } return nil } // buildQueue creates a directory for each of unprocessed filters and creates .queue files -func (df *DocsFilter) buildQueue(fracs fracmanager.List) error { - for _, filter := range df.filters { +func (fm *FilterManager) buildQueue(fracs fracmanager.List) error { + for _, filter := range fm.filters { if filter.status != StatusCreated { continue } - filter.dirPath = path.Join(df.config.DataDir, filter.Hash()) - util.MustCreateDir(filter.dirPath) + + // create tmp dir + tmpDir := path.Join(fm.config.DataDir, filter.Hash(), tmpDirSuffix) + util.MustCreateDir(tmpDir) filterFracs := fracs.FilterInRange(seq.MID(filter.params.From), seq.MID(filter.params.To)) for _, f := range filterFracs { - queueFilePath := path.Join(filter.dirPath, makeFileName(f.Info().Name(), fracInQueueExt)) - util.MustWriteFileAtomic(queueFilePath, []byte{}, tmpExt) + queueFilePath := path.Join(tmpDir, makeFileName(f.Info().Name(), fracInQueueExt)) + util.MustWriteFileAtomic(queueFilePath, []byte{}, 0o666, tmpExt) + } + + // rename tmp dir + dir := path.Join(fm.config.DataDir, filter.Hash()) + if err := os.Rename(tmpDir, dir); err != nil { + return err } + util.MustFsyncFile(fm.config.DataDir) + filter.dirPath = dir } return nil } // handleFilter finds docs and writes to fs -func (df *DocsFilter) processFilter(filter *Filter, fracs fracmanager.List) { +func (fm *FilterManager) processFilter(ctx context.Context, filter *Filter, fracs fracmanager.List) { if len(fracs) == 0 { return } @@ -276,12 +298,18 @@ func (df *DocsFilter) processFilter(filter *Filter, fracs fracmanager.List) { if !ok { // skip missing fracs return nil } - filter.processWg.Add(1) - go func() { - if err := df.processFrac(f, filter, false); err != nil { - panic(fmt.Errorf("docs filter process frac err: %s", err)) - } - }() + + select { + case <-ctx.Done(): + return nil + case fm.rateLimit <- struct{}{}: + filter.processWg.Go(func() { + defer func() { <-fm.rateLimit }() + if err := fm.processFrac(f, filter, false); err != nil { + panic(fmt.Errorf("docs filter process frac err: %s", err)) + } + }) + } return nil } _ = util.VisitFilesWithExt(filterDes, fracInQueueExt, processFracInQueue) @@ -293,13 +321,8 @@ func (df *DocsFilter) processFilter(filter *Filter, fracs fracmanager.List) { }() } -func (df *DocsFilter) processFrac(f frac.Fraction, filter *Filter, refresh bool) error { - defer filter.processWg.Done() - - df.rateLimit <- struct{}{} - defer func() { <-df.rateLimit }() - - qpr, err := f.Search(df.ctx, processor.SearchParams{ +func (fm *FilterManager) processFrac(f frac.Fraction, filter *Filter, refresh bool) error { + qpr, err := f.Search(fm.ctx, processor.SearchParams{ AST: filter.ast.Root, From: seq.MID(filter.params.From), To: seq.MID(filter.params.To), @@ -317,46 +340,42 @@ func (df *DocsFilter) processFrac(f frac.Fraction, filter *Filter, refresh bool) return nil } - storeDocsFilter := func(rawDocsFilter []byte) error { - util.MustWriteFileAtomic(doneFilePath, rawDocsFilter, tmpExt) - util.RemoveFile(queueFilePath) - return nil - } - // TODO: here we doing part of the work twice: // first time we find LIDs inside f.Search() and then find IDs by these LIDs. // Then we again find LIDs by earlier found IDs in f.FindLIDs(). // We did it like this because otherwise we had to do serious f.Search() rewrite. // For now we're ok with some performance penalty. - lids, err := f.FindLIDs(df.ctx, qpr.IDs.IDs()) + lids, err := f.FindLIDs(fm.ctx, qpr.IDs.IDs()) if err != nil { return err } docsFilterBin := DocsFilterBinIn{LIDs: lids} - if err := writeDocsFilter(&docsFilterBin, storeDocsFilter); err != nil { + if err := writeDocsFilter(&docsFilterBin, queueFilePath, doneFilePath); err != nil { return err } if !refresh { - df.addDoneFrac(f.Info().Name(), doneFilePath) + fm.addDoneFrac(f.Info().Name(), doneFilePath) } return nil } -func (df *DocsFilter) maintenance() { - for { - logger.Info("docs filter maintenance iteration") - df.checkDiskUsage() - time.Sleep(df.maintenanceInterval) - } +func (fm *FilterManager) startMaintenance(ctx context.Context) { + fm.maintenanceWG.Go(func() { + logger.Info("start docs filter maintenance") + util.RunEvery(ctx.Done(), fm.maintenanceInterval, func() { + logger.Info("docs filter maintenance iteration") + fm.checkDiskUsage() + }) + }) } -func (df *DocsFilter) checkDiskUsage() { +func (fm *FilterManager) checkDiskUsage() { du := int64(0) - for _, f := range df.filters { + for _, f := range fm.filters { des, err := os.ReadDir(f.dirPath) if err != nil { logger.Error("docs filter: can't read filter's dir", @@ -370,7 +389,7 @@ func (df *DocsFilter) checkDiskUsage() { } info, err := fde.Info() if err != nil { - logger.Error("docs filter: can't read tombstones file info", + logger.Error("docs filter: can't read filter file info", zap.String("filter", f.String()), zap.Error(err)) return } @@ -379,7 +398,7 @@ func (df *DocsFilter) checkDiskUsage() { } diskUsage.Set(float64(du)) - storedFilters.Set(float64(len(df.filters))) + storedFilters.Set(float64(len(fm.filters))) } func makeFileName(name, ext string) string { @@ -392,22 +411,20 @@ func fracNameFromFilePath(filterFilePath string) string { var marshalBufferPool util.BufferPool -func writeDocsFilter(df *DocsFilterBinIn, cb func(compressed []byte) error) error { +func writeDocsFilter(df *DocsFilterBinIn, queueFilePath, doneFilePath string) error { rawDocsFilter := marshalBufferPool.Get() defer marshalBufferPool.Put(rawDocsFilter) rawDocsFilter.B = marshalDocsFilter(rawDocsFilter.B, df) - if err := cb(rawDocsFilter.B); err != nil { - return err - } + util.MustWriteFileAtomic(doneFilePath, rawDocsFilter.B, 0o666, tmpExt) + util.RemoveFile(queueFilePath) + return nil } -// createDataDir creates dir data lazily to avoid creating extra folders. -func (df *DocsFilter) createDataDir() { - df.createDirOnce.Do(func() { - if err := os.MkdirAll(df.config.DataDir, 0o777); err != nil { - panic(err) - } - }) +// createDataDir creates data dir. +func (fm *FilterManager) createDataDir() { + if err := os.MkdirAll(fm.config.DataDir, 0o777); err != nil { + panic(err) + } } diff --git a/docsfilter/metrics.go b/filtermanager/metrics.go similarity index 89% rename from docsfilter/metrics.go rename to filtermanager/metrics.go index de45bc40..ed47171e 100644 --- a/docsfilter/metrics.go +++ b/filtermanager/metrics.go @@ -1,4 +1,4 @@ -package docsfilter +package filtermanager import ( "github.com/prometheus/client_golang/prometheus" @@ -16,6 +16,7 @@ var ( Namespace: "seq_db_store", Subsystem: "filters", Name: "disk_usage_bytes", + Help: "Disk space used by filter files in bytes", }) storedFilters = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "seq_db_store", diff --git a/fracmanager/storage_state.go b/fracmanager/storage_state.go index 262f5a1b..f1aaf950 100644 --- a/fracmanager/storage_state.go +++ b/fracmanager/storage_state.go @@ -2,7 +2,6 @@ package fracmanager import ( "encoding/json" - "fmt" "os" "path/filepath" "sync" @@ -83,46 +82,5 @@ func (m *StateManager) save() error { if err != nil { return err } - return atomicWrite(m.filePath, data, 0o644) -} - -// atomicWrite safely writes data to file using atomic replacement pattern -func atomicWrite(path string, data []byte, perm os.FileMode) error { - tmpPath := path + ".tmp" - f, err := os.OpenFile(tmpPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) - if err != nil { - return fmt.Errorf("create temp file: %w", err) - } - - defer func() { - if f != nil { - f.Close() - } - if err != nil { - os.Remove(tmpPath) - } - }() - - if _, err = f.Write(data); err != nil { - return fmt.Errorf("write data: %w", err) - } - - if err = f.Sync(); err != nil { - return fmt.Errorf("sync data: %w", err) - } - - if err = f.Close(); err != nil { - return fmt.Errorf("close file: %w", err) - } - f = nil // mark as closed so defer doesn't close again - - if err = os.Rename(tmpPath, path); err != nil { - return fmt.Errorf("rename file: %w", err) - } - - if err = util.SyncPath(filepath.Dir(path)); err != nil { // also sync parent directory - return fmt.Errorf("sync dir: %w", err) - } - - return nil + return util.WriteFileAtomic(m.filePath, data, 0o644, ".txt") } diff --git a/util/fs.go b/util/fs.go index b6e9eaca..8f148f6e 100644 --- a/util/fs.go +++ b/util/fs.go @@ -5,6 +5,7 @@ package util import ( "errors" + "fmt" "os" "path" "path/filepath" @@ -56,37 +57,51 @@ func RemoveFile(file string) { } } -func MustWriteFileAtomic(fpath string, data []byte, tmpFileExt string) { - fpathTmp := fpath + tmpFileExt +func MustWriteFileAtomic(fpath string, data []byte, perm os.FileMode, tmpFileExt string) { + if err := WriteFileAtomic(fpath, data, perm, tmpFileExt); err != nil { + logger.Panic("can't write file atomic", zap.String("path", fpath), zap.Error(err)) + } +} - f, err := os.Create(fpathTmp) +// atomicWrite safely writes data to file using atomic replacement pattern +func WriteFileAtomic(fpath string, data []byte, perm os.FileMode, tmpFileExt string) error { + tmpPath := fpath + tmpFileExt + f, err := os.OpenFile(tmpPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) if err != nil { - logger.Panic("can't create file", zap.Error(err)) + return fmt.Errorf("create temp file: %w", err) } + defer func() { - if err := f.Close(); err != nil { - logger.Panic("can't close file", zap.Error(err)) + if f != nil { + f.Close() + } + if err != nil { + os.Remove(tmpPath) } }() - if _, err := f.Write(data); err != nil { - logger.Panic("can't write to file", zap.Error(err)) + if _, err = f.Write(data); err != nil { + return fmt.Errorf("write data: %w", err) } - if err := f.Sync(); err != nil { - logger.Panic("can't sync file", zap.Error(err)) + if err = f.Sync(); err != nil { + return fmt.Errorf("sync data: %w", err) } - if err := os.Rename(fpathTmp, fpath); err != nil { - logger.Panic("can't rename file", zap.Error(err)) + if err = f.Close(); err != nil { + return fmt.Errorf("close file: %w", err) } + f = nil // mark as closed so defer doesn't close again - absFpath, err := filepath.Abs(fpath) - if err != nil { - logger.Panic("can't get absolute path", zap.String("path", fpath), zap.Error(err)) + if err = os.Rename(tmpPath, fpath); err != nil { + return fmt.Errorf("rename file: %w", err) + } + + if err = SyncPath(filepath.Dir(fpath)); err != nil { // also sync parent directory + return fmt.Errorf("sync dir: %w", err) } - dir := path.Dir(absFpath) - MustFsyncFile(dir) + + return nil } func MustFsyncFile(fpath string) { From 650ca4e69f393e1ab44706f283fc2a570a9b874d Mon Sep 17 00:00:00 2001 From: Daniil Forshev Date: Mon, 6 Apr 2026 18:57:21 +0500 Subject: [PATCH 3/4] fix: move config to last pr --- config/config.go | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/config/config.go b/config/config.go index 84e7582d..00df0d94 100644 --- a/config/config.go +++ b/config/config.go @@ -277,13 +277,13 @@ type Config struct { } `config:"tracing"` // Additional filtering options - Filtering Filter `config:"filtering"` - DocsFilter struct { - DataDir string `config:"data_dir"` - Concurrency int `config:"concurrency"` - Filters []Filter `config:"filters"` - CacheSize Bytes `config:"cache_size" default:"100MiB"` - } `config:"docs_filter"` + Filtering struct { + // If a search query time range overlaps with the [from; to] range + // the search query will be `AND`-ed with an additional predicate with the provided query expression + Query string `config:"query"` + From time.Time `config:"from"` + To time.Time `config:"to"` + } `config:"filtering"` // Experimental provides flags // For configuring experimental features. @@ -305,9 +305,3 @@ func (b *Bytes) UnmarshalString(s string) error { *b = Bytes(bytes) return nil } - -type Filter struct { - Query string `config:"query"` - From time.Time `config:"from"` - To time.Time `config:"to"` -} From 7a227d0e229152e69b4c37637dc94d438ddd3ba2 Mon Sep 17 00:00:00 2001 From: Daniil Forshev Date: Tue, 7 Apr 2026 15:20:23 +0500 Subject: [PATCH 4/4] fix(filter manager): fix build queue --- filtermanager/filter_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filtermanager/filter_manager.go b/filtermanager/filter_manager.go index ffd43aa0..d2688e49 100644 --- a/filtermanager/filter_manager.go +++ b/filtermanager/filter_manager.go @@ -254,7 +254,7 @@ func (fm *FilterManager) buildQueue(fracs fracmanager.List) error { } // create tmp dir - tmpDir := path.Join(fm.config.DataDir, filter.Hash(), tmpDirSuffix) + tmpDir := path.Join(fm.config.DataDir, fmt.Sprintf("%s%s", filter.Hash(), tmpDirSuffix)) util.MustCreateDir(tmpDir) filterFracs := fracs.FilterInRange(seq.MID(filter.params.From), seq.MID(filter.params.To))