Skip to content

Commit 2dc7e9e

Browse files
var-nanvar-nanyezhizi
authored
feat(ts): add the support of TS.ALTER command (#3264)
It closes #3214. --------- Co-authored-by: var-nan <[email protected]> Co-authored-by: DeEMO <[email protected]>
1 parent e19164f commit 2dc7e9e

File tree

5 files changed

+204
-1
lines changed

5 files changed

+204
-1
lines changed

src/commands/cmd_timeseries.cc

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,6 @@ class CommandTSCreateBase : public KeywordCommandBase {
315315
return Status::OK();
316316
}
317317

318-
private:
319318
TSCreateOption create_option_;
320319
};
321320

@@ -342,6 +341,52 @@ class CommandTSCreate : public CommandTSCreateBase {
342341
}
343342
};
344343

344+
class CommandTSAlter : public CommandTSCreateBase {
345+
public:
346+
CommandTSAlter() { registerDefaultHandlers(); }
347+
Status Parse(const std::vector<std::string> &args) override {
348+
if (args.size() < 2) {
349+
return {Status::RedisParseErr, errWrongNumOfArguments};
350+
}
351+
CommandTSCreateBase::setSkipNum(2);
352+
return CommandTSCreateBase::Parse(args);
353+
}
354+
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
355+
auto sc = CommandTSCreateBase::Execute(ctx, srv, conn, output);
356+
if (!sc.IsOK()) return sc;
357+
358+
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
359+
auto s = timeseries_db.Alter(ctx, args_[1], getCreateOption(), mask_);
360+
if (!s.ok() && s.IsInvalidArgument()) return {Status::RedisExecErr, errKeyNotFound};
361+
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
362+
*output = redis::RESP_OK;
363+
return Status::OK();
364+
}
365+
366+
private:
367+
uint8_t mask_ = 0;
368+
369+
void registerDefaultHandlers() override {
370+
using AlterMode = std::underlying_type<TSAlterMode>::type;
371+
registerHandler("RETENTION", [this](TSOptionsParser &parser) {
372+
mask_ |= static_cast<AlterMode>(TSAlterMode::RETENTION);
373+
return handleRetention(parser, create_option_.retention_time);
374+
});
375+
registerHandler("CHUNK_SIZE", [this](TSOptionsParser &parser) {
376+
mask_ |= static_cast<AlterMode>(TSAlterMode::CHUNK_SIZE);
377+
return handleChunkSize(parser, create_option_.chunk_size);
378+
});
379+
registerHandler("DUPLICATE_POLICY", [this](TSOptionsParser &parser) {
380+
mask_ |= static_cast<AlterMode>(TSAlterMode::DUPLICATE_POLICY);
381+
return handleDuplicatePolicy(parser, create_option_.duplicate_policy);
382+
});
383+
registerHandler("LABELS", [this](TSOptionsParser &parser) {
384+
mask_ |= static_cast<AlterMode>(TSAlterMode::LABELS);
385+
return handleLabels(parser, create_option_.labels);
386+
});
387+
}
388+
};
389+
345390
class CommandTSInfo : public Commander {
346391
public:
347392
Status Parse(const std::vector<std::string> &args) override { return Commander::Parse(args); }
@@ -1227,6 +1272,7 @@ class CommandTSQueryIndex : public Commander {
12271272
};
12281273

12291274
REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create", -2, "write", 1, 1, 1),
1275+
MakeCmdAttr<CommandTSAlter>("ts.alter", -2, "write", 1, 1, 1),
12301276
MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1, 1),
12311277
MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1, -3, 1),
12321278
MakeCmdAttr<CommandTSRange>("ts.range", -4, "read-only", 1, 1, 1),

src/types/redis_timeseries.cc

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1860,6 +1860,68 @@ rocksdb::Status TimeSeries::Create(engine::Context &ctx, const Slice &user_key,
18601860
return createTimeSeries(ctx, ns_key, &metadata, &option);
18611861
}
18621862

