diff --git a/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileTableGenerator.java b/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileTableGenerator.java index 7c2bd1623deb8..613ea8bd15984 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileTableGenerator.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileTableGenerator.java @@ -37,6 +37,7 @@ import java.io.File; import java.io.IOException; import java.time.LocalDate; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -81,15 +82,30 @@ public void registerTable( public void generateData(final String tableName, final int number, final long timeGap) throws IOException, WriteProcessException { final List schemas = table2MeasurementSchema.get(tableName); + final List columnCategoryList = table2ColumnCategory.get(tableName); + int timeIndex = -1; + for (int i = 0; i < columnCategoryList.size(); ++i) { + if (columnCategoryList.get(i) == ColumnCategory.TIME) { + timeIndex = i; + break; + } + } + final List schemaWithoutTime = new ArrayList<>(schemas); + final List columnCategoriesWithoutTime = new ArrayList<>(columnCategoryList); + if (timeIndex > -1) { + schemaWithoutTime.remove(timeIndex); + columnCategoriesWithoutTime.remove(timeIndex); + } final List columnNameList = - schemas.stream().map(IMeasurementSchema::getMeasurementName).collect(Collectors.toList()); + schemaWithoutTime.stream() + .map(IMeasurementSchema::getMeasurementName) + .collect(Collectors.toList()); final List dataTypeList = - schemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); - final List columnCategoryList = table2ColumnCategory.get(tableName); + schemaWithoutTime.stream().map(IMeasurementSchema::getType).collect(Collectors.toList()); final TreeSet timeSet = table2TimeSet.get(tableName); - final Tablet tablet = new Tablet(tableName, columnNameList, dataTypeList, columnCategoryList); - final Object[] values = tablet.getValues(); - final long sensorNum = schemas.size(); + final Tablet tablet = + new Tablet(tableName, columnNameList, dataTypeList, columnCategoriesWithoutTime); + final long sensorNum = schemaWithoutTime.size(); long startTime = timeSet.isEmpty() ? 0L : timeSet.last(); for (long r = 0; r < number; r++) { @@ -98,7 +114,7 @@ public void generateData(final String tableName, final int number, final long ti tablet.addTimestamp(row, startTime); timeSet.add(startTime); for (int i = 0; i < sensorNum; i++) { - generateDataPoint(tablet, i, row, schemas.get(i)); + generateDataPoint(tablet, i, row, schemaWithoutTime.get(i)); } // write if (tablet.getRowSize() == tablet.getMaxRowNumber()) { diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java index 5f88d50c9d90c..d14c089681640 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java @@ -47,7 +47,9 @@ import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -116,6 +118,40 @@ private List> generateMeasurementSche return pairs; } + private List> generateMeasurementSchemasWithTime( + final int timeColumnIndex, final String timeColumnName) { + List dataTypes = + new ArrayList<>( + Arrays.asList( + TSDataType.STRING, + TSDataType.TEXT, + TSDataType.BLOB, + TSDataType.TIMESTAMP, + TSDataType.BOOLEAN, + TSDataType.DATE, + TSDataType.DOUBLE, + TSDataType.FLOAT, + TSDataType.INT32, + TSDataType.INT64)); + List> pairs = new ArrayList<>(); + for (TSDataType type : dataTypes) { + for (TSDataType dataType : dataTypes) { + String id = String.format("%s2%s", type.name(), dataType.name()); + pairs.add(new Pair<>(new MeasurementSchema(id, type), new MeasurementSchema(id, dataType))); + } + } + + if (timeColumnIndex >= 0) { + pairs.add( + timeColumnIndex, + new Pair<>( + new MeasurementSchema(timeColumnName, TSDataType.TIMESTAMP), + new MeasurementSchema(timeColumnName, TSDataType.TIMESTAMP))); + } + + return pairs; + } + @Test public void testLoadWithEmptyDatabaseForTableModel() throws Exception { final int lineCount = 10000; @@ -123,7 +159,7 @@ public void testLoadWithEmptyDatabaseForTableModel() throws Exception { final List> measurementSchemas = generateMeasurementSchemas(); final List columnCategories = - generateTabletColumnCategory(0, measurementSchemas.size()); + generateTabletColumnCategory(measurementSchemas.size()); final File file = new File(tmpDir, "1-0-0-0.tsfile"); @@ -177,8 +213,7 @@ public void testLoadWithConvertOnTypeMismatchForTableModel() throws Exception { List> measurementSchemas = generateMeasurementSchemas(); - List columnCategories = - generateTabletColumnCategory(0, measurementSchemas.size()); + List columnCategories = generateTabletColumnCategory(measurementSchemas.size()); final File file = new File(tmpDir, "1-0-0-0.tsfile"); @@ -219,8 +254,7 @@ public void testLoadWithTableMod() throws Exception { List> measurementSchemas = generateMeasurementSchemas(); - List columnCategories = - generateTabletColumnCategory(0, measurementSchemas.size()); + List columnCategories = generateTabletColumnCategory(measurementSchemas.size()); final File file = new File(tmpDir, "1-0-0-0.tsfile"); @@ -260,14 +294,137 @@ public void testLoadWithTableMod() throws Exception { } } - private List generateTabletColumnCategory(int tagNum, int filedNum) { - List columnTypes = new ArrayList<>(tagNum + filedNum); + @Test + public void testLoadWithTimeColumn() throws Exception { + final int lineCount = 10000; + + List> measurementSchemas = + generateMeasurementSchemasWithTime(1, "time"); + List columnCategories = + generateTabletColumnCategory(0, measurementSchemas.size(), 1); + + File file = new File(tmpDir, "1-0-0-0.tsfile"); + + List schemaList1 = + measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList()); + + try (final TsFileTableGenerator generator = new TsFileTableGenerator(file)) { + generator.registerTable(SchemaConfig.TABLE_0, new ArrayList<>(schemaList1), columnCategories); + generator.generateData(SchemaConfig.TABLE_0, lineCount, PARTITION_INTERVAL / 10_000); + } + + testWithTimeColumn(lineCount, schemaList1, columnCategories, file); + testWithTimeColumn(lineCount, null, null, file); + + measurementSchemas = generateMeasurementSchemasWithTime(2, "time"); + columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), 2); + schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList()); + testWithTimeColumn(lineCount, schemaList1, columnCategories, file); + + measurementSchemas = generateMeasurementSchemasWithTime(-1, "time"); + columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), -1); + schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList()); + testWithTimeColumn(lineCount, schemaList1, columnCategories, file); + + measurementSchemas = generateMeasurementSchemasWithTime(2, "time1"); + columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), 2); + schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList()); + testWithTimeColumn(lineCount, schemaList1, columnCategories, file); + + file = new File(tmpDir, "2-0-0-0.tsfile"); + try (final TsFileTableGenerator generator = new TsFileTableGenerator(file)) { + generator.registerTable(SchemaConfig.TABLE_0, new ArrayList<>(schemaList1), columnCategories); + generator.generateData(SchemaConfig.TABLE_0, lineCount, PARTITION_INTERVAL / 10_000); + } + + measurementSchemas = generateMeasurementSchemasWithTime(2, "time"); + columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), 2); + schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList()); + testWithTimeColumn(lineCount, schemaList1, columnCategories, file); + + measurementSchemas = generateMeasurementSchemasWithTime(1, "time1"); + columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), 1); + schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList()); + testWithTimeColumn(lineCount, schemaList1, columnCategories, file); + + testWithTimeColumn(lineCount, null, null, file); + + measurementSchemas = generateMeasurementSchemasWithTime(-1, "time"); + columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), -1); + schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList()); + testWithTimeColumn(lineCount, schemaList1, columnCategories, file); + + file = new File(tmpDir, "3-0-0-0.tsfile"); + try (final TsFileTableGenerator generator = new TsFileTableGenerator(file)) { + generator.registerTable(SchemaConfig.TABLE_0, new ArrayList<>(schemaList1), columnCategories); + generator.generateData(SchemaConfig.TABLE_0, lineCount, PARTITION_INTERVAL / 10_000); + } + + measurementSchemas = generateMeasurementSchemasWithTime(2, "time"); + columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), 2); + schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList()); + testWithTimeColumn(lineCount, schemaList1, columnCategories, file); + + measurementSchemas = generateMeasurementSchemasWithTime(1, "time1"); + columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), 1); + schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList()); + testWithTimeColumn(lineCount, schemaList1, columnCategories, file); + + testWithTimeColumn(lineCount, null, null, file); + } + + private void testWithTimeColumn( + final long lineCount, + final List schemaList1, + final List columnCategories, + final File file) + throws Exception { + try (final Connection connection = + EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement statement = connection.createStatement()) { + statement.execute(String.format("create database if not exists %s", SchemaConfig.DATABASE_0)); + statement.execute(String.format("use %s", SchemaConfig.DATABASE_0)); + if (Objects.nonNull(schemaList1)) { + statement.execute(convert2TableSQL(SchemaConfig.TABLE_0, schemaList1, columnCategories)); + } + statement.execute( + String.format( + "load '%s' with ('database'='%s')", file.getAbsolutePath(), SchemaConfig.DATABASE_0)); + try (final ResultSet resultSet = + statement.executeQuery(String.format("select count(*) from %s", SchemaConfig.TABLE_0))) { + if (resultSet.next()) { + Assert.assertEquals(lineCount, resultSet.getLong(1)); + } else { + Assert.fail("This ResultSet is empty."); + } + } + + try (final ResultSet resultSet = statement.executeQuery("show tables")) { + Assert.assertTrue(resultSet.next()); + Assert.assertFalse(resultSet.next()); + } + + statement.execute(String.format("drop database %s", SchemaConfig.DATABASE_0)); + } + } + + private List generateTabletColumnCategory(final int fieldNum) { + return generateTabletColumnCategory(0, fieldNum, -1); + } + + private List generateTabletColumnCategory( + int tagNum, int fieldNum, final int timeIndex) { + List columnTypes = + new ArrayList<>(tagNum + fieldNum + (timeIndex >= 0 ? 1 : 0)); for (int i = 0; i < tagNum; i++) { columnTypes.add(ColumnCategory.TAG); } - for (int i = 0; i < filedNum; i++) { + for (int i = 0; i < fieldNum; i++) { columnTypes.add(ColumnCategory.FIELD); } + if (timeIndex >= 0) { + columnTypes.add(timeIndex, ColumnCategory.TIME); + } return columnTypes; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 8202de5c49947..1bcb6a26426ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -573,6 +573,9 @@ private void doAnalyzeSingleTableFile( } getOrCreateTableSchemaCache().flush(); + if (getOrCreateTableSchemaCache().isNeedDecode4DifferentTimeColumn()) { + loadTsFileTableStatement.enableNeedDecode4TimeColumn(); + } getOrCreateTableSchemaCache().clearTagColumnMapper(); TimestampPrecisionUtils.checkTimestampPrecision(tsFileResource.getFileEndTime()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java index 690a18dbd46ec..79ede0f459e58 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java @@ -68,6 +68,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.iotdb.commons.schema.MemUsageUtil.computeStringMemUsage; @@ -111,6 +112,7 @@ public class LoadTsFileTableSchemaCache { private long currentTimeIndexMemoryUsageSizeInBytes = 0; private int currentBatchDevicesCount = 0; + private final AtomicBoolean needDecode4DifferentTimeColumn = new AtomicBoolean(false); public LoadTsFileTableSchemaCache( final Metadata metadata, final MPPQueryContext context, final boolean needToCreateDatabase) @@ -298,7 +300,10 @@ public void createTableAndDatabaseIfNecessary(final String tableName) org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema.fromTsFileTableSchema( tableName, schema); final TableSchema realSchema = - metadata.validateTableHeaderSchema(database, fileSchema, context, true, true).orElse(null); + metadata + .validateTableHeaderSchema4TsFile( + database, fileSchema, context, true, true, needDecode4DifferentTimeColumn) + .orElse(null); if (Objects.isNull(realSchema)) { throw new LoadAnalyzeException( String.format( @@ -308,6 +313,10 @@ public void createTableAndDatabaseIfNecessary(final String tableName) verifyTableDataTypeAndGenerateTagColumnMapper(fileSchema, realSchema); } + public boolean isNeedDecode4DifferentTimeColumn() { + return needDecode4DifferentTimeColumn.get(); + } + private void autoCreateTableDatabaseIfAbsent(final String database) throws LoadAnalyzeException { validateDatabaseName(database); if (DataNodeTableCache.getInstance().isDatabaseExist(database)) { @@ -449,6 +458,7 @@ public void close() { currentBatchTable2Devices = null; tableTagColumnMapper = null; + needDecode4DifferentTimeColumn.set(false); } private void clearDevices() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index 2274869c35543..a36564878df23 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -548,7 +548,8 @@ public PlanNode visitLoadFile( context.getQueryId().genPlanNodeId(), loadTsFileStatement.getResources(), isTableModel, - loadTsFileStatement.getDatabase()); + loadTsFileStatement.getDatabase(), + false); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java index 604fda6e1e8d5..c8170a4880a08 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java @@ -66,12 +66,13 @@ public class LoadSingleTsFileNode extends WritePlanNode { private TRegionReplicaSet localRegionReplicaSet; public LoadSingleTsFileNode( - PlanNodeId id, - TsFileResource resource, - boolean isTableModel, - String database, - boolean deleteAfterLoad, - long writePointCount) { + final PlanNodeId id, + final TsFileResource resource, + final boolean isTableModel, + final String database, + final boolean deleteAfterLoad, + final long writePointCount, + final boolean needDecodeTsFile) { super(id); this.tsFile = resource.getTsFile(); this.resource = resource; @@ -79,6 +80,7 @@ public LoadSingleTsFileNode( this.database = database; this.deleteAfterLoad = deleteAfterLoad; this.writePointCount = writePointCount; + this.needDecodeTsFile = needDecodeTsFile; } public boolean isTsFileEmpty() { @@ -89,6 +91,10 @@ public boolean isTsFileEmpty() { public boolean needDecodeTsFile( Function>, List> partitionFetcher) { + if (needDecodeTsFile) { + return true; + } + List> slotList = new ArrayList<>(); resource .getDevices() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java index 3588b6ddbb052..25ad9f3a5c7fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java @@ -46,13 +46,19 @@ public class LoadTsFileNode extends WritePlanNode { private final List resources; private final List isTableModel; private final String database; + private final boolean needDecode4TimeColumn; public LoadTsFileNode( - PlanNodeId id, List resources, List isTableModel, String database) { + final PlanNodeId id, + final List resources, + final List isTableModel, + final String database, + final boolean needDecode4TimeColumn) { super(id); this.resources = resources; this.isTableModel = isTableModel; this.database = database; + this.needDecode4TimeColumn = needDecode4TimeColumn; } @Override @@ -121,7 +127,8 @@ private List splitByPartitionForTreeModel(Analysis analysis) { isTableModel.get(i), database, statement.isDeleteAfterLoad(), - statement.getWritePointCount(i))); + statement.getWritePointCount(i), + false)); } return res; } @@ -143,7 +150,8 @@ private List splitByPartitionForTableModel( isTableModel.get(i), database, statement.isDeleteAfterLoad(), - statement.getWritePointCount(i))); + statement.getWritePointCount(i), + needDecode4TimeColumn)); } } return res; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java index db706d4980cba..82ee332361409 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; // All the input databases shall not contain "root" public interface Metadata { @@ -102,28 +103,31 @@ Map> indexScan( *

When table or column is missing, this method will execute auto creation if the user have * corresponding authority. * - *

When using SQL, the columnSchemaList could be null and there won't be any validation. + *

When using SQL, the columnSchemaList could be {@code null} and there won't be any + * validation. * - *

When the input dataType or category of one column is null, the column won't be auto created. + *

When the input dataType or category of one column is {@code null}, the column won't be auto + * created. * *

The caller need to recheck the dataType of measurement columns to decide whether to do * partial insert * - * @param isStrictIdColumn if true, when the table already exists, the id columns in the existing - * table should be the prefix of those in the input tableSchema, or input id columns be the - * prefix of existing id columns. + * @param isStrictTagColumn if {@code true}, when the table already exists, the tag columns in the + * existing table should be the prefix of those in the input tableSchema, or input tag columns + * be the prefix of existing tag columns. * @return If table doesn't exist and the user have no authority to create table, Optional.empty() * will be returned. The returned table may not include all the columns * in @param{tableSchema}, if the user have no authority to alter table. - * @throws SemanticException if column category mismatch or data types of id or attribute column - * are not STRING or Category, Type of any missing ColumnSchema is null + * @throws SemanticException if column category mismatch or data types of tag or attribute column + * are not STRING or Category, Type of any missing ColumnSchema is {@code null} */ - Optional validateTableHeaderSchema( + Optional validateTableHeaderSchema4TsFile( final String database, final TableSchema tableSchema, final MPPQueryContext context, final boolean allowCreateTable, - final boolean isStrictIdColumn) + final boolean isStrictTagColumn, + final AtomicBoolean needDecode4DifferentTimeColumn) throws LoadAnalyzeTableColumnDisorderException; void validateInsertNodeMeasurements( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java index 5df442d4d321f..ed0be5b10d02c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java @@ -74,6 +74,7 @@ import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.transformation.dag.column.FailFunctionColumnTransformer.FAIL_FUNCTION_NAME; @@ -1396,16 +1397,22 @@ public Map> indexScan( } @Override - public Optional validateTableHeaderSchema( - String database, - TableSchema tableSchema, - MPPQueryContext context, - boolean allowCreateTable, - boolean isStrictTagColumn) + public Optional validateTableHeaderSchema4TsFile( + final String database, + final TableSchema tableSchema, + final MPPQueryContext context, + final boolean allowCreateTable, + final boolean isStrictTagColumn, + final AtomicBoolean needDecode4DifferentTimeColumn) throws LoadAnalyzeTableColumnDisorderException { return TableHeaderSchemaValidator.getInstance() - .validateTableHeaderSchema( - database, tableSchema, context, allowCreateTable, isStrictTagColumn); + .validateTableHeaderSchema4TsFile( + database, + tableSchema, + context, + allowCreateTable, + isStrictTagColumn, + needDecode4DifferentTimeColumn); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java index 2d629cb26ee93..8becef3efb114 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableHeaderSchemaValidator.java @@ -60,6 +60,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; @@ -68,8 +69,10 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import static org.apache.iotdb.commons.schema.table.TsTable.TIME_COLUMN_NAME; import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; @@ -93,12 +96,13 @@ public static TableHeaderSchemaValidator getInstance() { return TableHeaderSchemaValidatorHolder.INSTANCE; } - public Optional validateTableHeaderSchema( + public Optional validateTableHeaderSchema4TsFile( final String database, final TableSchema tableSchema, final MPPQueryContext context, final boolean allowCreateTable, - final boolean isStrictTagColumn) + final boolean isStrictTagColumn, + final @Nonnull AtomicBoolean needDecode4DifferentTimeColumn) throws LoadAnalyzeTableColumnDisorderException { // The schema cache R/W and fetch operation must be locked together thus the cache clean // operation executed by delete timeSeries will be effective. @@ -136,8 +140,7 @@ public Optional validateTableHeaderSchema( } else { DataNodeTreeViewSchemaUtils.checkTableInWrite(database, table); // If table with this name already exists and isStrictTagColumn is true, make sure the - // existing - // id columns are the prefix of the incoming tag columns, or vice versa + // existing tag columns are a prefix of the incoming tag columns, or vice versa if (isStrictTagColumn) { final List realTagColumns = table.getTagColumnSchemaList(); final List incomingTagColumns = tableSchema.getTagColumns(); @@ -173,6 +176,32 @@ public Optional validateTableHeaderSchema( } } } + long realTimeIndex = 0; + boolean realWithoutTimeColumn = true; + + for (final TsTableColumnSchema schema : table.getColumnList()) { + if (schema.getColumnCategory() == TsTableColumnCategory.TIME) { + realWithoutTimeColumn = false; + break; + } + if (schema.getColumnCategory() != TsTableColumnCategory.ATTRIBUTE) { + ++realTimeIndex; + } + } + + long inputTimeIndex = 0; + boolean inputWithoutTimeColumn = true; + for (final ColumnSchema schema : tableSchema.getColumns()) { + if (schema.getColumnCategory() == TsTableColumnCategory.TIME) { + inputWithoutTimeColumn = false; + break; + } + ++inputTimeIndex; + } + if (inputWithoutTimeColumn != realWithoutTimeColumn + || !inputWithoutTimeColumn && inputTimeIndex != realTimeIndex) { + needDecode4DifferentTimeColumn.set(true); + } } boolean refreshed = false; @@ -212,7 +241,7 @@ public Optional validateTableHeaderSchema( noField = false; } } else { - // leave measurement columns' dataType checking to the caller, then the caller can decide + // leave field columns' dataType checking to the caller, then the caller can decide // whether to do partial insert // only check column category @@ -234,7 +263,7 @@ public Optional validateTableHeaderSchema( final List resultColumnList = new ArrayList<>(); if (!missingColumnList.isEmpty() && isAutoCreateSchemaEnabled) { // TODO table metadata: authority check for table alter - // check id or attribute column data type in this method + // check tag or attribute column data type in this method autoCreateColumn(database, tableSchema.getTableName(), missingColumnList, context); table = DataNodeTableCache.getInstance().getTable(database, tableSchema.getTableName()); } else if (!missingColumnList.isEmpty() @@ -572,12 +601,10 @@ private void autoCreateColumnsFromMeasurements( final ListenableFuture future = task.execute(configTaskExecutor); final ConfigTaskResult result = future.get(); if (result.getStatusCode().getStatusCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - throw new RuntimeException( - new IoTDBException( - String.format( - "Auto add table column failed: %s.%s", - database, measurementInfo.getTableName()), - result.getStatusCode().getStatusCode())); + throw new IoTDBRuntimeException( + String.format( + "Auto add table column failed: %s.%s", database, measurementInfo.getTableName()), + result.getStatusCode().getStatusCode()); } DataNodeSchemaLockManager.getInstance() .takeReadLock(context, SchemaLockType.VALIDATE_VS_DELETION_TABLE); @@ -676,6 +703,21 @@ public TsTable toTsTable(InsertNodeMeasurementInfo measurementInfo) { } private void addColumnSchema(final List columnSchemas, final TsTable tsTable) { + // check if the time column has been specified + long timeColumnCount = + columnSchemas.stream() + .filter( + columnDefinition -> + columnDefinition.getColumnCategory() == TsTableColumnCategory.TIME) + .count(); + if (timeColumnCount > 1) { + throw new SemanticException("A table cannot have more than one time column"); + } + if (timeColumnCount == 0) { + // append the time column with default name "time" if user do not specify the time column + tsTable.addColumnSchema(new TimeColumnSchema(TIME_COLUMN_NAME, TSDataType.TIMESTAMP)); + } + for (final ColumnSchema columnSchema : columnSchemas) { TsTableColumnCategory category = columnSchema.getColumnCategory(); if (category == null) { @@ -813,10 +855,8 @@ private List parseInputColumnSchema( TSFileDescriptor.getInstance().getConfig().getCompressor(dataType))); break; case TIME: - throw new SemanticException( - "Adding column for column category " - + inputColumn.getColumnCategory() - + " is not supported"); + columnSchemaList.add(new TimeColumnSchema(inputColumn.getName(), TSDataType.TIMESTAMP)); + break; default: throw new IllegalStateException( "Unknown ColumnCategory for adding column: " + inputColumn.getColumnCategory()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index d442d51a21418..ded699588c1c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -1388,7 +1388,11 @@ protected RelationPlan visitLoadTsFile(final LoadTsFile node, final Void context } return new RelationPlan( new LoadTsFileNode( - idAllocator.genPlanNodeId(), node.getResources(), isTableModel, node.getDatabase()), + idAllocator.genPlanNodeId(), + node.getResources(), + isTableModel, + node.getDatabase(), + node.isNeedDecode4TimeColumn()), analysis.getRootScope(), Collections.emptyList(), outerContext); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java index 9b7cd372ee940..8deb97c2e5af6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java @@ -58,12 +58,13 @@ public class LoadTsFile extends Statement { private boolean isGeneratedByPipe = false; - private Map loadAttributes; + private final Map loadAttributes; private List tsFiles; private List resources; private List writePointCountList; private List isTableModel; + private boolean needDecode4TimeColumn; public LoadTsFile(NodeLocation location, String filePath, Map loadAttributes) { super(location); @@ -167,6 +168,14 @@ public void setIsTableModel(List isTableModel) { this.isTableModel = isTableModel; } + public boolean isNeedDecode4TimeColumn() { + return needDecode4TimeColumn; + } + + public void enableNeedDecode4TimeColumn() { + this.needDecode4TimeColumn = true; + } + public List getTsFiles() { return tsFiles; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index 082106bb5fd64..83372a5a6c0b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -32,6 +32,7 @@ import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -92,9 +93,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.iotdb.db.utils.constant.SqlConstant.ROOT; -import static org.apache.iotdb.db.utils.constant.SqlConstant.TREE_MODEL_DATABASE_PREFIX; - /** * {@link LoadTsFileManager} is used for dealing with {@link LoadTsFilePieceNode} and {@link * LoadCommand}. This class turn the content of a piece of loading TsFile into a new TsFile. When @@ -489,7 +487,7 @@ private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) final String tableName = chunkData.getDevice() != null ? chunkData.getDevice().getTableName() : null; if (tableName != null - && !(tableName.startsWith(TREE_MODEL_DATABASE_PREFIX) || tableName.equals(ROOT))) { + && PathUtils.isTableModelDatabase(partitionInfo.getDataRegion().getDatabaseName())) { // If the table does not exist, it means that the table is all deleted by mods final TsTable table = DataNodeTableCache.getInstance() diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java index e425c709815c3..a22ab8f6739ae 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java @@ -41,7 +41,7 @@ public void testLoadSingleTsFileNode() { TsFileResource resource = new TsFileResource(new File("1")); String database = "root.db"; LoadSingleTsFileNode node = - new LoadSingleTsFileNode(new PlanNodeId(""), resource, false, database, true, 0L); + new LoadSingleTsFileNode(new PlanNodeId(""), resource, false, database, true, 0L, false); Assert.assertTrue(node.isDeleteAfterLoad()); Assert.assertEquals(resource, node.getTsFileResource()); Assert.assertEquals(database, node.getDatabase()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java index 6666c180c5e04..36bed6932a877 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java @@ -96,6 +96,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -1058,12 +1059,13 @@ private Metadata mockMetadataForInsertion() { DataNodeTableCache.getInstance().commitUpdateTable(database, table, null); return new TestMetadata() { @Override - public Optional validateTableHeaderSchema( + public Optional validateTableHeaderSchema4TsFile( String database, TableSchema schema, MPPQueryContext context, boolean allowCreateTable, - boolean isStrictIdColumn) { + boolean isStrictTagColumn, + final AtomicBoolean needDecode4DifferentTimeColumn) { TableSchema tableSchema = StatementTestUtils.genTableSchema(); assertEquals(tableSchema, schema); return Optional.of(tableSchema); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TSBSMetadata.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TSBSMetadata.java index 79c031560973a..49f23e0358c59 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TSBSMetadata.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TSBSMetadata.java @@ -64,6 +64,7 @@ import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTSBSDataPartition.T1_DEVICE_1; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTSBSDataPartition.T1_DEVICE_2; @@ -342,12 +343,13 @@ public Map> indexScan( } @Override - public Optional validateTableHeaderSchema( + public Optional validateTableHeaderSchema4TsFile( String database, TableSchema tableSchema, MPPQueryContext context, boolean allowCreateTable, - boolean isStrictIdColumn) { + boolean isStrictTagColumn, + final AtomicBoolean needDecode4DifferentTimeColumn) { throw new UnsupportedOperationException(); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMetadata.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMetadata.java index 4b1d18944b732..722212b78b78a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMetadata.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMetadata.java @@ -80,6 +80,7 @@ import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.apache.iotdb.commons.schema.table.InformationSchema.INFORMATION_DATABASE; @@ -481,12 +482,13 @@ private boolean compareNotEqualsMatch( } @Override - public Optional validateTableHeaderSchema( + public Optional validateTableHeaderSchema4TsFile( final String database, final TableSchema tableSchema, final MPPQueryContext context, final boolean allowCreateTable, - final boolean isStrictIdColumn) { + final boolean isStrictTagColumn, + final AtomicBoolean needDecode4DifferentTimeColumn) { throw new UnsupportedOperationException(); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/InsertStatementTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/InsertStatementTest.java index 81a9d88d3d3c4..dbbadd1be91fb 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/InsertStatementTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/InsertStatementTest.java @@ -51,6 +51,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertThrows; @@ -100,12 +101,13 @@ public void setUp() throws Exception { DataNodeTableCache.getInstance().preUpdateTable("test", tsTable, null); DataNodeTableCache.getInstance().commitUpdateTable("test", "table1", null); - when(metadata.validateTableHeaderSchema( + when(metadata.validateTableHeaderSchema4TsFile( any(String.class), any(TableSchema.class), any(MPPQueryContext.class), any(Boolean.class), - any(Boolean.class))) + any(Boolean.class), + any(AtomicBoolean.class))) .thenReturn(Optional.of(tableSchema)); doAnswer( @@ -252,12 +254,13 @@ public void testConflictCategory() { DataNodeTableCache.getInstance().preUpdateTable("test", tsTable, null); DataNodeTableCache.getInstance().commitUpdateTable("test", "table1", null); - when(metadata.validateTableHeaderSchema( + when(metadata.validateTableHeaderSchema4TsFile( any(String.class), any(TableSchema.class), any(MPPQueryContext.class), any(Boolean.class), - any(Boolean.class))) + any(Boolean.class), + any(AtomicBoolean.class))) .thenReturn(Optional.of(tableSchema)); assertThrows( @@ -287,12 +290,13 @@ public void testMissingIdColumn() { DataNodeTableCache.getInstance().preUpdateTable("test", tsTable, null); DataNodeTableCache.getInstance().commitUpdateTable("test", "table1", null); - when(metadata.validateTableHeaderSchema( + when(metadata.validateTableHeaderSchema4TsFile( any(String.class), any(TableSchema.class), any(MPPQueryContext.class), any(Boolean.class), - any(Boolean.class))) + any(Boolean.class), + any(AtomicBoolean.class))) .thenReturn(Optional.of(tableSchema)); assertThrows( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java index 61b00d060bd63..df3bbc045e58f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsTable.java @@ -69,7 +69,7 @@ public class TsTable { "When there are object fields, the %s %s shall not be '.', '..' or contain './', '.\\'."; protected String tableName; - private Map columnSchemaMap = new LinkedHashMap<>(); + private final Map columnSchemaMap = new LinkedHashMap<>(); private final Map tagColumnIndexMap = new HashMap<>(); private final Map idColumnIndexMap = new HashMap<>();