diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java index 9ac2f4e17b7d..d0c8186b5611 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java @@ -139,7 +139,11 @@ public void testOPCUAServerSink() throws Exception { env, Arrays.asList( "create aligned timeSeries root.db.opc(value double, quality boolean, other int32)", - "insert into root.db.opc(time, value, quality, other) values (0, 0, true, 1)"), + "create aligned timeSeries root.db.opc1(value double, quality boolean, other int32)", + "create aligned timeSeries root.db.opc2(value double, quality boolean, other int32)", + "insert into root.db.opc(time, value, quality, other) values (0, 0, true, 1)", + "insert into root.db.opc1(time, value, quality, other) values (0, 0, true, 1)", + "insert into root.db.opc2(time, value, quality, other) values (0, 0, true, 1)"), null); while (true) { @@ -175,9 +179,13 @@ public void testOPCUAServerSink() throws Exception { break; } - TestUtils.executeNonQuery( + // Test multiple regions + TestUtils.executeNonQueries( env, - "insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)", + Arrays.asList( + "insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)", + "insert into root.db.opc1(time, value, quality, other) values (1, 1, false, 1)", + "insert into root.db.opc2(time, value, quality, other) values (1, 1, false, 1)"), null); long startTime = System.currentTimeMillis(); @@ -188,6 +196,22 @@ public void testOPCUAServerSink() throws Exception { Assert.assertEquals(new Variant(1.0), value.getValue()); Assert.assertEquals(StatusCode.BAD, value.getStatusCode()); Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); + + value = + opcUaClient + .readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc1")) + .get(); + Assert.assertEquals(new Variant(1.0), value.getValue()); + Assert.assertEquals(StatusCode.BAD, value.getStatusCode()); + Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); + + value = + opcUaClient + .readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc2")) + .get(); + Assert.assertEquals(new Variant(1.0), value.getValue()); + Assert.assertEquals(StatusCode.BAD, value.getStatusCode()); + Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); break; } catch (final Throwable t) { if (System.currentTimeMillis() - startTime > 10_000L) { @@ -345,7 +369,7 @@ private static OpcUaClient getOpcUaClient( + UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET)); client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false); - new ClientRunner(client, securityDir, password).run(); + new ClientRunner(client, securityDir, password, userName, 10).run(); return client.getClient(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java index 071db688dd8b..554ac3b0df4e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java @@ -100,6 +100,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE; @@ -118,6 +120,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TIMEOUT_SECONDS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY; @@ -137,8 +140,11 @@ public class OpcUaSink implements PipeConnector { private static final Map> SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new ConcurrentHashMap<>(); + private static final Map> + CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP = new ConcurrentHashMap<>(); private String serverKey; + private String nodeUrl; private boolean isClientServerModel; private String databaseName; private String placeHolder4NullTag; @@ -238,8 +244,7 @@ public void customize( "When the OPC UA sink sets 'with-quality' to true, the table model data is not supported."); } - final String nodeUrl = - parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, SINK_OPC_UA_NODE_URL_KEY); + nodeUrl = parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, SINK_OPC_UA_NODE_URL_KEY); if (Objects.isNull(nodeUrl)) { customizeServer(parameters); } else { @@ -247,7 +252,7 @@ public void customize( throw new PipeException( "When the OPC UA sink points to an outer server, the table model data is not supported."); } - customizeClient(nodeUrl, parameters); + customizeClient(parameters); } } @@ -350,7 +355,7 @@ private void customizeServer(final PipeParameters parameters) { } } - private void customizeClient(final String nodeUrl, final PipeParameters parameters) { + private void customizeClient(final PipeParameters parameters) { final SecurityPolicy policy = getSecurityPolicy( parameters @@ -380,15 +385,39 @@ private void customizeClient(final String nodeUrl, final PipeParameters paramete + File.separatorChar + UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET)))); - client = - new IoTDBOpcUaClient( - nodeUrl, - policy, - provider, - parameters.getBooleanOrDefault( - Arrays.asList(CONNECTOR_OPC_UA_HISTORIZING_KEY, SINK_OPC_UA_HISTORIZING_KEY), - CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE)); - new ClientRunner(client, securityDir, password).run(); + final long timeoutSeconds = + parameters.getLongOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY, SINK_OPC_UA_TIMEOUT_SECONDS_KEY), + CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE); + + synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) { + client = + CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP + .compute( + nodeUrl, + (key, oldValue) -> { + if (Objects.isNull(oldValue)) { + final IoTDBOpcUaClient result = + new IoTDBOpcUaClient( + nodeUrl, + policy, + provider, + parameters.getBooleanOrDefault( + Arrays.asList( + CONNECTOR_OPC_UA_HISTORIZING_KEY, + SINK_OPC_UA_HISTORIZING_KEY), + CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE)); + final ClientRunner runner = + new ClientRunner(result, securityDir, password, userName, timeoutSeconds); + runner.run(); + return new Pair<>(new AtomicInteger(0), result); + } + oldValue.getRight().checkEquals(userName, password, securityDir, policy); + return oldValue; + }) + .getRight(); + CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl).getLeft().incrementAndGet(); + } } private SecurityPolicy getSecurityPolicy(final String securityPolicy) { @@ -521,10 +550,6 @@ public interface ThrowingBiConsumer { @Override public void close() throws Exception { - if (Objects.nonNull(client)) { - client.disconnect(); - } - if (serverKey == null) { return; } @@ -544,6 +569,26 @@ public void close() throws Exception { } } } + + if (nodeUrl == null) { + return; + } + + synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) { + final Pair pair = + CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl); + if (pair == null) { + return; + } + + if (pair.getLeft().decrementAndGet() <= 0) { + try { + pair.getRight().disconnect(); + } finally { + CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.remove(nodeUrl); + } + } + } } /////////////////////////////// Getter /////////////////////////////// diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java index 402091598f13..69fe16f1aaa5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java @@ -25,15 +25,18 @@ import org.eclipse.milo.opcua.sdk.client.OpcUaClient; import org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator; import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager; +import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.security.Security; +import java.util.Objects; import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint; @@ -49,14 +52,23 @@ public class ClientRunner { private final IoTDBOpcUaClient configurableUaClient; private final Path securityDir; private final String password; + private final long timeoutSeconds; + + // For conflict checking + private final String user; public ClientRunner( final IoTDBOpcUaClient configurableUaClient, final String securityDir, - final String password) { + final String password, + final String user, + final long timeoutSeconds) { this.configurableUaClient = configurableUaClient; this.securityDir = Paths.get(securityDir); this.password = password; + this.user = user; + this.timeoutSeconds = timeoutSeconds; + configurableUaClient.setRunner(this); } private OpcUaClient createClient() throws Exception { @@ -90,7 +102,9 @@ private OpcUaClient createClient() throws Exception { .setCertificateChain(loader.getClientCertificateChain()) .setCertificateValidator(certificateValidator) .setIdentityProvider(configurableUaClient.getIdentityProvider()) - .setRequestTimeout(uint(5000)) + .setRequestTimeout(uint(timeoutSeconds * 1000L)) + .setConnectTimeout(uint(timeoutSeconds * 1000L)) + .setMaxResponseMessageSize(uint(0)) .build()); } @@ -109,4 +123,37 @@ public void run() { "Error getting opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage(), e); } } + + long getTimeoutSeconds() { + return timeoutSeconds; + } + + /////////////////////////////// Conflict detection /////////////////////////////// + + void checkEquals( + final String user, + final String password, + final Path securityDir, + final SecurityPolicy securityPolicy) { + checkEquals("user", this.user, user); + checkEquals("password", this.password, password); + checkEquals( + "security dir", + FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()), + FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString())); + checkEquals("securityPolicy", configurableUaClient.getSecurityPolicy(), securityPolicy); + } + + private void checkEquals(final String attrName, Object thisAttr, Object thatAttr) { + if (!Objects.equals(thisAttr, thatAttr)) { + if (attrName.equals("password")) { + thisAttr = "****"; + thatAttr = "****"; + } + throw new PipeException( + String.format( + "The existing server with nodeUrl %s's %s %s conflicts to the new %s %s, reject reusing.", + configurableUaClient.getNodeUrl(), attrName, thisAttr, attrName, thatAttr)); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java index c6d8da47878e..977d672df735 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java @@ -34,6 +34,7 @@ import org.eclipse.milo.opcua.sdk.core.ValueRanks; import org.eclipse.milo.opcua.stack.core.Identifiers; import org.eclipse.milo.opcua.stack.core.StatusCodes; +import org.eclipse.milo.opcua.stack.core.UaException; import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue; import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime; @@ -55,14 +56,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.concurrent.ExecutionException; import java.util.function.Predicate; import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.convertToOpcDataType; import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc; +import static org.eclipse.milo.opcua.stack.core.StatusCodes.Bad_Timeout; public class IoTDBOpcUaClient { private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class); @@ -78,6 +82,7 @@ public class IoTDBOpcUaClient { private final IdentityProvider identityProvider; private OpcUaClient client; private final boolean historizing; + private ClientRunner runner; public IoTDBOpcUaClient( final String nodeUrl, @@ -93,7 +98,20 @@ public IoTDBOpcUaClient( public void run(final OpcUaClient client) throws Exception { // synchronous connect this.client = client; - client.connect().get(); + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < runner.getTimeoutSeconds() * 1000L) { + try { + client.connect().get(); + } catch (final ExecutionException e) { + if (e.getCause() instanceof UaException + && ((UaException) e.getCause()).getStatusCode().getValue() == Bad_Timeout) { + Thread.sleep(1000L); + continue; + } + throw e; + } + break; + } } // Only support tree model & client-server @@ -300,4 +318,18 @@ private static ObjectAttributes createFolderAttributes(final String name) { null // notifier ); } + + /////////////////////////////// Conflict detection /////////////////////////////// + + public void setRunner(ClientRunner runner) { + this.runner = runner; + } + + public void checkEquals( + final String user, + final String password, + final String securityDir, + final SecurityPolicy securityPolicy) { + runner.checkEquals(user, password, Paths.get(securityDir), securityPolicy); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index fac08cfe0e8d..bf2ee3d17a02 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -2333,9 +2333,9 @@ public SettableFuture alterPipe(final AlterPipeStatement alter // Construct temporary pipe static meta for validation final String pipeName = alterPipeStatement.getPipeName(); - final Map extractorAttributes; + final Map sourceAttributes; final Map processorAttributes; - final Map connectorAttributes; + final Map sinkAttributes; try { if (!alterPipeStatement.getSourceAttributes().isEmpty()) { // We don't allow changing the extractor plugin type @@ -2347,7 +2347,7 @@ public SettableFuture alterPipe(final AlterPipeStatement alter new PipeParameters(alterPipeStatement.getSourceAttributes())); } if (alterPipeStatement.isReplaceAllSourceAttributes()) { - extractorAttributes = alterPipeStatement.getSourceAttributes(); + sourceAttributes = alterPipeStatement.getSourceAttributes(); } else { final boolean onlyContainsUser = onlyContainsUser(alterPipeStatement.getSourceAttributes()); @@ -2356,14 +2356,14 @@ public SettableFuture alterPipe(final AlterPipeStatement alter .getSourceParameters() .addOrReplaceEquivalentAttributes( new PipeParameters(alterPipeStatement.getSourceAttributes())); - extractorAttributes = + sourceAttributes = pipeMetaFromCoordinator.getStaticMeta().getSourceParameters().getAttribute(); if (onlyContainsUser) { - checkSourceType(alterPipeStatement.getPipeName(), extractorAttributes); + checkSourceType(alterPipeStatement.getPipeName(), sourceAttributes); } } } else { - extractorAttributes = + sourceAttributes = pipeMetaFromCoordinator.getStaticMeta().getSourceParameters().getAttribute(); } @@ -2386,7 +2386,7 @@ public SettableFuture alterPipe(final AlterPipeStatement alter if (!alterPipeStatement.getSinkAttributes().isEmpty()) { if (alterPipeStatement.isReplaceAllSinkAttributes()) { - connectorAttributes = alterPipeStatement.getSinkAttributes(); + sinkAttributes = alterPipeStatement.getSinkAttributes(); } else { final boolean onlyContainsUser = onlyContainsUser(alterPipeStatement.getSinkAttributes()); pipeMetaFromCoordinator @@ -2394,19 +2394,18 @@ public SettableFuture alterPipe(final AlterPipeStatement alter .getSinkParameters() .addOrReplaceEquivalentAttributes( new PipeParameters(alterPipeStatement.getSinkAttributes())); - connectorAttributes = + sinkAttributes = pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute(); if (onlyContainsUser) { - checkSinkType(alterPipeStatement.getPipeName(), connectorAttributes); + checkSinkType(alterPipeStatement.getPipeName(), sinkAttributes); } } } else { - connectorAttributes = - pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute(); + sinkAttributes = pipeMetaFromCoordinator.getStaticMeta().getSinkParameters().getAttribute(); } PipeDataNodeAgent.plugin() - .validate(pipeName, extractorAttributes, processorAttributes, connectorAttributes); + .validate(pipeName, sourceAttributes, processorAttributes, sinkAttributes); } catch (final Exception e) { future.setException( new IoTDBException(e.getMessage(), TSStatusCode.PIPE_ERROR.getStatusCode())); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java index b27bb59224da..ec9afce7c6f6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java @@ -229,6 +229,11 @@ public class PipeSinkConstant { public static final String SINK_OPC_UA_HISTORIZING_KEY = "sink.opcua.historizing"; public static final boolean CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE = false; + public static final String SINK_OPC_UA_TIMEOUT_SECONDS_KEY = "sink.opcua.timeout-seconds"; + public static final String CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY = + "connector.opcua.timeout-seconds"; + public static final long CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE = 10L; + public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = "connector.leader-cache.enable"; public static final String SINK_LEADER_CACHE_ENABLE_KEY = "sink.leader-cache.enable"; public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = true;