Skip to content

Commit ccb4075

Browse files
authored
Support empty relation write (#370)
Co-authored-by: youxiduo <[email protected]>
1 parent 1bb2da3 commit ccb4075

File tree

2 files changed

+27
-10
lines changed

2 files changed

+27
-10
lines changed

velox/dwio/parquet/writer/Writer.cpp

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,26 @@ Writer::Writer(
135135
stream_(std::make_shared<ArrowDataBufferSink>(
136136
std::move(sink),
137137
*generalPool_,
138-
bufferGrowRatio_)),
139-
schema_(schema) {
138+
bufferGrowRatio_)) {
140139
arrowContext_ = std::make_shared<ArrowContext>();
141140
arrowContext_->properties = getArrowParquetWriterOptions(options);
141+
arrowContext_->schema = schema;
142+
143+
if (arrowContext_->schema) {
144+
// If the input iterator is empty, the writer will do nothing and build a
145+
// empty file. We should at least write the parquet magic header so the
146+
// reader can regonize it is a valid parquet file. So, we initialize the
147+
// writer at first even there is no data.
148+
auto arrowProperties = ::parquet::ArrowWriterProperties::Builder().build();
149+
PARQUET_ASSIGN_OR_THROW(
150+
arrowContext_->writer,
151+
::parquet::arrow::FileWriter::Open(
152+
*arrowContext_->schema.get(),
153+
arrow::default_memory_pool(),
154+
stream_,
155+
arrowContext_->properties,
156+
arrowProperties));
157+
}
142158
}
143159

144160
Writer::Writer(
@@ -200,20 +216,23 @@ void Writer::flush() {
200216
*/
201217
void Writer::write(const VectorPtr& data) {
202218
ArrowArray array;
203-
ArrowSchema schema;
204219
exportToArrow(data, array, generalPool_.get());
205-
exportToArrow(data, schema);
206220
std::shared_ptr<arrow::RecordBatch> recordBatch;
207-
if (schema_) {
221+
if (arrowContext_->schema) {
208222
PARQUET_ASSIGN_OR_THROW(
209-
recordBatch, arrow::ImportRecordBatch(&array, schema_));
223+
recordBatch, arrow::ImportRecordBatch(&array, arrowContext_->schema));
210224
} else {
225+
ArrowSchema schema;
226+
exportToArrow(data, schema);
211227
PARQUET_ASSIGN_OR_THROW(
212228
recordBatch, arrow::ImportRecordBatch(&array, &schema));
213229
}
214230

215-
if (!arrowContext_->schema) {
216-
arrowContext_->schema = recordBatch->schema();
231+
if (arrowContext_->stagingChunks.empty()) {
232+
if (!arrowContext_->schema) {
233+
arrowContext_->schema = recordBatch->schema();
234+
}
235+
217236
for (int colIdx = 0; colIdx < arrowContext_->schema->num_fields();
218237
colIdx++) {
219238
arrowContext_->stagingChunks.push_back(

velox/dwio/parquet/writer/Writer.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,6 @@ class Writer : public dwio::common::Writer {
9393
std::shared_ptr<ArrowDataBufferSink> stream_;
9494

9595
std::shared_ptr<ArrowContext> arrowContext_;
96-
97-
std::shared_ptr<arrow::Schema> schema_;
9896
};
9997

10098
class ParquetWriterFactory : public dwio::common::WriterFactory {

0 commit comments

Comments
 (0)