Skip to content

Commit adc4631

Browse files
committed
Support struct column reading with different schemas (5962)
1 parent 61ee125 commit adc4631

File tree

11 files changed

+223
-35
lines changed

11 files changed

+223
-35
lines changed

velox/dwio/common/SelectiveStructColumnReader.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,7 @@ void SelectiveStructColumnReaderBase::read(
130130
activeRows = outputRows_;
131131
}
132132

133-
auto& childSpecs = scanSpec_->children();
134-
VELOX_CHECK(!childSpecs.empty());
133+
auto& childSpecs = scanSpec_->stableChildren();
135134
for (size_t i = 0; i < childSpecs.size(); ++i) {
136135
auto& childSpec = childSpecs[i];
137136
if (isChildConstant(*childSpec)) {
@@ -216,7 +215,7 @@ bool SelectiveStructColumnReaderBase::isChildConstant(
216215
fileType_->type()->kind() !=
217216
TypeKind::MAP && // If this is the case it means this is a flat map,
218217
// so it can't have "missing" fields.
219-
childSpec.channel() >= fileType_->size());
218+
!fileType_->containsChild(childSpec.fieldName()));
220219
}
221220

222221
namespace {
@@ -296,7 +295,6 @@ void setNullField(vector_size_t size, VectorPtr& field) {
296295
void SelectiveStructColumnReaderBase::getValues(
297296
RowSet rows,
298297
VectorPtr* result) {
299-
VELOX_CHECK(!scanSpec_->children().empty());
300298
VELOX_CHECK(
301299
*result != nullptr,
302300
"SelectiveStructColumnReaderBase expects a non-null result");
@@ -331,7 +329,7 @@ void SelectiveStructColumnReaderBase::getValues(
331329
resultRow->clearNulls(0, rows.size());
332330
}
333331
bool lazyPrepared = false;
334-
for (auto& childSpec : scanSpec_->children()) {
332+
for (auto& childSpec : scanSpec_->stableChildren()) {
335333
if (!childSpec->projectOut()) {
336334
continue;
337335
}

velox/dwio/common/TypeWithId.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ class TypeWithId : public velox::Tree<std::shared_ptr<const TypeWithId>> {
5959

6060
const std::shared_ptr<const TypeWithId>& childAt(uint32_t idx) const override;
6161

62+
bool containsChild(const std::string& name) const {
63+
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);
64+
return type_->as<velox::TypeKind::ROW>().containsChild(name);
65+
}
66+
6267
const std::shared_ptr<const TypeWithId>& childByName(
6368
const std::string& name) const {
6469
VELOX_CHECK_EQ(type_->kind(), velox::TypeKind::ROW);

velox/dwio/parquet/reader/ParquetColumnReader.cpp

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ namespace facebook::velox::parquet {
3636
std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
3737
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
3838
ParquetParams& params,
39-
common::ScanSpec& scanSpec) {
39+
common::ScanSpec& scanSpec,
40+
const TypePtr& outputType,
41+
memory::MemoryPool& pool) {
4042
auto colName = scanSpec.fieldName();
4143

4244
switch (dataType->type()->kind()) {
@@ -56,17 +58,34 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
5658
dataType->type(), dataType, params, scanSpec);
5759

5860
case TypeKind::ROW:
59-
return std::make_unique<StructColumnReader>(dataType, params, scanSpec);
61+
return std::make_unique<StructColumnReader>(
62+
dataType,
63+
params,
64+
scanSpec,
65+
outputType ? asRowType(outputType) : nullptr,
66+
pool);
6067

6168
case TypeKind::VARBINARY:
6269
case TypeKind::VARCHAR:
6370
return std::make_unique<StringColumnReader>(dataType, params, scanSpec);
6471

6572
case TypeKind::ARRAY:
66-
return std::make_unique<ListColumnReader>(dataType, params, scanSpec);
73+
return std::make_unique<ListColumnReader>(
74+
dataType,
75+
params,
76+
scanSpec,
77+
outputType ? std::dynamic_pointer_cast<const ArrayType>(outputType)
78+
: nullptr,
79+
pool);
6780

6881
case TypeKind::MAP:
69-
return std::make_unique<MapColumnReader>(dataType, params, scanSpec);
82+
return std::make_unique<MapColumnReader>(
83+
dataType,
84+
params,
85+
scanSpec,
86+
outputType ? std::dynamic_pointer_cast<const MapType>(outputType)
87+
: nullptr,
88+
pool);
7089

7190
case TypeKind::BOOLEAN:
7291
return std::make_unique<BooleanColumnReader>(dataType, params, scanSpec);

velox/dwio/parquet/reader/ParquetColumnReader.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ class ParquetColumnReader {
4444
static std::unique_ptr<dwio::common::SelectiveColumnReader> build(
4545
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
4646
ParquetParams& params,
47-
common::ScanSpec& scanSpec);
47+
common::ScanSpec& scanSpec,
48+
const TypePtr& outputType,
49+
memory::MemoryPool& pool);
4850
};
4951
} // namespace facebook::velox::parquet

velox/dwio/parquet/reader/ParquetReader.cpp

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ class ReaderBase {
8181
/// the data still exists in the buffered inputs.
8282
bool isRowGroupBuffered(int32_t rowGroupIndex) const;
8383

84+
/// @brief Convert the names of row type to lower case when
85+
/// fileColumnNamesReadAsLowerCase is true.
86+
/// @param rowTypePtr the input row type.
87+
/// @param fileColumnNamesReadAsLowerCase whether to convert names into lower
88+
/// case.
89+
/// @return row type with names converted.
90+
static std::shared_ptr<const RowType> convertRowTypeNames(
91+
const RowTypePtr& rowTypePtr,
92+
bool fileColumnNamesReadAsLowerCase);
93+
8494
private:
8595
// Reads and parses file footer.
8696
void loadFileMetaData();
@@ -543,22 +553,39 @@ TypePtr ReaderBase::convertType(
543553
}
544554
}
545555

556+
std::shared_ptr<const RowType> ReaderBase::convertRowTypeNames(
557+
const RowTypePtr& rowTypePtr,
558+
bool fileColumnNamesReadAsLowerCase) {
559+
if (!fileColumnNamesReadAsLowerCase) {
560+
return rowTypePtr;
561+
}
562+
std::vector<std::string> names;
563+
names.reserve(rowTypePtr->names().size());
564+
std::vector<TypePtr> types = rowTypePtr->children();
565+
for (const auto& name : rowTypePtr->names()) {
566+
std::string childName = name;
567+
folly::toLowerAscii(childName);
568+
names.emplace_back(childName);
569+
}
570+
return TypeFactory<TypeKind::ROW>::create(std::move(names), std::move(types));
571+
}
572+
546573
std::shared_ptr<const RowType> ReaderBase::createRowType(
547574
std::vector<std::shared_ptr<const ParquetTypeWithId::TypeWithId>> children,
548575
bool fileColumnNamesReadAsLowerCase) {
549576
std::vector<std::string> childNames;
577+
childNames.reserve(children.size());
550578
std::vector<TypePtr> childTypes;
551-
for (auto& child : children) {
552-
auto childName =
553-
std::static_pointer_cast<const ParquetTypeWithId>(child)->name_;
554-
if (fileColumnNamesReadAsLowerCase) {
555-
folly::toLowerAscii(childName);
556-
}
557-
childNames.push_back(std::move(childName));
558-
childTypes.push_back(child->type());
579+
childTypes.reserve(children.size());
580+
for (const auto& child : children) {
581+
childNames.emplace_back(
582+
std::static_pointer_cast<const ParquetTypeWithId>(child)->name_);
583+
childTypes.emplace_back(child->type());
559584
}
560-
return TypeFactory<TypeKind::ROW>::create(
561-
std::move(childNames), std::move(childTypes));
585+
return convertRowTypeNames(
586+
TypeFactory<TypeKind::ROW>::create(
587+
std::move(childNames), std::move(childTypes)),
588+
fileColumnNamesReadAsLowerCase);
562589
}
563590

564591
void ReaderBase::scheduleRowGroups(
@@ -639,7 +666,11 @@ ParquetRowReader::ParquetRowReader(
639666
columnReader_ = ParquetColumnReader::build(
640667
readerBase_->schemaWithId(), // Id is schema id
641668
params,
642-
*options_.getScanSpec());
669+
*options_.getScanSpec(),
670+
ReaderBase::convertRowTypeNames(
671+
asRowType(options_.getSelector()->getSchemaWithId()->type()),
672+
readerBase_->isFileColumnNamesReadAsLowerCase()),
673+
pool_);
643674

644675
filterRowGroups();
645676
if (!rowGroupIds_.empty()) {

velox/dwio/parquet/reader/RepeatedColumnReader.cpp

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ PageReader* FOLLY_NULLABLE readLeafRepDefs(
3131
return nullptr;
3232
}
3333
auto pageReader = reader->formatData().as<ParquetData>().reader();
34+
if (pageReader == nullptr) {
35+
return nullptr;
36+
}
3437
pageReader->decodeRepDefs(numTop);
3538
return pageReader;
3639
}
@@ -110,18 +113,28 @@ void ensureRepDefs(
110113
MapColumnReader::MapColumnReader(
111114
std::shared_ptr<const dwio::common::TypeWithId> requestedType,
112115
ParquetParams& params,
113-
common::ScanSpec& scanSpec)
116+
common::ScanSpec& scanSpec,
117+
const std::shared_ptr<const MapType>& outputType,
118+
memory::MemoryPool& pool)
114119
: dwio::common::SelectiveMapColumnReader(
115120
requestedType,
116121
requestedType,
117122
params,
118123
scanSpec) {
119124
auto& keyChildType = requestedType->childAt(0);
120125
auto& elementChildType = requestedType->childAt(1);
121-
keyReader_ =
122-
ParquetColumnReader::build(keyChildType, params, *scanSpec.children()[0]);
126+
keyReader_ = ParquetColumnReader::build(
127+
keyChildType,
128+
params,
129+
*scanSpec.children()[0],
130+
outputType ? outputType->keyType() : nullptr,
131+
pool);
123132
elementReader_ = ParquetColumnReader::build(
124-
elementChildType, params, *scanSpec.children()[1]);
133+
elementChildType,
134+
params,
135+
*scanSpec.children()[1],
136+
outputType ? outputType->valueType() : nullptr,
137+
pool);
125138
reinterpret_cast<const ParquetTypeWithId*>(requestedType.get())
126139
->makeLevelInfo(levelInfo_);
127140
children_ = {keyReader_.get(), elementReader_.get()};
@@ -218,15 +231,21 @@ void MapColumnReader::filterRowGroups(
218231
ListColumnReader::ListColumnReader(
219232
std::shared_ptr<const dwio::common::TypeWithId> requestedType,
220233
ParquetParams& params,
221-
common::ScanSpec& scanSpec)
234+
common::ScanSpec& scanSpec,
235+
const std::shared_ptr<const ArrayType>& outputType,
236+
memory::MemoryPool& pool)
222237
: dwio::common::SelectiveListColumnReader(
223238
requestedType,
224239
requestedType,
225240
params,
226241
scanSpec) {
227242
auto& childType = requestedType->childAt(0);
228-
child_ =
229-
ParquetColumnReader::build(childType, params, *scanSpec.children()[0]);
243+
child_ = ParquetColumnReader::build(
244+
childType,
245+
params,
246+
*scanSpec.children()[0],
247+
outputType ? outputType->elementType() : nullptr,
248+
pool);
230249
reinterpret_cast<const ParquetTypeWithId*>(requestedType.get())
231250
->makeLevelInfo(levelInfo_);
232251
children_ = {child_.get()};

velox/dwio/parquet/reader/RepeatedColumnReader.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ class MapColumnReader : public dwio::common::SelectiveMapColumnReader {
5858
MapColumnReader(
5959
std::shared_ptr<const dwio::common::TypeWithId> requestedType,
6060
ParquetParams& params,
61-
common::ScanSpec& scanSpec);
61+
common::ScanSpec& scanSpec,
62+
const std::shared_ptr<const MapType>& outputType,
63+
memory::MemoryPool& pool);
6264

6365
void prepareRead(
6466
vector_size_t offset,
@@ -113,7 +115,9 @@ class ListColumnReader : public dwio::common::SelectiveListColumnReader {
113115
ListColumnReader(
114116
std::shared_ptr<const dwio::common::TypeWithId> requestedType,
115117
ParquetParams& params,
116-
common::ScanSpec& scanSpec);
118+
common::ScanSpec& scanSpec,
119+
const std::shared_ptr<const ArrayType>& outputType,
120+
memory::MemoryPool& pool);
117121

118122
void prepareRead(
119123
vector_size_t offset,

velox/dwio/parquet/reader/StructColumnReader.cpp

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,47 @@ namespace facebook::velox::parquet {
2222
StructColumnReader::StructColumnReader(
2323
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
2424
ParquetParams& params,
25-
common::ScanSpec& scanSpec)
25+
common::ScanSpec& scanSpec,
26+
const RowTypePtr& outputType,
27+
memory::MemoryPool& pool)
2628
: SelectiveStructColumnReader(dataType, dataType, params, scanSpec) {
2729
auto& childSpecs = scanSpec_->stableChildren();
30+
std::vector<int> missingFields;
2831
for (auto i = 0; i < childSpecs.size(); ++i) {
2932
if (childSpecs[i]->isConstant()) {
3033
continue;
3134
}
32-
auto childDataType = fileType_->childByName(childSpecs[i]->fieldName());
35+
const auto& fieldName = childSpecs[i]->fieldName();
36+
if (outputType && !fileType_->containsChild(fieldName)) {
37+
missingFields.emplace_back(i);
38+
continue;
39+
}
40+
auto childDataType = fileType_->childByName(fieldName);
3341

34-
addChild(ParquetColumnReader::build(childDataType, params, *childSpecs[i]));
42+
addChild(ParquetColumnReader::build(
43+
childDataType,
44+
params,
45+
*childSpecs[i],
46+
outputType ? outputType->findChild(fieldName) : nullptr,
47+
pool));
3548
childSpecs[i]->setSubscript(children_.size() - 1);
3649
}
50+
51+
if (outputType) {
52+
// Set the struct as null if all the children fields in the output type are
53+
// missing and the number of child fields is more than one.
54+
if (childSpecs.size() > 1 && missingFields.size() == childSpecs.size()) {
55+
scanSpec_->setConstantValue(
56+
BaseVector::createNullConstant(outputType, 1, &pool));
57+
} else {
58+
// Set null constant for the missing child field of output type.
59+
for (int channel : missingFields) {
60+
childSpecs[channel]->setConstantValue(BaseVector::createNullConstant(
61+
outputType->findChild(childSpecs[channel]->fieldName()), 1, &pool));
62+
}
63+
}
64+
}
65+
3766
auto type = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get());
3867
if (type->parent()) {
3968
levelMode_ = reinterpret_cast<const ParquetTypeWithId*>(fileType_.get())
@@ -43,7 +72,10 @@ StructColumnReader::StructColumnReader(
4372
// this and the child.
4473
auto child = childForRepDefs_;
4574
for (;;) {
46-
assert(child);
75+
if (child == nullptr) {
76+
levelMode_ = LevelMode::kNulls;
77+
break;
78+
}
4779
if (child->fileType().type()->kind() == TypeKind::ARRAY ||
4880
child->fileType().type()->kind() == TypeKind::MAP) {
4981
levelMode_ = LevelMode::kStructOverLists;

velox/dwio/parquet/reader/StructColumnReader.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ class StructColumnReader : public dwio::common::SelectiveStructColumnReader {
2626
StructColumnReader(
2727
const std::shared_ptr<const dwio::common::TypeWithId>& dataType,
2828
ParquetParams& params,
29-
common::ScanSpec& scanSpec);
29+
common::ScanSpec& scanSpec,
30+
const RowTypePtr& outputType,
31+
memory::MemoryPool& pool);
3032

3133
void read(vector_size_t offset, RowSet rows, const uint64_t* incomingNulls)
3234
override;
1.32 KB
Binary file not shown.

0 commit comments

Comments
 (0)