Skip to content

Commit ff2e9e0

Browse files
committed
Add iceberg partition transform evaluator and utility
1 parent d2a6822 commit ff2e9e0

File tree

11 files changed

+752
-9
lines changed

11 files changed

+752
-9
lines changed

velox/connectors/hive/iceberg/CMakeLists.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,20 @@
1515
velox_add_library(
1616
velox_hive_iceberg_splitreader
1717
IcebergDataSink.cpp
18+
IcebergPartitionPath.cpp
1819
IcebergSplit.cpp
1920
IcebergSplitReader.cpp
2021
PartitionSpec.cpp
2122
PositionalDeleteFileReader.cpp
23+
TransformEvaluator.cpp
24+
TransformExprBuilder.cpp
25+
)
26+
27+
velox_link_libraries(
28+
velox_hive_iceberg_splitreader
29+
velox_connector
30+
velox_functions_iceberg
31+
Folly::folly
2232
)
2333

2434
velox_link_libraries(velox_hive_iceberg_splitreader velox_connector Folly::folly)
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#include "velox/connectors/hive/iceberg/IcebergPartitionPath.h"
17+
#include "velox/common/encode/Base64.h"
18+
19+
namespace facebook::velox::connector::hive::iceberg {
20+
21+
constexpr int32_t kEpochYear = 1970;
22+
23+
std::string IcebergPartitionPath::toPartitionString(
24+
int32_t value,
25+
const TypePtr& type) const {
26+
switch (transformType_) {
27+
case TransformType::kIdentity: {
28+
if (type->isDate()) {
29+
return DATE()->toString(value);
30+
}
31+
return folly::to<std::string>(value);
32+
}
33+
case TransformType::kDay:
34+
return DATE()->toString(value);
35+
case TransformType::kYear:
36+
return fmt::format("{:04d}", kEpochYear + value);
37+
case TransformType::kMonth: {
38+
int32_t year = kEpochYear + value / 12;
39+
int32_t month = 1 + value % 12;
40+
if (month <= 0) {
41+
month += 12;
42+
year -= 1;
43+
}
44+
return fmt::format("{:04d}-{:02d}", year, month);
45+
}
46+
case TransformType::kHour: {
47+
int64_t seconds = static_cast<int64_t>(value) * 3600;
48+
std::tm tmValue;
49+
VELOX_USER_CHECK(
50+
Timestamp::epochToCalendarUtc(seconds, tmValue),
51+
"Failed to convert seconds to time: {}",
52+
seconds);
53+
return fmt::format(
54+
"{:04d}-{:02d}-{:02d}-{:02d}",
55+
tmValue.tm_year + 1900,
56+
tmValue.tm_mon + 1,
57+
tmValue.tm_mday,
58+
tmValue.tm_hour);
59+
}
60+
default:
61+
return folly::to<std::string>(value);
62+
}
63+
}
64+
65+
std::string IcebergPartitionPath::toPartitionString(
66+
Timestamp value,
67+
const TypePtr& type) const {
68+
TimestampToStringOptions options;
69+
options.precision = TimestampPrecision::kMilliseconds;
70+
options.zeroPaddingYear = true;
71+
options.skipTrailingZeros = true;
72+
options.leadingPositiveSign = true;
73+
return value.toString(options);
74+
}
75+
76+
std::string IcebergPartitionPath::toPartitionString(
77+
StringView value,
78+
const TypePtr& type) const {
79+
if (type->isVarbinary()) {
80+
return encoding::Base64::encode(value.data(), value.size());
81+
}
82+
return std::string(value);
83+
}
84+
85+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include "velox/connectors/hive/HivePartitionUtil.h"
20+
#include "velox/connectors/hive/iceberg/PartitionSpec.h"
21+
22+
namespace facebook::velox::connector::hive::iceberg {
23+
24+
/// Converts a partition value to its string representation for use in
25+
/// partition directory path. The format follows the Iceberg specification
26+
/// for partition path encoding.
27+
class IcebergPartitionPath : public HivePartitionUtil {
28+
public:
29+
explicit IcebergPartitionPath(TransformType transformType)
30+
: transformType_(transformType) {}
31+
32+
~IcebergPartitionPath() override = default;
33+
34+
using HivePartitionUtil::toPartitionString;
35+
36+
std::string toPartitionString(int32_t value, const TypePtr& type)
37+
const override;
38+
39+
std::string toPartitionString(Timestamp value, const TypePtr& type)
40+
const override;
41+
42+
std::string toPartitionString(StringView value, const TypePtr& type)
43+
const override;
44+
45+
private:
46+
const TransformType transformType_;
47+
};
48+
49+
using IcebergPartitionPathPtr = std::shared_ptr<const IcebergPartitionPath>;
50+
51+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/connectors/hive/iceberg/TransformEvaluator.h"
18+
19+
#include <mutex>
20+
21+
#include "velox/expression/Expr.h"
22+
#include "velox/functions/iceberg/Register.h"
23+
24+
namespace facebook::velox::connector::hive::iceberg {
25+
26+
namespace {
27+
28+
constexpr char const* kIcebergFunctionPrefix{"iceberg_"};
29+
30+
}
31+
32+
TransformEvaluator::TransformEvaluator(
33+
const std::vector<core::TypedExprPtr>& expressions,
34+
const ConnectorQueryCtx* connectorQueryCtx)
35+
: connectorQueryCtx_(connectorQueryCtx) {
36+
VELOX_CHECK_NOT_NULL(connectorQueryCtx_);
37+
// Register Iceberg functions once for expression evaluation.
38+
static std::once_flag registerFlag;
39+
std::call_once(registerFlag, []() {
40+
functions::iceberg::registerFunctions(kIcebergFunctionPrefix);
41+
});
42+
43+
exprSet_ = connectorQueryCtx_->expressionEvaluator()->compile(expressions);
44+
VELOX_CHECK_NOT_NULL(exprSet_);
45+
}
46+
47+
std::vector<VectorPtr> TransformEvaluator::evaluate(
48+
const RowVectorPtr& input) const {
49+
const auto numRows = input->size();
50+
const auto numExpressions = exprSet_->exprs().size();
51+
52+
std::vector<VectorPtr> results(numExpressions);
53+
SelectivityVector rows(numRows);
54+
55+
// Evaluate all expressions in one pass.
56+
connectorQueryCtx_->expressionEvaluator()->evaluate(
57+
exprSet_.get(), rows, *input, results);
58+
59+
return results;
60+
}
61+
62+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include "velox/connectors/Connector.h"
19+
#include "velox/core/QueryCtx.h"
20+
#include "velox/expression/Expr.h"
21+
22+
namespace facebook::velox::connector::hive::iceberg {
23+
24+
/// Evaluates multiple expressions efficiently using batch evaluation.
25+
///
26+
/// Expressions are compiled once in the constructor and reused across multiple
27+
/// input batches. Iceberg partition transform functions with 'iceberg_'
28+
/// prefix are automatically registered during create first TransformEvaluator
29+
/// instance via std::call_once.
30+
class TransformEvaluator {
31+
public:
32+
/// Creates an evaluator with the given expressions and connector query
33+
/// context. Compiles the expressions once for reuse across multiple
34+
/// evaluations.
35+
///
36+
/// @param expressions Vector of typed expressions to evaluate. These are
37+
/// typically built using TransformExprBuilder::toExpressions() for Iceberg
38+
/// partition transforms, but can be any valid Velox expressions. The
39+
/// expressions are compiled once during construction.
40+
/// @param connectorQueryCtx Connector query context providing access to the
41+
/// expression evaluator (for compilation and evaluation) and memory pool.
42+
/// Must remain valid for the lifetime of this TransformEvaluator.
43+
TransformEvaluator(
44+
const std::vector<core::TypedExprPtr>& expressions,
45+
const ConnectorQueryCtx* connectorQueryCtx);
46+
47+
/// Evaluates all expressions on the input data in a single pass.
48+
/// Uses the pre-compiled ExprSet from the constructor for efficiency.
49+
///
50+
/// The input RowType must match the RowType used when building the
51+
/// expressions (passed to TransformExprBuilder::toExpressions). The column
52+
/// positions, names and types must align. Create new TransformEvaluator for
53+
/// input that has different RowType with the one when building the
54+
/// expressions.
55+
///
56+
/// @param input Input row vector containing the source data. Must have the
57+
/// same RowType (column positions, names and types) as used when building the
58+
/// expressions in the constructor.
59+
/// @return Vector of result columns, one for each expression, in the same
60+
/// order as the expressions provided to the constructor.
61+
std::vector<VectorPtr> evaluate(const RowVectorPtr& input) const;
62+
63+
private:
64+
const ConnectorQueryCtx* connectorQueryCtx_;
65+
std::unique_ptr<exec::ExprSet> exprSet_;
66+
};
67+
68+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#include "velox/connectors/hive/iceberg/TransformExprBuilder.h"
17+
#include "velox/core/Expressions.h"
18+
19+
namespace facebook::velox::connector::hive::iceberg {
20+
21+
namespace {
22+
23+
constexpr char const* kBucketFunction{"iceberg_bucket"};
24+
constexpr char const* kTruncateFunction{"iceberg_truncate"};
25+
constexpr char const* kYearFunction{"iceberg_years"};
26+
constexpr char const* kMonthFunction{"iceberg_months"};
27+
constexpr char const* kDayFunction{"iceberg_days"};
28+
constexpr char const* kHourFunction{"iceberg_hours"};
29+
30+
} // namespace
31+
32+
core::TypedExprPtr TransformExprBuilder::toExpression(
33+
const IcebergPartitionSpec::Field& field,
34+
const std::string& inputFieldName) {
35+
// For identity transform, just return a field access expression.
36+
if (field.transformType == TransformType::kIdentity) {
37+
return std::make_shared<core::FieldAccessTypedExpr>(
38+
field.type, inputFieldName);
39+
}
40+
41+
// For other transforms, build a CallTypedExpr with the appropriate function.
42+
std::string functionName;
43+
switch (field.transformType) {
44+
case TransformType::kBucket:
45+
functionName = kBucketFunction;
46+
break;
47+
case TransformType::kTruncate:
48+
functionName = kTruncateFunction;
49+
break;
50+
case TransformType::kYear:
51+
functionName = kYearFunction;
52+
break;
53+
case TransformType::kMonth:
54+
functionName = kMonthFunction;
55+
break;
56+
case TransformType::kDay:
57+
functionName = kDayFunction;
58+
break;
59+
case TransformType::kHour:
60+
functionName = kHourFunction;
61+
break;
62+
case TransformType::kIdentity:
63+
break;
64+
}
65+
66+
// Build the expression arguments.
67+
std::vector<core::TypedExprPtr> exprArgs;
68+
if (field.parameter.has_value()) {
69+
exprArgs.emplace_back(
70+
std::make_shared<core::ConstantTypedExpr>(
71+
INTEGER(), Variant(field.parameter.value())));
72+
}
73+
exprArgs.emplace_back(
74+
std::make_shared<core::FieldAccessTypedExpr>(field.type, inputFieldName));
75+
76+
return std::make_shared<core::CallTypedExpr>(
77+
field.resultType(), std::move(exprArgs), functionName);
78+
}
79+
80+
std::vector<core::TypedExprPtr> TransformExprBuilder::toExpressions(
81+
const IcebergPartitionSpecPtr& partitionSpec,
82+
const std::vector<column_index_t>& partitionChannels,
83+
const RowTypePtr& inputType) {
84+
VELOX_CHECK_EQ(
85+
partitionSpec->fields.size(),
86+
partitionChannels.size(),
87+
"Number of partition fields must match number of partition channels");
88+
89+
const auto numTransforms = partitionChannels.size();
90+
std::vector<core::TypedExprPtr> transformExprs;
91+
transformExprs.reserve(numTransforms);
92+
93+
for (auto i = 0; i < numTransforms; i++) {
94+
const auto channel = partitionChannels[i];
95+
transformExprs.emplace_back(
96+
toExpression(partitionSpec->fields.at(i), inputType->nameOf(channel)));
97+
}
98+
99+
return transformExprs;
100+
}
101+
102+
} // namespace facebook::velox::connector::hive::iceberg

0 commit comments

Comments
 (0)