Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ public void stopForcibly() {
Thread.currentThread().interrupt();
logger.error("Waiting node to shutdown error.", e);
}
logger.info("In test {} {} started forcibly.", getTestLogDirName(), getId());
logger.info("In test {} {} stopped forcibly.", getTestLogDirName(), getId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.it.schema;

import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.utils.MetadataUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.isession.ISession;
Expand Down Expand Up @@ -2749,4 +2750,39 @@ public void testAlterIllegalDataType() {
throw new RuntimeException(e);
}
}

@Test
public void testCrossPartitionWrite()
throws IoTDBConnectionException, StatementExecutionException {
try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
session.executeNonQueryStatement("CREATE DATABASE root.cross_partition");
session.executeNonQueryStatement(
"CREATE TIMESERIES root.cross_partition.device1.sensor1 WITH DATATYPE=INT32,ENCODING=RLE");

// Insert data into two partitions
Tablet tablet =
new Tablet(
"root.cross_partition.device1",
Arrays.asList(new MeasurementSchema("sensor1", TSDataType.INT32, TSEncoding.RLE)));
tablet.addTimestamp(0, 0);
tablet.addValue("sensor1", 0, 0);
tablet.addTimestamp(1, CommonConfig.DEFAULT_TIME_PARTITION_INTERVAL);
tablet.addValue("sensor1", 1, 1);
session.insertTablet(tablet);

session.executeNonQueryStatement(
"ALTER TIMESERIES root.cross_partition.device1.sensor1 SET DATA TYPE INT64");

// Insert data with altered type
tablet =
new Tablet(
"root.cross_partition.device1",
Arrays.asList(new MeasurementSchema("sensor1", TSDataType.INT64, TSEncoding.RLE)));
tablet.addTimestamp(0, 0);
tablet.addValue("sensor1", 0, 0L);
tablet.addTimestamp(1, CommonConfig.DEFAULT_TIME_PARTITION_INTERVAL);
tablet.addValue("sensor1", 1, 1L);
session.insertTablet(tablet);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,48 @@ public void testNoPermission() throws Exception {
}
}

@Test
public void testSourcePermissionRestart() throws SQLException {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
TestUtils.executeNonQuery(senderEnv, "create user `thulab` 'passwD@123456'", connection);
TestUtils.executeNonQueries(
senderEnv, Collections.singletonList("grant READ on root.** to user thulab"));

statement.execute(
String.format(
"create pipe a2b"
+ " with source ("
+ "'user'='thulab'"
+ ", 'password'='passwD@123456')"
+ " with sink ("
+ "'node-urls'='%s')",
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)"));
TestUtils.executeNonQueries(
receiverEnv,
Arrays.asList(
"create aligned timeSeries root.vehicle.plane(temperature DOUBLE, pressure INT32)"));

TestUtils.executeNonQueries(senderEnv, Collections.singletonList("start pipe a2b"));

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.vehicle.plane(temperature, pressure) values (36.5, 1103)"));

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select count(pressure) from root.vehicle.plane",
"count(root.vehicle.plane.pressure),",
Collections.singleton("1,"));
}
}

@Test
public void testSourcePermission() {
TestUtils.executeNonQuery(senderEnv, "create user `thulab` 'passwD@123456'", null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1487,15 +1487,10 @@ private boolean insertTabletToTsFileProcessor(
registerToTsFile(insertTabletNode, tsFileProcessor);
tsFileProcessor.insertTablet(insertTabletNode, rangeList, results, noFailure, infoForMetrics);
} catch (DataTypeInconsistentException e) {
// flush both MemTables so that the new type can be inserted into a new MemTable
TsFileProcessor workSequenceProcessor = workSequenceTsFileProcessors.get(timePartitionId);
if (workSequenceProcessor != null) {
fileFlushPolicy.apply(this, workSequenceProcessor, workSequenceProcessor.isSequence());
}
TsFileProcessor workUnsequenceProcessor = workUnsequenceTsFileProcessors.get(timePartitionId);
if (workUnsequenceProcessor != null) {
fileFlushPolicy.apply(this, workUnsequenceProcessor, workUnsequenceProcessor.isSequence());
}
// flush all MemTables so that the new type can be inserted into a new MemTable
// cannot just flush the current TsFileProcessor, because the new type may be inserted into
// other TsFileProcessors of this region
asyncCloseAllWorkingTsFileProcessors();
throw e;
} catch (WriteProcessRejectException e) {
logger.warn("insert to TsFileProcessor rejected, {}", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class CommonConfig {
public static final String SYSTEM_CONFIG_NAME = "iotdb-system.properties";
public static final String SYSTEM_CONFIG_TEMPLATE_NAME = "iotdb-system.properties.template";
private static final Logger logger = LoggerFactory.getLogger(CommonConfig.class);
public static final long DEFAULT_TIME_PARTITION_INTERVAL = 604_800_000L;

// Open ID Secret
private String openIdProviderUrl = "";
Expand Down Expand Up @@ -184,7 +185,7 @@ public class CommonConfig {
private long timePartitionOrigin = 0;

/** Time partition interval in milliseconds. */
private long timePartitionInterval = 604_800_000;
private long timePartitionInterval = DEFAULT_TIME_PARTITION_INTERVAL;

/** This variable set timestamp precision as millisecond, microsecond or nanosecond. */
private String timestampPrecision = "ms";
Expand Down