diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBUserDefinedTimeIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBUserDefinedTimeIT.java new file mode 100644 index 000000000000..60cb94d5bba3 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBUserDefinedTimeIT.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.schema; + +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.itbase.env.BaseEnv; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; + +import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBUserDefinedTimeIT { + + private static final String TABLE_DATABASE = "user_defined_time"; + private static final String VIEW_DATABASE = "user_defined_time_for_view"; + private static final String SELECT_DATABASE = "select_agg_function"; + private static final String[] SQLS = + new String[] { + "CREATE DATABASE " + TABLE_DATABASE, + "CREATE DATABASE " + VIEW_DATABASE, + "CREATE DATABASE " + SELECT_DATABASE + }; + private final String header = "ColumnName,DataType,Category,"; + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().initClusterEnvironment(); + prepareTableData(SQLS); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testCreateTable() { + try (final Connection connection = + EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement statement = connection.createStatement()) { + statement.execute("use " + TABLE_DATABASE); + + // create table and do not assign the time column name + try { + statement.execute( + "create table default_not_assign_time(device string tag, s1 int32 field)"); + TestUtils.assertResultSetEqual( + statement.executeQuery("describe default_not_assign_time"), + header, + Arrays.asList("time,TIMESTAMP,TIME,", "device,STRING,TAG,", "s1,INT32,FIELD,")); + } catch (SQLException e) { + fail("create table without time info fails, the specific message: " + e.getMessage()); + } + + // create table and assign the time column name + try { + statement.execute( + "create table time_in_first(date_time timestamp time, device string tag, s1 int32 field)"); + TestUtils.assertResultSetEqual( + statement.executeQuery("describe time_in_first"), + header, + Arrays.asList("date_time,TIMESTAMP,TIME,", "device,STRING,TAG,", "s1,INT32,FIELD,")); + } catch (SQLException e) { + fail("assign the name of time column fails, the specific message: " + e.getMessage()); + } + + // create table which of the time column not at the first column + try { + statement.execute( + "create table time_not_in_first(device string tag, date_time timestamp time, s1 int32 field)"); + TestUtils.assertResultSetEqual( + statement.executeQuery("describe time_not_in_first"), + header, + Arrays.asList("device,STRING,TAG,", "date_time,TIMESTAMP,TIME,", "s1,INT32,FIELD,")); + } catch (SQLException e) { + fail("assign the name of time column fails, the specific message: " + e.getMessage()); + } + + // create table with multi time-column + try { + statement.execute( + "create table with_multi_time(time_type timestamp time, device string tag, date_time timestamp time, s1 int32 field)"); + fail("Creating table is not be allowed to assign two time columns"); + } catch (SQLException e) { + assertEquals("701: A table cannot have more than one time column", e.getMessage()); + } + + // create table with time column that is not timestamp data type + try { + statement.execute( + "create table time_other_type(device string tag, date_time int64 time, s1 int32 field)"); + fail("The time column has to be assigned a timestamp data type when creating table"); + } catch (SQLException e) { + assertEquals("701: The time column's type shall be 'timestamp'.", e.getMessage()); + } + + // Columns in table shall not share the same name time when creating table + try { + statement.execute( + "create table shared_time_name(device string tag, time int64 field, s1 int32 field)"); + fail("Columns in table shall not share the same name time when creating table"); + } catch (SQLException e) { + assertEquals("701: Columns in table shall not share the same name time.", e.getMessage()); + } + + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private void prepareTreeData() { + try (final Connection connection = EnvFactory.getEnv().getConnection(BaseEnv.TREE_SQL_DIALECT); + final Statement statement = connection.createStatement()) { + statement.execute("create timeseries root.tt.device.s1 with datatype=int32"); + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testCreateView() { + prepareTreeData(); + + try (final Connection connection = + EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement statement = connection.createStatement()) { + statement.execute("use " + VIEW_DATABASE); + + // create view and do not assign the time column name + try { + statement.execute( + "create view default_not_assign_time(device string tag, s1 int32 field) as root.tt.**"); + TestUtils.assertResultSetEqual( + statement.executeQuery("describe default_not_assign_time"), + header, + Arrays.asList("time,TIMESTAMP,TIME,", "device,STRING,TAG,", "s1,INT32,FIELD,")); + } catch (SQLException e) { + fail("create table without time info fails, the specific message: " + e.getMessage()); + } + + // create view which of the time column at the first column + try { + statement.execute( + "create view time_in_first(date_time timestamp time, device string tag, s1 int32 field) as root.tt.**"); + TestUtils.assertResultSetEqual( + statement.executeQuery("describe time_in_first"), + header, + Arrays.asList("date_time,TIMESTAMP,TIME,", "device,STRING,TAG,", "s1,INT32,FIELD,")); + } catch (SQLException e) { + fail("assign the name of time column fails, the specific message: " + e.getMessage()); + } + + // create view which of the time column not at the first column + try { + statement.execute( + "create view time_not_in_first(device string tag, date_time timestamp time, s1 int32 field) as root.tt.**"); + TestUtils.assertResultSetEqual( + statement.executeQuery("describe time_not_in_first"), + header, + Arrays.asList("device,STRING,TAG,", "date_time,TIMESTAMP,TIME,", "s1,INT32,FIELD,")); + } catch (SQLException e) { + fail("assign the name of time column fails, the specific message: " + e.getMessage()); + } + + // create view with multi time-column + try { + statement.execute( + "create view with_multi_time(time_type timestamp time, device string tag, date_time timestamp time, s1 int32 field) as root.tt.**"); + fail("Creating view is not be allowed to assign two time columns"); + } catch (SQLException e) { + assertEquals("701: A table cannot have more than one time column", e.getMessage()); + } + + // create table with time column that is not timestamp data type + try { + statement.execute( + "create view time_other_type(device string tag, date_time int64 time, s1 int32 field) as root.tt.**"); + fail("The time column has to be assigned a timestamp data type when creating view"); + } catch (SQLException e) { + assertEquals("701: The time column's type shall be 'timestamp'.", e.getMessage()); + } + + // Columns in table shall not share the same name time when creating table + try { + statement.execute( + "create view shared_time_time(device string tag, time int64 field, s1 int32 field) as root.tt.**"); + fail("Columns in view shall not share the same name time when creating table"); + } catch (SQLException e) { + assertEquals("701: Columns in table shall not share the same name time.", e.getMessage()); + } + + } catch (SQLException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java index 45617de31d1c..90dcb82c2f08 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java @@ -78,6 +78,7 @@ import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.FIRST; import static org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.FIRST_BY; import static org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.LAST; import static org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.LAST_BY; @@ -108,11 +109,14 @@ public static TableAccumulator createAccumulator( && inputExpressions.size() > 1) { boolean xIsTimeColumn = isTimeColumn(inputExpressions.get(0), timeColumnName); boolean yIsTimeColumn = isTimeColumn(inputExpressions.get(1), timeColumnName); - // When used in AggTableScanOperator, we can finish calculation of + boolean orderKeyIsTimeColumn = isTimeColumn(inputExpressions.get(2), timeColumnName); + + // When used in AggTableScanOperator and the order column is time column, we can finish + // calculation of // LastDesc/LastByDesc/First/First_by after the result has been initialized if (LAST_BY.getFunctionName().equals(functionName)) { result = - ascending + ascending || !orderKeyIsTimeColumn ? new LastByAccumulator( inputDataTypes.get(0), inputDataTypes.get(1), xIsTimeColumn, yIsTimeColumn) : new LastByDescAccumulator( @@ -125,7 +129,7 @@ public static TableAccumulator createAccumulator( isAggTableScan); } else { result = - ascending + ascending && orderKeyIsTimeColumn ? new FirstByAccumulator( inputDataTypes.get(0), inputDataTypes.get(1), @@ -136,13 +140,21 @@ public static TableAccumulator createAccumulator( inputDataTypes.get(0), inputDataTypes.get(1), xIsTimeColumn, yIsTimeColumn); } } else if (LAST.getFunctionName().equals(functionName)) { - return ascending - ? new LastAccumulator(inputDataTypes.get(0)) - : new LastDescAccumulator( - inputDataTypes.get(0), - isTimeColumn(inputExpressions.get(0), timeColumnName), - isMeasurementColumn(inputExpressions.get(0), measurementColumnNames), - isAggTableScan); + boolean orderKeyIsTimeColumn = isTimeColumn(inputExpressions.get(1), timeColumnName); + result = + ascending || !orderKeyIsTimeColumn + ? new LastAccumulator(inputDataTypes.get(0)) + : new LastDescAccumulator( + inputDataTypes.get(0), + isTimeColumn(inputExpressions.get(0), timeColumnName), + isMeasurementColumn(inputExpressions.get(0), measurementColumnNames), + isAggTableScan); + } else if (FIRST.getFunctionName().equals(functionName)) { + boolean orderKeyIsTimeColumn = isTimeColumn(inputExpressions.get(1), timeColumnName); + result = + ascending && orderKeyIsTimeColumn + ? new FirstAccumulator(inputDataTypes.get(0), isAggTableScan) + : new FirstDescAccumulator(inputDataTypes.get(0)); } else { result = createBuiltinAccumulator( @@ -307,34 +319,18 @@ public static TableAccumulator createBuiltinAccumulator( case SUM: return new SumAccumulator(inputDataTypes.get(0)); case LAST: - return ascending - ? new LastAccumulator(inputDataTypes.get(0)) - : new LastDescAccumulator(inputDataTypes.get(0), false, false, isAggTableScan); + return new LastAccumulator(inputDataTypes.get(0)); case FIRST: - return ascending - ? new FirstAccumulator(inputDataTypes.get(0), isAggTableScan) - : new FirstDescAccumulator(inputDataTypes.get(0)); + return new FirstDescAccumulator(inputDataTypes.get(0)); case MAX: return new MaxAccumulator(inputDataTypes.get(0)); case MIN: return new MinAccumulator(inputDataTypes.get(0)); case LAST_BY: - return ascending - ? new LastByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1), false, false) - : new LastByDescAccumulator( - inputDataTypes.get(0), - inputDataTypes.get(1), - false, - false, - false, - false, - isAggTableScan); + return new LastByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1), false, false); case FIRST_BY: - return ascending - ? new FirstByAccumulator( - inputDataTypes.get(0), inputDataTypes.get(1), false, false, isAggTableScan) - : new FirstByDescAccumulator( - inputDataTypes.get(0), inputDataTypes.get(1), false, false); + return new FirstByDescAccumulator( + inputDataTypes.get(0), inputDataTypes.get(1), false, false); case MAX_BY: return new TableMaxByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1)); case MIN_BY: diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationTableScanTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationTableScanTest.java new file mode 100644 index 000000000000..c03e5e9c0ce1 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationTableScanTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.distribution; + +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; + +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.concurrent.ExecutorService; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; +import org.apache.iotdb.db.queryengine.plan.analyze.Analyzer; +import org.apache.iotdb.db.queryengine.plan.analyze.FakePartitionFetcherImpl; +import org.apache.iotdb.db.queryengine.plan.analyze.FakeSchemaFetcherImpl; +import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; +import org.apache.iotdb.db.queryengine.plan.planner.LogicalPlanner; +import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; +import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery; +import org.junit.Test; +import org.mockito.Mockito; + +public class AggregationTableScanTest { + + @Test + public void lastAggTest() { + final String sql = null; + DataNodeQueryContext dataNodeQueryContext = new DataNodeQueryContext(1); + + SessionInfo sessionInfo = + new SessionInfo( + 0, "root", ZoneId.systemDefault(), "last_agg_db", IClientSession.SqlDialect.TABLE); + QueryId queryId = new QueryId("test"); + MPPQueryContext context = + new MPPQueryContext( + sql, + queryId, + sessionInfo, + new TEndPoint("127.0.0.1", 6667), + new TEndPoint("127.0.0.1", 6667)); + Analyzer analyzer = + new Analyzer(context, new FakePartitionFetcherImpl(), new FakeSchemaFetcherImpl()); + Statement statement = StatementGenerator.createStatement(sql, ZonedDateTime.now().getOffset()); + Analysis analysis = analyzer.analyze(statement); + LogicalPlanner logicalPlanner = new LogicalPlanner(context); + LogicalQueryPlan logicalPlan = logicalPlanner.plan(analysis); + DistributionPlanner distributionPlanner = new DistributionPlanner(analysis, logicalPlan); + FragmentInstance instance = distributionPlanner.planFragments().getInstances().get(0); + + LocalExecutionPlanner localExecutionPlanner = LocalExecutionPlanner.getInstance(); + localExecutionPlanner.plan( + instance.getFragment().getPlanNodeTree(), + instance.getFragment().getTypeProvider(), + mockFIContext(queryId), + dataNodeQueryContext); + } + + private FragmentInstanceContext mockFIContext(QueryId queryId) { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "last_agg-instance-notification"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext instanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + IDataRegionForQuery dataRegionForQuery = Mockito.mock(DataRegion.class); + instanceContext.setDataRegion(dataRegionForQuery); + return instanceContext; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsFileTableSchemaUtil.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsFileTableSchemaUtil.java index 7a74f4be651d..eaa1b6a6a9e7 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsFileTableSchemaUtil.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/TsFileTableSchemaUtil.java @@ -34,6 +34,8 @@ import java.util.HashMap; import java.util.List; +import static org.apache.iotdb.commons.schema.table.TsTable.TIME_COLUMN_NAME; + /** Utility class for converting between TsTable and TSFile TableSchema */ public class TsFileTableSchemaUtil { @@ -156,9 +158,18 @@ public static TableSchema toTsFileTableSchemaNoAttribute(final TsTable table) { // Directly iterate through columns and filter out TIME and ATTRIBUTE columns int columnIndex = 0; - for (final TsTableColumnSchema columnSchema : tsTableColumnSchemas) { + + for (int i = 0; i < tsTableColumnSchemas.size(); i++) { + TsTableColumnSchema columnSchema = tsTableColumnSchemas.get(i); final TsTableColumnCategory category = columnSchema.getColumnCategory(); + // if the time columns is named as "time" and in first position, drop it + if (i == 0 + && category == TsTableColumnCategory.TIME + && columnSchema.getColumnName().equalsIgnoreCase(TIME_COLUMN_NAME)) { + continue; + } + // Skip ATTRIBUTE columns (only include TIME, TAG and FIELD) if (category == TsTableColumnCategory.ATTRIBUTE) { continue;