Skip to content

Commit b642b73

Browse files
PingLiuPingLakehouse Engine Bot
authored andcommitted
Revert "feat: Add Iceberg partition name generator (facebookincubator#15461)"
This reverts commit 7576f4e. Alchemy-item: (ID = 885) Iceberg staging hub commit 1/15 - 9582b8b
1 parent 6242914 commit b642b73

24 files changed

+402
-1345
lines changed

velox/connectors/hive/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ velox_add_library(
2727
HiveConnectorSplit.cpp
2828
HiveDataSink.cpp
2929
HiveDataSource.cpp
30-
HivePartitionName.cpp
30+
HivePartitionUtil.cpp
3131
PartitionIdGenerator.cpp
3232
SplitReader.cpp
3333
TableHandle.cpp

velox/connectors/hive/HiveDataSink.cpp

Lines changed: 8 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,6 @@ FOLLY_ALWAYS_INLINE int32_t
200200
getBucketCount(const HiveBucketProperty* bucketProperty) {
201201
return bucketProperty == nullptr ? 0 : bucketProperty->bucketCount();
202202
}
203-
204203
} // namespace
205204

206205
const HiveWriterId& HiveWriterId::unpartitionedId() {
@@ -383,9 +382,7 @@ HiveDataSink::HiveDataSink(
383382
? createBucketFunction(
384383
*insertTableHandle->bucketProperty(),
385384
inputType)
386-
: nullptr,
387-
getPartitionChannels(insertTableHandle),
388-
nullptr) {}
385+
: nullptr) {}
389386

