Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 239 additions & 0 deletions src/ailego/buffer/buffer_pool.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
#include <zvec/ailego/buffer/buffer_pool.h>
#include <zvec/core/framework/index_logger.h>

namespace zvec {
namespace ailego {

int LRUCache::init(size_t block_size) {
block_size_ = block_size;
for (size_t i = 0; i < CATCH_QUEUE_NUM; i++) {
queues_.push_back(ConcurrentQueue(block_size));
}
return 0;
}

bool LRUCache::evict_single_block(BlockType &item) {
bool found = false;
for (size_t i = 0; i < CATCH_QUEUE_NUM; i++) {
found = queues_[i].try_dequeue(item);
if (found) {
break;
}
}
return found;
}

bool LRUCache::add_single_block(const LPMap *lp_map, const BlockType &block,
int block_type) {
bool ok = queues_[block_type].try_enqueue(block);
evict_queue_insertions_.fetch_add(1, std::memory_order_relaxed);
if (evict_queue_insertions_ % block_size_ == 0) {
this->clear_dead_node(lp_map);
}
return ok;
}

void LRUCache::clear_dead_node(const LPMap *lp_map) {
for (int i = 0; i < CATCH_QUEUE_NUM; i++) {
int clear_size = block_size_ * 2;
if (queues_[i].size_approx() < clear_size * 4) {
continue;
}
int clear_count = 0;
ConcurrentQueue tmp(block_size_);
BlockType item;
while (queues_[i].try_dequeue(item) && (clear_count++ < clear_size)) {
if (!lp_map->isDeadBlock(item)) {
tmp.try_enqueue(item);
}
}
while (tmp.try_dequeue(item)) {
if (!lp_map->isDeadBlock(item)) {
queues_[i].try_enqueue(item);
}
}
}
}

void LPMap::init(size_t entry_num) {
if (entries_) {
delete[] entries_;
}
entry_num_ = entry_num;
entries_ = new Entry[entry_num_];
for (size_t i = 0; i < entry_num_; i++) {
entries_[i].ref_count.store(std::numeric_limits<int>::min());
entries_[i].load_count.store(0);
entries_[i].buffer = nullptr;
}
cache_.init(entry_num * 4);
}

char *LPMap::acquire_block(block_id_t block_id) {
assert(block_id < entry_num_);
Entry &entry = entries_[block_id];
if (entry.ref_count.load(std::memory_order_relaxed) == 0) {
entry.load_count.fetch_add(1, std::memory_order_relaxed);
}
entry.ref_count.fetch_add(1, std::memory_order_relaxed);
if (entry.ref_count.load(std::memory_order_relaxed) < 0) {
return nullptr;
}
return entry.buffer;
}

void LPMap::release_block(block_id_t block_id) {
assert(block_id < entry_num_);
Entry &entry = entries_[block_id];

if (entry.ref_count.fetch_sub(1, std::memory_order_release) == 1) {
std::atomic_thread_fence(std::memory_order_acquire);
LRUCache::BlockType block;
block.first = block_id;
block.second = entry.load_count.load();
cache_.add_single_block(this, block, 0);
}
}

char *LPMap::evict_block(block_id_t block_id) {
assert(block_id < entry_num_);
Entry &entry = entries_[block_id];
int expected = 0;
if (entry.ref_count.compare_exchange_strong(
expected, std::numeric_limits<int>::min())) {
char *buffer = entry.buffer;
entry.buffer = nullptr;
return buffer;
} else {
return nullptr;
}
}

char *LPMap::set_block_acquired(block_id_t block_id, char *buffer) {
assert(block_id < entry_num_);
Entry &entry = entries_[block_id];
if (entry.ref_count.load(std::memory_order_relaxed) >= 0) {
entry.ref_count.fetch_add(1, std::memory_order_relaxed);
return entry.buffer;
}
entry.buffer = buffer;
entry.ref_count.store(1, std::memory_order_relaxed);
entry.load_count.fetch_add(1, std::memory_order_relaxed);
return buffer;
}

void LPMap::recycle(moodycamel::ConcurrentQueue<char *> &free_buffers) {
LRUCache::BlockType block;
do {
bool ok = cache_.evict_single_block(block);
if (!ok) {
return;
}
} while (isDeadBlock(block));
char *buffer = evict_block(block.first);
if (buffer) {
free_buffers.try_enqueue(buffer);
}
}

VecBufferPool::VecBufferPool(const std::string &filename) {
fd_ = open(filename.c_str(), O_RDONLY);
if (fd_ < 0) {
throw std::runtime_error("Failed to open file: " + filename);
}
struct stat st;
if (fstat(fd_, &st) < 0) {
throw std::runtime_error("Failed to stat file: " + filename);
}
file_size_ = st.st_size;
}

int VecBufferPool::init(size_t pool_capacity, size_t block_size) {
pool_capacity_ = pool_capacity;
size_t buffer_num = pool_capacity_ / block_size + 10;
size_t block_num = file_size_ / block_size + 10;
lp_map_.init(block_num);
for (size_t i = 0; i < buffer_num; i++) {
char *buffer = (char *)aligned_alloc(64, block_size);
if (buffer != nullptr) {
bool ok = free_buffers_.try_enqueue(buffer);
}
}
LOG_DEBUG("Buffer pool num: %zu, entry num: %zu", buffer_num,
lp_map_.entry_num());
return 0;
}

VecBufferPoolHandle VecBufferPool::get_handle() {
return VecBufferPoolHandle(*this);
}

char *VecBufferPool::acquire_buffer(block_id_t block_id, size_t offset,
size_t size, int retry) {
char *buffer = lp_map_.acquire_block(block_id);
if (buffer) {
return buffer;
}
{
bool found = free_buffers_.try_dequeue(buffer);
if (!found) {
for (int i = 0; i < retry; i++) {
lp_map_.recycle(free_buffers_);
found = free_buffers_.try_dequeue(buffer);
if (found) {
break;
}
}
}
if (!found) {
LOG_ERROR("Buffer pool failed to get free buffer");
return nullptr;
}
}

ssize_t read_bytes = pread(fd_, buffer, size, offset);
if (read_bytes != static_cast<ssize_t>(size)) {
LOG_ERROR("Buffer pool failed to read file at offset: %zu", offset);
return nullptr;
}
char *placed_buffer = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
placed_buffer = lp_map_.set_block_acquired(block_id, buffer);
}
if (placed_buffer != buffer) {
// another thread has set the block
free_buffers_.try_enqueue(buffer);
}
return placed_buffer;
}

int VecBufferPool::get_meta(size_t offset, size_t length, char *buffer) {
ssize_t read_bytes = pread(fd_, buffer, length, offset);
if (read_bytes != static_cast<ssize_t>(length)) {
LOG_ERROR("Buffer pool failed to read file at offset: %zu", offset);
return -1;
}
return 0;
}

char *VecBufferPoolHandle::get_block(size_t offset, size_t size,
size_t block_id) {
char *buffer = pool.acquire_buffer(block_id, offset, size, 5);
return buffer;
}

int VecBufferPoolHandle::get_meta(size_t offset, size_t length, char *buffer) {
return pool.get_meta(offset, length, buffer);
}

void VecBufferPoolHandle::release_one(block_id_t block_id) {
pool.lp_map_.release_block(block_id);
}

void VecBufferPoolHandle::acquire_one(block_id_t block_id) {
pool.lp_map_.acquire_block(block_id);
}

} // namespace ailego
} // namespace zvec
10 changes: 9 additions & 1 deletion src/core/algorithm/flat/flat_streamer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,18 @@ class FlatStreamerContext : public IndexStreamer::Context {
group_topk_heaps_.clear();
}

