Skip to content

Commit 8c9af76

Browse files
committed
Support timestamp reader for Parquet file format (4680)
1 parent adc4631 commit 8c9af76

File tree

13 files changed

+210
-8
lines changed

13 files changed

+210
-8
lines changed

velox/dwio/common/SelectiveColumnReader.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,9 @@ void SelectiveColumnReader::getIntValues(
214214
VELOX_FAIL("Unsupported value size: {}", valueSize_);
215215
}
216216
break;
217+
case TypeKind::TIMESTAMP:
218+
getFlatValues<Timestamp, Timestamp>(rows, result, requestedType);
219+
break;
217220
default:
218221
VELOX_FAIL(
219222
"Not a valid type for integer reader: {}", requestedType->toString());

velox/dwio/parquet/reader/PageReader.cpp

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,51 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
524524
}
525525
break;
526526
}
527+
case thrift::Type::INT96: {
528+
auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp);
529+
dictionary_.values = AlignedBuffer::allocate<char>(numVeloxBytes, &pool_);
530+
auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp);
531+
if (pageData_) {
532+
memcpy(dictionary_.values->asMutable<char>(), pageData_, numBytes);
533+
} else {
534+
dwio::common::readBytes(
535+
numBytes,
536+
inputStream_.get(),
537+
dictionary_.values->asMutable<char>(),
538+
bufferStart_,
539+
bufferEnd_);
540+
}
541+
// Expand the Parquet type length values to Velox type length.
542+
// We start from the end to allow in-place expansion.
543+
auto values = dictionary_.values->asMutable<Timestamp>();
544+
auto parquetValues = dictionary_.values->asMutable<char>();
545+
static constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
546+
static constexpr int64_t kSecondsPerDay = 86400LL;
547+
static constexpr int64_t kNanosPerSecond =
548+
Timestamp::kNanosecondsInMillisecond *
549+
Timestamp::kMillisecondsInSecond;
550+
for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
551+
// Convert the timestamp into seconds and nanos since the Unix epoch,
552+
// 00:00:00.000000 on 1 January 1970.
553+
uint64_t nanos;
554+
memcpy(
555+
&nanos,
556+
parquetValues + i * sizeof(Int96Timestamp),
557+
sizeof(uint64_t));
558+
int32_t days;
559+
memcpy(
560+
&days,
561+
parquetValues + i * sizeof(Int96Timestamp) + sizeof(uint64_t),
562+
sizeof(int32_t));
563+
int64_t seconds = (days - kJulianToUnixEpochDays) * kSecondsPerDay;
564+
if (nanos > Timestamp::kMaxNanos) {
565+
seconds += nanos / kNanosPerSecond;
566+
nanos -= (nanos / kNanosPerSecond) * kNanosPerSecond;
567+
}
568+
values[i] = Timestamp(seconds, nanos);
569+
}
570+
break;
571+
}
527572
case thrift::Type::BYTE_ARRAY: {
528573
dictionary_.values =
529574
AlignedBuffer::allocate<StringView>(dictionary_.numValues, &pool_);
@@ -614,7 +659,6 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
614659
VELOX_UNSUPPORTED(
615660
"Parquet type {} not supported for dictionary", parquetType);
616661
}
617-
case thrift::Type::INT96:
618662
default:
619663
VELOX_UNSUPPORTED(
620664
"Parquet type {} not supported for dictionary", parquetType);
@@ -641,6 +685,8 @@ int32_t parquetTypeBytes(thrift::Type::type type) {
641685
case thrift::Type::INT64:
642686
case thrift::Type::DOUBLE:
643687
return 8;
688+
case thrift::Type::INT96:
689+
return 12;
644690
default:
645691
VELOX_FAIL("Type does not have a byte width {}", type);
646692
}

velox/dwio/parquet/reader/ParquetColumnReader.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "velox/dwio/parquet/reader/StructColumnReader.h"
2929

3030
#include "velox/dwio/parquet/reader/Statistics.h"
31+
#include "velox/dwio/parquet/reader/TimestampColumnReader.h"
3132
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"
3233

