Skip to content

Commit 48e6811

Browse files
committed
Work on subscriptions, can subscribe successfully, now needs state machine for processing replies
1 parent 6706944 commit 48e6811

File tree

2 files changed

+56
-43
lines changed

2 files changed

+56
-43
lines changed

src/client/include/CmwLightClient.hpp

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,13 @@ struct CmwLightConnectBody {
4848
std::string &clientInfo() { return x_9; }
4949
};
5050
struct CmwLightRequestContext {
51-
std::string x_8; // SELECTOR
52-
std::map<std::string, std::string> c; // FILTERS // todo: support arbitrary filter data
53-
std::map<std::string, std::string> x; // DATA // todo: support arbitrary filter data
51+
std::string x_8; // SELECTOR
52+
std::map<std::string, std::variant<bool, int, long, std::string>> c; // FILTERS
53+
std::map<std::string, std::variant<bool, int, long, std::string>> x; // DATA
5454
// accessors to make code more readable
55-
std::string &selector() { return x_8; };
56-
std::map<std::string, std::string> &filters() { return c; }
57-
std::map<std::string, std::string> &data() { return x; }
55+
std::string &selector() { return x_8; };
56+
std::map<std::string, std::variant<bool, int, long, std::string>> &filters() { return c; }
57+
std::map<std::string, std::variant<bool, int, long, std::string>> &data() { return x; }
5858
};
5959
struct CmwLightDataContext {
6060
std::string x_4; // CYCLE_NAME
@@ -191,7 +191,7 @@ struct OpenSubscription {
191191
std::chrono::milliseconds backOff = 20ms;
192192
long updateId;
193193
long reqId = 0L;
194-
std::string replyId;
194+
long replyId;
195195
SubscriptionState state = SubscriptionState::SUBSCRIBING;
196196
std::chrono::system_clock::time_point nextTry;
197197
std::string uri;
@@ -407,6 +407,7 @@ class CMWLightClient : public CMWLightClientBase {
407407

408408
if (con._connectionState == detail::Connection::ConnectionState::CONNECTING2) {
409409
if (header.requestType() == static_cast<uint8_t>(detail::RequestType::REPLY)) {
410+
fmt::print("connected successfully\n");
410411
con._connectionState = detail::Connection::ConnectionState::CONNECTED;
411412
con._lastHeartbeatReceived = currentTime;
412413
return true;
@@ -418,21 +419,24 @@ class CMWLightClient : public CMWLightClientBase {
418419
using enum detail::RequestType;
419420
switch (detail::RequestType{ header.requestType() }) {
420421
case REPLY: {
421-
// auto request = con._pendingRequests[fmt::format("{}", header.id())];
422+
auto request = con._pendingRequests[fmt::format("{}", header.id())];
422423
// con._pendingRequests.erase(header.id());
423-
output.arrivalTime = std::chrono::system_clock::now(); /// timePoint < UTC time when the message was sent/received by the client
424-
output.command = opencmw::mdp::Command::Final; /// Command < command type (GET, SET, SUBSCRIBE, UNSUBSCRIBE, PARTIAL, FINAL, NOTIFY, READY, DISCONNECT, HEARTBEAT)
425-
output.id = 0; /// std::size_t
424+
output.arrivalTime = std::chrono::system_clock::now(); /// timePoint < UTC time when the message was sent/received by the client
425+
output.command = opencmw::mdp::Command::Final; /// Command < command type (GET, SET, SUBSCRIBE, UNSUBSCRIBE, PARTIAL, FINAL, NOTIFY, READY, DISCONNECT, HEARTBEAT)
426+
char *end = &request.reqId.back();
427+
output.id = std::strtoul(request.reqId.data(), &end, 10); /// std::size_t
426428
output.protocolName = "RDA3"; /// std::string < unique protocol name including version (e.g. 'MDPC03' or 'MDPW03')
427429
output.serviceName = "/"; /// std::string < service endpoint name (normally the URI path only), or client source ID (for broker <-> worker messages)
428430
output.clientRequestID; /// IoBuffer < stateful: worker mirrors clientRequestID; stateless: worker generates unique increasing IDs (to detect packet loss)
429431
output.topic = URI{ "/" }; /// URI < URI containing at least <path> and optionally <query> parameters
430432
output.data = IoBuffer{ con._frames[2].data().data(), con._frames[2].size() }; /// IoBuffer < request/reply body -- opaque binary, e.g. YaS-, CmwLight-, JSON-, or HTML-based
431433
output.error = ""; /// std::string < UTF-8 strings containing error code and/or stack-trace (e.g. "404 Not Found")
432434
// output.rbac; ///IoBuffer < optional RBAC meta-info -- may contain token, role, signed message hash (implementation dependent)
435+
// con._pendingRequests.erase("{}", header.id());
433436
return true;
434437
}
435438
case EXCEPTION: {
439+
fmt::print("exception\n");
436440
auto request = con._pendingRequests[fmt::format("{}", header.id())];
437441
// con._pendingRequests.erase(header.id());
438442
output.arrivalTime = std::chrono::system_clock::now(); /// timePoint < UTC time when the message was sent/received by the client
@@ -447,22 +451,31 @@ class CMWLightClient : public CMWLightClientBase {
447451
return true;
448452
}
449453
case SUBSCRIBE: {
450-
auto sub = con._subscriptions[fmt::format("{}", header.id())];
451-
// sub.replyId = header.options()->sourceId();
454+
fmt::print("subscription sucessful: request id: {}, update id: {}\n", header.id(), header.options()->sourceId());
455+
std::cout << header << std::endl;
456+
;
457+
auto &sub = con._subscriptions[fmt::format("{}", header.id())];
458+
sub.replyId = header.options()->sourceId();
452459
sub.state = detail::OpenSubscription::SubscriptionState::SUBSCRIBED;
453460
sub.backOff = 20ms; // reset back-off
454461
return false;
455462
}
456463
case UNSUBSCRIBE: {
464+
fmt::print("unsubscribe\n");
457465
// successfully removed subscription
458466
auto subscriptionForUnsub = con._subscriptions[fmt::format("{}", header.id())];
459467
subscriptionForUnsub.state = detail::OpenSubscription::SubscriptionState::UNSUBSCRIBED;
460468
// con._subscriptions.erase(subscriptionForUnsub.updateId);
461469
return false;
462470
}
463471
case NOTIFICATION_DATA: {
472+
fmt::print("notification_data\n");
464473
std::string replyId;
465-
if (con._subscriptions.find(replyId) == con._subscriptions.end()) {
474+
std::cout << header << std::endl;
475+
;
476+
auto sub = std::find_if(con._subscriptions.begin(), con._subscriptions.end(), [&header](auto &pair) { return pair.second.replyId == header.id(); });
477+
if (sub == con._subscriptions.end()) {
478+
fmt::print("received unexpected subscription for replyId: {}\n", header.id());
466479
return false;
467480
}
468481
auto subscriptionForNotification = con._subscriptions[replyId];
@@ -474,7 +487,7 @@ class CMWLightClient : public CMWLightClientBase {
474487
// }
475488
output.arrivalTime = std::chrono::system_clock::now(); /// timePoint < UTC time when the message was sent/received by the client
476489
output.command = opencmw::mdp::Command::Notify; /// Command < command type (GET, SET, SUBSCRIBE, UNSUBSCRIBE, PARTIAL, FINAL, NOTIFY, READY, DISCONNECT, HEARTBEAT)
477-
output.id = 0; /// std::size_t
490+
output.id = static_cast<size_t>(sub->second.reqId); /// std::size_t
478491
output.protocolName = "RDA3"; /// std::string < unique protocol name including version (e.g. 'MDPC03' or 'MDPW03')
479492
output.serviceName = "/"; /// std::string < service endpoint name (normally the URI path only), or client source ID (for broker <-> worker messages)
480493
output.clientRequestID = IoBuffer{}; /// IoBuffer < stateful: worker mirrors clientRequestID; stateless: worker generates unique increasing IDs (to detect packet loss)
@@ -484,6 +497,7 @@ class CMWLightClient : public CMWLightClientBase {
484497
return true;
485498
}
486499
case NOTIFICATION_EXC: {
500+
fmt::print("notification exception\n");
487501
std::string replyId;
488502
if (con._subscriptions.find(replyId) == con._subscriptions.end()) {
489503
return false;
@@ -501,6 +515,7 @@ class CMWLightClient : public CMWLightClientBase {
501515
return true;
502516
}
503517
case SUBSCRIBE_EXCEPTION: {
518+
fmt::print("subscribe exception\n");
504519
auto subForSubExc = con._subscriptions[fmt::format("{}", header.id())];
505520
subForSubExc.state = detail::OpenSubscription::SubscriptionState::UNSUBSCRIBED;
506521
subForSubExc.nextTry = currentTime + subForSubExc.backOff;
@@ -517,13 +532,17 @@ class CMWLightClient : public CMWLightClientBase {
517532
output.error = ""; /// std::string < UTF-8 strings containing error code and/or stack-trace (e.g. "404 Not Found")
518533
return true;
519534
}
535+
case SESSION_CONFIRM: {
536+
fmt::print("received session confirm\n");
537+
return false;
538+
}
520539
// unsupported or non-actionable replies
521540
case GET:
522541
case SET:
523542
case CONNECT:
524543
case EVENT:
525-
case SESSION_CONFIRM:
526544
default:
545+
fmt::print("unsupported message: {}\n", header.requestType());
527546
return false;
528547
}
529548
}
@@ -550,14 +569,12 @@ class CMWLightClient : public CMWLightClientBase {
550569
break;
551570
case SERVER_HB:
552571
if (con._connectionState != CONNECTED && con._connectionState != CONNECTING2) {
553-
throw std::runtime_error("received a heart-beat message on an unconnected connection");
572+
fmt::print("received a heart-beat message on an unconnected connection!\n");
573+
return false;
554574
}
555575
con._lastHeartbeatReceived = currentTime;
556576
break;
557577
case SERVER_REP:
558-
if (con._connectionState != CONNECTED && con._connectionState != CONNECTING2) {
559-
throw std::runtime_error("received an update on an unconnected connection");
560-
}
561578
con._lastHeartbeatReceived = currentTime;
562579
return handleServerReply(output, con, currentTime);
563580
case CLIENT_CONNECT:
@@ -689,25 +706,20 @@ class CMWLightClient : public CMWLightClientBase {
689706
detail::send(con._socket, ZMQ_SNDMORE, "error sending get frame"sv, "\x21"); // 0x20 => detail::MessageType::CLIENT_REQ
690707
opencmw::URI uri{ sub.uri };
691708
CmwLightHeader header;
709+
header.id() = sub.reqId;
692710
header.device() = uri.path()->substr(1, uri.path()->find('/', 1) - 1);
693711
header.property() = uri.path()->substr(uri.path()->find('/', 1) + 1);
694712
header.requestType() = static_cast<uint8_t>(detail::RequestType::SUBSCRIBE);
695713
header.sessionId() = detail::createClientId();
696714
detail::send(con._socket, ZMQ_SNDMORE, "failed to send message header"sv, detail::serialiseCmwLight(header)); // send message header
697-
bool hasRequestCtx = true;
698-
if (hasRequestCtx) {
699-
CmwLightRequestContext ctx;
700-
ctx.selector() = ""; // todo: set correct ctx values
701-
ctx.filters() = { { "acquisitonModeFilter", "0" }, { "channelNameFilter", "GS01QS1F:Current@1Hz" } }; // todo: correct filters from query // acquisitino mode filter needs to be enum/int => needs map of variant
702-
// ctx.data() = {};
703-
IoBuffer buffer{};
704-
serialise<CmwLight>(buffer, ctx);
705-
detail::send(con._socket, ZMQ_SNDMORE, "failed to send context frame"sv, std::move(buffer)); // send requestContext
706-
detail::send(con._socket, 0, "failed to send descriptor frame"sv, descriptorToString(HEADER, BODY_REQUEST_CONTEXT));
707-
} else {
708-
detail::send(con._socket, 0, "failed to send descriptor frame"sv, descriptorToString(HEADER));
709-
}
710-
sub.state = SUBSCRIBING;
715+
CmwLightRequestContext ctx;
716+
ctx.filters() = { { "acquisitionModeFilter", 0 }, { "channelNameFilter", "GS01QS1F:Current@10Hz" } }; // todo: correct filters from query
717+
IoBuffer buffer{};
718+
serialise<CmwLight>(buffer, ctx);
719+
detail::send(con._socket, ZMQ_SNDMORE, "failed to send context frame"sv, std::move(buffer)); // send requestContext
720+
detail::send(con._socket, 0, "failed to send descriptor frame"sv, descriptorToString(HEADER, BODY_REQUEST_CONTEXT));
721+
con._lastHeartBeatSent = now;
722+
sub.state = SUBSCRIBING;
711723
} else if (sub.state == UNSUBSCRIBING) {
712724
}
713725
}

src/client/test/CmwLightTest.cpp

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,18 +69,19 @@ TEST_CASE("BasicCmwLight example", "[Client]") {
6969
// send some requests
7070
auto endpoint = URI<STRICT>::factory(URI<STRICT>(digitizerAddress)).scheme("rda3tcp").path("/GSCD002/Version").build();
7171

72-
std::atomic<int> received{ 0 };
73-
clientContext.get(endpoint, [&received](const mdp::Message &message) {
74-
fmt::print("{}", hexview(message.data.asString()));
75-
received++;
72+
std::atomic<int> getReceived{ 0 };
73+
clientContext.get(endpoint, [&getReceived](const mdp::Message &message) {
74+
fmt::print("get reply: {}", hexview(message.data.asString()));
75+
getReceived++;
7676
});
7777

78-
auto subscriptionEndpoint = URI<STRICT>::factory(URI<STRICT>(digitizerAddress)).scheme("rda3tcp").path("/GSCD002/AcquisitionDAQ").addQueryParameter("ctx", "FAIR.SELECTOR.ALL").addQueryParameter("filter", "acquisitonModeFilter=0;channelNameFilter=GS01QS1F:Current@1Hz").build();
79-
clientContext.subscribe(subscriptionEndpoint, [&received](const mdp::Message &message) {
80-
fmt::print("{}", hexview(message.data.asString()));
81-
received++;
78+
std::atomic<int> subscriptionUpdatesReceived{ 0 };
79+
auto subscriptionEndpoint = URI<STRICT>::factory(URI<STRICT>(digitizerAddress)).scheme("rda3tcp").path("/GSCD002/AcquisitionDAQ").addQueryParameter("ctx", "FAIR.SELECTOR.ALL").addQueryParameter("filter", "acquisitonModeFilter=0;channelNameFilter=GS01QS1F:Current@1Hz").build();
80+
clientContext.subscribe(subscriptionEndpoint, [&subscriptionUpdatesReceived](const mdp::Message &message) {
81+
fmt::print("subscription update: {}", hexview(message.data.asString()));
82+
subscriptionUpdatesReceived++;
8283
});
8384

8485
std::this_thread::sleep_for(8000ms); // allow the request to reach the server
85-
REQUIRE(received == 1);
86+
REQUIRE(getReceived == 1);
8687
}

0 commit comments

Comments
 (0)