Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/IStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ class IStorage
virtual bool enumerate(callback fn) const = 0;
virtual size_t count() const = 0;

virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) {
virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem) {
beginWriteBatch();
for (size_t ielem = 0; ielem < celem; ++ielem) {
insert(rgkeys[ielem], rgcbkeys[ielem], rgvals[ielem], rgcbvals[ielem], false);
bool fOverwrite = (rgfOverwrite != nullptr) ? rgfOverwrite[ielem] : false;
insert(rgkeys[ielem], rgcbkeys[ielem], rgvals[ielem], rgcbvals[ielem], fOverwrite);
}
endWriteBatch();
}
Expand Down
4 changes: 2 additions & 2 deletions src/StorageCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr
}

long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing);
void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem)
void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem)
{
std::vector<dictEntry*> vechashes;
if (m_pdict != nullptr) {
Expand Down Expand Up @@ -152,7 +152,7 @@ void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, si
}
ul.unlock();

m_spstorage->bulkInsert(rgkeys, rgcbkeys, rgvals, rgcbvals, celem);
m_spstorage->bulkInsert(rgkeys, rgcbkeys, rgvals, rgcbvals, rgfOverwrite, celem);

bulkInsertsInProgress--;
}
Expand Down
2 changes: 1 addition & 1 deletion src/StorageCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class StorageCache
void clear(void(callback)(void*));
void clearAsync();
void insert(sds key, const void *data, size_t cbdata, bool fOverwrite);
void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem);
void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem);
void retrieve(sds key, IStorage::callbackSingle fn) const;
bool erase(sds key);
void emergencyFreeCache();
Expand Down
56 changes: 47 additions & 9 deletions src/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2905,15 +2905,13 @@ void redisDbPersistentData::storeDatabase()
dictReleaseIterator(di);
}

/* static */ void redisDbPersistentData::serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const char *key, bool fUpdate)
/* static */ sds redisDbPersistentData::serializeChange(redisDbPersistentData *db, const char *key)
{
auto itr = db->find_cached_threadsafe(key);
if (itr == nullptr)
return;
return nullptr;
robj *o = itr.val();
sds temp = serializeStoredObjectAndExpire(db, (const char*) itr.key(), o);
storage->insert((sds)key, temp, sdslen(temp), fUpdate);
sdsfree(temp);
return serializeStoredObjectAndExpire(db, (const char*) itr.key(), o);
}

bool redisDbPersistentData::processChanges(bool fSnapshot)
Expand Down Expand Up @@ -2956,10 +2954,30 @@ bool redisDbPersistentData::processChanges(bool fSnapshot)
{
dictIterator *di = dictGetIterator(m_dictChanged);
dictEntry *de;
std::vector<char*> veckeys;
std::vector<size_t> veccbkeys;
std::vector<char*> vecvals;
std::vector<size_t> veccbvals;
std::vector<char> vecoverwrite;
veckeys.reserve(dictSize(m_dictChanged));
veccbkeys.reserve(dictSize(m_dictChanged));
vecvals.reserve(dictSize(m_dictChanged));
veccbvals.reserve(dictSize(m_dictChanged));
vecoverwrite.reserve(dictSize(m_dictChanged));
while ((de = dictNext(di)) != nullptr)
{
serializeAndStoreChange(m_spstorage.get(), this, (const char*)dictGetKey(de), (bool)dictGetVal(de));
sds val = serializeChange(this, (const char*)dictGetKey(de));
if (val != nullptr) {
veckeys.push_back((char*)dictGetKey(de));
veccbkeys.push_back(sdslen((sds)dictGetKey(de)));
vecvals.push_back(val);
veccbvals.push_back(sdslen(val));
vecoverwrite.push_back((bool)dictGetVal(de));
}
}
m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), vecoverwrite.data(), veckeys.size());
for (auto val : vecvals)
sdsfree(val);
dictReleaseIterator(di);
}
}
Expand Down Expand Up @@ -2993,7 +3011,7 @@ void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)
vecvals.push_back(temp);
veccbvals.push_back(sdslen(temp));
}
m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), veckeys.size());
m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), nullptr, veckeys.size());
for (auto val : vecvals)
sdsfree(val);
dictReleaseIterator(di);
Expand All @@ -3004,7 +3022,7 @@ void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)

void redisDbPersistentData::bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem)
{
m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, celem);
m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, nullptr, celem);
}

void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree)
Expand All @@ -3013,10 +3031,30 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **
{
dictIterator *di = dictGetIterator(m_dictChangedStorageFlush);
dictEntry *de;
std::vector<char*> veckeys;
std::vector<size_t> veccbkeys;
std::vector<char*> vecvals;
std::vector<size_t> veccbvals;
std::vector<char> vecoverwrite;
veckeys.reserve(dictSize(m_dictChanged));
veccbkeys.reserve(dictSize(m_dictChanged));
vecvals.reserve(dictSize(m_dictChanged));
veccbvals.reserve(dictSize(m_dictChanged));
vecoverwrite.resize(dictSize(m_dictChanged));
while ((de = dictNext(di)) != nullptr)
{
serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, (const char*)dictGetKey(de), (bool)dictGetVal(de));
sds val = serializeChange((redisDbPersistentData*)m_pdbSnapshotStorageFlush, (const char*)dictGetKey(de));
if (val != nullptr) {
veckeys.push_back((char*)dictGetKey(de));
veccbkeys.push_back(sdslen((sds)dictGetKey(de)));
vecvals.push_back(val);
veccbvals.push_back(sdslen(val));
vecoverwrite.push_back((bool)dictGetVal(de));
}
}
m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), vecoverwrite.data(), veckeys.size());
for (auto val : vecvals)
sdsfree(val);
dictReleaseIterator(di);
dictRelease(m_dictChangedStorageFlush);
m_dictChangedStorageFlush = nullptr;
Expand Down
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,7 @@ class redisDbPersistentData
uint64_t m_mvccCheckpoint = 0;

private:
static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const char *key, bool fUpdate);
static sds serializeChange(redisDbPersistentData *db, const char *key);

void ensure(const char *key);
void ensure(const char *key, dictEntry **de);
Expand Down
12 changes: 10 additions & 2 deletions src/storage/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data,
++m_count;
}

void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem)
void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem)
{
size_t coverwrites = 0;
if (celem >= 16384) {
rocksdb::Options options = DefaultRocksDBOptions();
rocksdb::SstFileWriter sst_file_writer(rocksdb::EnvOptions(), options, options.comparator);
Expand Down Expand Up @@ -92,8 +93,15 @@ void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **
m_spdb->Write(WriteOptions(), spbatch.get());
}

if (rgfOverwrite != nullptr) {
for (size_t ielem = 0; ielem < celem; ++ielem) {
if (rgfOverwrite[ielem])
++coverwrites;
}
}

std::unique_lock<fastlock> l(m_lock);
m_count += celem;
m_count += celem - coverwrites;
}

bool RocksDBStorageProvider::erase(const char *key, size_t cchKey)
Expand Down
2 changes: 1 addition & 1 deletion src/storage/rocksdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class RocksDBStorageProvider : public IStorage
virtual void beginWriteBatch() override;
virtual void endWriteBatch() override;

virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) override;
virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem) override;

virtual void batch_lock() override;
virtual void batch_unlock() override;
Expand Down