Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@ public void testOPCUAServerSink() throws Exception {
env,
Arrays.asList(
"create aligned timeSeries root.db.opc(value double, quality boolean, other int32)",
"insert into root.db.opc(time, value, quality, other) values (0, 0, true, 1)"),
"create aligned timeSeries root.db.opc1(value double, quality boolean, other int32)",
"create aligned timeSeries root.db.opc2(value double, quality boolean, other int32)",
"insert into root.db.opc(time, value, quality, other) values (0, 0, true, 1)",
"insert into root.db.opc1(time, value, quality, other) values (0, 0, true, 1)",
"insert into root.db.opc2(time, value, quality, other) values (0, 0, true, 1)"),
null);

while (true) {
Expand Down Expand Up @@ -175,9 +179,13 @@ public void testOPCUAServerSink() throws Exception {
break;
}

TestUtils.executeNonQuery(
// Test multiple regions
TestUtils.executeNonQueries(
env,
"insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)",
Arrays.asList(
"insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)",
"insert into root.db.opc1(time, value, quality, other) values (1, 1, false, 1)",
"insert into root.db.opc2(time, value, quality, other) values (1, 1, false, 1)"),
null);

long startTime = System.currentTimeMillis();
Expand All @@ -188,6 +196,22 @@ public void testOPCUAServerSink() throws Exception {
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());

value =
opcUaClient
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc1"))
.get();
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());

value =
opcUaClient
.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc2"))
.get();
Assert.assertEquals(new Variant(1.0), value.getValue());
Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
break;
} catch (final Throwable t) {
if (System.currentTimeMillis() - startTime > 10_000L) {
Expand Down Expand Up @@ -345,7 +369,7 @@ private static OpcUaClient getOpcUaClient(
+ UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET));

client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false);
new ClientRunner(client, securityDir, password).run();
new ClientRunner(client, securityDir, password, userName, 10).run();
return client.getClient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE;
Expand All @@ -118,6 +120,7 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TIMEOUT_SECONDS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY;

Expand All @@ -137,8 +140,11 @@ public class OpcUaSink implements PipeConnector {

private static final Map<String, Pair<AtomicInteger, OpcUaNameSpace>>
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new ConcurrentHashMap<>();
private static final Map<String, Pair<AtomicInteger, IoTDBOpcUaClient>>
CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP = new ConcurrentHashMap<>();

private String serverKey;
private String nodeUrl;
private boolean isClientServerModel;
private String databaseName;
private String placeHolder4NullTag;
Expand Down Expand Up @@ -238,16 +244,15 @@ public void customize(
"When the OPC UA sink sets 'with-quality' to true, the table model data is not supported.");
}

final String nodeUrl =
parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, SINK_OPC_UA_NODE_URL_KEY);
nodeUrl = parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, SINK_OPC_UA_NODE_URL_KEY);
if (Objects.isNull(nodeUrl)) {
customizeServer(parameters);
} else {
if (PathUtils.isTableModelDatabase(databaseName)) {
throw new PipeException(
"When the OPC UA sink points to an outer server, the table model data is not supported.");
}
customizeClient(nodeUrl, parameters);
customizeClient(parameters);
}
}

Expand Down Expand Up @@ -350,7 +355,7 @@ private void customizeServer(final PipeParameters parameters) {
}
}

private void customizeClient(final String nodeUrl, final PipeParameters parameters) {
private void customizeClient(final PipeParameters parameters) {
final SecurityPolicy policy =
getSecurityPolicy(
parameters
Expand Down Expand Up @@ -380,15 +385,39 @@ private void customizeClient(final String nodeUrl, final PipeParameters paramete
+ File.separatorChar
+ UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET))));

client =
new IoTDBOpcUaClient(
nodeUrl,
policy,
provider,
parameters.getBooleanOrDefault(
Arrays.asList(CONNECTOR_OPC_UA_HISTORIZING_KEY, SINK_OPC_UA_HISTORIZING_KEY),
CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE));
new ClientRunner(client, securityDir, password).run();
final long timeoutSeconds =
parameters.getLongOrDefault(
Arrays.asList(CONNECTOR_OPC_UA_TIMEOUT_SECONDS_KEY, SINK_OPC_UA_TIMEOUT_SECONDS_KEY),
CONNECTOR_OPC_UA_TIMEOUT_SECONDS_DEFAULT_VALUE);

synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) {
client =
CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP
.compute(
nodeUrl,
(key, oldValue) -> {
if (Objects.isNull(oldValue)) {
final IoTDBOpcUaClient result =
new IoTDBOpcUaClient(
nodeUrl,
policy,
provider,
parameters.getBooleanOrDefault(
Arrays.asList(
CONNECTOR_OPC_UA_HISTORIZING_KEY,
SINK_OPC_UA_HISTORIZING_KEY),
CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE));
final ClientRunner runner =
new ClientRunner(result, securityDir, password, userName, timeoutSeconds);
runner.run();
return new Pair<>(new AtomicInteger(0), result);
}
oldValue.getRight().checkEquals(userName, password, securityDir, policy);
return oldValue;
})
.getRight();
CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl).getLeft().incrementAndGet();
}
}

