diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java index 4cbb853a1..10ec4c018 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClient.java @@ -8,7 +8,7 @@ WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. -Copyright (C) 2023 Sensia Software LLC. All Rights Reserved. +Copyright (C) 2023-2025 Sensia Software LLC. All Rights Reserved. ******************************* END LICENSE BLOCK ***************************/ @@ -18,29 +18,18 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.StringReader; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.Authenticator; -import java.net.HttpURLConnection; -import java.net.PasswordAuthentication; import java.net.URI; import java.net.URISyntaxException; -import java.net.URL; import java.net.URLEncoder; import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; -import java.net.http.HttpResponse.BodyHandler; -import java.net.http.HttpResponse.BodyHandlers; -import java.net.http.HttpResponse.BodySubscriber; -import java.net.http.HttpResponse.BodySubscribers; import java.nio.charset.StandardCharsets; + import java.time.Instant; +import java.util.ArrayDeque; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.LinkedHashSet; +import java.util.Map; import java.util.Set; import java.util.Spliterator; import java.util.concurrent.CompletableFuture; @@ -56,6 +45,7 @@ import org.sensorhub.api.command.ICommandStatus; import org.sensorhub.api.command.ICommandStreamInfo; import org.sensorhub.api.common.BigId; +import org.sensorhub.api.common.SensorHubException; import org.sensorhub.api.data.DataStreamInfo; import org.sensorhub.api.data.IDataStreamInfo; import org.sensorhub.api.data.IObsData; @@ -63,7 +53,8 @@ import org.sensorhub.api.semantic.IDerivedProperty; import org.sensorhub.api.system.ISystemWithDesc; import org.sensorhub.impl.common.IdEncodersBase32; -import org.sensorhub.impl.service.consys.ResourceParseException; +import org.sensorhub.impl.service.consys.client.http.IHttpClient; +import org.sensorhub.impl.service.consys.client.http.JavaHttpClient; import org.sensorhub.impl.service.consys.feature.FoiBindingGeoJson; import org.sensorhub.impl.service.consys.obs.DataStreamBindingJson; import org.sensorhub.impl.service.consys.obs.DataStreamSchemaBindingOmJson; @@ -89,18 +80,15 @@ import org.sensorhub.impl.service.consys.task.CommandStatusHandler.CommandStatusHandlerContextData; import org.sensorhub.impl.service.consys.task.CommandStreamBindingJson; import org.sensorhub.impl.service.consys.task.CommandStreamSchemaBindingJson; -import org.sensorhub.utils.Lambdas; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.vast.ogc.gml.IFeature; import org.vast.util.Asserts; import org.vast.util.BaseBuilder; -import com.google.common.base.Strings; -import com.google.common.net.HttpHeaders; + import com.google.common.net.UrlEscapers; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; import com.google.gson.stream.JsonReader; import com.google.gson.stream.JsonWriter; @@ -119,26 +107,41 @@ public class ConSysApiClient static final String SF_COLLECTION = "samplingFeatures"; static final Logger log = LoggerFactory.getLogger(ConSysApiClient.class); + protected URI endpoint; + protected ITokenHandler tokenHandler; + protected String user; + protected char[] password; + protected String token; + IHttpClient httpAdapter; - protected static boolean isHttpClientAvailable; + protected ConSysApiClient() { + } - static { - // Check if HttpClient is available. Will not be available on Android. + public ConSysApiClient(ConSysApiClientConfig config) throws SensorHubException { try { - Class.forName("java.net.http.HttpClient"); - isHttpClientAvailable = true; - } catch (ClassNotFoundException e) { - isHttpClientAvailable = false; - } - } + httpAdapter = (IHttpClient) Class.forName(config.httpClientImplClass) + .getDeclaredConstructor().newInstance(); + httpAdapter.setConfig(config); - protected Authenticator authenticator; - protected HttpClient http; - protected URI endpoint; - protected String token; + String scheme= config.conSys.enableTLS ? "https" : "http"; + String endpointUrl = scheme +"://" + config.conSys.remoteHost + ":" + config.conSys.remotePort; + if (config.conSys.resourcePath != null && !config.conSys.resourcePath.isEmpty()) + { + String path = config.conSys.resourcePath; + if (!path.startsWith("/")) + path = "/" + path; + if (!path.endsWith("/")) + path = path + "/"; + endpointUrl = endpointUrl + path; + } + this.endpoint = new URI(endpointUrl); - protected ConSysApiClient() {} + } catch (Exception e) { + throw new SensorHubException("Failed to instantiate http client", e); + } + + } /*------------*/ @@ -716,7 +719,7 @@ public Spliterator trySplit() /* Datastreams */ /*-------------*/ - public CompletableFuture getDatastreamById(String id, ResourceFormat format, boolean fetchSchema) + public CompletableFuture getDataStreamById(String id, ResourceFormat format, boolean fetchSchema) { var cf1 = sendGetRequest(endpoint.resolve(DATASTREAMS_COLLECTION + "/" + urlPathEncode(id)), format, body -> { try @@ -734,7 +737,7 @@ public CompletableFuture getDatastreamById(String id, ResourceF if (fetchSchema) { - return cf1.thenCombine(getDatastreamSchema(id, ResourceFormat.SWE_JSON, ResourceFormat.JSON), (dsInfo, schemaInfo) -> { + return cf1.thenCombine(getDataStreamSchema(id, ResourceFormat.SWE_JSON, ResourceFormat.JSON), (dsInfo, schemaInfo) -> { schemaInfo.getRecordStructure().setName(dsInfo.getOutputName()); @@ -749,9 +752,81 @@ public CompletableFuture getDatastreamById(String id, ResourceF return cf1; } - - - public CompletableFuture getDatastreamSchema(String id, ResourceFormat obsFormat, ResourceFormat format) + + public CompletableFuture> getDataStreams(ResourceFormat format, int pageSize, int offset) + { + var request = DATASTREAMS_COLLECTION + "?f=" + format + "&limit=" + pageSize + "&offset=" + offset; + log.debug("{}", request); + + return sendGetRequest(endpoint.resolve(request), format, body -> { + try + { + /*body.mark(100000); + ByteStreams.copy(body, System.out); + body.reset();*/ + + var ctx = new RequestContext(body); + var binding = new DataStreamBindingJson(ctx, new IdEncodersBase32(), null, true, Collections.emptyMap()) { + protected JsonReader getJsonReader(InputStream is) throws IOException + { + var reader = super.getJsonReader(is); + skipToCollectionItems(reader); + return reader; + } + }; + + return StreamSupport.stream(new Spliterator() { + + @Override + public int characteristics() + { + return Spliterator.ORDERED | Spliterator.DISTINCT; + } + + @Override + public long estimateSize() + { + return Long.MAX_VALUE; + } + + @Override + public boolean tryAdvance(Consumer consumer) + { + try + { + var f = binding.deserialize(); + if (f != null) + { + consumer.accept(f); + return true; + } + + return false; + } + catch (IOException e) + { + throw new IllegalStateException("Error parsing datastream", e); + } + } + + @Override + public Spliterator trySplit() + { + return null; + } + + }, false); + } + catch (IOException e) + { + e.printStackTrace(); + throw new CompletionException(e); + } + }); + + } + + public CompletableFuture getDataStreamSchema(String id, ResourceFormat obsFormat, ResourceFormat format) { var obsFormatStr = urlQueryEncode(obsFormat.getMimeType()); return sendGetRequest(endpoint.resolve(DATASTREAMS_COLLECTION + "/" + urlPathEncode(id) + "/schema?obsFormat="+obsFormatStr), format, body -> { @@ -837,7 +912,133 @@ protected void endJsonCollection(JsonWriter writer, Collection lin throw new IllegalStateException("Error initializing binding", e); } } + public CompletableFuture>> getSystemDataStreams(String systemId, ResourceFormat format) + { + return getSystemDataStreams(systemId, format, 100); + } + + public CompletableFuture>> getSystemDataStreams(String systemId, ResourceFormat format, int maxPageSize) + { + return getResourcesWithPaging((pageSize, offset) -> { + try { + return getSystemDataStreams(systemId, format, pageSize, offset).get(); + } + catch (Exception e) { + throw new IOException("Error loading datastreams", e); + } + }, maxPageSize); + } + + protected CompletableFuture>> getSystemDataStreams(String systemId, ResourceFormat format, int pageSize, int offset) + { + var request = SYSTEMS_COLLECTION + "/" + systemId + "/" + DATASTREAMS_COLLECTION + "?f=" + format + "&limit=" + pageSize + "&offset=" + offset; + log.debug("{}", request); + + return sendGetRequest(endpoint.resolve(request), format, body -> { + try + { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + body.transferTo(baos); + InputStream firstClone = new ByteArrayInputStream(baos.toByteArray()); + InputStream secondClone = new ByteArrayInputStream(baos.toByteArray()); + + var ids = new ArrayDeque<>(); + var bytes = firstClone.readAllBytes(); + + var jsonString = new String(bytes, StandardCharsets.UTF_8); + + JsonObject jsonObj = JsonParser.parseString(jsonString).getAsJsonObject(); + var items = jsonObj.get("items").getAsJsonArray(); + for (var item: items) { + ids.add(item.getAsJsonObject().get("id").getAsString()); + } + + var ctx = new RequestContext(secondClone); + + var binding = new DataStreamBindingJson(ctx, new IdEncodersBase32(), null, true, Collections.emptyMap()) { + protected JsonReader getJsonReader(InputStream is) throws IOException + { + var reader = super.getJsonReader(is); + skipToCollectionItems(reader); + return reader; + } + }; + + + return StreamSupport.stream(new Spliterator>() { + + @Override + public int characteristics() + { + return Spliterator.ORDERED | Spliterator.DISTINCT; + } + + @Override + public long estimateSize() + { + return Long.MAX_VALUE; + } + + @Override + public boolean tryAdvance(Consumer> consumer) + { + try + { + var f = binding.deserialize(); + if (f != null) + { + consumer.accept(Map.entry((String) ids.pop(), f)); + return true; + } + + return false; + } + catch (IOException e) + { + throw new IllegalStateException("Error parsing datastream", e); + } + } + + @Override + public Spliterator> trySplit() + { + return null; + } + + }, false); + + + } + catch (IOException e) + { + e.printStackTrace(); + throw new CompletionException(e); + } + }); + } + + public CompletableFuture updateDataStream(String dataStreamId, IDataStreamInfo dataStreamInfo) + { + try + { + var buffer = new ByteArrayOutputStream(); + var ctx = new RequestContext(buffer); + + var binding = new DataStreamBindingJson(ctx, new IdEncodersBase32(), null,false, Collections.emptyMap()); + binding.serialize(null, dataStreamInfo, false); + + return sendPutRequest( + endpoint.resolve(DATASTREAMS_COLLECTION + "/" + dataStreamId), + ResourceFormat.JSON, + buffer.toByteArray()); + } + catch (IOException e) + { + throw new IllegalStateException("Error initializing binding", e); + } + } /*-----------------*/ /* Control Streams */ @@ -965,8 +1166,13 @@ public CompletableFuture getControlStreamSchema(String id, R /*--------------*/ /* Observations */ /*--------------*/ + public CompletableFuture pushObs(String dataStreamId, IDataStreamInfo dataStream, IObsData obs) + { + return pushObs(dataStreamId, dataStream, obs, null); + } + // TODO: Be able to push different kinds of observations such as video - public CompletableFuture pushObs(String dsId, IDataStreamInfo dataStream, IObsData obs) + public CompletableFuture pushObs(String dsId, IDataStreamInfo dataStream, IObsData obs, String foiId) { try { @@ -986,6 +1192,12 @@ public CompletableFuture pushObs(String dsId, IDataStreamInfo dataStream ctx.setFormat(ResourceFormat.OM_JSON); var binding = new ObsBindingOmJson(ctx, new IdEncodersBase32(), false, null); binding.serialize(null, obs, false); + if (foiId != null) { + JsonObject payload = JsonParser.parseString(buffer.toString()).getAsJsonObject(); + payload.addProperty("foi@id", foiId); + buffer.reset(); + buffer.write(payload.toString().getBytes()); + } } return sendPostRequest( @@ -1177,336 +1389,31 @@ public CompletableFuture sendCommand(String csId, ICommandStream protected CompletableFuture sendGetRequest(URI collectionUri, ResourceFormat format, Function bodyMapper) { - if (!isHttpClientAvailable) - return sendGetRequestFallback(collectionUri, format, bodyMapper); - - var builder = HttpRequest.newBuilder() - .uri(collectionUri) - .GET() - .header(HttpHeaders.ACCEPT, format.getMimeType()); - - if (token != null) - builder.header(HttpHeaders.AUTHORIZATION, "Bearer " + token); - - var req = builder.build(); - BodyHandler bodyHandler = resp -> { - BodySubscriber upstream = BodySubscribers.ofByteArray(); - return BodySubscribers.mapping(upstream, body -> { - if (resp.statusCode() == 200) { - var is = new ByteArrayInputStream(body); - return bodyMapper.apply(is); - } else { - var error = new String(body); - throw new CompletionException("HTTP error " + resp.statusCode() + ": " + error, null); - } - }); - }; - - return http.sendAsync(req, bodyHandler) - .thenApply(resp -> { - if (resp.statusCode() == 200) - return resp.body(); - else - throw new CompletionException("HTTP error " + resp.statusCode(), null); - }); - } - - - /** - * Fallback method for sending requests using HttpURLConnection. - * This is used when HttpClient is not available (e.g., on Android). - */ - protected CompletableFuture sendGetRequestFallback(URI collectionUri, ResourceFormat format, Function bodyMapper) - { - return CompletableFuture.supplyAsync(() -> { - HttpURLConnection connection = null; - try { - if (authenticator != null) - Authenticator.setDefault(authenticator); - - URL url = collectionUri.toURL(); - connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("GET"); - connection.setRequestProperty(HttpHeaders.ACCEPT, format.getMimeType()); - if (token != null) - connection.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + token); - - int responseCode = connection.getResponseCode(); - if (responseCode == 200) { - try (InputStream is = connection.getInputStream()) { - return bodyMapper.apply(is); - } - } else { - throw new CompletionException("HTTP error " + responseCode, null); - } - } catch (IOException e) { - throw new CompletionException(e); - } finally { - if (connection != null) { - connection.disconnect(); - } - } - }); + return httpAdapter.sendGetRequest(collectionUri, format, bodyMapper); } protected CompletableFuture sendPostRequest(URI collectionUri, ResourceFormat format, byte[] body) { - if (!isHttpClientAvailable) - return sendPostRequestFallback(collectionUri, format, body); - - var builder = HttpRequest.newBuilder() - .uri(collectionUri) - .POST(HttpRequest.BodyPublishers.ofByteArray(body)) - .header(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()) - .header(HttpHeaders.CONTENT_TYPE, format.getMimeType()); - - if (token != null) - builder.header(HttpHeaders.AUTHORIZATION, "Bearer " + token); - - var req = builder.build(); - return http.sendAsync(req, BodyHandlers.ofString()) - .thenApply(resp -> { - if (resp.statusCode() == 201 || resp.statusCode() == 303) { - var location = resp.headers() - .firstValue(HttpHeaders.LOCATION) - .orElseThrow(() -> new IllegalStateException("Missing Location header in response")); - return location.substring(location.lastIndexOf('/') + 1); - } else - throw new CompletionException(resp.body(), null); - }); + return httpAdapter.sendPostRequest(collectionUri, format, body); } protected CompletableFuture sendPostRequestAndReadResponse(URI collectionUri, ResourceFormat format, byte[] requestBody, Function responseBodyMapper) { - //if (!isHttpClientAvailable) - // return sendPostRequestFallback(collectionUri, format, body); - - var builder = HttpRequest.newBuilder() - .uri(collectionUri) - .POST(HttpRequest.BodyPublishers.ofByteArray(requestBody)) - .header(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()) - .header(HttpHeaders.CONTENT_TYPE, format.getMimeType()); - - if (token != null) - builder.header(HttpHeaders.AUTHORIZATION, "Bearer " + token); - - - var req = builder.build(); - BodyHandler bodyHandler = resp -> { - BodySubscriber upstream = BodySubscribers.ofByteArray(); - return BodySubscribers.mapping(upstream, body -> { - if (resp.statusCode() == 200) { - var is = new ByteArrayInputStream(body); - return responseBodyMapper.apply(is); - } else { - var bodyStr = new String(body); - try { - var jsonError = (JsonObject)JsonParser.parseString(bodyStr); - throw new CompletionException(jsonError.get("message").getAsString(), null); - } catch (JsonSyntaxException e) { - throw new CompletionException("HTTP error " + resp.statusCode() + ": " + bodyStr, null); - } - } - }); - }; - - return http.sendAsync(req, bodyHandler) - .thenApply(resp -> { - if (resp.statusCode() == 200) - return resp.body(); - else - throw new CompletionException("HTTP error " + resp.statusCode(), null); - }); - } - - - /** - * Fallback method for sending requests using HttpURLConnection. - * This is used when HttpClient is not available (e.g., on Android). - */ - protected CompletableFuture sendPostRequestFallback(URI collectionUri, ResourceFormat format, byte[] body) - { - return CompletableFuture.supplyAsync(() -> { - HttpURLConnection connection = null; - try { - if (authenticator != null) - Authenticator.setDefault(authenticator); - - URL url = collectionUri.toURL(); - connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("POST"); - connection.setRequestProperty(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()); - connection.setRequestProperty(HttpHeaders.CONTENT_TYPE, format.getMimeType()); - if (token != null) - connection.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + token); - connection.setDoOutput(true); - - try (OutputStream os = connection.getOutputStream()) { - os.write(body); - } - - int responseCode = connection.getResponseCode(); - if (responseCode == 201 || responseCode == 303) { - String location = connection.getHeaderField(HttpHeaders.LOCATION); - if (location == null) { - throw new IllegalStateException("Missing Location header in response."); - } - return location.substring(location.lastIndexOf('/') + 1); - } else { - throw new CompletionException(connection.getResponseMessage(), null); - } - } catch (IOException e) { - throw new CompletionException(e); - } finally { - if (connection != null) { - connection.disconnect(); - } - } - }); + return httpAdapter.sendPostRequestAndReadResponse(collectionUri, format, requestBody, responseBodyMapper); } protected CompletableFuture sendPutRequest(URI collectionUri, ResourceFormat format, byte[] body) { - if (!isHttpClientAvailable) - return sendPutRequestFallback(collectionUri, format, body); - - var builder = HttpRequest.newBuilder() - .uri(collectionUri) - .PUT(HttpRequest.BodyPublishers.ofByteArray(body)) - .header(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()) - .header(HttpHeaders.CONTENT_TYPE, format.getMimeType()); - - if (token != null) - builder.header(HttpHeaders.AUTHORIZATION, "Bearer " + token); - - var req = builder.build(); - return http.sendAsync(req, BodyHandlers.ofString()) - .thenApply(HttpResponse::statusCode); - } - - - /** - * Fallback method for sending requests using HttpURLConnection. - * This is used when HttpClient is not available (e.g., on Android). - */ - protected CompletableFuture sendPutRequestFallback(URI collectionUri, ResourceFormat format, byte[] body) - { - return CompletableFuture.supplyAsync(() -> { - HttpURLConnection connection = null; - try { - if (authenticator != null) - Authenticator.setDefault(authenticator); - - URL url = collectionUri.toURL(); - connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("PUT"); - connection.setRequestProperty(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()); - connection.setRequestProperty(HttpHeaders.CONTENT_TYPE, format.getMimeType()); - if (token != null) - connection.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + token); - connection.setDoOutput(true); - - try (OutputStream os = connection.getOutputStream()) { - os.write(body); - } - - return connection.getResponseCode(); - } catch (IOException e) { - throw new CompletionException(e); - } finally { - if (connection != null) { - connection.disconnect(); - } - } - }); + return httpAdapter.sendPutRequest(collectionUri, format, body); } protected CompletableFuture> sendBatchPostRequest(URI collectionUri, ResourceFormat format, byte[] body) { - if (!isHttpClientAvailable) - return sendBatchPostRequestFallback(collectionUri, format, body); - - var builder = HttpRequest.newBuilder() - .uri(collectionUri) - .POST(HttpRequest.BodyPublishers.ofByteArray(body)) - .header(HttpHeaders.CONTENT_TYPE, format.getMimeType()); - - if (token != null) - builder.header(HttpHeaders.AUTHORIZATION, "Bearer " + token); - - var req = builder.build(); - return http.sendAsync(req, BodyHandlers.ofString()) - .thenApply(Lambdas.checked(resp -> { - if (resp.statusCode() == 201 || resp.statusCode() == 303) { - var idList = new LinkedHashSet(); - try (JsonReader reader = new JsonReader(new StringReader(resp.body()))) { - reader.beginArray(); - while (reader.hasNext()) { - var uri = reader.nextString(); - idList.add(uri.substring(uri.lastIndexOf('/') + 1)); - } - reader.endArray(); - } - return idList; - } else - throw new ResourceParseException(resp.body()); - })); - } - - - /** - * Fallback method for sending requests using HttpURLConnection. - * This is used when HttpClient is not available (e.g., on Android). - */ - protected CompletableFuture> sendBatchPostRequestFallback(URI collectionUri, ResourceFormat format, byte[] body) - { - return CompletableFuture.supplyAsync(() -> { - HttpURLConnection connection = null; - try { - if (authenticator != null) { - Authenticator.setDefault(authenticator); - } - - URL url = collectionUri.toURL(); - connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("POST"); - connection.setRequestProperty(HttpHeaders.CONTENT_TYPE, format.getMimeType()); - if (token != null) - connection.setRequestProperty(HttpHeaders.AUTHORIZATION, "Bearer " + token); - connection.setDoOutput(true); - - try (OutputStream os = connection.getOutputStream()) { - os.write(body); - } - - int responseCode = connection.getResponseCode(); - if (responseCode == 201 || responseCode == 303) { - Set idList = new LinkedHashSet<>(); - try (InputStream is = connection.getInputStream(); - JsonReader reader = new JsonReader(new InputStreamReader(is))) { - reader.beginArray(); - while (reader.hasNext()) { - String uri = reader.nextString(); - idList.add(uri.substring(uri.lastIndexOf('/') + 1)); - } - reader.endArray(); - } - return idList; - } else { - throw new ResourceParseException(connection.getResponseMessage()); - } - } catch (IOException e) { - throw new CompletionException(e); - } finally { - if (connection != null) { - connection.disconnect(); - } - } - }); + return httpAdapter.sendBatchPostRequest(collectionUri, format, body); } @@ -1599,14 +1506,11 @@ protected String urlQueryEncode(String value) { return URLEncoder.encode(value, StandardCharsets.UTF_8); } - - - public void setAuthToken(String token) - { + + @Deprecated + public void setAuthToken(String token) { this.token = token; } - - /* Builder stuff */ @@ -1620,12 +1524,10 @@ public static ConSysApiClientBuilder newBuilder(String endpoint) public static class ConSysApiClientBuilder extends BaseBuilder { HttpClient.Builder httpClientBuilder; - ConSysApiClientBuilder(String endpoint) { this.instance = new ConSysApiClient(); - if (isHttpClientAvailable) - this.httpClientBuilder = HttpClient.newBuilder(); + this.httpClientBuilder = HttpClient.newBuilder(); try { @@ -1640,29 +1542,17 @@ public static class ConSysApiClientBuilder extends BaseBuilder } - public ConSysApiClientBuilder useHttpClient(HttpClient http) + public ConSysApiClientBuilder useHttpClient(IHttpClient http) { - instance.http = http; + instance.httpAdapter = http; return this; } public ConSysApiClientBuilder simpleAuth(String user, char[] password) { - if (!Strings.isNullOrEmpty(user)) - { - var finalPwd = password != null ? password : new char[0]; - instance.authenticator = new Authenticator() { - @Override - protected PasswordAuthentication getPasswordAuthentication() { - return new PasswordAuthentication(user, finalPwd); - } - }; - - if (isHttpClientAvailable) - httpClientBuilder.authenticator(instance.authenticator); - } - + instance.user = user; + instance.password = password != null ? password : new char[0]; return this; } @@ -1670,9 +1560,14 @@ protected PasswordAuthentication getPasswordAuthentication() { @Override public ConSysApiClient build() { - if (isHttpClientAvailable && instance.http == null) - instance.http = httpClientBuilder.build(); + if (instance.httpAdapter == null) + instance.httpAdapter = new JavaHttpClient(instance.user, instance.password, instance.tokenHandler); return instance; } + + public ConSysApiClientBuilder tokenHandler(ITokenHandler tokenHandler) { + instance.tokenHandler = tokenHandler; + return this; + } } } diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java index 8dc01a237..525eb1e76 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientConfig.java @@ -1,3 +1,17 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2025 GeoRobotix. All Rights Reserved. + + ******************************* END LICENSE BLOCK ***************************/ + package org.sensorhub.impl.service.consys.client; import org.sensorhub.api.client.ClientConfig; @@ -5,6 +19,7 @@ import org.sensorhub.impl.comm.HTTPConfig; import org.sensorhub.impl.comm.RobustIPConnectionConfig; import org.sensorhub.impl.datastore.view.ObsSystemDatabaseViewConfig; +import org.sensorhub.impl.service.consys.client.http.JavaHttpClient; public class ConSysApiClientConfig extends ClientConfig { @@ -20,26 +35,17 @@ public class ConSysApiClientConfig extends ClientConfig { @DisplayInfo(label="Connection Options") public RobustIPConnectionConfig connection = new RobustIPConnectionConfig(); + @DisplayInfo(label="OAuth Options", desc="Allows for the usage of OAuth Client Credentials (\"bearer\") tokens for instead of basic authentication") + public ConSysOAuthConfig conSysOAuth = new ConSysOAuthConfig(); -// public static class ConSysConnectionConfig extends RobustIPConnectionConfig -// { -// @DisplayInfo(desc="Enable to use a persistent HTTP connection for InsertResult") -// public boolean usePersistentConnection; -// -// -// @DisplayInfo(desc="Maximum number of records in upload queue (used to compensate for variable bandwidth)") -// public int maxQueueSize = 10; -// -// -// @DisplayInfo(desc="Maximum number of stream errors before we try to reconnect to remote server") -// public int maxConnectErrors = 10; -// } - + @DisplayInfo(label="Http Client Implementation", desc="Fully qualified class name of the HTTP client implementation to use") + public String httpClientImplClass; public ConSysApiClientConfig() { this.moduleClass = ConSysApiClientModule.class.getCanonicalName(); this.conSys.resourcePath = "/sensorhub/api"; + this.httpClientImplClass = JavaHttpClient.class.getCanonicalName(); } } diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientDescriptor.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientDescriptor.java index 14397d48b..d183b09ae 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientDescriptor.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientDescriptor.java @@ -1,3 +1,17 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2025 GeoRobotix. All Rights Reserved. + + ******************************* END LICENSE BLOCK ***************************/ + package org.sensorhub.impl.service.consys.client; import org.sensorhub.api.module.IModule; diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java index f34de8ac9..2f0b41898 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysApiClientModule.java @@ -1,3 +1,17 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2025 GeoRobotix. All Rights Reserved. + + ******************************* END LICENSE BLOCK ***************************/ + package org.sensorhub.impl.service.consys.client; import com.google.common.base.Strings; @@ -61,6 +75,9 @@ public static class StreamInfo private BigId internalID; private String sysUID; private Flow.Subscription subscription; + public long lastEventTime = Long.MIN_VALUE; + public int measPeriodMs = 1000; + public int errorCount = 0; } public ConSysApiClientModule() @@ -86,11 +103,13 @@ public void setConfiguration(ConSysApiClientConfig config) if (config.conSys.enableTLS) scheme += "s"; apiEndpointUrl = scheme + "://" + config.conSys.remoteHost + ":" + config.conSys.remotePort; - if (config.conSys.resourcePath != null) - { - if (config.conSys.resourcePath.charAt(0) != '/') - apiEndpointUrl += '/'; - apiEndpointUrl += config.conSys.resourcePath; + if (config.conSys.resourcePath != null && !config.conSys.resourcePath.isEmpty()) { + String path = config.conSys.resourcePath; + if (!path.startsWith("/")) + path = "/" + path; + if (!path.endsWith("/")) + path = path + "/"; + apiEndpointUrl += path; } } @@ -98,11 +117,7 @@ public void setConfiguration(ConSysApiClientConfig config) protected void doInit() throws SensorHubException { this.dataBaseView = config.dataSourceSelector.getFilteredView(getParentHub()); - - this.client = ConSysApiClient. - newBuilder(apiEndpointUrl) - .simpleAuth(config.conSys.user, !config.conSys.password.isEmpty() ? config.conSys.password.toCharArray() : null) - .build(); + this.client = new ConSysApiClient(config); } @Override @@ -198,7 +213,7 @@ private String tryUpdateSystem(ISystemWithDesc system) var responseCode = client.updateSystem(systemID, system).get(); boolean successful = responseCode == 204; if(!successful) - throw new ClientException("Failed to update resource: " + apiEndpointUrl + ConSysApiClient.SYSTEMS_COLLECTION + "/" + systemID); + throw new ClientException("Failed to update resource: " + apiEndpointUrl + ConSysApiClient.SYSTEMS_COLLECTION + "/" + systemID); return systemID; } } catch (ExecutionException | InterruptedException | ClientException e) { @@ -316,19 +331,75 @@ protected List registerSystemDataStreams(SystemRegInfo system) { List addedStreams = new ArrayList<>(); - dataBaseView.getDataStreamStore().selectEntries( + var dataStreamEntries = dataBaseView.getDataStreamStore().selectEntries( new DataStreamFilter.Builder() .withSystems(new SystemFilter.Builder() - .withUniqueIDs(system.system.getUniqueIdentifier()) - .build()) - .build()) - .forEach((entry) -> { - if(Objects.equals(entry.getValue().getSystemID().getUniqueID(), system.system.getUniqueIdentifier())) - addedStreams.add(registerDataStream(entry.getKey().getInternalID(), system.systemID, entry.getValue())); - }); + .withUniqueIDs(system.system.getUniqueIdentifier()) + .build()) + .build()).toList(); + + var systemDataStreamList = new ArrayList>(); + try { + var systemDataStream = client.getSystemDataStreams(system.systemID, ResourceFormat.JSON).get().toList(); + + systemDataStreamList.addAll(systemDataStream); + } catch (InterruptedException | ExecutionException e) { + reportError("Error getting system datastreams", new Throwable(e.getMessage())); + } + + for (var entry : dataStreamEntries) { + // check each entry against the systemDataStreamList with outputName and systemUID + var filteredListByUniqueId = systemDataStreamList.stream().filter(dataStreamInfo -> + dataStreamInfo.getValue().getSystemID().getUniqueID().equals(entry.getValue().getSystemID().getUniqueID())); + + var filterListByOutputName = filteredListByUniqueId.filter(dataStreamInfo -> + dataStreamInfo.getValue().getOutputName().equals(entry.getValue().getOutputName())).toList(); + + if (filterListByOutputName.isEmpty()) { + if(Objects.equals(entry.getValue().getSystemID().getUniqueID(), system.system.getUniqueIdentifier())) + addedStreams.add(registerDataStream(entry.getKey().getInternalID(), system.systemID, entry.getValue())); + } else { + // get id and update the existing datastream + var datastream = filterListByOutputName.get(0); + + var newEntry = new DataStreamInfo.Builder() + .withName(entry.getValue().getName()) + .withDescription(entry.getValue().getDescription()) + .withSystem(entry.getValue().getSystemID()) + .withRecordDescription(entry.getValue().getRecordStructure()) + .withRecordEncoding(entry.getValue().getRecordEncoding()) + .withDeployment(entry.getValue().getDeploymentID()) + .withProcedure(entry.getValue().getProcedureID()) + .withFeatureOfInterest(datastream.getValue().getFeatureOfInterestID()) + .withSamplingFeature(datastream.getValue().getSamplingFeatureID()) + .withPhenomenonTimeInterval(entry.getValue().getPhenomenonTimeInterval()) + .withResultTimeInterval(entry.getValue().getResultTimeInterval()) + .withValidTime(entry.getValue().getValidTime()) + .build(); + + addedStreams.add(registerUpdatedDataStream(entry.getKey().getInternalID(), datastream.getKey(), newEntry)); + } + } + return addedStreams; } + protected StreamInfo registerUpdatedDataStream(BigId dsId, String dataStreamId, IDataStreamInfo dataStream) + { + var dsTopicId = EventUtils.getDataStreamDataTopicID(dataStream); + client.updateDataStream(dataStreamId, dataStream); + + StreamInfo streamInfo = new StreamInfo(); + streamInfo.dataStreamID = dataStreamId; + streamInfo.dataStream = dataStream; + streamInfo.topicID = dsTopicId; + streamInfo.sysUID = dataStream.getSystemID().getUniqueID(); + streamInfo.internalID = dsId; + + dataStreams.put(dsTopicId, streamInfo); + return streamInfo; + } + protected StreamInfo registerDataStream(BigId dsId, String systemID, IDataStreamInfo dataStream) { var dsTopicId = EventUtils.getDataStreamDataTopicID(dataStream); @@ -403,7 +474,11 @@ protected synchronized void startStream(StreamInfo streamInfo) throws ClientExce .withLatestResult() .build()) .forEach(obs -> - client.pushObs(streamInfo.dataStreamID, streamInfo.dataStream, obs)); + client.pushObs( + streamInfo.dataStreamID, + streamInfo.dataStream, + obs + )); getLogger().info("Starting Connected Systems data push for stream {} with UID {} to Connected Systems endpoint {}", streamInfo.dataStreamID, streamInfo.sysUID, apiEndpointUrl); @@ -432,10 +507,19 @@ public boolean isConnected() protected void handleEvent(final ObsEvent e, StreamInfo streamInfo) { for(var obs : e.getObservations()) { + String foiID = null; + if (obs.hasFoi()) { + var registeredFoiID = registeredFeatureIDs.get(obs.getFoiID()); + if (registeredFoiID != null) + foiID = registeredFoiID; + } client.pushObs( streamInfo.dataStreamID, streamInfo.dataStream, - obs); + obs, + foiID + ); + streamInfo.lastEventTime = e.getTimeStamp(); } } @@ -589,4 +673,9 @@ else if (e instanceof FoiAddedEvent foiAddedEvent) } } + public Map getDataStreams() + { + return dataStreams; + } + } diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysOAuthConfig.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysOAuthConfig.java new file mode 100644 index 000000000..db6051f6d --- /dev/null +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ConSysOAuthConfig.java @@ -0,0 +1,33 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2025 GeoRobotix. All Rights Reserved. + + ******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.service.consys.client; + +import org.sensorhub.api.config.DisplayInfo; + +public class ConSysOAuthConfig { + + @DisplayInfo(label = "Use OAuth Authentication") + public boolean oAuthEnabled = true; + + @DisplayInfo(label="Token Endpoint", desc="URL of OAuth provider's token endpoint") + public String tokenEndpoint; + + @DisplayInfo(desc="Client ID as provided by your OAuth provider") + public String clientID; + + + @DisplayInfo(desc="Client Secret as provided by your OAuth provider") + public String clientSecret; +} diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ITokenHandler.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ITokenHandler.java new file mode 100644 index 000000000..a815da306 --- /dev/null +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/ITokenHandler.java @@ -0,0 +1,33 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2025 GeoRobotix. All Rights Reserved. + + ******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.service.consys.client; + +public interface ITokenHandler { + + /** + * gets the current access token + */ + String getToken(); + + /** + * refreshes a new access token + */ + void refreshAccessToken(); + + /** + * checks if the current token has expired + */ + boolean isExpired(); +} diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/OAuthTokenHandler.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/OAuthTokenHandler.java new file mode 100644 index 000000000..43937f25e --- /dev/null +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/OAuthTokenHandler.java @@ -0,0 +1,98 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2025 GeoRobotix. All Rights Reserved. + + ******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.service.consys.client; + +import com.google.common.net.HttpHeaders; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.text.MessageFormat; + +/** + * Token handler impl for OAuth 2.0 Client + */ +public class OAuthTokenHandler implements ITokenHandler { + + private String token; + + private static final Logger logger = LoggerFactory.getLogger(OAuthTokenHandler.class); + + private long expirationTime; + + private final ConSysOAuthConfig config; + + public OAuthTokenHandler(ConSysOAuthConfig config) { + + this.config = config; + } + + public String getToken() { + return token; + } + + + public void refreshAccessToken() { + + try { + + String data = MessageFormat.format("grant_type=client_credentials&client_id={0}&client_secret={1}", + config.clientID, config.clientSecret); + + URL url = URI.create(config.tokenEndpoint).toURL(); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded"); + connection.setDoOutput(true); + + try (OutputStream os = connection.getOutputStream()) { + os.write(data.getBytes()); + } + + int responseCode = connection.getResponseCode(); + if (responseCode != 200) { + + logger.error("Failed to retrieve access token: {}", responseCode); + + } else { + + try (InputStream is = connection.getInputStream()) { + + JsonObject jsonObject = JsonParser.parseReader(new InputStreamReader(is)).getAsJsonObject(); + + token = jsonObject.get("access_token").getAsString(); + expirationTime = jsonObject.get("expires_in").getAsInt() * 1000L + System.currentTimeMillis(); + } + } + } catch (IOException e) { + + logger.error("Failed to retrieve access token due to exception: {}", e.getMessage()); + } + } + + public boolean isExpired() { + + return System.currentTimeMillis() > expirationTime; + } +} \ No newline at end of file diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/http/HttpURLConnectionClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/http/HttpURLConnectionClient.java new file mode 100644 index 000000000..f82a0dd95 --- /dev/null +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/http/HttpURLConnectionClient.java @@ -0,0 +1,243 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + + The contents of this file are subject to the Mozilla Public License, v. 2.0. + If a copy of the MPL was not distributed with this file, You can obtain one + at http://mozilla.org/MPL/2.0/. + + Software distributed under the License is distributed on an "AS IS" basis, + WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + for the specific language governing rights and limitations under the License. + + Copyright (C) 2025 GeoRobotix. All Rights Reserved. + + ******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.service.consys.client.http; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.Authenticator; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Function; + +import org.sensorhub.impl.service.consys.ResourceParseException; +import org.sensorhub.impl.service.consys.client.ConSysApiClientConfig; +import org.sensorhub.impl.service.consys.resource.ResourceFormat; + +import com.google.common.net.HttpHeaders; +import com.google.gson.stream.JsonReader; + +public class HttpURLConnectionClient implements IHttpClient +{ + protected Authenticator authenticator; + + public HttpURLConnectionClient() + { + } + + @Override + public void setConfig(ConSysApiClientConfig config) { + + } + + + @Override + public CompletableFuture sendGetRequest(URI uri, ResourceFormat format, Function bodyMapper) + { + return CompletableFuture.supplyAsync(() -> { + HttpURLConnection connection = null; + try { + if (authenticator != null) + Authenticator.setDefault(authenticator); + + URL url = uri.toURL(); + connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + connection.setRequestProperty(HttpHeaders.ACCEPT, format.getMimeType()); + + int responseCode = connection.getResponseCode(); + if (responseCode == 200) { + try (InputStream is = connection.getInputStream()) { + return bodyMapper.apply(is); + } + } else { + throw new CompletionException("HTTP error " + responseCode, null); + } + } catch (IOException e) { + throw new CompletionException(e); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + }); + } + + @Override + public CompletableFuture sendPostRequest(URI uri, ResourceFormat format, byte[] body) + { + return CompletableFuture.supplyAsync(() -> { + HttpURLConnection connection = null; + try { + if (authenticator != null) + Authenticator.setDefault(authenticator); + + URL url = uri.toURL(); + connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()); + connection.setRequestProperty(HttpHeaders.CONTENT_TYPE, format.getMimeType()); + + connection.setDoOutput(true); + + try (OutputStream os = connection.getOutputStream()) { + os.write(body); + } + + int responseCode = connection.getResponseCode(); + if (responseCode == 201 || responseCode == 303) { + String location = connection.getHeaderField(HttpHeaders.LOCATION); + if (location == null) { + throw new IllegalStateException("Missing Location header in response."); + } + return location.substring(location.lastIndexOf('/') + 1); + } else { + throw new CompletionException(connection.getResponseMessage(), null); + } + } catch (IOException e) { + throw new CompletionException(e); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + }); + } + + @Override + public CompletableFuture sendPostRequestAndReadResponse(URI uri, ResourceFormat format, byte[] body, Function responseBodyMapper) + { + return CompletableFuture.supplyAsync(() -> { + HttpURLConnection connection = null; + try { + if (authenticator != null) + Authenticator.setDefault(authenticator); + + URL url = uri.toURL(); + connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()); + connection.setRequestProperty(HttpHeaders.CONTENT_TYPE, format.getMimeType()); + + connection.setDoOutput(true); + + try (OutputStream os = connection.getOutputStream()) { + os.write(body); + } + + int responseCode = connection.getResponseCode(); + if (responseCode == 200) { + try (InputStream is = connection.getInputStream()) { + return responseBodyMapper.apply(is); + } + } else { + throw new CompletionException("HTTP error " + responseCode + ": " + connection.getResponseMessage(), null); + } + } catch (IOException e) { + throw new CompletionException(e); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + }); + } + + @Override + public CompletableFuture sendPutRequest(URI uri, ResourceFormat format, byte[] body) + { + return CompletableFuture.supplyAsync(() -> { + HttpURLConnection connection = null; + try { + if (authenticator != null) + Authenticator.setDefault(authenticator); + + URL url = uri.toURL(); + connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("PUT"); + connection.setRequestProperty(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()); + connection.setRequestProperty(HttpHeaders.CONTENT_TYPE, format.getMimeType()); + + connection.setDoOutput(true); + + try (OutputStream os = connection.getOutputStream()) { + os.write(body); + } + + return connection.getResponseCode(); + } catch (IOException e) { + throw new CompletionException(e); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + }); + } + + @Override + public CompletableFuture> sendBatchPostRequest(URI uri, ResourceFormat format, byte[] body) + { + return CompletableFuture.supplyAsync(() -> { + HttpURLConnection connection = null; + try { + if (authenticator != null) { + Authenticator.setDefault(authenticator); + } + + URL url = uri.toURL(); + connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setRequestProperty(HttpHeaders.CONTENT_TYPE, format.getMimeType()); + + connection.setDoOutput(true); + + try (OutputStream os = connection.getOutputStream()) { + os.write(body); + } + + int responseCode = connection.getResponseCode(); + if (responseCode == 201 || responseCode == 303) { + Set idList = new LinkedHashSet<>(); + try (InputStream is = connection.getInputStream(); + JsonReader reader = new JsonReader(new InputStreamReader(is))) { + reader.beginArray(); + while (reader.hasNext()) { + String uri2 = reader.nextString(); + idList.add(uri2.substring(uri2.lastIndexOf('/') + 1)); + } + reader.endArray(); + } + return idList; + } else { + throw new ResourceParseException(connection.getResponseMessage()); + } + } catch (IOException e) { + throw new CompletionException(e); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + }); + } + +} diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/http/IHttpClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/http/IHttpClient.java new file mode 100644 index 000000000..d4a55a2e5 --- /dev/null +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/http/IHttpClient.java @@ -0,0 +1,36 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2025 GeoRobotix. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.service.consys.client.http; + +import java.io.InputStream; +import java.net.URI; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import org.sensorhub.impl.service.consys.client.ConSysApiClientConfig; +import org.sensorhub.impl.service.consys.resource.ResourceFormat; + +public interface IHttpClient +{ + CompletableFuture sendGetRequest(URI uri, ResourceFormat format, Function bodyMapper); + CompletableFuture sendPostRequest(URI uri, ResourceFormat format, byte[] body); + CompletableFuture sendPostRequestAndReadResponse(URI uri, ResourceFormat format, byte[] body, Function responseBodyMapper); + CompletableFuture sendPutRequest(URI uri, ResourceFormat format, byte[] body); + CompletableFuture> sendBatchPostRequest(URI uri, ResourceFormat format, byte[] body); + + void setConfig(ConSysApiClientConfig config); + +} diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/http/JavaHttpClient.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/http/JavaHttpClient.java new file mode 100644 index 000000000..acc8b44a6 --- /dev/null +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/client/http/JavaHttpClient.java @@ -0,0 +1,243 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2025 GeoRobotix. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.sensorhub.impl.service.consys.client.http; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.StringReader; +import java.net.Authenticator; +import java.net.PasswordAuthentication; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandler; +import java.net.http.HttpResponse.BodyHandlers; +import java.net.http.HttpResponse.BodySubscriber; +import java.net.http.HttpResponse.BodySubscribers; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Function; +import org.sensorhub.impl.service.consys.ResourceParseException; +import org.sensorhub.impl.service.consys.client.ConSysApiClientConfig; +import org.sensorhub.impl.service.consys.client.ITokenHandler; +import org.sensorhub.impl.service.consys.client.OAuthTokenHandler; +import org.sensorhub.impl.service.consys.resource.ResourceFormat; +import org.sensorhub.utils.Lambdas; +import com.google.common.net.HttpHeaders; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; +import com.google.gson.stream.JsonReader; + + +public class JavaHttpClient implements IHttpClient +{ + protected HttpClient http; + protected ITokenHandler tokenHandler; + + public JavaHttpClient() {} + + public JavaHttpClient(String user, char[] password, ITokenHandler tokenHandler) { + this.tokenHandler = tokenHandler; + var builder = HttpClient.newBuilder(); + if (user != null && !user.isEmpty()) { + var finalPwd = password != null ? password : new char[0]; + builder.authenticator(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(user, finalPwd); + } + }); + } + this.http = builder.build(); + } + + @Override + public void setConfig(ConSysApiClientConfig config) { + if (config.conSysOAuth.oAuthEnabled) { + this.tokenHandler = new OAuthTokenHandler(config.conSysOAuth); + } + + this.http = HttpClient.newBuilder() + .authenticator(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + char[] finalPwd = config.conSys.password != null ? config.conSys.password.toCharArray() : new char[0]; + + return new PasswordAuthentication(config.conSys.user, finalPwd); + } + }) + .build(); + } + + @Override + public CompletableFuture sendGetRequest(URI uri, ResourceFormat format, Function bodyMapper) + { + var builder = HttpRequest.newBuilder() + .uri(uri) + .GET() + .header(HttpHeaders.ACCEPT, format.getMimeType()); + + addAuthHeader(builder); + + + var req = builder.build(); + BodyHandler bodyHandler = resp -> { + BodySubscriber upstream = BodySubscribers.ofByteArray(); + return BodySubscribers.mapping(upstream, body -> { + if (resp.statusCode() == 200) { + var is = new ByteArrayInputStream(body); + return bodyMapper.apply(is); + } else { + var error = new String(body); + throw new CompletionException("HTTP error " + resp.statusCode() + ": " + error, null); + } + }); + }; + + return http.sendAsync(req, bodyHandler) + .thenApply(resp -> { + if (resp.statusCode() == 200) + return resp.body(); + else + throw new CompletionException("HTTP error " + resp.statusCode(), null); + }); + } + + + @Override + public CompletableFuture sendPostRequest(URI uri, ResourceFormat format, byte[] body) + { + var builder = HttpRequest.newBuilder() + .uri(uri) + .POST(HttpRequest.BodyPublishers.ofByteArray(body)) + .header(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()) + .header(HttpHeaders.CONTENT_TYPE, format.getMimeType()); + + addAuthHeader(builder); + + var req = builder.build(); + return http.sendAsync(req, BodyHandlers.ofString()) + .thenApply(resp -> { + if (resp.statusCode() == 201 || resp.statusCode() == 303) { + var location = resp.headers() + .firstValue(HttpHeaders.LOCATION) + .orElseThrow(() -> new IllegalStateException("Missing Location header in response")); + return location.substring(location.lastIndexOf('/') + 1); + } else + throw new CompletionException(resp.body(), null); + }); + } + + + @Override + public CompletableFuture sendPostRequestAndReadResponse(URI uri, ResourceFormat format, byte[] requestBody, Function responseBodyMapper) + { + var builder = HttpRequest.newBuilder() + .uri(uri) + .POST(HttpRequest.BodyPublishers.ofByteArray(requestBody)) + .header(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()) + .header(HttpHeaders.CONTENT_TYPE, format.getMimeType()); + + addAuthHeader(builder); + + var req = builder.build(); + BodyHandler bodyHandler = resp -> { + BodySubscriber upstream = BodySubscribers.ofByteArray(); + return BodySubscribers.mapping(upstream, body -> { + if (resp.statusCode() == 200) { + var is = new ByteArrayInputStream(body); + return responseBodyMapper.apply(is); + } else { + var bodyStr = new String(body); + try { + var jsonError = (JsonObject) JsonParser.parseString(bodyStr); + throw new CompletionException(jsonError.get("message").getAsString(), null); + } catch (JsonSyntaxException e) { + throw new CompletionException("HTTP error " + resp.statusCode() + ": " + bodyStr, null); + } + } + }); + }; + + return http.sendAsync(req, bodyHandler) + .thenApply(resp -> { + if (resp.statusCode() == 200) + return resp.body(); + else + throw new CompletionException("HTTP error " + resp.statusCode(), null); + }); + } + + + @Override + public CompletableFuture sendPutRequest(URI uri, ResourceFormat format, byte[] body) + { + var builder = HttpRequest.newBuilder() + .uri(uri) + .PUT(HttpRequest.BodyPublishers.ofByteArray(body)) + .header(HttpHeaders.ACCEPT, ResourceFormat.JSON.getMimeType()) + .header(HttpHeaders.CONTENT_TYPE, format.getMimeType()); + + addAuthHeader(builder); + + var req = builder.build(); + return http.sendAsync(req, BodyHandlers.ofString()) + .thenApply(HttpResponse::statusCode); + } + + + @Override + public CompletableFuture> sendBatchPostRequest(URI uri, ResourceFormat format, byte[] body) + { + var builder = HttpRequest.newBuilder() + .uri(uri) + .POST(HttpRequest.BodyPublishers.ofByteArray(body)) + .header(HttpHeaders.CONTENT_TYPE, format.getMimeType()); + + addAuthHeader(builder); + + var req = builder.build(); + return http.sendAsync(req, BodyHandlers.ofString()) + .thenApply(Lambdas.checked(resp -> { + if (resp.statusCode() == 201 || resp.statusCode() == 303) { + var idList = new LinkedHashSet(); + try (JsonReader reader = new JsonReader(new StringReader(resp.body()))) { + reader.beginArray(); + while (reader.hasNext()) { + var uri2 = reader.nextString(); + idList.add(uri2.substring(uri2.lastIndexOf('/') + 1)); + } + reader.endArray(); + } + return idList; + } else + throw new ResourceParseException(resp.body()); + })); + } + + protected void addAuthHeader(HttpRequest.Builder requestBuilder) + { + if (tokenHandler != null) { + if (tokenHandler.isExpired()) { + tokenHandler.refreshAccessToken(); + } + requestBuilder.header("Authorization", "Bearer " + tokenHandler.getToken()); + } + } +} diff --git a/sensorhub-service-consys/src/test/java/org/sensorhub/impl/service/consys/client/TestClientDataStreams.java b/sensorhub-service-consys/src/test/java/org/sensorhub/impl/service/consys/client/TestClientDataStreams.java index d29a437d6..659b5e763 100644 --- a/sensorhub-service-consys/src/test/java/org/sensorhub/impl/service/consys/client/TestClientDataStreams.java +++ b/sensorhub-service-consys/src/test/java/org/sensorhub/impl/service/consys/client/TestClientDataStreams.java @@ -101,8 +101,8 @@ public void testAddDataStreamAndGetById() throws Exception .newBuilder(apiRootUrl) .build(); - var dsInfo = client.getDatastreamById(dsId, ResourceFormat.JSON, false).get(); + var dsInfo = client.getDataStreamById(dsId, ResourceFormat.JSON, false).get(); assertEquals(recordStruct.getName(), dsInfo.getOutputName()); assertEquals(systemTests.getSystemUid(1), dsInfo.getSystemID().getUniqueID()); assertTrue(((FeatureLink)dsInfo.getSystemID()).getLink().getHref().contains(sysId)); @@ -110,6 +110,38 @@ public void testAddDataStreamAndGetById() throws Exception + + @Test + public void testGetDataStreams() throws Exception + { + var sysId = systemTests.addSystem(1, true); + + var swe = new GeoPosHelper(); + var recordStruct = swe.createLocationVectorLLA() + .name("pos") + .build(); + + int totalDataStreams = 5; + var addedIds = new ArrayList(); + for (int i = 0; i < totalDataStreams; i++) + { + var struct = recordStruct.copy(); + struct.setName("output" + i); + var dsId = addDataStream(sysId, i, struct, false); + addedIds.add(dsId); + } + + var client = ConSysApiClient + .newBuilder(apiRootUrl) + .build(); + + var allDataStreams = client.getDataStreams(ResourceFormat.JSON, 10, 0).get(); + var allDataStreamList = allDataStreams.toList(); + assertEquals(totalDataStreams, allDataStreamList.size()); + } + + + protected String addDataStream(String sysId, int num, DataComponent recordStruct, boolean checkHead) throws Exception { // insert one datastream diff --git a/sensorhub-service-consys/src/test/java/org/sensorhub/impl/service/consys/client/TestClientObs.java b/sensorhub-service-consys/src/test/java/org/sensorhub/impl/service/consys/client/TestClientObs.java index 0d55b3edf..ede5cc99a 100644 --- a/sensorhub-service-consys/src/test/java/org/sensorhub/impl/service/consys/client/TestClientObs.java +++ b/sensorhub-service-consys/src/test/java/org/sensorhub/impl/service/consys/client/TestClientObs.java @@ -67,7 +67,7 @@ protected void addObs(String dsId, int start, int count, boolean checkHead) thro .newBuilder(apiRootUrl) .build(); - var dsInfo = client.getDatastreamSchema(dsId, ResourceFormat.SWE_JSON, ResourceFormat.JSON).get(); + var dsInfo = client.getDataStreamSchema(dsId, ResourceFormat.SWE_JSON, ResourceFormat.JSON).get(); var recordStruct = dsInfo.getRecordStructure(); var now = Instant.now().truncatedTo(ChronoUnit.SECONDS).minusSeconds(3600);