Skip to content

Commit 5380a18

Browse files
rui-moLakehouse Engine Bot
authored andcommitted
[11067] Support scan filter for decimal in ORC
Alchemy-item: (ID = 840) [11067] Support scan filter for decimal in ORC commit 1/1 - fee4403
1 parent 98fe203 commit 5380a18

File tree

5 files changed

+370
-14
lines changed

5 files changed

+370
-14
lines changed

velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.cpp

Lines changed: 143 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,16 +75,17 @@ void SelectiveDecimalColumnReader<DataT>::seekToRowGroup(int64_t index) {
7575

7676
template <typename DataT>
7777
template <bool kDense>
78-
void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
79-
vector_size_t numRows = rows.back() + 1;
78+
void SelectiveDecimalColumnReader<DataT>::readHelper(
79+
const common::Filter* filter,
80+
RowSet rows) {
8081
ExtractToReader extractValues(this);
81-
common::AlwaysTrue filter;
82+
common::AlwaysTrue alwaysTrue;
8283
DirectRleColumnVisitor<
8384
int64_t,
8485
common::AlwaysTrue,
8586
decltype(extractValues),
8687
kDense>
87-
visitor(filter, this, rows, extractValues);
88+
visitor(alwaysTrue, this, rows, extractValues);
8889

8990
// decode scale stream
9091
if (version_ == velox::dwrf::RleVersion_1) {
@@ -104,46 +105,176 @@ void SelectiveDecimalColumnReader<DataT>::readHelper(RowSet rows) {
104105
// reset numValues_ before reading values
105106
numValues_ = 0;
106107
valueSize_ = sizeof(DataT);
108+
vector_size_t numRows = rows.back() + 1;
107109
ensureValuesCapacity<DataT>(numRows);
108110

109111
// decode value stream
110112
facebook::velox::dwio::common::
111113
ColumnVisitor<DataT, common::AlwaysTrue, decltype(extractValues), kDense>
112-
valueVisitor(filter, this, rows, extractValues);
114+
valueVisitor(alwaysTrue, this, rows, extractValues);
113115
decodeWithVisitor<DirectDecoder<true>>(valueDecoder_.get(), valueVisitor);
114116
readOffset_ += numRows;
117+
118+
// Fill decimals before applying filter.
119+
fillDecimals();
120+
121+
// 'nullsInReadRange_' is the nulls for the entire read range, and if the row
122+
// set is not dense, result nulls should be allocated, which represents the
123+
// nulls for the selected rows before filtering.
124+
const auto rawNulls = nullsInReadRange_
125+
? (kDense ? nullsInReadRange_->as<uint64_t>() : rawResultNulls_)
126+
: nullptr;
127+
// Process filter.
128+
process(filter, rows, rawNulls);
129+
}
130+
131+
template <typename DataT>
132+
void SelectiveDecimalColumnReader<DataT>::processNulls(
133+
bool isNull,
134+
const RowSet& rows,
135+
const uint64_t* rawNulls) {
136+
if (!rawNulls) {
137+
return;
138+
}
139+
returnReaderNulls_ = false;
140+
anyNulls_ = !isNull;
141+
allNull_ = isNull;
142+
143+
auto rawDecimal = values_->asMutable<DataT>();
144+
auto rawScale = scaleBuffer_->asMutable<int64_t>();
145+
146+
vector_size_t idx = 0;
147+
if (isNull) {
148+
for (vector_size_t i = 0; i < numValues_; i++) {
149+
if (bits::isBitNull(rawNulls, i)) {
150+
bits::setNull(rawResultNulls_, idx);
151+
addOutputRow(rows[i]);
152+
idx++;
153+
}
154+
}
155+
} else {
156+
for (vector_size_t i = 0; i < numValues_; i++) {
157+
if (!bits::isBitNull(rawNulls, i)) {
158+
bits::setNull(rawResultNulls_, idx, false);
159+
rawDecimal[idx] = rawDecimal[i];
160+
rawScale[idx] = rawScale[i];
161+
addOutputRow(rows[i]);
162+
idx++;
163+
}
164+
}
165+
}
166+
}
167+
168+
template <typename DataT>
169+
void SelectiveDecimalColumnReader<DataT>::processFilter(
170+
const common::Filter* filter,
171+
const RowSet& rows,
172+
const uint64_t* rawNulls) {
173+
VELOX_CHECK_NOT_NULL(filter, "Filter must not be null.");
174+
returnReaderNulls_ = false;
175+
anyNulls_ = false;
176+
allNull_ = true;
177+
178+
vector_size_t idx = 0;
179+
auto rawDecimal = values_->asMutable<DataT>();
180+
for (vector_size_t i = 0; i < numValues_; i++) {
181+
if (rawNulls && bits::isBitNull(rawNulls, i)) {
182+
if (filter->testNull()) {
183+
bits::setNull(rawResultNulls_, idx);
184+
addOutputRow(rows[i]);
185+
anyNulls_ = true;
186+
idx++;
187+
}
188+
} else {
189+
bool tested;
190+
if constexpr (std::is_same_v<DataT, int64_t>) {
191+
tested = filter->testInt64(rawDecimal[i]);
192+
} else {
193+
tested = filter->testInt128(rawDecimal[i]);
194+
}
195+
196+
if (tested) {
197+
if (rawNulls) {
198+
bits::setNull(rawResultNulls_, idx, false);
199+
}
200+
rawDecimal[idx] = rawDecimal[i];
201+
addOutputRow(rows[i]);
202+
allNull_ = false;
203+
idx++;
204+
}
205+
}
206+
}
207+
}
208+
209+
template <typename DataT>
210+
void SelectiveDecimalColumnReader<DataT>::process(
211+
const common::Filter* filter,
212+
const RowSet& rows,
213+
const uint64_t* rawNulls) {
214+
if (!filter) {
215+
// No filter and "hasDeletion" is false so input rows will be
216+
// reused.
217+
return;
218+
}
219+
220+
switch (filter->kind()) {
221+
case common::FilterKind::kIsNull:
222+
processNulls(true, rows, rawNulls);
223+
break;
224+
case common::FilterKind::kIsNotNull: {
225+
if (rawNulls) {
226+
processNulls(false, rows, rawNulls);
227+
} else {
228+
for (vector_size_t i = 0; i < numValues_; i++) {
229+
addOutputRow(rows[i]);
230+
}
231+
}
232+
break;
233+
}
234+
default:
235+
processFilter(filter, rows, rawNulls);
236+
}
115237
}
116238

117239
template <typename DataT>
118240
void SelectiveDecimalColumnReader<DataT>::read(
119241
int64_t offset,
120242
const RowSet& rows,
121243
const uint64_t* incomingNulls) {
122-
VELOX_CHECK(!scanSpec_->filter());
123244
VELOX_CHECK(!scanSpec_->valueHook());
124245
prepareRead<int64_t>(offset, rows, incomingNulls);
246+
if (DictionaryValues::hasFilter(scanSpec_->filter()) &&
247+
(!resultNulls_ || !resultNulls_->unique() ||
248+
resultNulls_->capacity() * 8 < rows.size())) {
249+
// Make sure a dedicated resultNulls_ is allocated with enough capacity as
250+
// RleDecoder always assumes it is available.
251+
resultNulls_ = AlignedBuffer::allocate<bool>(rows.size(), memoryPool_);
252+
rawResultNulls_ = resultNulls_->asMutable<uint64_t>();
253+
}
254+
rawValues_ = values_->asMutable<char>();
125255
bool isDense = rows.back() == rows.size() - 1;
126256
if (isDense) {
127-
readHelper<true>(rows);
257+
readHelper<true>(scanSpec_->filter(), rows);
128258
} else {
129-
readHelper<false>(rows);
259+
readHelper<false>(scanSpec_->filter(), rows);
130260
}
131261
}
132262

133263
template <typename DataT>
134264
void SelectiveDecimalColumnReader<DataT>::getValues(
135265
const RowSet& rows,
136266
VectorPtr* result) {
267+
getIntValues(rows, requestedType_, result);
268+
}
269+
270+
template <typename DataT>
271+
void SelectiveDecimalColumnReader<DataT>::fillDecimals() {
137272
auto nullsPtr =
138273
resultNulls() ? resultNulls()->template as<uint64_t>() : nullptr;
139274
auto scales = scaleBuffer_->as<int64_t>();
140275
auto values = values_->asMutable<DataT>();
141-
142276
DecimalUtil::fillDecimals<DataT>(
143277
values, nullsPtr, values, scales, numValues_, scale_);
144-
145-
rawValues_ = values_->asMutable<char>();
146-
getIntValues(rows, requestedType_, result);
147278
}
148279

149280
template class SelectiveDecimalColumnReader<int64_t>;

velox/dwio/dwrf/reader/SelectiveDecimalColumnReader.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,24 @@ class SelectiveDecimalColumnReader : public SelectiveColumnReader {
4949

5050
private:
5151
template <bool kDense>
52-
void readHelper(RowSet rows);
52+
void readHelper(const common::Filter* filter, RowSet rows);
53+
54+
// Process IsNull and IsNotNull filters.
55+
void processNulls(bool isNull, const RowSet& rows, const uint64_t* rawNulls);
56+
57+
// Process filters on decimal values.
58+
void processFilter(
59+
const common::Filter* filter,
60+
const RowSet& rows,
61+
const uint64_t* rawNulls);
62+
63+
// Dispatch to the respective filter processing based on the filter type.
64+
void process(
65+
const common::Filter* filter,
66+
const RowSet& rows,
67+
const uint64_t* rawNulls);
68+
69+
void fillDecimals();
5370

5471
std::unique_ptr<IntDecoder<true>> valueDecoder_;
5572
std::unique_ptr<IntDecoder<true>> scaleDecoder_;

velox/dwio/dwrf/test/E2EFilterTest.cpp

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,62 @@ TEST_F(E2EFilterTest, floatAndDouble) {
241241
false);
242242
}
243243

244+
TEST_F(E2EFilterTest, DISABLED_shortDecimal) {
245+
// ORC write functionality is not yet supported. Enable this test once it
246+
// becomes available and set the file format to ORC at that time.
247+
// options.format = DwrfFormat::kOrc;
248+
const std::unordered_map<std::string, TypePtr> types = {
249+
{"shortdecimal_val:decimal(8, 5)", DECIMAL(8, 5)},
250+
{"shortdecimal_val:decimal(10, 5)", DECIMAL(10, 5)},
251+
{"shortdecimal_val:decimal(17, 5)", DECIMAL(17, 5)}};
252+
253+
for (const auto& pair : types) {
254+
testWithTypes(
255+
pair.first,
256+
[&]() {
257+
makeIntDistribution<int64_t>(
258+
"shortdecimal_val",
259+
10, // min
260+
100, // max
261+
22, // repeats
262+
19, // rareFrequency
263+
-999, // rareMin
264+
30000, // rareMax
265+
true);
266+
},
267+
false,
268+
{"shortdecimal_val"},
269+
20);
270+
}
271+
}
272+
273+
TEST_F(E2EFilterTest, DISABLED_longDecimal) {
274+
// ORC write functionality is not yet supported. Enable this test once it
275+
// becomes available and set the file format to ORC at that time.
276+
// options.format = DwrfFormat::kOrc;
277+
const std::unordered_map<std::string, TypePtr> types = {
278+
{"longdecimal_val:decimal(30, 10)", DECIMAL(30, 10)},
279+
{"longdecimal_val:decimal(37, 15)", DECIMAL(37, 15)}};
280+
for (const auto& pair : types) {
281+
testWithTypes(
282+
pair.first,
283+
[&]() {
284+
makeIntDistribution<int128_t>(
285+
"longdecimal_val",
286+
10, // min
287+
100, // max
288+
22, // repeats
289+
19, // rareFrequency
290+
-999, // rareMin
291+
30000, // rareMax
292+
true);
293+
},
294+
false,
295+
{"longdecimal_val"},
296+
20);
297+
}
298+
}
299+
244300
TEST_F(E2EFilterTest, stringDirect) {
245301
testutil::TestValue::enable();
246302
bool coverage[2][2]{};

0 commit comments

Comments
 (0)