private SecurityPolicy getSecurityPolicy(final String securityPolicy) {
Expand Down Expand Up @@ -521,10 +550,6 @@ public interface ThrowingBiConsumer<T, U, E extends Exception> {

@Override
public void close() throws Exception {
if (Objects.nonNull(client)) {
client.disconnect();
}

if (serverKey == null) {
return;
}
Expand All @@ -544,6 +569,26 @@ public void close() throws Exception {
}
}
}

if (nodeUrl == null) {
return;
}

synchronized (CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP) {
final Pair<AtomicInteger, IoTDBOpcUaClient> pair =
CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.get(nodeUrl);
if (pair == null) {
return;
}

if (pair.getLeft().decrementAndGet() <= 0) {
try {
pair.getRight().disconnect();
} finally {
CLIENT_KEY_TO_REFERENCE_COUNT_AND_CLIENT_MAP.remove(nodeUrl);
}
}
}
}

/////////////////////////////// Getter ///////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.Security;
import java.util.Objects;

import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;

Expand All @@ -49,14 +52,23 @@ public class ClientRunner {
private final IoTDBOpcUaClient configurableUaClient;
private final Path securityDir;
private final String password;
private final long timeoutSeconds;

// For conflict checking
private final String user;

public ClientRunner(
final IoTDBOpcUaClient configurableUaClient,
final String securityDir,
final String password) {
final String password,
final String user,
final long timeoutSeconds) {
this.configurableUaClient = configurableUaClient;
this.securityDir = Paths.get(securityDir);
this.password = password;
this.user = user;
this.timeoutSeconds = timeoutSeconds;
configurableUaClient.setRunner(this);
}

private OpcUaClient createClient() throws Exception {
Expand Down Expand Up @@ -90,7 +102,9 @@ private OpcUaClient createClient() throws Exception {
.setCertificateChain(loader.getClientCertificateChain())
.setCertificateValidator(certificateValidator)
.setIdentityProvider(configurableUaClient.getIdentityProvider())
.setRequestTimeout(uint(5000))
.setRequestTimeout(uint(timeoutSeconds * 1000L))
.setConnectTimeout(uint(timeoutSeconds * 1000L))
.setMaxResponseMessageSize(uint(0))
.build());
}

Expand All @@ -109,4 +123,37 @@ public void run() {
"Error getting opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage(), e);
}
}

long getTimeoutSeconds() {
return timeoutSeconds;
}

/////////////////////////////// Conflict detection ///////////////////////////////

void checkEquals(
final String user,
final String password,
final Path securityDir,
final SecurityPolicy securityPolicy) {
checkEquals("user", this.user, user);
checkEquals("password", this.password, password);
checkEquals(
"security dir",
FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()),
FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString()));
checkEquals("securityPolicy", configurableUaClient.getSecurityPolicy(), securityPolicy);
}

private void checkEquals(final String attrName, Object thisAttr, Object thatAttr) {
if (!Objects.equals(thisAttr, thatAttr)) {
if (attrName.equals("password")) {
thisAttr = "****";
thatAttr = "****";
}
throw new PipeException(
String.format(
"The existing server with nodeUrl %s's %s %s conflicts to the new %s %s, reject reusing.",
configurableUaClient.getNodeUrl(), attrName, thisAttr, attrName, thatAttr));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.milo.opcua.sdk.core.ValueRanks;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.StatusCodes;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
Expand All @@ -55,14 +56,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;

import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.convertToOpcDataType;
import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
import static org.eclipse.milo.opcua.stack.core.StatusCodes.Bad_Timeout;

public class IoTDBOpcUaClient {
private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class);
Expand All @@ -78,6 +82,7 @@ public class IoTDBOpcUaClient {
private final IdentityProvider identityProvider;
private OpcUaClient client;
private final boolean historizing;
private ClientRunner runner;

public IoTDBOpcUaClient(
final String nodeUrl,
Expand All @@ -93,7 +98,20 @@ public IoTDBOpcUaClient(
public void run(final OpcUaClient client) throws Exception {
// synchronous connect
this.client = client;
client.connect().get();
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < runner.getTimeoutSeconds() * 1000L) {
try {
client.connect().get();
} catch (final ExecutionException e) {
if (e.getCause() instanceof UaException
&& ((UaException) e.getCause()).getStatusCode().getValue() == Bad_Timeout) {
Thread.sleep(1000L);
continue;
}
throw e;
}
break;
}
}

// Only support tree model & client-server
Expand Down Expand Up @@ -300,4 +318,18 @@ private static ObjectAttributes createFolderAttributes(final String name) {
null // notifier
);
}

/////////////////////////////// Conflict detection ///////////////////////////////

public void setRunner(ClientRunner runner) {
this.runner = runner;
}

public void checkEquals(
final String user,
final String password,
final String securityDir,
final SecurityPolicy securityPolicy) {
runner.checkEquals(user, password, Paths.get(securityDir), securityPolicy);
}
}
Loading
Loading