Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.File;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -81,15 +82,30 @@ public void registerTable(
public void generateData(final String tableName, final int number, final long timeGap)
throws IOException, WriteProcessException {
final List<IMeasurementSchema> schemas = table2MeasurementSchema.get(tableName);
final List<ColumnCategory> columnCategoryList = table2ColumnCategory.get(tableName);
int timeIndex = -1;
for (int i = 0; i < columnCategoryList.size(); ++i) {
if (columnCategoryList.get(i) == ColumnCategory.TIME) {
timeIndex = i;
break;
}
}
final List<IMeasurementSchema> schemaWithoutTime = new ArrayList<>(schemas);
final List<ColumnCategory> columnCategoriesWithoutTime = new ArrayList<>(columnCategoryList);
if (timeIndex > -1) {
schemaWithoutTime.remove(timeIndex);
columnCategoriesWithoutTime.remove(timeIndex);
}
final List<String> columnNameList =
schemas.stream().map(IMeasurementSchema::getMeasurementName).collect(Collectors.toList());
schemaWithoutTime.stream()
.map(IMeasurementSchema::getMeasurementName)
.collect(Collectors.toList());
final List<TSDataType> dataTypeList =
schemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
final List<ColumnCategory> columnCategoryList = table2ColumnCategory.get(tableName);
schemaWithoutTime.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
final TreeSet<Long> timeSet = table2TimeSet.get(tableName);
final Tablet tablet = new Tablet(tableName, columnNameList, dataTypeList, columnCategoryList);
final Object[] values = tablet.getValues();
final long sensorNum = schemas.size();
final Tablet tablet =
new Tablet(tableName, columnNameList, dataTypeList, columnCategoriesWithoutTime);
final long sensorNum = schemaWithoutTime.size();
long startTime = timeSet.isEmpty() ? 0L : timeSet.last();

for (long r = 0; r < number; r++) {
Expand All @@ -98,7 +114,7 @@ public void generateData(final String tableName, final int number, final long ti
tablet.addTimestamp(row, startTime);
timeSet.add(startTime);
for (int i = 0; i < sensorNum; i++) {
generateDataPoint(tablet, i, row, schemas.get(i));
generateDataPoint(tablet, i, row, schemaWithoutTime.get(i));
}
// write
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -116,14 +118,48 @@ private List<Pair<MeasurementSchema, MeasurementSchema>> generateMeasurementSche
return pairs;
}

private List<Pair<MeasurementSchema, MeasurementSchema>> generateMeasurementSchemasWithTime(
final int timeColumnIndex, final String timeColumnName) {
List<TSDataType> dataTypes =
new ArrayList<>(
Arrays.asList(
TSDataType.STRING,
TSDataType.TEXT,
TSDataType.BLOB,
TSDataType.TIMESTAMP,
TSDataType.BOOLEAN,
TSDataType.DATE,
TSDataType.DOUBLE,
TSDataType.FLOAT,
TSDataType.INT32,
TSDataType.INT64));
List<Pair<MeasurementSchema, MeasurementSchema>> pairs = new ArrayList<>();
for (TSDataType type : dataTypes) {
for (TSDataType dataType : dataTypes) {
String id = String.format("%s2%s", type.name(), dataType.name());
pairs.add(new Pair<>(new MeasurementSchema(id, type), new MeasurementSchema(id, dataType)));
}
}

if (timeColumnIndex >= 0) {
pairs.add(
timeColumnIndex,
new Pair<>(
new MeasurementSchema(timeColumnName, TSDataType.TIMESTAMP),
new MeasurementSchema(timeColumnName, TSDataType.TIMESTAMP)));
}

return pairs;
}

@Test
public void testLoadWithEmptyDatabaseForTableModel() throws Exception {
final int lineCount = 10000;

final List<Pair<MeasurementSchema, MeasurementSchema>> measurementSchemas =
generateMeasurementSchemas();
final List<ColumnCategory> columnCategories =
generateTabletColumnCategory(0, measurementSchemas.size());
generateTabletColumnCategory(measurementSchemas.size());

final File file = new File(tmpDir, "1-0-0-0.tsfile");

Expand Down Expand Up @@ -177,8 +213,7 @@ public void testLoadWithConvertOnTypeMismatchForTableModel() throws Exception {

List<Pair<MeasurementSchema, MeasurementSchema>> measurementSchemas =
generateMeasurementSchemas();
List<ColumnCategory> columnCategories =
generateTabletColumnCategory(0, measurementSchemas.size());
List<ColumnCategory> columnCategories = generateTabletColumnCategory(measurementSchemas.size());

final File file = new File(tmpDir, "1-0-0-0.tsfile");

Expand Down Expand Up @@ -219,8 +254,7 @@ public void testLoadWithTableMod() throws Exception {

List<Pair<MeasurementSchema, MeasurementSchema>> measurementSchemas =
generateMeasurementSchemas();
List<ColumnCategory> columnCategories =
generateTabletColumnCategory(0, measurementSchemas.size());
List<ColumnCategory> columnCategories = generateTabletColumnCategory(measurementSchemas.size());

final File file = new File(tmpDir, "1-0-0-0.tsfile");

Expand Down Expand Up @@ -260,14 +294,137 @@ public void testLoadWithTableMod() throws Exception {
}
}

private List<ColumnCategory> generateTabletColumnCategory(int tagNum, int filedNum) {
List<ColumnCategory> columnTypes = new ArrayList<>(tagNum + filedNum);
@Test
public void testLoadWithTimeColumn() throws Exception {
final int lineCount = 10000;

List<Pair<MeasurementSchema, MeasurementSchema>> measurementSchemas =
generateMeasurementSchemasWithTime(1, "time");
List<ColumnCategory> columnCategories =
generateTabletColumnCategory(0, measurementSchemas.size(), 1);

File file = new File(tmpDir, "1-0-0-0.tsfile");

List<MeasurementSchema> schemaList1 =
measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList());

try (final TsFileTableGenerator generator = new TsFileTableGenerator(file)) {
generator.registerTable(SchemaConfig.TABLE_0, new ArrayList<>(schemaList1), columnCategories);
generator.generateData(SchemaConfig.TABLE_0, lineCount, PARTITION_INTERVAL / 10_000);
}

testWithTimeColumn(lineCount, schemaList1, columnCategories, file);
testWithTimeColumn(lineCount, null, null, file);

measurementSchemas = generateMeasurementSchemasWithTime(2, "time");
columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), 2);
schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList());
testWithTimeColumn(lineCount, schemaList1, columnCategories, file);

measurementSchemas = generateMeasurementSchemasWithTime(-1, "time");
columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), -1);
schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList());
testWithTimeColumn(lineCount, schemaList1, columnCategories, file);