1863+
rocksdb::Status TimeSeries::Alter(engine::Context &ctx, const Slice &user_key, const TSCreateOption &option,
1864+
uint8_t mask) {
1865+
std::string ns_key = AppendNamespacePrefix(user_key);
1866+
TimeSeriesMetadata metadata;
1867+
if (auto s = getTimeSeriesMetadata(ctx, ns_key, &metadata); !s.ok()) {
1868+
return rocksdb::Status::InvalidArgument("key not exists");
1869+
}
1870+
auto batch = storage_->GetWriteBatchBase();
1871+
WriteBatchLogData log_data(kRedisTimeSeries, {"Alter"});
1872+
if (auto s = batch->PutLogData(log_data.Encode()); !s.ok()) return s;
1873+
1874+
using AlterMode = std::underlying_type<TSAlterMode>::type;
1875+
bool update_retention = mask & static_cast<AlterMode>(TSAlterMode::RETENTION);
1876+
bool update_chunk_size = mask & static_cast<AlterMode>(TSAlterMode::CHUNK_SIZE);
1877+
bool update_duplicate_policy = mask & static_cast<AlterMode>(TSAlterMode::DUPLICATE_POLICY);
1878+
bool update_labels = mask & static_cast<AlterMode>(TSAlterMode::LABELS);
1879+
1880+
if (update_retention || update_chunk_size || update_duplicate_policy) {
1881+
metadata.retention_time = update_retention ? option.retention_time : metadata.retention_time;
1882+
metadata.chunk_size = update_chunk_size ? option.chunk_size : metadata.chunk_size;
1883+
metadata.duplicate_policy = update_duplicate_policy ? option.duplicate_policy : metadata.duplicate_policy;
1884+
std::string bytes;
1885+
metadata.Encode(&bytes);
1886+
if (auto s = batch->Put(metadata_cf_handle_, ns_key, bytes); !s.ok()) return s;
1887+
}
1888+
if (update_labels) {
1889+
LabelKVList prev_labels;
1890+
if (auto s = getLabelKVList(ctx, ns_key, metadata, &prev_labels); !s.ok()) return s;
1891+
1892+
std::unordered_map<std::string, std::pair<std::string, bool>> labels_map; // True : (key,val) should be updated
1893+
for (auto &label : option.labels) {
1894+
labels_map.insert(std::make_pair(label.k, std::make_pair(label.v, true)));
1895+
}
1896+
for (auto &label : prev_labels) {
1897+
if (auto it = labels_map.find(label.k); it != labels_map.end()) {
1898+
it->second.second = it->second.first != label.v;
1899+
if (it->second.second) { // Remove reverse index key. Val is updated later.
1900+
auto rev_index_key = TSRevLabelKey(namespace_, label.k, label.v, user_key).Encode();
1901+
if (auto s = batch->Delete(index_cf_handle_, rev_index_key); !s.ok()) return s;
1902+
} // Else key-val pair unchanged
1903+
} else { // Remove label.
1904+
auto internal_key = internalKeyFromLabelKey(ns_key, metadata, label.k);
1905+
if (auto s = batch->Delete(internal_key); !s.ok()) return s;
1906+
1907+
auto rev_index_key = TSRevLabelKey(namespace_, label.k, label.v, user_key).Encode();
1908+
if (auto s = batch->Delete(index_cf_handle_, rev_index_key); !s.ok()) return s;
1909+
}
1910+
}
1911+
1912+
for (auto &[k, v] : labels_map) {
1913+
if (v.second) { // Update label and insert reverse-index
1914+
auto internal_key = internalKeyFromLabelKey(ns_key, metadata, k);
1915+
if (auto s = batch->Put(internal_key, v.first); !s.ok()) return s;
1916+
1917+
auto rev_index_key = TSRevLabelKey(namespace_, k, v.first, user_key).Encode();
1918+
if (auto s = batch->Put(index_cf_handle_, rev_index_key, Slice()); !s.ok()) return s;
1919+
}
1920+
}
1921+
}
1922+
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
1923+
}
1924+
18631925
rocksdb::Status TimeSeries::Add(engine::Context &ctx, const Slice &user_key, TSSample sample,
18641926
const TSCreateOption &option, AddResult *res, const DuplicatePolicy *on_dup_policy) {
18651927
std::string ns_key = AppendNamespacePrefix(user_key);

src/types/redis_timeseries.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,14 @@ enum class TSCreateRuleResult : uint8_t {
253253
kSrcEqDst = 6,
254254
};
255255

256+
enum class TSAlterMode : uint8_t {
257+
RETENTION = 1,
258+
CHUNK_SIZE = 1 << 1,
259+
DUPLICATE_POLICY = 1 << 2,
260+
IGNORE = 1 << 3,
261+
LABELS = 1 << 4,
262+
};
263+
256264
std::vector<TSSample> GroupSamplesAndReduce(const std::vector<std::vector<TSSample>> &all_samples,
257265
TSMRangeOption::GroupReducerType reducer_type);
258266

@@ -267,6 +275,7 @@ class TimeSeries : public SubKeyScanner {
267275
TimeSeries(engine::Storage *storage, const std::string &ns)
268276
: SubKeyScanner(storage, ns), index_cf_handle_(storage->GetCFHandle(ColumnFamilyID::Index)) {}
269277
rocksdb::Status Create(engine::Context &ctx, const Slice &user_key, const TSCreateOption &option);
278+
rocksdb::Status Alter(engine::Context &ctx, const Slice &user_key, const TSCreateOption &option, uint8_t mask);
270279
rocksdb::Status Add(engine::Context &ctx, const Slice &user_key, TSSample sample, const TSCreateOption &option,
271280
AddResult *res, const DuplicatePolicy *on_dup_policy = nullptr);
272281
rocksdb::Status MAdd(engine::Context &ctx, const Slice &user_key, std::vector<TSSample> samples,

tests/cppunit/types/timeseries_test.cc

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,82 @@ TEST_F(TimeSeriesTest, Create) {
5353
EXPECT_EQ(s.ToString(), "Invalid argument: key already exists");
5454
}
5555

56+
TEST_F(TimeSeriesTest, Alter) {
57+
redis::TSCreateOption option;
58+
option.retention_time = 3600;
59+
option.chunk_size = 1024;
60+
option.labels = {{"type", "runtime"}, {"compiler", "gcc"}, {"machine", "Linux"}};
61+
key_ = "pa";
62+
63+
EXPECT_TRUE(ts_db_->Create(*ctx_, key_, option).ok());
64+
65+
redis::TSInfoResult res;
66+
option.retention_time = 200;
67+
EXPECT_TRUE(ts_db_->Alter(*ctx_, key_, option, static_cast<uint8_t>(redis::TSAlterMode::RETENTION)).ok());
68+
EXPECT_TRUE(ts_db_->Info(*ctx_, key_, &res).ok());
69+
EXPECT_EQ(res.metadata.retention_time, 200);
70+
EXPECT_EQ(res.metadata.chunk_size, 1024);
71+
EXPECT_EQ(res.labels.size(), 3);
72+
73+
// Update chunk size and verify other fields are unaffected.
74+
option.chunk_size = 128;
75+
EXPECT_TRUE(ts_db_->Alter(*ctx_, key_, option, static_cast<uint8_t>(redis::TSAlterMode::CHUNK_SIZE)).ok());
76+
ts_db_->Info(*ctx_, key_, &res);
77+
EXPECT_EQ(res.metadata.retention_time, 200);
78+
EXPECT_EQ(res.metadata.chunk_size, 128);
79+
EXPECT_EQ(res.labels.size(), 3);
80+
81+
// Verify records are properly inserted.
82+
std::vector<TSSample> samples = {{10, 23}, {12, 24.5}};
83+
std::vector<TSChunk::AddResult> results;
84+
results.resize(samples.size());
85+
EXPECT_TRUE(ts_db_->MAdd(*ctx_, key_, samples, &results).ok());
86+
EXPECT_EQ(results[0].sample.ts, 10);
87+
EXPECT_EQ(results[1].sample.ts, 12);
88+
89+
// Update labels and verify
90+
res.labels.clear();
91+
option.labels = {{"compiler", "gcc_12"}, {"version", "123"}};
92+
EXPECT_TRUE(ts_db_->Alter(*ctx_, key_, option, static_cast<uint8_t>(redis::TSAlterMode::LABELS)).ok());
93+
EXPECT_TRUE(ts_db_->Info(*ctx_, key_, &res).ok());
94+
EXPECT_EQ(res.metadata.retention_time, 200);
95+
EXPECT_EQ(res.metadata.chunk_size, 128);
96+
EXPECT_EQ(res.labels.size(), 2);
97+
redis::LabelKVPair first = {"version", "123"}, second = {"compiler", "gcc_12"};
98+
EXPECT_TRUE(std::find_if(res.labels.begin(), res.labels.end(), [first](const auto &label) {
99+
return first.k == label.k && first.v == label.v;
100+
}) != res.labels.end());
101+
EXPECT_TRUE(std::find_if(res.labels.begin(), res.labels.end(), [second](const auto &label) {
102+
return second.k == label.k && second.v == label.v;
103+
}) != res.labels.end());
104+
105+
// Verify reverse-indexes are properly updated.
106+
key_ = "pavani";
107+
redis::TSCreateOption second_option;
108+
second_option.labels = {{"compiler", "gcc_12"}, {"Desktop", "Lubuntu"}};
109+
EXPECT_TRUE(ts_db_->Create(*ctx_, key_, second_option).ok());
110+
111+
redis::TSMGetOption::FilterOption filters;
112+
filters.labels_equals = {{"compiler", {"gcc_12"}}};
113+
std::vector<std::string> query_res;
114+
EXPECT_TRUE(ts_db_->QueryIndex(*ctx_, filters, &query_res).ok());
115+
EXPECT_EQ(query_res.size(), 2);
116+
EXPECT_TRUE(std::find(query_res.begin(), query_res.end(), "pa") != query_res.end());
117+
EXPECT_TRUE(std::find(query_res.begin(), query_res.end(), "pavani") != query_res.end());
118+
119+
// old labels should be deleted.
120+
filters.labels_equals.clear();
121+
filters.labels_equals = {{"machine", {"Linux"}}};
122+
query_res.clear();
123+
EXPECT_TRUE(ts_db_->QueryIndex(*ctx_, filters, &query_res).ok());
124+
EXPECT_TRUE(query_res.empty());
125+
126+
key_ = "pavni";
127+
auto s = ts_db_->Alter(*ctx_, key_, option, 1);
128+
EXPECT_FALSE(s.ok());
129+
EXPECT_EQ(s.ToString(), "Invalid argument: key not exists");
130+
}
131+
56132
TEST_F(TimeSeriesTest, Add) {
57133
redis::TSCreateOption option;
58134
option.retention_time = 3600;

tests/gocase/unit/type/timeseries/timeseries_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,16 @@ func testTimeSeries(t *testing.T, configs util.KvrocksServerConfigs) {
167167
require.Equal(t, int64(2), vals[11])
168168
})
169169

170+
t.Run("TS.ALTER Verify Updates", func(t *testing.T) {
171+
require.NoError(t, rdb.Do(ctx, "ts.create", key, "retention", "3600", "chunk_size", "1024",
172+
"LABELS", "type", "runtime", "compiler", "gcc", "machine", "gcc").Err())
173+
174+
require.NoError(t, rdb.Do(ctx, "ts.alter", key, "retention", "200").Err())
175+
vals, err := rdb.Do(ctx, "ts.info", key).Slice()
176+
require.NoError(t, err)
177+
require.Equal(t, int64(200), vals[9])
178+
})
179+
170180
t.Run("TS.ADD Basic Add", func(t *testing.T) {
171181
require.NoError(t, rdb.Del(ctx, key).Err())
172182
require.NoError(t, rdb.Do(ctx, "ts.create", key).Err())

0 commit comments

Comments
 (0)