390387
HiveDataSink::HiveDataSink(
391388
RowTypePtr inputType,
@@ -395,27 +392,6 @@ HiveDataSink::HiveDataSink(
395392
const std::shared_ptr<const HiveConfig>& hiveConfig,
396393
uint32_t bucketCount,
397394
std::unique_ptr<core::PartitionFunction> bucketFunction)
398-
: HiveDataSink(
399-
std::move(inputType),
400-
insertTableHandle,
401-
connectorQueryCtx,
402-
commitStrategy,
403-
hiveConfig,
404-
bucketCount,
405-
std::move(bucketFunction),
406-
getPartitionChannels(insertTableHandle),
407-
nullptr) {}
408-
409-
HiveDataSink::HiveDataSink(
410-
RowTypePtr inputType,
411-
std::shared_ptr<const HiveInsertTableHandle> insertTableHandle,
412-
const ConnectorQueryCtx* connectorQueryCtx,
413-
CommitStrategy commitStrategy,
414-
const std::shared_ptr<const HiveConfig>& hiveConfig,
415-
uint32_t bucketCount,
416-
std::unique_ptr<core::PartitionFunction> bucketFunction,
417-
const std::vector<column_index_t>& partitionChannels,
418-
std::unique_ptr<PartitionIdGenerator> partitionIdGenerator)
419395
: inputType_(std::move(inputType)),
420396
insertTableHandle_(std::move(insertTableHandle)),
421397
connectorQueryCtx_(connectorQueryCtx),
@@ -424,15 +400,16 @@ HiveDataSink::HiveDataSink(
424400
updateMode_(getUpdateMode()),
425401
maxOpenWriters_(hiveConfig_->maxPartitionsPerWriters(
426402
connectorQueryCtx->sessionProperties())),
427-
partitionChannels_(partitionChannels),
403+
partitionChannels_(getPartitionChannels(insertTableHandle_)),
428404
partitionIdGenerator_(
429-
partitionIdGenerator ? std::move(partitionIdGenerator)
430-
: !partitionChannels_.empty()
405+
!partitionChannels_.empty()
431406
? std::make_unique<PartitionIdGenerator>(
432407
inputType_,
433408
partitionChannels_,
434409
maxOpenWriters_,
435-
connectorQueryCtx_->memoryPool())
410+
connectorQueryCtx_->memoryPool(),
411+
hiveConfig_->isPartitionPathAsLowerCase(
412+
connectorQueryCtx->sessionProperties()))
436413
: nullptr),
437414
dataChannels_(
438415
getNonPartitionChannels(partitionChannels_, inputType_->size())),
@@ -444,8 +421,6 @@ HiveDataSink::HiveDataSink(
444421
sortWriterFinishTimeSliceLimitMs_(getFinishTimeSliceLimitMsFromHiveConfig(
445422
hiveConfig_,
446423
connectorQueryCtx->sessionProperties())),
447-
partitionKeyAsLowerCase_(hiveConfig_->isPartitionPathAsLowerCase(
448-
connectorQueryCtx_->sessionProperties())),
449424
fileNameGenerator_(insertTableHandle_->fileNameGenerator()) {
450425
fileSystemStats_ = std::make_unique<filesystems::File::IoStats>();
451426
if (isBucketed()) {
@@ -782,7 +757,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
782757

783758
std::optional<std::string> partitionName;
784759
if (isPartitioned()) {
785-
partitionName = getPartitionName(id.partitionId.value());
760+
partitionName =
761+
partitionIdGenerator_->partitionName(id.partitionId.value());
786762
}
787763

788764
// Without explicitly setting flush policy, the default memory based flush
@@ -888,15 +864,6 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
888864
return writerIndexMap_[id];
889865
}
890866

891-
std::string HiveDataSink::getPartitionName(uint32_t partitionId) const {
892-
VELOX_CHECK_NOT_NULL(partitionIdGenerator_);
893-
894-
return HivePartitionName::partitionName(
895-
partitionId,
896-
partitionIdGenerator_->partitionValues(),
897-
partitionKeyAsLowerCase_);
898-
}
899-
900867
std::unique_ptr<facebook::velox::dwio::common::Writer>
901868
HiveDataSink::maybeCreateBucketSortWriter(
902869
std::unique_ptr<facebook::velox::dwio::common::Writer> writer) {

velox/connectors/hive/HiveDataSink.h

Lines changed: 1 addition & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
#include "velox/common/compression/Compression.h"
1919
#include "velox/connectors/Connector.h"
2020
#include "velox/connectors/hive/HiveConfig.h"
21-
#include "velox/connectors/hive/HivePartitionName.h"
2221
#include "velox/connectors/hive/PartitionIdGenerator.h"
2322
#include "velox/connectors/hive/TableHandle.h"
2423
#include "velox/dwio/common/Options.h"
@@ -517,17 +516,6 @@ class HiveDataSink : public DataSink {
517516
};
518517
static std::string stateString(State state);
519518

520-
/// Creates a HiveDataSink for writing data to Hive table files.
521-
///
522-
/// @param inputType The schema of input data rows to be written.
523-
/// @param insertTableHandle Metadata about the table write operation,
524-
/// including storage format, compression, bucketing, and partitioning
525-
/// configuration.
526-
/// @param connectorQueryCtx Query context with session properties, memory
527-
/// pools, and spill configuration.
528-
/// @param commitStrategy Strategy for committing written data (kNoCommit or
529-
/// kTaskCommit).
530-
/// @param hiveConfig Hive connector configuration.
531519
HiveDataSink(
532520
RowTypePtr inputType,
533521
std::shared_ptr<const HiveInsertTableHandle> insertTableHandle,
@@ -544,40 +532,6 @@ class HiveDataSink : public DataSink {
544532
uint32_t bucketCount,
545533
std::unique_ptr<core::PartitionFunction> bucketFunction);
546534

547-
/// Constructor with explicit bucketing and partitioning parameters.
548-
///
549-
/// @param inputType The schema of input data rows to be written.
550-
/// @param insertTableHandle Metadata about the table write operation,
551-
/// including storage format, compression, location, and serialization
552-
/// parameters.
553-
/// @param connectorQueryCtx Query context with session properties, memory
554-
/// pools, and spill configuration.
555-
/// @param commitStrategy Strategy for committing written data (kNoCommit or
556-
/// kTaskCommit). Determines whether temporary files need to be renamed on
557-
/// commit.
558-
/// @param hiveConfig Hive connector configuration with settings for max
559-
/// partitions, bucketing limits etc.
560-
/// @param bucketCount Number of buckets for bucketed tables (0 if not
561-
/// bucketed). Must be less than the configured max bucket count.
562-
/// @param bucketFunction Function to compute bucket IDs from row data
563-
/// (nullptr if not bucketed). Used to distribute rows across buckets.
564-
/// @param partitionChannels Column indices used for partitioning (empty if
565-
/// not partitioned). These columns are extracted to determine partition
566-
/// directories.
567-
/// @param partitionIdGenerator Generates partition IDs from partition column
568-
/// values (nullptr if not partitioned). Compute partition key combinations to
569-
/// unique IDs.
570-
HiveDataSink(
571-
RowTypePtr inputType,
572-
std::shared_ptr<const HiveInsertTableHandle> insertTableHandle,
573-
const ConnectorQueryCtx* connectorQueryCtx,
574-
CommitStrategy commitStrategy,
575-
const std::shared_ptr<const HiveConfig>& hiveConfig,
576-
uint32_t bucketCount,
577-
std::unique_ptr<core::PartitionFunction> bucketFunction,
578-
const std::vector<column_index_t>& partitionChannels,
579-
std::unique_ptr<PartitionIdGenerator> partitionIdGenerator);
580-
581535
void appendData(RowVectorPtr input) override;
582536

583537
bool finish() override;
@@ -667,7 +621,7 @@ class HiveDataSink : public DataSink {
667621
io::IoStatistics* ioStats);
668622

669623
// Compute the partition id and bucket id for each row in 'input'.
670-
virtual void computePartitionAndBucketIds(const RowVectorPtr& input);
624+
void computePartitionAndBucketIds(const RowVectorPtr& input);
671625

672626
// Get the HiveWriter corresponding to the row
673627
// from partitionIds and bucketIds.
@@ -687,12 +641,6 @@ class HiveDataSink : public DataSink {
687641
// the newly created writer in 'writers_'.
688642
uint32_t appendWriter(const HiveWriterId& id);
689643

690-
// Returns the Hive partition directory name for the given partition ID.
691-
// Converts the partition values associated with the partition ID into a
692-
// Hive-formatted directory path. Returns std::nullopt if the table is
693-
// unpartitioned. Should be called only when writing to a partitioned table.
694-
virtual std::string getPartitionName(uint32_t partitionId) const;
695-
696644
std::unique_ptr<facebook::velox::dwio::common::Writer>
697645
maybeCreateBucketSortWriter(
698646
std::unique_ptr<facebook::velox::dwio::common::Writer> writer);
@@ -747,7 +695,6 @@ class HiveDataSink : public DataSink {
747695
const std::shared_ptr<dwio::common::WriterFactory> writerFactory_;
748696
const common::SpillConfig* const spillConfig_;
749697
const uint64_t sortWriterFinishTimeSliceLimitMs_{0};
750-
const bool partitionKeyAsLowerCase_;
751698

752699
std::vector<column_index_t> sortColumnIndices_;
753700
std::vector<CompareFlags> sortCompareFlags_;

velox/connectors/hive/HivePartitionName.cpp

Lines changed: 0 additions & 102 deletions
This file was deleted.

0 commit comments

Comments
 (0)