Skip to content
Open

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,10 +1,25 @@
/***************************** BEGIN LICENSE BLOCK ***************************

The contents of this file are subject to the Mozilla Public License, v. 2.0.
If a copy of the MPL was not distributed with this file, You can obtain one
at http://mozilla.org/MPL/2.0/.

Software distributed under the License is distributed on an "AS IS" basis,
WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
for the specific language governing rights and limitations under the License.

Copyright (C) 2025 GeoRobotix. All Rights Reserved.

******************************* END LICENSE BLOCK ***************************/

package org.sensorhub.impl.service.consys.client;

import org.sensorhub.api.client.ClientConfig;
import org.sensorhub.api.config.DisplayInfo;
import org.sensorhub.impl.comm.HTTPConfig;
import org.sensorhub.impl.comm.RobustIPConnectionConfig;
import org.sensorhub.impl.datastore.view.ObsSystemDatabaseViewConfig;
import org.sensorhub.impl.service.consys.client.http.JavaHttpClient;

public class ConSysApiClientConfig extends ClientConfig {

Expand All @@ -20,26 +35,17 @@ public class ConSysApiClientConfig extends ClientConfig {
@DisplayInfo(label="Connection Options")
public RobustIPConnectionConfig connection = new RobustIPConnectionConfig();

@DisplayInfo(label="OAuth Options", desc="Allows for the usage of OAuth Client Credentials (\"bearer\") tokens for instead of basic authentication")
public ConSysOAuthConfig conSysOAuth = new ConSysOAuthConfig();

// public static class ConSysConnectionConfig extends RobustIPConnectionConfig
// {
// @DisplayInfo(desc="Enable to use a persistent HTTP connection for InsertResult")
// public boolean usePersistentConnection;
//
//
// @DisplayInfo(desc="Maximum number of records in upload queue (used to compensate for variable bandwidth)")
// public int maxQueueSize = 10;
//
//
// @DisplayInfo(desc="Maximum number of stream errors before we try to reconnect to remote server")
// public int maxConnectErrors = 10;
// }

@DisplayInfo(label="Http Client Implementation", desc="Fully qualified class name of the HTTP client implementation to use")
public String httpClientImplClass;

public ConSysApiClientConfig()
{
this.moduleClass = ConSysApiClientModule.class.getCanonicalName();
this.conSys.resourcePath = "/sensorhub/api";
this.httpClientImplClass = JavaHttpClient.class.getCanonicalName();
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
/***************************** BEGIN LICENSE BLOCK ***************************

The contents of this file are subject to the Mozilla Public License, v. 2.0.
If a copy of the MPL was not distributed with this file, You can obtain one
at http://mozilla.org/MPL/2.0/.

Software distributed under the License is distributed on an "AS IS" basis,
WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
for the specific language governing rights and limitations under the License.

Copyright (C) 2025 GeoRobotix. All Rights Reserved.

******************************* END LICENSE BLOCK ***************************/

package org.sensorhub.impl.service.consys.client;

import org.sensorhub.api.module.IModule;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
/***************************** BEGIN LICENSE BLOCK ***************************

The contents of this file are subject to the Mozilla Public License, v. 2.0.
If a copy of the MPL was not distributed with this file, You can obtain one
at http://mozilla.org/MPL/2.0/.

Software distributed under the License is distributed on an "AS IS" basis,
WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
for the specific language governing rights and limitations under the License.

Copyright (C) 2025 GeoRobotix. All Rights Reserved.

******************************* END LICENSE BLOCK ***************************/

package org.sensorhub.impl.service.consys.client;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -61,6 +75,9 @@ public static class StreamInfo
private BigId internalID;
private String sysUID;
private Flow.Subscription subscription;
public long lastEventTime = Long.MIN_VALUE;
public int measPeriodMs = 1000;
public int errorCount = 0;
}

public ConSysApiClientModule()
Expand All @@ -86,23 +103,21 @@ public void setConfiguration(ConSysApiClientConfig config)
if (config.conSys.enableTLS)
scheme += "s";
apiEndpointUrl = scheme + "://" + config.conSys.remoteHost + ":" + config.conSys.remotePort;
if (config.conSys.resourcePath != null)
{
if (config.conSys.resourcePath.charAt(0) != '/')
apiEndpointUrl += '/';
apiEndpointUrl += config.conSys.resourcePath;
if (config.conSys.resourcePath != null && !config.conSys.resourcePath.isEmpty()) {
String path = config.conSys.resourcePath;
if (!path.startsWith("/"))
path = "/" + path;
if (!path.endsWith("/"))
path = path + "/";
apiEndpointUrl += path;
}
}

@Override
protected void doInit() throws SensorHubException
{
this.dataBaseView = config.dataSourceSelector.getFilteredView(getParentHub());

this.client = ConSysApiClient.
newBuilder(apiEndpointUrl)
.simpleAuth(config.conSys.user, !config.conSys.password.isEmpty() ? config.conSys.password.toCharArray() : null)
.build();
this.client = new ConSysApiClient(config);
}

@Override
Expand Down Expand Up @@ -198,7 +213,7 @@ private String tryUpdateSystem(ISystemWithDesc system)
var responseCode = client.updateSystem(systemID, system).get();
boolean successful = responseCode == 204;
if(!successful)
throw new ClientException("Failed to update resource: " + apiEndpointUrl + ConSysApiClient.SYSTEMS_COLLECTION + "/" + systemID);
throw new ClientException("Failed to update resource: " + apiEndpointUrl + ConSysApiClient.SYSTEMS_COLLECTION + "/" + systemID);
return systemID;
}
} catch (ExecutionException | InterruptedException | ClientException e) {
Expand Down Expand Up @@ -316,19 +331,75 @@ protected List<StreamInfo> registerSystemDataStreams(SystemRegInfo system)
{
List<StreamInfo> addedStreams = new ArrayList<>();

dataBaseView.getDataStreamStore().selectEntries(
var dataStreamEntries = dataBaseView.getDataStreamStore().selectEntries(
new DataStreamFilter.Builder()
.withSystems(new SystemFilter.Builder()
.withUniqueIDs(system.system.getUniqueIdentifier())
.build())
.build())
.forEach((entry) -> {
if(Objects.equals(entry.getValue().getSystemID().getUniqueID(), system.system.getUniqueIdentifier()))
addedStreams.add(registerDataStream(entry.getKey().getInternalID(), system.systemID, entry.getValue()));
});
.withUniqueIDs(system.system.getUniqueIdentifier())
.build())
.build()).toList();

var systemDataStreamList = new ArrayList<Map.Entry<String,IDataStreamInfo>>();
try {
var systemDataStream = client.getSystemDataStreams(system.systemID, ResourceFormat.JSON).get().toList();

systemDataStreamList.addAll(systemDataStream);
} catch (InterruptedException | ExecutionException e) {
reportError("Error getting system datastreams", new Throwable(e.getMessage()));
}

for (var entry : dataStreamEntries) {
// check each entry against the systemDataStreamList with outputName and systemUID
var filteredListByUniqueId = systemDataStreamList.stream().filter(dataStreamInfo ->
dataStreamInfo.getValue().getSystemID().getUniqueID().equals(entry.getValue().getSystemID().getUniqueID()));

var filterListByOutputName = filteredListByUniqueId.filter(dataStreamInfo ->
dataStreamInfo.getValue().getOutputName().equals(entry.getValue().getOutputName())).toList();

if (filterListByOutputName.isEmpty()) {
if(Objects.equals(entry.getValue().getSystemID().getUniqueID(), system.system.getUniqueIdentifier()))
addedStreams.add(registerDataStream(entry.getKey().getInternalID(), system.systemID, entry.getValue()));
} else {
// get id and update the existing datastream
var datastream = filterListByOutputName.get(0);

var newEntry = new DataStreamInfo.Builder()
.withName(entry.getValue().getName())
.withDescription(entry.getValue().getDescription())
.withSystem(entry.getValue().getSystemID())
.withRecordDescription(entry.getValue().getRecordStructure())
.withRecordEncoding(entry.getValue().getRecordEncoding())
.withDeployment(entry.getValue().getDeploymentID())
.withProcedure(entry.getValue().getProcedureID())
.withFeatureOfInterest(datastream.getValue().getFeatureOfInterestID())
.withSamplingFeature(datastream.getValue().getSamplingFeatureID())
.withPhenomenonTimeInterval(entry.getValue().getPhenomenonTimeInterval())
.withResultTimeInterval(entry.getValue().getResultTimeInterval())
.withValidTime(entry.getValue().getValidTime())
.build();

addedStreams.add(registerUpdatedDataStream(entry.getKey().getInternalID(), datastream.getKey(), newEntry));
}
}

return addedStreams;
}

