Skip to content

Conversation

@var-nan
Copy link
Contributor

@var-nan var-nan commented Nov 15, 2025

It closes #3217 .
rangeCommon() in redis_timeseries.cc will add two samples, prev_sample and next_sample to the res vector when the aggregator is twa. These two samples are used to compute the area of the polygons that are at the front of the first bucket and at the end of last bucket.

prev_sample is the biggest sample in the data with timestamp less than or equal to first sample of filtered range and next_sample is the smallest sample in the data with timestamp greater than or equal to the last sample of filtered range.

TODO: test TWA with FILTER_BY_TS/FILTER_BY_VALUE option.

var-nan and others added 2 commits November 14, 2025 23:04
rangeCommon() in redis_timeseries.cc will add two samples, 'prev_sample' and 'next_sample'
to the 'res' vector when the aggregator is twa. These two samples are used to compute the
area of the polygons that are at the front of the first bucket and at the end of last bucket.

prev_sample is the biggest sample in the data with timestamp less than or equal to first sample
of filtered range and next_sample is the smallest sample in the data with timestamp greater than
or equal to the last sample of filtered range.

TODO: test TWA with FILTER_BY_TS/FILTER_BY_VALUE option.
@PragmaTwice PragmaTwice requested a review from yezhizi November 17, 2025 15:25
When filtered with FILTER_BY_TS/FILTER_BY_VALUE, the `next` and `prev`
samples are discarded while computing the area.
@var-nan
Copy link
Contributor Author

var-nan commented Nov 17, 2025

279c9bd will correctly calculate TWA when samples are filtered with FILTER_BY_TS/FITLER_BY_VALUE.

@PragmaTwice
Copy link
Member

PragmaTwice commented Nov 18, 2025

/home/runner/work/kvrocks/kvrocks/src/types/redis_timeseries.cc:91:70: error: variable 'prev_available' is not initialized [cppcoreguidelines-init-variables,-warnings-as-errors]
Suppressed 8485 warnings (8464 in non-user code, 21 NOLINT).
   91 |   bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, prev_available, next_available;
Use -header-filter=.* to display errors from all non-system headers. Use -system-headers to display errors from system headers as well.
2 warnings treated as errors
      |                                                                      ^             
      |                                                                                     = false
/home/runner/work/kvrocks/kvrocks/src/types/redis_timeseries.cc:91:86: error: variable 'next_available' is not initialized [cppcoreguidelines-init-variables,-warnings-as-errors]
   91 |   bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, prev_available, next_available;
      |                                                                                      ^             
      |                                                                                                     = false

The CI failed due to a clang-tidy report in the changes. Could you fix it to pass the CI?

@var-nan
Copy link
Contributor Author

var-nan commented Nov 18, 2025

Some of these errors didn't show up when I ran ./x.py check tidy locally; I guess it's due to a version change.

@yezhizi
Copy link
Contributor

yezhizi commented Nov 18, 2025

Sorry for the wait! Been a bit busy these days. I'll review this later today. : )

@var-nan
Copy link
Contributor Author

var-nan commented Nov 26, 2025

Thanks @yezhizi . I was about to push the clang-tidy fixes.

@sonarqubecloud
Copy link

Comment on lines +124 to +135
TSSample prev_sample, next_sample;
bool is_twa_aggregator = aggregator.type == TSAggregatorType::TWA, prev_available = false, next_available = false;
if (is_twa_aggregator) {
const bool discard_boundaries = !option.filter_by_ts.empty() || option.filter_by_value.has_value();
next_sample = samples.back();
samples.pop_back();
prev_sample = samples.back();
samples.pop_back();
// When FILTER_BY_TS/FILTER_BY_VALUE is enabled, discard out-of-boundary samples.
prev_available = discard_boundaries ? false : !samples.empty() && (samples.front().ts != prev_sample.ts);
next_available = discard_boundaries ? false : !samples.empty() && (samples.back().ts != next_sample.ts);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can pass next_sample, next_available, etc. as function parameters instead of calculating them inside the AggregateSamplesByRangeOption function. For example, we can add a struct:

struct TWABounds {
    std::optional<TSSample> prev_sample;
    std::optional<TSSample> next_sample;
};

And modify the function interface to:

AggregateSamplesByRangeOption(std::vector<TSSample> samples, const TSRangeOption &option, const TWABounds&)

Comment on lines +177 to +193
auto non_empty_left_bucket_idx = [&spans](size_t curr) {
while (--curr && spans[curr].empty());
return curr;
};
auto non_empty_right_bucket_idx = [&spans](size_t curr) {
while (++curr < spans.size() && spans[curr].empty());
return curr;
};

std::vector<std::pair<TSSample, TSSample>> neighbors;
neighbors.reserve(spans.size());
for (size_t i = 0; i < spans.size(); i++) {
TSSample prev = (i != 0) ? spans[non_empty_left_bucket_idx(i)].back() : prev_sample;
TSSample next = (i != (spans.size() - 1)) ? spans[non_empty_right_bucket_idx(i)].front() : next_sample;
neighbors.emplace_back(prev, next);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nested while loops inside the for loop result in O(N^2) time complexity, which can be optimized to O(N). We could:

  1. Iterate from 0 to N to resolve all prev neighbors by maintaining a "last seen non-empty" variable.
  2. Iterate from N to 0 to resolve all next neighbors similarly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

TimeSeries: Support twa aggregator for range queries

4 participants