measurementSchemas = generateMeasurementSchemasWithTime(2, "time1");
columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), 2);
schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList());
testWithTimeColumn(lineCount, schemaList1, columnCategories, file);

file = new File(tmpDir, "2-0-0-0.tsfile");
try (final TsFileTableGenerator generator = new TsFileTableGenerator(file)) {
generator.registerTable(SchemaConfig.TABLE_0, new ArrayList<>(schemaList1), columnCategories);
generator.generateData(SchemaConfig.TABLE_0, lineCount, PARTITION_INTERVAL / 10_000);
}

measurementSchemas = generateMeasurementSchemasWithTime(2, "time");
columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), 2);
schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList());
testWithTimeColumn(lineCount, schemaList1, columnCategories, file);

measurementSchemas = generateMeasurementSchemasWithTime(1, "time1");
columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), 1);
schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList());
testWithTimeColumn(lineCount, schemaList1, columnCategories, file);

testWithTimeColumn(lineCount, null, null, file);

measurementSchemas = generateMeasurementSchemasWithTime(-1, "time");
columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), -1);
schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList());
testWithTimeColumn(lineCount, schemaList1, columnCategories, file);

file = new File(tmpDir, "3-0-0-0.tsfile");
try (final TsFileTableGenerator generator = new TsFileTableGenerator(file)) {
generator.registerTable(SchemaConfig.TABLE_0, new ArrayList<>(schemaList1), columnCategories);
generator.generateData(SchemaConfig.TABLE_0, lineCount, PARTITION_INTERVAL / 10_000);
}

measurementSchemas = generateMeasurementSchemasWithTime(2, "time");
columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), 2);
schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList());
testWithTimeColumn(lineCount, schemaList1, columnCategories, file);

measurementSchemas = generateMeasurementSchemasWithTime(1, "time1");
columnCategories = generateTabletColumnCategory(0, measurementSchemas.size(), 1);
schemaList1 = measurementSchemas.stream().map(pair -> pair.left).collect(Collectors.toList());
testWithTimeColumn(lineCount, schemaList1, columnCategories, file);

testWithTimeColumn(lineCount, null, null, file);
}

private void testWithTimeColumn(
final long lineCount,
final List<MeasurementSchema> schemaList1,
final List<ColumnCategory> columnCategories,
final File file)
throws Exception {
try (final Connection connection =
EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute(String.format("create database if not exists %s", SchemaConfig.DATABASE_0));
statement.execute(String.format("use %s", SchemaConfig.DATABASE_0));
if (Objects.nonNull(schemaList1)) {
statement.execute(convert2TableSQL(SchemaConfig.TABLE_0, schemaList1, columnCategories));
}
statement.execute(
String.format(
"load '%s' with ('database'='%s')", file.getAbsolutePath(), SchemaConfig.DATABASE_0));
try (final ResultSet resultSet =
statement.executeQuery(String.format("select count(*) from %s", SchemaConfig.TABLE_0))) {
if (resultSet.next()) {
Assert.assertEquals(lineCount, resultSet.getLong(1));
} else {
Assert.fail("This ResultSet is empty.");
}
}

try (final ResultSet resultSet = statement.executeQuery("show tables")) {
Assert.assertTrue(resultSet.next());
Assert.assertFalse(resultSet.next());
}

statement.execute(String.format("drop database %s", SchemaConfig.DATABASE_0));
}
}

