Skip to content

Commit 8047d9b

Browse files
committed
Add iceberg partition transform evaluator and utility
1 parent d2a6822 commit 8047d9b

File tree

10 files changed

+679
-9
lines changed

10 files changed

+679
-9
lines changed

velox/connectors/hive/iceberg/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@ velox_add_library(
1919
IcebergSplitReader.cpp
2020
PartitionSpec.cpp
2121
PositionalDeleteFileReader.cpp
22+
TransformEvaluator.cpp
23+
TransformExprBuilder.cpp
24+
)
25+
26+
velox_link_libraries(
27+
velox_hive_iceberg_splitreader
28+
velox_connector
29+
velox_functions_iceberg
30+
Folly::folly
2231
)
2332

2433
velox_link_libraries(velox_hive_iceberg_splitreader velox_connector Folly::folly)

velox/connectors/hive/iceberg/PartitionSpec.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ struct IcebergPartitionSpec {
113113
case TransformType::kTruncate:
114114
return type;
115115
}
116+
VELOX_UNREACHABLE("Unknown transform type");
116117
}
117118
};
118119

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/connectors/hive/iceberg/TransformEvaluator.h"
18+
19+
#include <mutex>
20+
21+
#include "velox/expression/Expr.h"
22+
#include "velox/functions/iceberg/Register.h"
23+
24+
namespace facebook::velox::connector::hive::iceberg {
25+
26+
TransformEvaluator::TransformEvaluator(
27+
const std::vector<core::TypedExprPtr>& expressions,
28+
const ConnectorQueryCtx* connectorQueryCtx)
29+
: connectorQueryCtx_(connectorQueryCtx) {
30+
VELOX_CHECK_NOT_NULL(connectorQueryCtx_);
31+
32+
// Register Iceberg functions once for expression evaluation.
33+
static std::once_flag registerFlag;
34+
static constexpr char const* kIcebergFunctionPrefix{"iceberg_"};
35+
36+
std::call_once(registerFlag, []() {
37+
functions::iceberg::registerFunctions(kIcebergFunctionPrefix);
38+
});
39+
40+
exprSet_ = connectorQueryCtx_->expressionEvaluator()->compile(expressions);
41+
VELOX_CHECK_NOT_NULL(exprSet_);
42+
}
43+
44+
std::vector<VectorPtr> TransformEvaluator::evaluate(
45+
const RowVectorPtr& input) const {
46+
const auto numRows = input->size();
47+
const auto numExpressions = exprSet_->exprs().size();
48+
49+
std::vector<VectorPtr> results(numExpressions);
50+
SelectivityVector rows(numRows);
51+
52+
// Evaluate all expressions in one pass.
53+
connectorQueryCtx_->expressionEvaluator()->evaluate(
54+
exprSet_.get(), rows, *input, results);
55+
56+
return results;
57+
}
58+
59+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include "velox/connectors/Connector.h"
19+
#include "velox/core/QueryCtx.h"
20+
#include "velox/expression/Expr.h"
21+
22+
namespace facebook::velox::connector::hive::iceberg {
23+
24+
/// Evaluates multiple expressions efficiently using batch evaluation.
25+
///
26+
/// Expressions are compiled once in the constructor and reused across multiple
27+
/// input batches. Iceberg partition transform functions with 'iceberg_'
28+
/// prefix are automatically registered during create first TransformEvaluator
29+
/// instance via std::call_once.
30+
class TransformEvaluator {
31+
public:
32+
/// Creates an evaluator with the given expressions and connector query
33+
/// context. Compiles the expressions once for reuse across multiple
34+
/// evaluations.
35+
///
36+
/// @param expressions Vector of typed expressions to evaluate. These are
37+
/// typically built using TransformExprBuilder::toExpressions() for Iceberg
38+
/// partition transforms, but can be any valid Velox expressions. The
39+
/// expressions are compiled once during construction.
40+
/// @param connectorQueryCtx Connector query context providing access to the
41+
/// expression evaluator (for compilation and evaluation) and memory pool.
42+
/// Must remain valid for the lifetime of this TransformEvaluator.
43+
TransformEvaluator(
44+
const std::vector<core::TypedExprPtr>& expressions,
45+
const ConnectorQueryCtx* connectorQueryCtx);
46+
47+
/// Evaluates all expressions on the input data in a single pass.
48+
/// Uses the pre-compiled ExprSet from the constructor for efficiency.
49+
///
50+
/// The input RowType must match the RowType used when building the
51+
/// expressions (passed to TransformExprBuilder::toExpressions). The column
52+
/// positions, names and types must align. Create new TransformEvaluator for
53+
/// input that has different RowType with the one when building the
54+
/// expressions.
55+
///
56+
/// @param input Input row vector containing the source data. Must have the
57+
/// same RowType (column positions, names and types) as used when building the
58+
/// expressions in the constructor.
59+
/// @return Vector of result columns, one for each expression, in the same
60+
/// order as the expressions provided to the constructor.
61+
std::vector<VectorPtr> evaluate(const RowVectorPtr& input) const;
62+
63+
private:
64+
const ConnectorQueryCtx* connectorQueryCtx_;
65+
std::unique_ptr<exec::ExprSet> exprSet_;
66+
};
67+
68+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#include "velox/connectors/hive/iceberg/TransformExprBuilder.h"
17+
#include "velox/core/Expressions.h"
18+
19+
namespace facebook::velox::connector::hive::iceberg {
20+
21+
namespace {
22+
23+
constexpr char const* kBucketFunction{"iceberg_bucket"};
24+
constexpr char const* kTruncateFunction{"iceberg_truncate"};
25+
constexpr char const* kYearFunction{"iceberg_years"};
26+
constexpr char const* kMonthFunction{"iceberg_months"};
27+
constexpr char const* kDayFunction{"iceberg_days"};
28+
constexpr char const* kHourFunction{"iceberg_hours"};
29+
30+
/// Converts a single partition field to a typed expression.
31+
///
32+
/// Builds an expression tree for one partition transform. Identity transforms
33+
/// become FieldAccessTypedExpr, while other transforms (bucket, truncate,
34+
/// year, month, day, hour) become CallTypedExpr with appropriate function
35+
/// names and parameters.
36+
///
37+
/// @param field Partition field containing transform type, source column
38+
/// type, and optional parameter (e.g., bucket count, truncate width).
39+
/// @param inputFieldName Name of the source column in the input RowVector.
40+
/// @return Typed expression representing the transform.
41+
core::TypedExprPtr toExpression(
42+
const IcebergPartitionSpec::Field& field,
43+
const std::string& inputFieldName) {
44+
// For identity transform, just return a field access expression.
45+
if (field.transformType == TransformType::kIdentity) {
46+
return std::make_shared<core::FieldAccessTypedExpr>(
47+
field.type, inputFieldName);
48+
}
49+
50+
// For other transforms, build a CallTypedExpr with the appropriate function.
51+
std::string functionName;
52+
switch (field.transformType) {
53+
case TransformType::kBucket:
54+
functionName = kBucketFunction;
55+
break;
56+
case TransformType::kTruncate:
57+
functionName = kTruncateFunction;
58+
break;
59+
case TransformType::kYear:
60+
functionName = kYearFunction;
61+
break;
62+
case TransformType::kMonth:
63+
functionName = kMonthFunction;
64+
break;
65+
case TransformType::kDay:
66+
functionName = kDayFunction;
67+
break;
68+
case TransformType::kHour:
69+
functionName = kHourFunction;
70+
break;
71+
case TransformType::kIdentity:
72+
break;
73+
}
74+
75+
// Build the expression arguments.
76+
std::vector<core::TypedExprPtr> exprArgs;
77+
if (field.parameter.has_value()) {
78+
exprArgs.emplace_back(
79+
std::make_shared<core::ConstantTypedExpr>(
80+
INTEGER(), Variant(field.parameter.value())));
81+
}
82+
exprArgs.emplace_back(
83+
std::make_shared<core::FieldAccessTypedExpr>(field.type, inputFieldName));
84+
85+
return std::make_shared<core::CallTypedExpr>(
86+
field.resultType(), std::move(exprArgs), functionName);
87+
}
88+
89+
} // namespace
90+
91+
std::vector<core::TypedExprPtr> TransformExprBuilder::toExpressions(
92+
const IcebergPartitionSpecPtr& partitionSpec,
93+
const std::vector<column_index_t>& partitionChannels,
94+
const RowTypePtr& inputType) {
95+
VELOX_CHECK_EQ(
96+
partitionSpec->fields.size(),
97+
partitionChannels.size(),
98+
"Number of partition fields must match number of partition channels");
99+
100+
const auto numTransforms = partitionChannels.size();
101+
std::vector<core::TypedExprPtr> transformExprs;
102+
transformExprs.reserve(numTransforms);
103+
104+
for (auto i = 0; i < numTransforms; i++) {
105+
const auto channel = partitionChannels[i];
106+
transformExprs.emplace_back(
107+
toExpression(partitionSpec->fields.at(i), inputType->nameOf(channel)));
108+
}
109+
110+
return transformExprs;
111+
}
112+
113+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include "velox/connectors/hive/iceberg/PartitionSpec.h"
20+
#include "velox/expression/Expr.h"
21+
22+
namespace facebook::velox::connector::hive::iceberg {
23+
24+
/// Converts Iceberg partition specification to Velox expressions.
25+
class TransformExprBuilder {
26+
public:
27+
/// Converts partition specification to a list of typed expressions.
28+
///
29+
/// @param partitionSpec Iceberg partition specification containing transform
30+
/// definitions for each partition field.
31+
/// @param partitionChannels Column indices (0-based) in the input RowVector
32+
/// that correspond to each partition field. Must have the same size as
33+
/// partitionSpec->fields. Provides the positional mapping from partition spec
34+
/// fields to input RowVector columns.
35+
/// @param inputType The row type of the input data. This is necessary for
36+
/// building expressions because the column names in partitionSpec reference
37+
/// table schema names, which might not match the column names in inputType
38+
/// (e.g., inputType may use generated names like c0, c1, c2). The
39+
/// FieldAccessTypedExpr must be built using the actual column names from
40+
/// inputType that will be present at runtime. The partitionChannels provide
41+
/// the positional mapping to locate the correct columns.
42+
/// @return Vector of typed expressions, one for each partition field.
43+
static std::vector<core::TypedExprPtr> toExpressions(
44+
const IcebergPartitionSpecPtr& partitionSpec,
45+
const std::vector<column_index_t>& partitionChannels,
46+
const RowTypePtr& inputType);
47+
};
48+
49+
} // namespace facebook::velox::connector::hive::iceberg

velox/connectors/hive/iceberg/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ if(NOT VELOX_DISABLE_GOOGLETEST)
6363
IcebergTestBase.cpp
6464
Main.cpp
6565
PartitionSpecTest.cpp
66+
TransformTest.cpp
6667
)
6768

6869
add_test(velox_hive_iceberg_insert_test velox_hive_iceberg_insert_test)

velox/connectors/hive/iceberg/tests/IcebergTestBase.cpp

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include "velox/connectors/hive/iceberg/IcebergSplit.h"
2222
#include "velox/connectors/hive/iceberg/PartitionSpec.h"
23+
#include "velox/expression/Expr.h"
2324

2425
namespace facebook::velox::connector::hive::iceberg::test {
2526

@@ -51,6 +52,7 @@ void IcebergTestBase::TearDown() {
5152
connectorPool_.reset();
5253
opPool_.reset();
5354
root_.reset();
55+
queryCtx_.reset();
5456
HiveConnectorTestBase::TearDown();
5557
}
5658

@@ -59,20 +61,25 @@ void IcebergTestBase::setupMemoryPools() {
5961
opPool_.reset();
6062
connectorPool_.reset();
6163
connectorQueryCtx_.reset();
64+
queryCtx_.reset();
6265

6366
root_ = memory::memoryManager()->addRootPool(
6467
"IcebergTest", 1L << 30, exec::MemoryReclaimer::create());
6568
opPool_ = root_->addLeafChild("operator");
6669
connectorPool_ =
6770
root_->addAggregateChild("connector", exec::MemoryReclaimer::create());
6871

69-
connectorQueryCtx_ = std::make_unique<connector::ConnectorQueryCtx>(
72+
queryCtx_ = core::QueryCtx::create(nullptr, core::QueryConfig({}));
73+
auto expressionEvaluator = std::make_unique<exec::SimpleExpressionEvaluator>(
74+
queryCtx_.get(), opPool_.get());
75+
76+
connectorQueryCtx_ = std::make_unique<ConnectorQueryCtx>(
7077
opPool_.get(),
7178
connectorPool_.get(),
7279
connectorSessionProperties_.get(),
7380
nullptr,
7481
common::PrefixSortConfig(),
75-
nullptr,
82+
std::move(expressionEvaluator),
7683
nullptr,
7784
"query.IcebergTest",
7885
"task.IcebergTest",
@@ -103,8 +110,8 @@ std::vector<RowVectorPtr> IcebergTestBase::createTestData(
103110
}
104111

105112
std::shared_ptr<IcebergPartitionSpec> IcebergTestBase::createPartitionSpec(
106-
const std::vector<PartitionField>& partitionFields,
107-
const RowTypePtr& rowType) {
113+
const RowTypePtr& rowType,
114+
const std::vector<PartitionField>& partitionFields) {
108115
std::vector<IcebergPartitionSpec::Field> fields;
109116
for (const auto& partitionField : partitionFields) {
110117
fields.push_back(
@@ -154,7 +161,7 @@ IcebergInsertTableHandlePtr IcebergTestBase::createInsertTableHandle(
154161
outputDirectoryPath,
155162
LocationHandle::TableType::kNew);
156163

157-
auto partitionSpec = createPartitionSpec(partitionFields, rowType);
164+
auto partitionSpec = createPartitionSpec(rowType, partitionFields);
158165

159166
return std::make_shared<const IcebergInsertTableHandle>(
160167
/*inputColumns=*/columnHandles,

0 commit comments

Comments
 (0)