diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 4d17a1d6e4..c8d8bf7534 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -1804,14 +1804,27 @@ private static void copy(SeekableInputStream from, PositionOutputStream to, long * @throws IOException if there is an error while writing */ public void end(Map extraMetaData) throws IOException { + final long footerStart = out.getPos(); + + // Build the footer metadata) in memory using the helper stream + InMemoryPositionOutputStream buffer = new InMemoryPositionOutputStream(footerStart); + + serializeColumnIndexes(columnIndexes, blocks, buffer, fileEncryptor); + serializeOffsetIndexes(offsetIndexes, blocks, buffer, fileEncryptor); + serializeBloomFilters(bloomFilters, blocks, buffer, fileEncryptor); + + ParquetMetadata localFooter = + new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); + serializeFooter(localFooter, buffer, fileEncryptor, metadataConverter); + + byte[] footerBytes = buffer.toByteArray(); + try { state = state.end(); - serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor); - serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor); - serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor); - LOG.debug("{}: end", out.getPos()); - this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); - serializeFooter(footer, out, fileEncryptor, metadataConverter); + + out.write(footerBytes); + out.flush(); + this.footer = localFooter; } finally { close(); } @@ -2441,4 +2454,44 @@ protected boolean isPaddingNeeded(long remaining) { return (remaining <= maxPaddingSize); } } + + /** + * Lightweight {@link PositionOutputStream} that writes into a byte buffer while + * keeping a virtual position that can be initialised to an arbitrary offset. + * The position offset lets us build the footer in memory but still record the + * *final* absolute offsets that will appear once the buffer is flushed to the + * underlying file. + */ + private static final class InMemoryPositionOutputStream extends PositionOutputStream { + private final java.io.ByteArrayOutputStream buffer = new java.io.ByteArrayOutputStream(); + private long pos; + + InMemoryPositionOutputStream(long startPos) { + this.pos = startPos; + } + + @Override + public long getPos() { + return pos; + } + + @Override + public void write(int b) { + buffer.write(b); + pos++; + } + + @Override + public void write(byte[] b, int off, int len) { + buffer.write(b, off, len); + pos += len; + } + + @Override + public void flush() {} + + byte[] toByteArray() { + return buffer.toByteArray(); + } + } }