private List<ColumnCategory> generateTabletColumnCategory(final int fieldNum) {
return generateTabletColumnCategory(0, fieldNum, -1);
}

private List<ColumnCategory> generateTabletColumnCategory(
int tagNum, int fieldNum, final int timeIndex) {
List<ColumnCategory> columnTypes =
new ArrayList<>(tagNum + fieldNum + (timeIndex >= 0 ? 1 : 0));
for (int i = 0; i < tagNum; i++) {
columnTypes.add(ColumnCategory.TAG);
}
for (int i = 0; i < filedNum; i++) {
for (int i = 0; i < fieldNum; i++) {
columnTypes.add(ColumnCategory.FIELD);
}
if (timeIndex >= 0) {
columnTypes.add(timeIndex, ColumnCategory.TIME);
}
return columnTypes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,9 @@ private void doAnalyzeSingleTableFile(
}

getOrCreateTableSchemaCache().flush();
if (getOrCreateTableSchemaCache().isNeedDecode4DifferentTimeColumn()) {
loadTsFileTableStatement.enableNeedDecode4TimeColumn();
}
getOrCreateTableSchemaCache().clearTagColumnMapper();

TimestampPrecisionUtils.checkTimestampPrecision(tsFileResource.getFileEndTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.iotdb.commons.schema.MemUsageUtil.computeStringMemUsage;
Expand Down Expand Up @@ -111,6 +112,7 @@ public class LoadTsFileTableSchemaCache {
private long currentTimeIndexMemoryUsageSizeInBytes = 0;

private int currentBatchDevicesCount = 0;
private final AtomicBoolean needDecode4DifferentTimeColumn = new AtomicBoolean(false);

public LoadTsFileTableSchemaCache(
final Metadata metadata, final MPPQueryContext context, final boolean needToCreateDatabase)
Expand Down Expand Up @@ -298,7 +300,10 @@ public void createTableAndDatabaseIfNecessary(final String tableName)
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema.fromTsFileTableSchema(
tableName, schema);
final TableSchema realSchema =
metadata.validateTableHeaderSchema(database, fileSchema, context, true, true).orElse(null);
metadata
.validateTableHeaderSchema4TsFile(
database, fileSchema, context, true, true, needDecode4DifferentTimeColumn)
.orElse(null);
if (Objects.isNull(realSchema)) {
throw new LoadAnalyzeException(
String.format(
Expand All @@ -308,6 +313,10 @@ public void createTableAndDatabaseIfNecessary(final String tableName)
verifyTableDataTypeAndGenerateTagColumnMapper(fileSchema, realSchema);
}

public boolean isNeedDecode4DifferentTimeColumn() {
return needDecode4DifferentTimeColumn.get();
}

private void autoCreateTableDatabaseIfAbsent(final String database) throws LoadAnalyzeException {
validateDatabaseName(database);
if (DataNodeTableCache.getInstance().isDatabaseExist(database)) {
Expand Down Expand Up @@ -449,6 +458,7 @@ public void close() {

currentBatchTable2Devices = null;
tableTagColumnMapper = null;
needDecode4DifferentTimeColumn.set(false);
}

private void clearDevices() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,8 @@ public PlanNode visitLoadFile(
context.getQueryId().genPlanNodeId(),
loadTsFileStatement.getResources(),
isTableModel,
loadTsFileStatement.getDatabase());
loadTsFileStatement.getDatabase(),
false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,19 +66,21 @@ public class LoadSingleTsFileNode extends WritePlanNode {
private TRegionReplicaSet localRegionReplicaSet;

public LoadSingleTsFileNode(
PlanNodeId id,
TsFileResource resource,
boolean isTableModel,
String database,
boolean deleteAfterLoad,
long writePointCount) {
final PlanNodeId id,
final TsFileResource resource,
final boolean isTableModel,
final String database,
final boolean deleteAfterLoad,
final long writePointCount,
final boolean needDecodeTsFile) {
super(id);
this.tsFile = resource.getTsFile();
this.resource = resource;
this.isTableModel = isTableModel;
this.database = database;
this.deleteAfterLoad = deleteAfterLoad;
this.writePointCount = writePointCount;
this.needDecodeTsFile = needDecodeTsFile;
}

public boolean isTsFileEmpty() {
Expand All @@ -89,6 +91,10 @@ public boolean isTsFileEmpty() {
public boolean needDecodeTsFile(
Function<List<Pair<IDeviceID, TTimePartitionSlot>>, List<TRegionReplicaSet>>
partitionFetcher) {
if (needDecodeTsFile) {
return true;
}

List<Pair<IDeviceID, TTimePartitionSlot>> slotList = new ArrayList<>();
resource
.getDevices()
Expand Down
Loading