protected StreamInfo registerUpdatedDataStream(BigId dsId, String dataStreamId, IDataStreamInfo dataStream)
{
var dsTopicId = EventUtils.getDataStreamDataTopicID(dataStream);
client.updateDataStream(dataStreamId, dataStream);

StreamInfo streamInfo = new StreamInfo();
streamInfo.dataStreamID = dataStreamId;
streamInfo.dataStream = dataStream;
streamInfo.topicID = dsTopicId;
streamInfo.sysUID = dataStream.getSystemID().getUniqueID();
streamInfo.internalID = dsId;

dataStreams.put(dsTopicId, streamInfo);
return streamInfo;
}

protected StreamInfo registerDataStream(BigId dsId, String systemID, IDataStreamInfo dataStream)
{
var dsTopicId = EventUtils.getDataStreamDataTopicID(dataStream);
Expand Down Expand Up @@ -403,7 +474,11 @@ protected synchronized void startStream(StreamInfo streamInfo) throws ClientExce
.withLatestResult()
.build())
.forEach(obs ->
client.pushObs(streamInfo.dataStreamID, streamInfo.dataStream, obs));
client.pushObs(
streamInfo.dataStreamID,
streamInfo.dataStream,
obs
));

getLogger().info("Starting Connected Systems data push for stream {} with UID {} to Connected Systems endpoint {}",
streamInfo.dataStreamID, streamInfo.sysUID, apiEndpointUrl);
Expand Down Expand Up @@ -432,10 +507,19 @@ public boolean isConnected()
protected void handleEvent(final ObsEvent e, StreamInfo streamInfo)
{
for(var obs : e.getObservations()) {
String foiID = null;
if (obs.hasFoi()) {
var registeredFoiID = registeredFeatureIDs.get(obs.getFoiID());
if (registeredFoiID != null)
foiID = registeredFoiID;
}
client.pushObs(
streamInfo.dataStreamID,
streamInfo.dataStream,
obs);
obs,
foiID
);
streamInfo.lastEventTime = e.getTimeStamp();
}
}