3334
namespace facebook::velox::parquet {
@@ -90,6 +91,10 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
9091
case TypeKind::BOOLEAN:
9192
return std::make_unique<BooleanColumnReader>(dataType, params, scanSpec);
9293

94+
case TypeKind::TIMESTAMP:
95+
return std::make_unique<TimestampColumnReader>(
96+
dataType, params, scanSpec);
97+
9398
default:
9499
VELOX_FAIL(
95100
"buildReader unhandled type: " +

velox/dwio/parquet/reader/ParquetReader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -534,7 +534,7 @@ TypePtr ReaderBase::convertType(
534534
case thrift::Type::type::INT64:
535535
return BIGINT();
536536
case thrift::Type::type::INT96:
537-
return DOUBLE(); // TODO: Lose precision
537+
return TIMESTAMP();
538538
case thrift::Type::type::FLOAT:
539539
return REAL();
540540
case thrift::Type::type::DOUBLE:
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include "velox/dwio/parquet/reader/IntegerColumnReader.h"
20+
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"
21+
22+
namespace facebook::velox::parquet {
23+
24+
class TimestampColumnReader : public IntegerColumnReader {
25+
public:
26+
TimestampColumnReader(
27+
const std::shared_ptr<const dwio::common::TypeWithId>& nodeType,
28+
ParquetParams& params,
29+
common::ScanSpec& scanSpec)
30+
: IntegerColumnReader(nodeType, nodeType, params, scanSpec) {}
31+
32+
void read(
33+
vector_size_t offset,
34+
RowSet rows,
35+
const uint64_t* /*incomingNulls*/) override {
36+
auto& data = formatData_->as<ParquetData>();
37+
// Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length.
38+
prepareRead<int128_t>(offset, rows, nullptr);
39+
readCommon<IntegerColumnReader>(rows);
40+
}
41+
};
42+
43+
} // namespace facebook::velox::parquet
560 Bytes
Binary file not shown.

velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,34 @@ class ParquetTableScanTest : public HiveConnectorTestBase {
7272
assertQuery(plan, splits_, sql);
7373
}
7474

75+
void assertSelectWithFilter(
76+
std::vector<std::string>&& outputColumnNames,
77+
const std::vector<std::string>& subfieldFilters,
78+
const std::string& remainingFilter,
79+
const std::string& sql,
80+
bool isFilterPushdownEnabled) {
81+
auto rowType = getRowType(std::move(outputColumnNames));
82+
parse::ParseOptions options;
83+
options.parseDecimalAsDouble = false;
84+
85+
auto plan = PlanBuilder(pool_.get())
86+
.setParseOptions(options)
87+
// Function extractFiltersFromRemainingFilter will extract
88+
// filters to subfield filters, but for some types, filter
89+
// pushdown is not supported.
90+
.tableScan(
91+
"hive_table",
92+
rowType,
93+
{},
94+
subfieldFilters,
95+
remainingFilter,
96+
nullptr,
97+
isFilterPushdownEnabled)
98+
.planNode();
99+
100+
assertQuery(plan, splits_, sql);
101+
}
102+
75103
void assertSelectWithAgg(
76104
std::vector<std::string>&& outputColumnNames,
77105
const std::vector<std::string>& aggregates,

velox/exec/tests/utils/PlanBuilder.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ PlanBuilder& PlanBuilder::tableScan(
8383
const std::unordered_map<std::string, std::string>& columnAliases,
8484
const std::vector<std::string>& subfieldFilters,
8585
const std::string& remainingFilter,
86-
const RowTypePtr& dataColumns) {
86+
const RowTypePtr& dataColumns,
87+
bool isFilterPushdownEnabled) {
8788
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
8889
assignments;
8990
std::unordered_map<std::string, core::TypedExprPtr> typedMapping;
@@ -143,7 +144,7 @@ PlanBuilder& PlanBuilder::tableScan(
143144
auto tableHandle = std::make_shared<HiveTableHandle>(
144145
kHiveConnectorId,
145146
tableName,
146-
true,
147+
isFilterPushdownEnabled,
147148
std::move(filters),
148149
remainingFilterExpr,
149150
dataColumns);

velox/exec/tests/utils/PlanBuilder.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ class PlanBuilder {
144144
const std::unordered_map<std::string, std::string>& columnAliases = {},
145145
const std::vector<std::string>& subfieldFilters = {},
146146
const std::string& remainingFilter = "",
147-
const RowTypePtr& dataColumns = nullptr);
147+
const RowTypePtr& dataColumns = nullptr,
148+
bool isFilterPushdownEnabled = true);
148149

149150
/// Add a TableScanNode using a connector-specific table handle and
150151
/// assignments. Supports any connector, not just Hive connector.

velox/type/Type.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ namespace facebook::velox {
4747
using int128_t = __int128_t;
4848
using int256_t = boost::multiprecision::int256_t;
4949

50+
struct __attribute__((__packed__)) Int96Timestamp {
51+
int32_t days;
52+
uint64_t nanos;
53+
};
54+
5055
/// Velox type system supports a small set of SQL-compatible composeable types:
5156
/// BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, HUGEINT, REAL, DOUBLE, VARCHAR,
5257
/// VARBINARY, TIMESTAMP, ARRAY, MAP, ROW

0 commit comments

Comments
 (0)