diff --git a/asyncsearcher/async_searcher.go b/asyncsearcher/async_searcher.go index a34fec29..eab092cf 100644 --- a/asyncsearcher/async_searcher.go +++ b/asyncsearcher/async_searcher.go @@ -281,7 +281,7 @@ func (as *AsyncSearcher) storeSearchInfoLocked(id string, info asyncSearchInfo) panic(err) } fpath := path.Join(as.config.DataDir, id+asyncSearchExtInfo) - mustWriteFileAtomic(fpath, b) + util.MustWriteFileAtomic(fpath, b, 0o666, asyncSearchTmpFile) info.infoSize.Store(int64(len(b))) as.requests[id] = info } @@ -420,7 +420,7 @@ func (as *AsyncSearcher) processFrac(f frac.Fraction, info asyncSearchInfo) (err name := getQPRFilename(info.Request.ID, f.Info().Name()) fpath := path.Join(as.config.DataDir, name) - mustWriteFileAtomic(fpath, rawQPR) + util.MustWriteFileAtomic(fpath, rawQPR, 0o666, asyncSearchTmpFile) info.qprsSize.Add(int64(len(rawQPR))) return nil @@ -465,7 +465,7 @@ func (as *AsyncSearcher) findQPRs(id string) ([]string, error) { files = append(files, path.Join(as.config.DataDir, name)) return nil } - if err := visitFilesWithExt(des, asyncSearchExtQPR, appendQPRInfoFile); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtQPR, appendQPRInfoFile); err != nil { return nil, err } return files, nil @@ -490,7 +490,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { areQPRsMerged[requestID] = true return nil } - if err := visitFilesWithExt(des, asyncSearchExtMergedQPR, loadMergedQPRsInfo); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtMergedQPR, loadMergedQPRsInfo); err != nil { return nil, err } @@ -510,7 +510,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { anyRemove = true return nil } - if err := visitFilesWithExt(des, asyncSearchExtQPR, removeMergedQPRs); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtQPR, removeMergedQPRs); err != nil { return nil, err } @@ -522,11 +522,11 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { anyRemove = true return nil } - if err := visitFilesWithExt(des, asyncSearchTmpFile, removeTmpFiles); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchTmpFile, removeTmpFiles); err != nil { return nil, err } if anyRemove { - mustFsyncFile(dataDir) + util.MustFsyncFile(dataDir) } qprsDuByID := make(map[string]int) @@ -592,7 +592,7 @@ func loadAsyncRequests(dataDir string) (map[string]asyncSearchInfo, error) { requests[requestID] = info return nil } - if err := visitFilesWithExt(des, asyncSearchExtInfo, loadInfos); err != nil { + if err := util.VisitFilesWithExt(des, asyncSearchExtInfo, loadInfos); err != nil { return nil, err } return requests, nil @@ -807,7 +807,7 @@ func (as *AsyncSearcher) mergeQPRs(job mergeJob) { storeMQPR := func(compressed []byte) error { sizeAfter = len(compressed) mqprPath := path.Join(as.config.DataDir, job.ID+asyncSearchExtMergedQPR) - mustWriteFileAtomic(mqprPath, compressed) + util.MustWriteFileAtomic(mqprPath, compressed, 0o666, asyncSearchTmpFile) return nil } if err := compressQPR(&qpr, storeMQPR); err != nil { @@ -1026,65 +1026,3 @@ func (as *AsyncSearcher) GetAsyncSearchesList(r GetAsyncSearchesListRequest) []* return items } - -func mustWriteFileAtomic(fpath string, data []byte) { - fpathTmp := fpath + asyncSearchTmpFile - - f, err := os.Create(fpathTmp) - if err != nil { - logger.Fatal("can't create file", zap.Error(err)) - } - defer func() { - if err := f.Close(); err != nil { - logger.Fatal("can't close file", zap.Error(err)) - } - }() - - if _, err := f.Write(data); err != nil { - logger.Fatal("can't write to file", zap.Error(err)) - } - - if err := f.Sync(); err != nil { - logger.Fatal("can't sync file", zap.Error(err)) - } - - if err := os.Rename(fpathTmp, fpath); err != nil { - logger.Fatal("can't rename file", zap.Error(err)) - } - - absFpath, err := filepath.Abs(fpath) - if err != nil { - logger.Fatal("can't get absolute path", zap.String("path", fpath), zap.Error(err)) - } - dir := path.Dir(absFpath) - mustFsyncFile(dir) -} - -func mustFsyncFile(fpath string) { - dirFile, err := os.Open(fpath) - if err != nil { - logger.Fatal("can't open dir", zap.Error(err)) - } - if err := dirFile.Sync(); err != nil { - logger.Fatal("can't sync dir", zap.Error(err)) - } - if err := dirFile.Close(); err != nil { - logger.Fatal("can't close dir", zap.Error(err)) - } -} - -func visitFilesWithExt(des []os.DirEntry, ext string, cb func(name string) error) error { - for _, de := range des { - if de.IsDir() { - continue - } - name := de.Name() - if path.Ext(name) != ext { - continue - } - if err := cb(name); err != nil { - return err - } - } - return nil -} diff --git a/filtermanager/encoding.go b/filtermanager/encoding.go new file mode 100644 index 00000000..26a6fa46 --- /dev/null +++ b/filtermanager/encoding.go @@ -0,0 +1,285 @@ +package filtermanager + +import ( + "encoding/binary" + "errors" + "fmt" + "math" + "unsafe" + + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" + "github.com/ozontech/seq-db/zstd" +) + +type DocsFilterBinIn struct { + LIDs []seq.LID +} + +type DocsFilterBinOut struct { + LIDs []uint32 +} + +type docsFilterBinVersion uint8 + +const ( + docsFilterBinVersion1 docsFilterBinVersion = iota + 1 +) + +var availableVersions = map[docsFilterBinVersion]struct{}{ + docsFilterBinVersion1: {}, +} + +type lidsCodec byte + +const ( + lidsCodecDelta = 1 + lidsCodecDeltaZstd = 2 +) + +type lidsBlockHeader struct { + Codec lidsCodec + Length uint32 // Number of LIDs in block + MinLID uint32 + MaxLID uint32 + Size uint32 // Size of ids block in bytes. + Offset uint64 // block's offset in file +} + +func (h *lidsBlockHeader) marshal(dst []byte) { + if len(dst) < int(lidsBlockHeaderSizeBytes) { + panic("BUG: marshal lidsBlockHeader: len(dst) is less than header size") + } + + dst[0] = byte(h.Codec) + dst = dst[1:] + binary.LittleEndian.PutUint32(dst, h.Length) + dst = dst[sizeOfUint32:] + binary.LittleEndian.PutUint32(dst, h.MinLID) + dst = dst[sizeOfUint32:] + binary.LittleEndian.PutUint32(dst, h.MaxLID) + dst = dst[sizeOfUint32:] + binary.LittleEndian.PutUint32(dst, h.Size) + dst = dst[sizeOfUint32:] + binary.LittleEndian.PutUint64(dst, h.Offset) + dst = dst[sizeOfUint64:] +} + +func (h *lidsBlockHeader) unmarshal(src []byte) ([]byte, error) { + if len(src) < int(lidsBlockHeaderSizeBytes) { + return src, errors.New("too few bytes") + } + + h.Codec = lidsCodec(src[0]) + src = src[1:] + h.Length = binary.LittleEndian.Uint32(src) + src = src[sizeOfUint32:] + h.MinLID = binary.LittleEndian.Uint32(src) + src = src[sizeOfUint32:] + h.MaxLID = binary.LittleEndian.Uint32(src) + src = src[sizeOfUint32:] + h.Size = binary.LittleEndian.Uint32(src) + src = src[sizeOfUint32:] + h.Offset = binary.LittleEndian.Uint64(src) + src = src[sizeOfUint64:] + + return src, nil +} + +func marshalDocsFilter(dst []byte, in *DocsFilterBinIn) []byte { + dst = append(dst, uint8(docsFilterBinVersion1)) + dst = marshalLIDsBlocks(dst, in.LIDs) + return dst +} + +const ( + sizeOfUint32 = unsafe.Sizeof(uint32(0)) + sizeOfUint64 = unsafe.Sizeof(uint64(0)) +) + +const ( + lidsBlockHeaderSizeBytes = 1 + (4 * sizeOfUint32) + sizeOfUint64 + maxLIDsBlockLen = 1024 +) + +var lidsBlockBufPool util.BufferPool + +func marshalLIDsBlocks(dst []byte, in []seq.LID) []byte { + b := lidsBlockBufPool.Get() + defer lidsBlockBufPool.Put(b) + + numberOfBlocks := (len(in) + maxLIDsBlockLen - 1) / maxLIDsBlockLen + dst = binary.LittleEndian.AppendUint32(dst, uint32(numberOfBlocks)) + + // reserve space for headers + curHeaderOffset := len(dst) + dst = append(dst, make([]byte, numberOfBlocks*int(lidsBlockHeaderSizeBytes))...) + + var start int + for range numberOfBlocks { + end := min(maxLIDsBlockLen, len(in[start:])) + chunk := in[start : start+end] + + var codec lidsCodec + b.B, codec = marshalLIDsBlock(b.B[:0], chunk) + if len(b.B) > math.MaxUint32 { + panic(fmt.Errorf("unexpected block length %d; want up to %d", len(b.B), math.MaxUint32)) + } + + header := lidsBlockHeader{ + Codec: codec, + Length: uint32(len(chunk)), + MinLID: uint32(chunk[0]), + MaxLID: uint32(chunk[len(chunk)-1]), + Size: uint32(len(b.B)), + Offset: uint64(len(dst)), + } + header.marshal(dst[curHeaderOffset:]) + curHeaderOffset += int(lidsBlockHeaderSizeBytes) + + dst = append(dst, b.B...) + start += end + } + + return dst +} + +func marshalLIDsBlock(dst []byte, in []seq.LID) ([]byte, lidsCodec) { + b := lidsBlockBufPool.Get() + defer lidsBlockBufPool.Put(b) + + prev := seq.LID(0) + for i := range len(in) { + lid := in[i] + deltaLID := lid - prev + prev = lid + b.B = binary.AppendVarint(b.B, int64(deltaLID)) + } + + orig := dst + dst = zstd.CompressLevel(b.B, dst, getCompressLevel(len(b.B))) + + compressRatio := float64(len(dst)-len(orig)) / float64(len(b.B)) + if compressRatio < 1.05 { + orig = append(orig, b.B...) + return orig, lidsCodecDelta + } + + return dst, lidsCodecDeltaZstd +} + +const minLIDsFIlterBytesLen = 10 // 1 byte lidsBinVersion + 8 byte number of LIDs + N (min 1) bytes varint + delta encoded LIDs + +func unmarshalDocsFilter(dst *DocsFilterBinOut, src []byte) (_ []byte, err error) { + if len(src) < minLIDsFIlterBytesLen { + return nil, fmt.Errorf("invalid LIDs filter format; want %d bytes, got %d", minLIDsFIlterBytesLen, len(src)) + } + + version := docsFilterBinVersion(src[0]) + src = src[1:] + if _, ok := availableVersions[version]; !ok { + return nil, fmt.Errorf("invalid LIDs binary version: %d", version) + } + + dst.LIDs, src, err = unmarshalLIDsBlocks(dst.LIDs, src) + if err != nil { + return src, err + } + + return src, nil +} + +func unmarshalLIDsBlocks(dst []uint32, src []byte) ([]uint32, []byte, error) { + numberOfBlocks := binary.LittleEndian.Uint32(src) + src = src[sizeOfUint32:] + + var err error + + headers := make([]lidsBlockHeader, 0, numberOfBlocks) + for range numberOfBlocks { + header := lidsBlockHeader{} + src, err = header.unmarshal(src) + if err != nil { + return dst, src, fmt.Errorf("can't unmarshal lids header: %s", err) + } + headers = append(headers, header) + } + + for i := range numberOfBlocks { + dst, src, err = unmarshalLIDsBlock(dst, src, headers[i]) + if err != nil { + return dst, src, err + } + } + + if len(src) > 0 { + return dst, src, fmt.Errorf("unexpected tail when unmarshaling LIDs blocks") + } + + return dst, src, nil +} + +func unmarshalLIDsBlock(dst []uint32, src []byte, header lidsBlockHeader) ([]uint32, []byte, error) { + if len(src) == 0 { + return dst, src, fmt.Errorf("empty LIDs block") + } + + if header.Size == 0 || int(header.Size) > len(src) { + return nil, src, fmt.Errorf("invalid LIDs block length %d; want %d", len(src), header.Size) + } + + block := src[:header.Size] + src = src[header.Size:] + + var err error + + switch header.Codec { + case lidsCodecDeltaZstd: + b := lidsBlockBufPool.Get() + defer lidsBlockBufPool.Put(b) + b.B, err = zstd.Decompress(block, b.B) + if err != nil { + return dst, src, fmt.Errorf("can't decompress ids block: %s", err) + } + dst, err = unmarshalLIDsDelta(dst, b.B, header) + if err != nil { + return dst, src, err + } + return dst, src, nil + case lidsCodecDelta: + dst, err = unmarshalLIDsDelta(dst, block, header) + if err != nil { + return dst, src, err + } + return dst, src, nil + default: + return dst, src, fmt.Errorf("unknown ids codec: %d", header.Codec) + } +} + +func unmarshalLIDsDelta(dst []uint32, block []byte, header lidsBlockHeader) ([]uint32, error) { + prevLID := uint32(0) + for range header.Length { + v, n := binary.Varint(block) + block = block[n:] + lid := prevLID + uint32(v) + prevLID = lid + dst = append(dst, lid) + } + + if len(block) > 0 { + return dst, fmt.Errorf("unexpected tail when unmarshaling LIDs block") + } + + return dst, nil +} + +func getCompressLevel(size int) int { + level := 3 + if size <= 512 { + level = 1 + } else if size <= 4*1024 { + level = 2 + } + return level +} diff --git a/filtermanager/encoding_test.go b/filtermanager/encoding_test.go new file mode 100644 index 00000000..8af0b7a3 --- /dev/null +++ b/filtermanager/encoding_test.go @@ -0,0 +1,42 @@ +package filtermanager + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/seq" +) + +func TestMarshalUnmarshalLIDsFilter(t *testing.T) { + test := func(df DocsFilterBinIn) { + t.Helper() + + rawDocsFilter := marshalDocsFilter(nil, &df) + var out DocsFilterBinOut + tail, err := unmarshalDocsFilter(&out, rawDocsFilter) + require.NoError(t, err) + require.Equal(t, 0, len(tail)) + assert.Equal(t, lidsToUint32s(df.LIDs), out.LIDs) + } + + test(DocsFilterBinIn{LIDs: []seq.LID{0, 1, 2, 3}}) + test(DocsFilterBinIn{LIDs: []seq.LID{10, 15, 22, 18, 105, 1010}}) + test(DocsFilterBinIn{LIDs: []seq.LID{11}}) + + multipleBlocksSize := maxLIDsBlockLen*3 + 15 + multipleBlocksLIDs := make([]seq.LID, 0, multipleBlocksSize) + for i := range multipleBlocksSize { + multipleBlocksLIDs = append(multipleBlocksLIDs, seq.LID(i)) + } + test(DocsFilterBinIn{LIDs: multipleBlocksLIDs}) +} + +func lidsToUint32s(in []seq.LID) []uint32 { + out := make([]uint32, 0, len(in)) + for _, i := range in { + out = append(out, uint32(i)) + } + return out +} diff --git a/filtermanager/filter.go b/filtermanager/filter.go new file mode 100644 index 00000000..f7d03566 --- /dev/null +++ b/filtermanager/filter.go @@ -0,0 +1,65 @@ +package filtermanager + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "sync" + + "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/seq" +) + +type FilterStatus byte + +const ( + StatusCreated FilterStatus = iota + StatusInProgress + StatusDone + StatusError +) + +type Params struct { + Query string + From seq.MID + To seq.MID +} + +type Filter struct { + params Params + + status FilterStatus + + ast parser.SeqQLQuery + + hash string + dirPath string + + processWg *sync.WaitGroup +} + +func NewFilter(params Params) *Filter { + return &Filter{ + params: params, + status: StatusCreated, + processWg: &sync.WaitGroup{}, + } +} + +func (f *Filter) String() string { + return fmt.Sprintf("%s_%d_%d", f.params.Query, f.params.From, f.params.To) +} + +func (f *Filter) Hash() string { + if f.hash == "" { + h := sha256.New() + h.Write([]byte(f.String())) + bs := h.Sum(nil) + f.hash = hex.EncodeToString(bs) + } + return f.hash +} + +func (f *Filter) markAsDone() { + f.status = StatusDone +} diff --git a/filtermanager/filter_manager.go b/filtermanager/filter_manager.go new file mode 100644 index 00000000..d2688e49 --- /dev/null +++ b/filtermanager/filter_manager.go @@ -0,0 +1,430 @@ +package filtermanager + +import ( + "context" + "fmt" + "math" + "os" + "path" + "runtime" + "strings" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/frac/processor" + "github.com/ozontech/seq-db/fracmanager" + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" +) + +const ( + fracInQueueExt = ".queue" + fracDoneExt = ".filter" + tmpExt = ".tmp" + + tmpDirSuffix = "_tmp" +) + +const ( + defaultMaintenanceInterval = 30 * time.Second +) + +type MappingProvider interface { + GetMapping() seq.Mapping +} + +type Config struct { + DataDir string + Workers int + CacheSizeLimit uint64 +} + +type FilterManager struct { + ctx context.Context + + config Config + filters map[string]*Filter + + fracs map[string][]string + fracsMu *sync.RWMutex + + mp MappingProvider + + rateLimit chan struct{} + + maintenanceWG *sync.WaitGroup + maintenanceInterval time.Duration + maintenanceStop context.CancelFunc +} + +func New( + ctx context.Context, + cfg Config, + params []Params, + mp MappingProvider, +) *FilterManager { + workers := cfg.Workers + if workers <= 0 { + workers = runtime.GOMAXPROCS(0) + } + + filtersMap := make(map[string]*Filter, len(params)) + + for _, p := range params { + f := NewFilter(p) + filtersMap[f.Hash()] = f + } + + return &FilterManager{ + ctx: ctx, + config: cfg, + filters: filtersMap, + fracs: make(map[string][]string), + fracsMu: &sync.RWMutex{}, + mp: mp, + rateLimit: make(chan struct{}, workers), + maintenanceInterval: defaultMaintenanceInterval, + } +} + +func (fm *FilterManager) Start(ctx context.Context, fracs fracmanager.List) { + fm.createDataDir() + + err := fm.loadFilters() + if err != nil { + logger.Fatal("failed to load previous docs filters", zap.Error(err)) + } + + err = fm.buildQueue(fracs) + if err != nil { + logger.Fatal("failed to build docs filters queue", zap.Error(err)) + } + + ctx, cancel := context.WithCancel(ctx) + fm.maintenanceStop = cancel + fm.startMaintenance(ctx) + + mapping := fm.mp.GetMapping() + + go func() { + for _, f := range fm.filters { + ast, err := parser.ParseSeqQL(f.params.Query, mapping) + if err != nil { + panic(fmt.Errorf("BUG: search query must be valid: %s", err)) + } + f.ast = ast + + fm.processFilter(ctx, f, fracs.FilterInRange(f.params.From, f.params.To)) + } + }() +} + +func (fm *FilterManager) Stop() { + fm.maintenanceStop() + fm.maintenanceWG.Wait() +} + +// RefreshFrac replaces frac's filter files with newly found results. Used after active frac is sealed. +func (fm *FilterManager) RefreshFrac(fraction frac.Fraction) { + fm.fracsMu.RLock() + fracsFiles, has := fm.fracs[fraction.Info().Name()] + fm.fracsMu.RUnlock() + + if !has { + return + } + + for _, fileName := range fracsFiles { + filter := fm.filters[filterNameFromPath(fileName)] + + queueFilePath := path.Join(filter.dirPath, makeFileName(fraction.Info().Name(), fracInQueueExt)) + util.MustWriteFileAtomic(queueFilePath, []byte{}, 0o666, tmpExt) + + fm.rateLimit <- struct{}{} + go func() { + defer func() { <-fm.rateLimit }() + if err := fm.processFrac(fraction, filter, false); err != nil { + panic(fmt.Errorf("docs filter refresh frac err: %s", err)) + } + }() + } +} + +// RemoveFrac removes fraction's filter files. Used after frac is deleted +func (fm *FilterManager) RemoveFrac(fracName string) { + fm.fracsMu.RLock() + fracsFiles, has := fm.fracs[fracName] + fm.fracsMu.RUnlock() + + if !has { + return + } + + fm.fracsMu.Lock() + delete(fm.fracs, fracName) + fm.fracsMu.Unlock() + + for _, fileName := range fracsFiles { + util.RemoveFile(fileName) + } +} + +func filterNameFromPath(p string) string { + return path.Base(path.Dir(p)) +} + +func (fm *FilterManager) addDoneFrac(fracName, fracPath string) { + fm.fracsMu.Lock() + defer fm.fracsMu.Unlock() + + fm.fracs[fracName] = append(fm.fracs[fracName], fracPath) +} + +// loadFilters loads existing filters +func (fm *FilterManager) loadFilters() error { + des, err := os.ReadDir(fm.config.DataDir) + if err != nil { + return err + } + + var anyRemove bool + + for _, de := range des { + if !de.IsDir() { + continue + } + + if _, ok := fm.filters[de.Name()]; !ok { + logger.Info("there is filter folder on disk, but not in config. need to delete it.") + err := os.RemoveAll(path.Join(fm.config.DataDir, de.Name())) + if err != nil && !os.IsNotExist(err) { + return err + } + anyRemove = true + continue + } + + f := fm.filters[de.Name()] + f.status = StatusInProgress + f.dirPath = path.Join(fm.config.DataDir, de.Name()) + + filterDes, err := os.ReadDir(f.dirPath) + if err != nil { + return fmt.Errorf("reading directory: %s", err) + } + + var hasFracsInQueue bool + + for _, fde := range filterDes { + if fde.IsDir() { + continue + } + name := fde.Name() + + switch path.Ext(name) { + case fracInQueueExt: + hasFracsInQueue = true + case fracDoneExt: + fm.addDoneFrac(fracNameFromFilePath(name), path.Join(f.dirPath, name)) + } + } + + if !hasFracsInQueue { + f.status = StatusDone + } + } + + if anyRemove { + util.MustFsyncFile(fm.config.DataDir) + } + + return nil +} + +// buildQueue creates a directory for each of unprocessed filters and creates .queue files +func (fm *FilterManager) buildQueue(fracs fracmanager.List) error { + for _, filter := range fm.filters { + if filter.status != StatusCreated { + continue + } + + // create tmp dir + tmpDir := path.Join(fm.config.DataDir, fmt.Sprintf("%s%s", filter.Hash(), tmpDirSuffix)) + util.MustCreateDir(tmpDir) + + filterFracs := fracs.FilterInRange(seq.MID(filter.params.From), seq.MID(filter.params.To)) + for _, f := range filterFracs { + queueFilePath := path.Join(tmpDir, makeFileName(f.Info().Name(), fracInQueueExt)) + util.MustWriteFileAtomic(queueFilePath, []byte{}, 0o666, tmpExt) + } + + // rename tmp dir + dir := path.Join(fm.config.DataDir, filter.Hash()) + if err := os.Rename(tmpDir, dir); err != nil { + return err + } + util.MustFsyncFile(fm.config.DataDir) + filter.dirPath = dir + } + + return nil +} + +// handleFilter finds docs and writes to fs +func (fm *FilterManager) processFilter(ctx context.Context, filter *Filter, fracs fracmanager.List) { + if len(fracs) == 0 { + return + } + + fracsByName := make(map[string]frac.Fraction) + for _, f := range fracs { + fracsByName[f.Info().Name()] = f + } + + filterDes, err := os.ReadDir(filter.dirPath) + if err != nil { + panic(fmt.Errorf("BUG: reading directory must be successful: %s", err)) + } + + inProgressFilters.Add(1) + + processFracInQueue := func(name string) error { + f, ok := fracsByName[fracNameFromFilePath(name)] + if !ok { // skip missing fracs + return nil + } + + select { + case <-ctx.Done(): + return nil + case fm.rateLimit <- struct{}{}: + filter.processWg.Go(func() { + defer func() { <-fm.rateLimit }() + if err := fm.processFrac(f, filter, false); err != nil { + panic(fmt.Errorf("docs filter process frac err: %s", err)) + } + }) + } + return nil + } + _ = util.VisitFilesWithExt(filterDes, fracInQueueExt, processFracInQueue) + + go func() { + filter.processWg.Wait() + filter.markAsDone() + inProgressFilters.Add(-1) + }() +} + +func (fm *FilterManager) processFrac(f frac.Fraction, filter *Filter, refresh bool) error { + qpr, err := f.Search(fm.ctx, processor.SearchParams{ + AST: filter.ast.Root, + From: seq.MID(filter.params.From), + To: seq.MID(filter.params.To), + Limit: math.MaxInt64, + }) + if err != nil { + return err + } + + queueFilePath := path.Join(filter.dirPath, makeFileName(f.Info().Name(), fracInQueueExt)) + doneFilePath := path.Join(filter.dirPath, makeFileName(f.Info().Name(), fracDoneExt)) + + if len(qpr.IDs) == 0 { + util.RemoveFile(queueFilePath) + return nil + } + + // TODO: here we doing part of the work twice: + // first time we find LIDs inside f.Search() and then find IDs by these LIDs. + // Then we again find LIDs by earlier found IDs in f.FindLIDs(). + // We did it like this because otherwise we had to do serious f.Search() rewrite. + // For now we're ok with some performance penalty. + lids, err := f.FindLIDs(fm.ctx, qpr.IDs.IDs()) + if err != nil { + return err + } + + docsFilterBin := DocsFilterBinIn{LIDs: lids} + if err := writeDocsFilter(&docsFilterBin, queueFilePath, doneFilePath); err != nil { + return err + } + + if !refresh { + fm.addDoneFrac(f.Info().Name(), doneFilePath) + } + + return nil +} + +func (fm *FilterManager) startMaintenance(ctx context.Context) { + fm.maintenanceWG.Go(func() { + logger.Info("start docs filter maintenance") + util.RunEvery(ctx.Done(), fm.maintenanceInterval, func() { + logger.Info("docs filter maintenance iteration") + fm.checkDiskUsage() + }) + }) +} + +func (fm *FilterManager) checkDiskUsage() { + du := int64(0) + + for _, f := range fm.filters { + des, err := os.ReadDir(f.dirPath) + if err != nil { + logger.Error("docs filter: can't read filter's dir", + zap.String("filter", f.String()), zap.Error(err)) + return + } + + for _, fde := range des { + if fde.IsDir() { + continue + } + info, err := fde.Info() + if err != nil { + logger.Error("docs filter: can't read filter file info", + zap.String("filter", f.String()), zap.Error(err)) + return + } + du += info.Size() + } + } + + diskUsage.Set(float64(du)) + storedFilters.Set(float64(len(fm.filters))) +} + +func makeFileName(name, ext string) string { + return name + ext +} + +func fracNameFromFilePath(filterFilePath string) string { + return strings.Split(path.Base(filterFilePath), ".")[0] +} + +var marshalBufferPool util.BufferPool + +func writeDocsFilter(df *DocsFilterBinIn, queueFilePath, doneFilePath string) error { + rawDocsFilter := marshalBufferPool.Get() + defer marshalBufferPool.Put(rawDocsFilter) + + rawDocsFilter.B = marshalDocsFilter(rawDocsFilter.B, df) + util.MustWriteFileAtomic(doneFilePath, rawDocsFilter.B, 0o666, tmpExt) + util.RemoveFile(queueFilePath) + + return nil +} + +// createDataDir creates data dir. +func (fm *FilterManager) createDataDir() { + if err := os.MkdirAll(fm.config.DataDir, 0o777); err != nil { + panic(err) + } +} diff --git a/filtermanager/metrics.go b/filtermanager/metrics.go new file mode 100644 index 00000000..ed47171e --- /dev/null +++ b/filtermanager/metrics.go @@ -0,0 +1,27 @@ +package filtermanager + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + inProgressFilters = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "filters", + Name: "in_progress", + Help: "Number of doc filters in progress", + }) + diskUsage = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "filters", + Name: "disk_usage_bytes", + Help: "Disk space used by filter files in bytes", + }) + storedFilters = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "seq_db_store", + Subsystem: "filters", + Name: "stored", + Help: "Number of active doc filters", + }) +) diff --git a/frac/active.go b/frac/active.go index b586db51..3e890d01 100644 --- a/frac/active.go +++ b/frac/active.go @@ -46,6 +46,7 @@ type Active struct { TokenList *TokenList DocsPositions *DocsPositions + IDsToLIDs *ActiveLIDs docsFile *os.File docsReader storage.DocsReader @@ -86,6 +87,7 @@ func NewActive( f := &Active{ TokenList: NewActiveTokenList(config.IndexWorkers), DocsPositions: NewSyncDocsPositions(), + IDsToLIDs: NewActiveLIDs(), MIDs: NewIDs(), RIDs: NewIDs(), DocBlocks: NewIDs(), @@ -385,6 +387,17 @@ func (f *Active) Search(ctx context.Context, params processor.SearchParams) (*se return dp.Search(params) } +func (f *Active) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + if f.Info().DocsTotal == 0 { // it is empty active fraction state + return nil, nil + } + + dp := f.createDataProvider(ctx) + defer dp.release() + + return dp.FindLIDs(ids) +} + func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider { return &activeDataProvider{ ctx: ctx, @@ -397,6 +410,7 @@ func (f *Active) createDataProvider(ctx context.Context) *activeDataProvider { blocksOffsets: f.DocBlocks.GetVals(), docsPositions: f.DocsPositions, + idsToLids: f.IDsToLIDs, docsReader: &f.docsReader, } } diff --git a/frac/active_index.go b/frac/active_index.go index 350a8e0d..71831c26 100644 --- a/frac/active_index.go +++ b/frac/active_index.go @@ -25,6 +25,7 @@ type activeDataProvider struct { blocksOffsets []uint64 docsPositions *DocsPositions + idsToLids *ActiveLIDs docsReader *storage.DocsReader idsIndex *activeIDsIndex @@ -136,6 +137,16 @@ func (dp *activeDataProvider) Search(params processor.SearchParams) (*seq.QPR, e return res, nil } +func (dp *activeDataProvider) FindLIDs(ids []seq.ID) ([]seq.LID, error) { + res := make([]seq.LID, 0, len(ids)) + for _, id := range ids { + if lid, ok := dp.idsToLids.Get(id); ok { + res = append(res, lid) + } + } + return res, nil +} + type activeIDsIndex struct { mids []uint64 rids []uint64 diff --git a/frac/active_indexer.go b/frac/active_indexer.go index 9f3cc6ae..00b1ccf5 100644 --- a/frac/active_indexer.go +++ b/frac/active_indexer.go @@ -161,6 +161,10 @@ func (ai *ActiveIndexer) appendWorker(index int) { lids := active.AppendIDs(collector.IDs) m.Stop() + m = sw.Start("active_lids_map_set") + active.IDsToLIDs.SetMultiple(collector.IDs, lids) + m.Stop() + m = sw.Start("token_list_append") tokenLIDsPlaces := collector.PrepareTokenLIDsPlaces() active.TokenList.Append(collector.TokensValues, collector.FieldsLengths, tokenLIDsPlaces) diff --git a/frac/active_lids_map.go b/frac/active_lids_map.go new file mode 100644 index 00000000..bae64854 --- /dev/null +++ b/frac/active_lids_map.go @@ -0,0 +1,37 @@ +package frac + +import ( + "sync" + + "github.com/ozontech/seq-db/seq" +) + +type ActiveLIDs struct { + mu *sync.RWMutex + idToLid map[seq.ID]seq.LID +} + +func NewActiveLIDs() *ActiveLIDs { + al := ActiveLIDs{ + mu: &sync.RWMutex{}, + idToLid: make(map[seq.ID]seq.LID), + } + return &al +} + +func (al *ActiveLIDs) Get(id seq.ID) (seq.LID, bool) { + al.mu.RLock() + defer al.mu.RUnlock() + + val, ok := al.idToLid[id] + return val, ok +} + +func (al *ActiveLIDs) SetMultiple(ids []seq.ID, lids []uint32) { + al.mu.Lock() + defer al.mu.Unlock() + + for i, id := range ids { + al.idToLid[id] = seq.LID(lids[i]) + } +} diff --git a/frac/fraction.go b/frac/fraction.go index 59995639..a3027c97 100644 --- a/frac/fraction.go +++ b/frac/fraction.go @@ -21,6 +21,7 @@ type Fraction interface { Contains(mid seq.MID) bool Fetch(context.Context, []seq.ID) ([][]byte, error) Search(context.Context, processor.SearchParams) (*seq.QPR, error) + FindLIDs(context.Context, []seq.ID) ([]seq.LID, error) } var ( diff --git a/frac/remote.go b/frac/remote.go index c2088caa..e15aa73f 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -131,6 +131,16 @@ func (f *Remote) Search(ctx context.Context, params processor.SearchParams) (*se return dp.Search(params) } +func (f *Remote) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + dp, err := f.createDataProvider(ctx) + if err != nil { + return nil, err + } + defer dp.release() + + return dp.FindLIDs(ids) +} + func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, error) { if err := f.load(); err != nil { logger.Error( diff --git a/frac/sealed.go b/frac/sealed.go index c4c033d8..ddae8ccf 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -312,6 +312,13 @@ func (f *Sealed) Search(ctx context.Context, params processor.SearchParams) (*se return dp.Search(params) } +func (f *Sealed) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + dp := f.createDataProvider(ctx) + defer dp.release() + + return dp.FindLIDs(ids) +} + func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { f.load() return &sealedDataProvider{ diff --git a/frac/sealed_index.go b/frac/sealed_index.go index f97c6e84..3899124a 100644 --- a/frac/sealed_index.go +++ b/frac/sealed_index.go @@ -122,6 +122,10 @@ func (dp *sealedDataProvider) Search(params processor.SearchParams) (*seq.QPR, e return qpr, nil } +func (dp *sealedDataProvider) FindLIDs(ids []seq.ID) ([]seq.LID, error) { + return dp.getFetchIndex().findLIDs(ids), nil +} + type sealedIDsIndex struct { fracName string table *seqids.Table diff --git a/fracmanager/proxy_frac.go b/fracmanager/proxy_frac.go index ffc31854..6d4df41f 100644 --- a/fracmanager/proxy_frac.go +++ b/fracmanager/proxy_frac.go @@ -70,6 +70,10 @@ func (p *fractionProxy) Search(ctx context.Context, params processor.SearchParam return p.impl.Search(ctx, params) } +func (p *fractionProxy) FindLIDs(ctx context.Context, ids []seq.ID) ([]seq.LID, error) { + return p.impl.FindLIDs(ctx, ids) +} + // activeProxy manages an active (writable) fraction // Tracks pending write operations and provides freeze capability. // Lifecycle: Created when fraction becomes active, destroyed after sealing. @@ -191,3 +195,7 @@ func (emptyFraction) Search(_ context.Context, params processor.SearchParams) (* metric.CountersTotal.WithLabelValues("empty_data_provider").Inc() return &seq.QPR{Aggs: make([]seq.AggregatableSamples, len(params.AggQ))}, nil } + +func (emptyFraction) FindLIDs(_ context.Context, _ []seq.ID) ([]seq.LID, error) { + return nil, nil +} diff --git a/fracmanager/searcher_test.go b/fracmanager/searcher_test.go index eb70a2d2..bdb07ec6 100644 --- a/fracmanager/searcher_test.go +++ b/fracmanager/searcher_test.go @@ -62,6 +62,10 @@ func (f *testFakeFrac) Search(context.Context, processor.SearchParams) (*seq.QPR return f.qpr, nil } +func (f *testFakeFrac) FindLIDs(context.Context, []seq.ID) ([]seq.LID, error) { + return []seq.LID{}, nil +} + func newFakeFrac(from, to seq.MID, qpr *seq.QPR) *testFakeFrac { return &testFakeFrac{ info: &common.Info{From: from, To: to, DocsTotal: 1}, diff --git a/fracmanager/storage_state.go b/fracmanager/storage_state.go index 262f5a1b..f1aaf950 100644 --- a/fracmanager/storage_state.go +++ b/fracmanager/storage_state.go @@ -2,7 +2,6 @@ package fracmanager import ( "encoding/json" - "fmt" "os" "path/filepath" "sync" @@ -83,46 +82,5 @@ func (m *StateManager) save() error { if err != nil { return err } - return atomicWrite(m.filePath, data, 0o644) -} - -// atomicWrite safely writes data to file using atomic replacement pattern -func atomicWrite(path string, data []byte, perm os.FileMode) error { - tmpPath := path + ".tmp" - f, err := os.OpenFile(tmpPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) - if err != nil { - return fmt.Errorf("create temp file: %w", err) - } - - defer func() { - if f != nil { - f.Close() - } - if err != nil { - os.Remove(tmpPath) - } - }() - - if _, err = f.Write(data); err != nil { - return fmt.Errorf("write data: %w", err) - } - - if err = f.Sync(); err != nil { - return fmt.Errorf("sync data: %w", err) - } - - if err = f.Close(); err != nil { - return fmt.Errorf("close file: %w", err) - } - f = nil // mark as closed so defer doesn't close again - - if err = os.Rename(tmpPath, path); err != nil { - return fmt.Errorf("rename file: %w", err) - } - - if err = util.SyncPath(filepath.Dir(path)); err != nil { // also sync parent directory - return fmt.Errorf("sync dir: %w", err) - } - - return nil + return util.WriteFileAtomic(m.filePath, data, 0o644, ".txt") } diff --git a/util/fs.go b/util/fs.go index 57fd7b17..8f148f6e 100644 --- a/util/fs.go +++ b/util/fs.go @@ -5,31 +5,34 @@ package util import ( "errors" + "fmt" "os" + "path" + "path/filepath" "go.uber.org/zap" "github.com/ozontech/seq-db/logger" ) -func MustSyncPath(path string) { - if err := SyncPath(path); err != nil { - logger.Panic("cannot sync path", zap.String("path", path), zap.Error(err)) +func MustSyncPath(dirPath string) { + if err := SyncPath(dirPath); err != nil { + logger.Panic("cannot sync path", zap.String("path", dirPath), zap.Error(err)) } } -func MustRemoveFileByPath(path string) { - if err := os.Remove(path); err != nil && !errors.Is(err, os.ErrNotExist) { +func MustRemoveFileByPath(fpath string) { + if err := os.Remove(fpath); err != nil && !errors.Is(err, os.ErrNotExist) { logger.Panic( "cannot remove file by path", - zap.String("path", path), + zap.String("path", fpath), zap.Error(err), ) } } -func SyncPath(path string) error { - d, err := os.Open(path) +func SyncPath(dirPath string) error { + d, err := os.Open(dirPath) if err != nil { return err } @@ -53,3 +56,89 @@ func RemoveFile(file string) { logger.Error("file removing error", zap.Error(err)) } } + +func MustWriteFileAtomic(fpath string, data []byte, perm os.FileMode, tmpFileExt string) { + if err := WriteFileAtomic(fpath, data, perm, tmpFileExt); err != nil { + logger.Panic("can't write file atomic", zap.String("path", fpath), zap.Error(err)) + } +} + +// atomicWrite safely writes data to file using atomic replacement pattern +func WriteFileAtomic(fpath string, data []byte, perm os.FileMode, tmpFileExt string) error { + tmpPath := fpath + tmpFileExt + f, err := os.OpenFile(tmpPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + if err != nil { + return fmt.Errorf("create temp file: %w", err) + } + + defer func() { + if f != nil { + f.Close() + } + if err != nil { + os.Remove(tmpPath) + } + }() + + if _, err = f.Write(data); err != nil { + return fmt.Errorf("write data: %w", err) + } + + if err = f.Sync(); err != nil { + return fmt.Errorf("sync data: %w", err) + } + + if err = f.Close(); err != nil { + return fmt.Errorf("close file: %w", err) + } + f = nil // mark as closed so defer doesn't close again + + if err = os.Rename(tmpPath, fpath); err != nil { + return fmt.Errorf("rename file: %w", err) + } + + if err = SyncPath(filepath.Dir(fpath)); err != nil { // also sync parent directory + return fmt.Errorf("sync dir: %w", err) + } + + return nil +} + +func MustFsyncFile(fpath string) { + dirFile, err := os.Open(fpath) + if err != nil { + logger.Panic("can't open dir", zap.Error(err)) + } + if err := dirFile.Sync(); err != nil { + logger.Panic("can't sync dir", zap.Error(err)) + } + if err := dirFile.Close(); err != nil { + logger.Panic("can't close dir", zap.Error(err)) + } +} + +// MustCreateDir creates directory at dirPath. +// Handles the case when directory already exists. +func MustCreateDir(dirPath string) { + err := os.MkdirAll(dirPath, 0o777) + if err != nil && !os.IsExist(err) { + logger.Panic("can't create file", zap.Error(err)) + } +} + +// VisitFilesWithExt traverses all the files with `ext` extension in `des` directory and calls a `cb` func for each of files. +func VisitFilesWithExt(des []os.DirEntry, ext string, cb func(name string) error) error { + for _, de := range des { + if de.IsDir() { + continue + } + name := de.Name() + if path.Ext(name) != ext { + continue + } + if err := cb(name); err != nil { + return err + } + } + return nil +}