Expand Down Expand Up @@ -589,4 +673,9 @@ else if (e instanceof FoiAddedEvent foiAddedEvent)
}
}

public Map<String, StreamInfo> getDataStreams()
{
return dataStreams;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/***************************** BEGIN LICENSE BLOCK ***************************
The contents of this file are subject to the Mozilla Public License, v. 2.0.
If a copy of the MPL was not distributed with this file, You can obtain one
at http://mozilla.org/MPL/2.0/.
Software distributed under the License is distributed on an "AS IS" basis,
WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
for the specific language governing rights and limitations under the License.
Copyright (C) 2025 GeoRobotix. All Rights Reserved.
******************************* END LICENSE BLOCK ***************************/

package org.sensorhub.impl.service.consys.client;

import org.sensorhub.api.config.DisplayInfo;

public class ConSysOAuthConfig {

@DisplayInfo(label = "Use OAuth Authentication")
public boolean oAuthEnabled = true;

@DisplayInfo(label="Token Endpoint", desc="URL of OAuth provider's token endpoint")
public String tokenEndpoint;

@DisplayInfo(desc="Client ID as provided by your OAuth provider")
public String clientID;


@DisplayInfo(desc="Client Secret as provided by your OAuth provider")
public String clientSecret;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/***************************** BEGIN LICENSE BLOCK ***************************

The contents of this file are subject to the Mozilla Public License, v. 2.0.
If a copy of the MPL was not distributed with this file, You can obtain one
at http://mozilla.org/MPL/2.0/.

Software distributed under the License is distributed on an "AS IS" basis,
WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
for the specific language governing rights and limitations under the License.

Copyright (C) 2025 GeoRobotix. All Rights Reserved.

******************************* END LICENSE BLOCK ***************************/

package org.sensorhub.impl.service.consys.client;

public interface ITokenHandler {

/**
* gets the current access token
*/
String getToken();

/**
* refreshes a new access token
*/
void refreshAccessToken();

/**
* checks if the current token has expired
*/
boolean isExpired();
}
Loading
Loading