void reset() override {}
void reset() override {
for (auto &it : results_) {
it.clear();
}
for (auto &it : group_results_) {
it.clear();
}
}

//! Reset the context
void reset(const FlatStreamer<BATCH_SIZE> *owner) {
this->reset();
magic_ = owner->magic();
feature_size_ = owner->meta().element_size();

Expand Down
4 changes: 4 additions & 0 deletions src/core/algorithm/hnsw/hnsw_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ class HnswContext : public IndexContext {

//! Reset context
void reset(void) override {
this->clear();
set_filter(nullptr);
reset_threshold();
set_fetch_vector(false);
Expand Down Expand Up @@ -422,6 +423,9 @@ class HnswContext : public IndexContext {
for (auto &it : results_) {
it.clear();
}
for (auto &it : group_results_) {
it.clear();
}
}

uint32_t *mutable_stats_get_neighbors() {
Expand Down
3 changes: 1 addition & 2 deletions src/core/algorithm/hnsw/hnsw_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ struct Neighbors {
Neighbors(uint32_t cnt_in, const node_id_t *data_in)
: cnt{cnt_in}, data{data_in} {}

Neighbors(IndexStorage::MemoryBlock &&mem_block)
: neighbor_block{std::move(mem_block)} {
Neighbors(IndexStorage::MemoryBlock &mem_block) : neighbor_block{mem_block} {
auto hd = reinterpret_cast<const NeighborsHeader *>(neighbor_block.data());
cnt = hd->neighbor_cnt;
data = hd->neighbors;
Expand Down
2 changes: 1 addition & 1 deletion src/core/algorithm/hnsw/hnsw_streamer_entity.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ const Neighbors HnswStreamerEntity::get_neighbors(level_t level,
LOG_ERROR("Read neighbor header failed, ret=%zu", size);
return Neighbors();
}
return Neighbors(std::move(neighbor_block));
return Neighbors(neighbor_block);
}

//! Get vector data by key
Expand Down
7 changes: 5 additions & 2 deletions src/core/interface/index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -406,8 +406,9 @@ int Index::Search(const VectorData &vector_data,
}

// dense support refiner, but sparse doesn't
int ret = 0;
if (search_param->refiner_param == nullptr) {
return _dense_search(vector_data, search_param, result, context);
ret = _dense_search(vector_data, search_param, result, context);
} else {
auto &reference_index = search_param->refiner_param->reference_index;
if (reference_index == nullptr) {
Expand Down Expand Up @@ -441,8 +442,10 @@ int Index::Search(const VectorData &vector_data,
// TODO: should copy other params?
flat_search_param->bf_pks = std::make_shared<std::vector<uint64_t>>(keys);

return reference_index->Search(vector_data, flat_search_param, result);
ret = reference_index->Search(vector_data, flat_search_param, result);
}
context->reset();
return ret;
}


Expand Down
Loading