diff --git a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java index d7a4c330c9..776fb45576 100644 --- a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java +++ b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java @@ -45,6 +45,7 @@ import org.apache.parquet.format.event.TypedConsumer.I64Consumer; import org.apache.parquet.format.event.TypedConsumer.StringConsumer; import org.apache.thrift.TBase; +import org.apache.thrift.TConfiguration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; @@ -59,6 +60,7 @@ public class Util { private static final int INIT_MEM_ALLOC_ENCR_BUFFER = 100; + private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream to) throws IOException { writeColumnIndex(columnIndex, to, null, null); @@ -156,6 +158,15 @@ public static FileMetaData readFileMetaData(InputStream from, BlockCipher.Decryp return read(from, new FileMetaData(), decryptor, AAD); } + public static FileMetaData readFileMetaData(InputStream from, int maxMessageSize) throws IOException { + return readFileMetaData(from, null, null, maxMessageSize); + } + + public static FileMetaData readFileMetaData( + InputStream from, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize) throws IOException { + return read(from, new FileMetaData(), decryptor, AAD, maxMessageSize); + } + public static void writeColumnMetaData( ColumnMetaData columnMetaData, OutputStream to, BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException { @@ -190,6 +201,18 @@ public static FileMetaData readFileMetaData( return md; } + public static FileMetaData readFileMetaData( + InputStream from, boolean skipRowGroups, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize) + throws IOException { + FileMetaData md = new FileMetaData(); + if (skipRowGroups) { + readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups, decryptor, AAD, maxMessageSize); + } else { + read(from, md, decryptor, AAD, maxMessageSize); + } + return md; + } + public static void writeFileCryptoMetaData( org.apache.parquet.format.FileCryptoMetaData cryptoMetadata, OutputStream to) throws IOException { write(cryptoMetadata, to, null, null); @@ -293,6 +316,17 @@ public static void readFileMetaData( BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException { + readFileMetaData(input, consumer, skipRowGroups, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE); + } + + public static void readFileMetaData( + final InputStream input, + final FileMetaDataConsumer consumer, + boolean skipRowGroups, + BlockCipher.Decryptor decryptor, + byte[] AAD, + int maxMessageSize) + throws IOException { try { DelegatingFieldConsumer eventConsumer = fieldConsumer() .onField(VERSION, new I32Consumer() { @@ -358,26 +392,54 @@ public void consume(RowGroup rowGroup) { byte[] plainText = decryptor.decrypt(input, AAD); from = new ByteArrayInputStream(plainText); } - new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer); + new EventBasedThriftReader(protocol(from, maxMessageSize)).readStruct(eventConsumer); } catch (TException e) { throw new IOException("can not read FileMetaData: " + e.getMessage(), e); } } private static TProtocol protocol(OutputStream to) throws TTransportException { - return protocol(new TIOStreamTransport(to)); + return protocol(new TIOStreamTransport(to), DEFAULT_MAX_MESSAGE_SIZE); } private static TProtocol protocol(InputStream from) throws TTransportException { - return protocol(new TIOStreamTransport(from)); + return protocol(new TIOStreamTransport(from), DEFAULT_MAX_MESSAGE_SIZE); + } + + private static TProtocol protocol(InputStream from, int maxMessageSize) throws TTransportException { + return protocol(new TIOStreamTransport(from), maxMessageSize); } - private static InterningProtocol protocol(TIOStreamTransport t) { + private static InterningProtocol protocol(TIOStreamTransport t, int configuredMaxMessageSize) + throws TTransportException, NumberFormatException { + int maxMessageSize = configuredMaxMessageSize; + if (configuredMaxMessageSize == -1) { + // Set to default 100 MB + maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE; + } + if (configuredMaxMessageSize <= 0) { + throw new NumberFormatException("Max message size must be positive: " + configuredMaxMessageSize); + } + + TConfiguration config = t.getConfiguration(); + config.setMaxMessageSize(maxMessageSize); + /* + Reset known message size to 0 to force checking against the max message size. + This is necessary when reusing the same transport for multiple reads/writes, + as the known message size may be larger than the max message size. + */ + t.updateKnownMessageSize(0); return new InterningProtocol(new TCompactProtocol(t)); } private static > T read( final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException { + return read(input, tbase, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE); + } + + private static > T read( + final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize) + throws IOException { final InputStream from; if (null == decryptor) { from = input; @@ -387,7 +449,7 @@ private static InterningProtocol protocol(TIOStreamTransport t) { } try { - tbase.read(protocol(from)); + tbase.read(protocol(from, maxMessageSize)); return tbase; } catch (TException e) { throw new IOException("can not read " + tbase.getClass() + ": " + e.getMessage(), e); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index d20ac7faeb..a1a256329b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -147,6 +147,15 @@ public class ParquetMetadataConverter { public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter(); public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k + /** + * Configuration property to control the Thrift max message size when reading Parquet metadata. + * This is useful for files with very large metadata + * Default value is 100 MB. + */ + public static final String PARQUET_THRIFT_STRING_SIZE_LIMIT = "parquet.thrift.string.size.limit"; + + private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB + private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class); private static final LogicalTypeConverterVisitor LOGICAL_TYPE_ANNOTATION_VISITOR = new LogicalTypeConverterVisitor(); @@ -154,6 +163,7 @@ public class ParquetMetadataConverter { new ConvertedTypeConverterVisitor(); private final int statisticsTruncateLength; private final boolean useSignedStringMinMax; + private final ParquetReadOptions options; public ParquetMetadataConverter() { this(false); @@ -173,7 +183,7 @@ public ParquetMetadataConverter(Configuration conf) { } public ParquetMetadataConverter(ParquetReadOptions options) { - this(options.useSignedStringMinMax()); + this(options.useSignedStringMinMax(), ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, options); } private ParquetMetadataConverter(boolean useSignedStringMinMax) { @@ -181,11 +191,30 @@ private ParquetMetadataConverter(boolean useSignedStringMinMax) { } private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength) { + this(useSignedStringMinMax, statisticsTruncateLength, null); + } + + private ParquetMetadataConverter( + boolean useSignedStringMinMax, int statisticsTruncateLength, ParquetReadOptions options) { if (statisticsTruncateLength <= 0) { throw new IllegalArgumentException("Truncate length should be greater than 0"); } this.useSignedStringMinMax = useSignedStringMinMax; this.statisticsTruncateLength = statisticsTruncateLength; + this.options = options; + } + + /** + * Gets the configured max message size for Thrift deserialization. + * Reads from ParquetReadOptions configuration, or returns -1 if not available. + * + * @return the max message size in bytes, or -1 to use the default + */ + private int getMaxMessageSize() { + if (options != null && options.getConfiguration() != null) { + return options.getConfiguration().getInt(PARQUET_THRIFT_STRING_SIZE_LIMIT, DEFAULT_MAX_MESSAGE_SIZE); + } + return -1; } // NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate @@ -1694,21 +1723,27 @@ public ParquetMetadata readParquetMetadata( filter.accept(new MetadataFilterVisitor() { @Override public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException { - FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + int maxMessageSize = getMaxMessageSize(); + FileMetaData fileMetadata = + readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize); return new FileMetaDataAndRowGroupOffsetInfo( fileMetadata, generateRowGroupOffsets(fileMetadata)); } @Override public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException { - FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD); + int maxMessageSize = getMaxMessageSize(); + FileMetaData fileMetadata = + readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD, maxMessageSize); return new FileMetaDataAndRowGroupOffsetInfo( fileMetadata, generateRowGroupOffsets(fileMetadata)); } @Override public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException { - FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + int maxMessageSize = getMaxMessageSize(); + FileMetaData fileMetadata = + readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize); // We must generate the map *before* filtering because it modifies `fileMetadata`. Map rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata); FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter); @@ -1717,7 +1752,9 @@ public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) thro @Override public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException { - FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + int maxMessageSize = getMaxMessageSize(); + FileMetaData fileMetadata = + readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize); // We must generate the map *before* filtering because it modifies `fileMetadata`. Map rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata); FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java new file mode 100644 index 0000000000..f9f121b998 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestParquetFileReaderMaxMessageSize { + + public static Path TEST_FILE; + public MessageType schema; + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + @Before + public void testSetup() throws IOException { + + File testParquetFile = temp.newFile(); + testParquetFile.delete(); + + TEST_FILE = new Path(testParquetFile.toURI()); + // Create a file with many columns + StringBuilder schemaBuilder = new StringBuilder("message test_schema {"); + for (int i = 0; i < 2000; i++) { + schemaBuilder.append("required int64 col_").append(i).append(";"); + } + schemaBuilder.append("}"); + + schema = MessageTypeParser.parseMessageType(schemaBuilder.toString()); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + try (ParquetWriter writer = ExampleParquetWriter.builder(HadoopOutputFile.fromPath(TEST_FILE, conf)) + .withConf(conf) + .withType(schema) + .build()) { + + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + Group group = factory.newGroup(); + for (int col = 0; col < 2000; col++) { + group.append("col_" + col, 1L); + } + writer.write(group); + } + } + + /** + * Test reading a file with many columns using custom max message size + */ + @Test + public void testReadFileWithManyColumns() throws IOException { + Configuration readConf = new Configuration(); + readConf.setInt("parquet.thrift.string.size.limit", 200 * 1024 * 1024); + + ParquetReadOptions options = HadoopReadOptions.builder(readConf).build(); + + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) { + + ParquetMetadata metadata = reader.getFooter(); + assertNotNull(metadata); + assertEquals(schema, metadata.getFileMetaData().getSchema()); + assertTrue(metadata.getBlocks().size() > 0); + } + } + + /** + * Test that default configuration works for normal files + */ + @Test + public void testReadNormalFileWithDefaultConfig() throws IOException { + // Read with default configuration (no custom max message size) + Configuration readConf = new Configuration(); + ParquetReadOptions options = HadoopReadOptions.builder(readConf).build(); + + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) { + + ParquetMetadata metadata = reader.getFooter(); + assertNotNull(metadata); + assertEquals(1, metadata.getBlocks().get(0).getRowCount()); + } + } + + /** + * Test that insufficient max message size produces error + */ + @Test + public void testInsufficientMaxMessageSizeError() throws IOException { + // Try to read with very small max message size + Configuration readConf = new Configuration(); + readConf.setInt("parquet.thrift.string.size.limit", 1); // Only 1 byte + + ParquetReadOptions options = HadoopReadOptions.builder(readConf).build(); + + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) { + fail("Should have thrown Message size exceeds limit due to MaxMessageSize"); + } catch (IOException e) { + e.printStackTrace(); + assertTrue( + "Error should mention TTransportException", + e.getMessage().contains("Message size exceeds limit") + || e.getCause().getMessage().contains("Message size exceeds limit") + || e.getMessage().contains("MaxMessageSize reached") + || e.getCause().getMessage().contains("MaxMessageSize reached")); + } + } +}