From 8838044679f2f960cca06da763f8d768ba022879 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Mon, 26 Jan 2026 16:07:00 +0800 Subject: [PATCH 1/2] Fi --- .../env/cluster/node/AbstractNodeWrapper.java | 2 +- .../it/schema/IoTDBAlterTimeSeriesTypeIT.java | 35 +++++++++++++++++++ .../storageengine/dataregion/DataRegion.java | 13 +++---- .../iotdb/commons/conf/CommonConfig.java | 3 +- 4 files changed, 42 insertions(+), 11 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index 1a34903446991..cbcf4ca3fd59e 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -572,7 +572,7 @@ public void stopForcibly() { Thread.currentThread().interrupt(); logger.error("Waiting node to shutdown error.", e); } - logger.info("In test {} {} started forcibly.", getTestLogDirName(), getId()); + logger.info("In test {} {} stopped forcibly.", getTestLogDirName(), getId()); } @Override diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java index f7d12f10b8fe6..e74f3c05c6e8d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.it.schema; +import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.utils.MetadataUtils; import org.apache.iotdb.db.utils.SchemaUtils; import org.apache.iotdb.isession.ISession; @@ -2749,4 +2750,38 @@ public void testAlterIllegalDataType() { throw new RuntimeException(e); } } + + @Test + public void testCrossPartitionWrite() + throws IoTDBConnectionException, StatementExecutionException { + try (ISession session = EnvFactory.getEnv().getSessionConnection()) { + session.executeNonQueryStatement("CREATE DATABASE root.cross_partition"); + session.executeNonQueryStatement( + "CREATE TIMESERIES root.cross_partition.device1.sensor1 WITH DATATYPE=INT32,ENCODING=RLE"); + + // Insert data into two partitions + Tablet tablet = new Tablet("root.cross_partition.device1", Arrays.asList( + new MeasurementSchema("sensor1", TSDataType.INT32, TSEncoding.RLE)) + ); + tablet.addTimestamp(0, 0); + tablet.addValue("sensor1", 0, 0); + tablet.addTimestamp(1, CommonConfig.DEFAULT_TIME_PARTITION_INTERVAL); + tablet.addValue("sensor1", 1, 1); + session.insertTablet(tablet); + + session.executeNonQueryStatement( + "ALTER TIMESERIES root.cross_partition.device1.sensor1 SET DATA TYPE INT64"); + + // Insert data with altered type + tablet = new Tablet("root.cross_partition.device1", Arrays.asList( + new MeasurementSchema("sensor1", TSDataType.INT64, TSEncoding.RLE)) + ); + tablet.addTimestamp(0, 0); + tablet.addValue("sensor1", 0, 0L); + tablet.addTimestamp(1, CommonConfig.DEFAULT_TIME_PARTITION_INTERVAL); + tablet.addValue("sensor1", 1, 1L); + session.insertTablet(tablet); + } + + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 3768f05941b91..0061f41f99524 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1487,15 +1487,10 @@ private boolean insertTabletToTsFileProcessor( registerToTsFile(insertTabletNode, tsFileProcessor); tsFileProcessor.insertTablet(insertTabletNode, rangeList, results, noFailure, infoForMetrics); } catch (DataTypeInconsistentException e) { - // flush both MemTables so that the new type can be inserted into a new MemTable - TsFileProcessor workSequenceProcessor = workSequenceTsFileProcessors.get(timePartitionId); - if (workSequenceProcessor != null) { - fileFlushPolicy.apply(this, workSequenceProcessor, workSequenceProcessor.isSequence()); - } - TsFileProcessor workUnsequenceProcessor = workUnsequenceTsFileProcessors.get(timePartitionId); - if (workUnsequenceProcessor != null) { - fileFlushPolicy.apply(this, workUnsequenceProcessor, workUnsequenceProcessor.isSequence()); - } + // flush all MemTables so that the new type can be inserted into a new MemTable + // cannot just flush the current TsFileProcessor, because the new type may be inserted into + // other TsFileProcessors of this region + asyncCloseAllWorkingTsFileProcessors(); throw e; } catch (WriteProcessRejectException e) { logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index e0225b698a999..b0657272fc5cc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -55,6 +55,7 @@ public class CommonConfig { public static final String SYSTEM_CONFIG_NAME = "iotdb-system.properties"; public static final String SYSTEM_CONFIG_TEMPLATE_NAME = "iotdb-system.properties.template"; private static final Logger logger = LoggerFactory.getLogger(CommonConfig.class); + public static final long DEFAULT_TIME_PARTITION_INTERVAL = 604_800_000L; // Open ID Secret private String openIdProviderUrl = ""; @@ -184,7 +185,7 @@ public class CommonConfig { private long timePartitionOrigin = 0; /** Time partition interval in milliseconds. */ - private long timePartitionInterval = 604_800_000; + private long timePartitionInterval = DEFAULT_TIME_PARTITION_INTERVAL; /** This variable set timestamp precision as millisecond, microsecond or nanosecond. */ private String timestampPrecision = "ms"; From d7f0f1112ad92b0727e7364f62b20bd1ebab4c29 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Mon, 26 Jan 2026 16:22:48 +0800 Subject: [PATCH 2/2] spotless --- .../it/schema/IoTDBAlterTimeSeriesTypeIT.java | 15 +++---- .../manual/IoTDBPipePermissionIT.java | 42 +++++++++++++++++++ 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java index e74f3c05c6e8d..181fd2da2b25e 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBAlterTimeSeriesTypeIT.java @@ -2760,9 +2760,10 @@ public void testCrossPartitionWrite() "CREATE TIMESERIES root.cross_partition.device1.sensor1 WITH DATATYPE=INT32,ENCODING=RLE"); // Insert data into two partitions - Tablet tablet = new Tablet("root.cross_partition.device1", Arrays.asList( - new MeasurementSchema("sensor1", TSDataType.INT32, TSEncoding.RLE)) - ); + Tablet tablet = + new Tablet( + "root.cross_partition.device1", + Arrays.asList(new MeasurementSchema("sensor1", TSDataType.INT32, TSEncoding.RLE))); tablet.addTimestamp(0, 0); tablet.addValue("sensor1", 0, 0); tablet.addTimestamp(1, CommonConfig.DEFAULT_TIME_PARTITION_INTERVAL); @@ -2773,15 +2774,15 @@ public void testCrossPartitionWrite() "ALTER TIMESERIES root.cross_partition.device1.sensor1 SET DATA TYPE INT64"); // Insert data with altered type - tablet = new Tablet("root.cross_partition.device1", Arrays.asList( - new MeasurementSchema("sensor1", TSDataType.INT64, TSEncoding.RLE)) - ); + tablet = + new Tablet( + "root.cross_partition.device1", + Arrays.asList(new MeasurementSchema("sensor1", TSDataType.INT64, TSEncoding.RLE))); tablet.addTimestamp(0, 0); tablet.addValue("sensor1", 0, 0L); tablet.addTimestamp(1, CommonConfig.DEFAULT_TIME_PARTITION_INTERVAL); tablet.addValue("sensor1", 1, 1L); session.insertTablet(tablet); } - } } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java index 7ff4ff75efef2..35403c4dc6d05 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java @@ -233,6 +233,48 @@ public void testNoPermission() throws Exception { } } + @Test + public void testSourcePermissionRestart() throws SQLException { + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + TestUtils.executeNonQuery(senderEnv, "create user `thulab` 'passwD@123456'", connection); + TestUtils.executeNonQueries( + senderEnv, Collections.singletonList("grant READ on root.** to user thulab")); + + statement.execute( + String.format( + "create pipe a2b" + + " with source (" + + "'user'='thulab'" + + ", 'password'='passwD@123456')" + + " with sink (" + + "'node-urls'='%s')", + receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)")); + TestUtils.executeNonQueries( + receiverEnv, + Arrays.asList( + "create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)")); + + TestUtils.executeNonQueries(senderEnv, Collections.singletonList("start pipe a2b")); + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)")); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select count(pressure) from root.vehicle.plane", + "count(root.vehicle.plane.pressure),", + Collections.singleton("1,")); + } + } + @Test public void testSourcePermission() { TestUtils.executeNonQuery(senderEnv, "create user `thulab` 'passwD@123456'", null);