Skip to content

Commit cc9282c

Browse files
committed
stfsender,tfbuilder: implement datadist controller rpcs
1 parent 3b3936b commit cc9282c

15 files changed

+231
-114
lines changed

src/StfSender/StfSenderDevice.cxx

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,16 @@ void StfSenderDevice::InitTask()
5858

5959
auto& lStatus = mDiscoveryConfig->status();
6060
lStatus.mutable_info()->set_type(StfSender);
61-
lStatus.mutable_info()->set_process_id(Config::getIdOption(*GetConfig()));
61+
lStatus.mutable_info()->set_process_id(Config::getIdOption(StfSender, *GetConfig()));
6262
lStatus.mutable_info()->set_ip_address(Config::getNetworkIfAddressOption(*GetConfig()));
63-
lStatus.mutable_partition()->set_partition_id(Config::getPartitionOption(*GetConfig()));
63+
64+
// wait for "partition-id"
65+
while (!Config::getPartitionOption(*GetConfig())) {
66+
WDDLOG("TfBuilder waiting on 'discovery-partition' config parameter.");
67+
std::this_thread::sleep_for(1s);
68+
}
69+
70+
lStatus.mutable_partition()->set_partition_id(*Config::getPartitionOption(*GetConfig()));
6471
mDiscoveryConfig->write();
6572
}
6673

@@ -94,13 +101,12 @@ void StfSenderDevice::InitTask()
94101
if (mStandalone && !mFileSink.enabled()) {
95102
WDDLOG("Running in standalone mode and with STF file sink disabled. Data will be lost.");
96103
}
97-
98-
// Info thread
99-
mInfoThread = create_thread_member("stfs_info", &StfSenderDevice::InfoThread, this);
100104
}
101105

102106
void StfSenderDevice::PreRun()
103107
{
108+
mRunning = true;
109+
104110
if (!mStandalone) {
105111
// Start output handler
106112
mOutputHandler.start(mDiscoveryConfig);
@@ -126,15 +132,21 @@ void StfSenderDevice::PreRun()
126132
if (mFileSink.enabled()) {
127133
mFileSink.start();
128134
}
135+
136+
// Info thread
137+
mInfoThread = create_thread_member("stfs_info", &StfSenderDevice::InfoThread, this);
138+
129139
// start the receiver thread
130140
mReceiverThread = create_thread_member("stfs_recv", &StfSenderDevice::StfReceiverThread, this);
131141
}
132142

133-
void StfSenderDevice::ResetTask()
143+
void StfSenderDevice::PostRun()
134144
{
135145
// Stop the pipeline
136146
stopPipeline();
137147

148+
mRunning = false;
149+
138150
// stop the receiver thread
139151
if (mReceiverThread.joinable()) {
140152
mReceiverThread.join();
@@ -161,6 +173,11 @@ void StfSenderDevice::ResetTask()
161173
mInfoThread.join();
162174
}
163175

176+
DDDLOG("PostRun() done.");
177+
}
178+
179+
void StfSenderDevice::ResetTask()
180+
{
164181
DDDLOG("ResetTask() done.");
165182
}
166183

@@ -175,12 +192,9 @@ void StfSenderDevice::StfReceiverThread()
175192
DplToStfAdapter lStfReceiver;
176193
std::unique_ptr<SubTimeFrame> lStf;
177194

178-
// wait for the device to go into RUNNING state
179-
WaitForRunningState();
180-
181195
const auto lStfStartTime = hres_clock::now();
182196

183-
while (IsRunningState()) {
197+
while (mRunning) {
184198
lStf = lStfReceiver.deserialize(lInputChan);
185199

186200
if (!lStf) {
@@ -211,10 +225,7 @@ void StfSenderDevice::StfReceiverThread()
211225

212226
void StfSenderDevice::InfoThread()
213227
{
214-
// wait for the device to go into RUNNING state
215-
WaitForRunningState();
216-
217-
while (IsRunningState()) {
228+
while (mRunning) {
218229
IDDLOG("SubTimeFrame size_mean={} in_frequency_mean={:.4} queued_stf={}",
219230
mStfSizeSamples.Mean(), mStfTimeSamples.MeanStepFreq(), mNumStfs);
220231

@@ -225,8 +236,12 @@ void StfSenderDevice::InfoThread()
225236

226237
bool StfSenderDevice::ConditionalRun()
227238
{
239+
if (mRpcServer.isTerminateRequested()) {
240+
IDDLOG_RL(10000, "DataDistribution partition is terminated.");
241+
return false; // trigger PostRun()
242+
}
228243
// nothing to do here sleep for awhile
229-
std::this_thread::sleep_for(1000ms);
244+
std::this_thread::sleep_for(300ms);
230245
return true;
231246
}
232247
}

src/StfSender/StfSenderDevice.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class StfSenderDevice : public DataDistDevice,
7676

7777
protected:
7878
void PreRun() final;
79-
void PostRun() final {};
79+
void PostRun() final;
8080
bool ConditionalRun() final;
8181

8282
void StfReceiverThread();
@@ -156,6 +156,7 @@ class StfSenderDevice : public DataDistDevice,
156156
TfSchedulerRpcClient mTfSchedulerRpcClient;
157157

158158
/// Receiver threads
159+
bool mRunning = false;
159160
std::thread mReceiverThread;
160161

161162
/// File sink

src/StfSender/StfSenderRpc.cxx

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ ::grpc::Status StfSenderRpcImpl::ConnectTfBuilderRequest(::grpc::ServerContext*
5656
const std::string lTfBuilderId = request->tf_builder_id();
5757
const std::string lTfBuilderEndpoint = request->endpoint();
5858

59+
if (mTerminateRequested) {
60+
response->set_status(TfBuilderConnectionStatus::ERROR_PARTITION_TERMINATING);
61+
return Status::OK;
62+
}
63+
5964
// handle the request
6065
DDDLOG("Requested to connect to TfBuilder. tfb_id={} tfb_ep={}", lTfBuilderId, lTfBuilderEndpoint);
6166
response->set_status(OK);
@@ -80,7 +85,6 @@ ::grpc::Status StfSenderRpcImpl::DisconnectTfBuilderRequest(::grpc::ServerContex
8085
const TfBuilderEndpoint* request,
8186
StatusResponse* response)
8287
{
83-
8488
const std::string lTfBuilderId = request->tf_builder_id();
8589
const std::string lTfBuilderEndpoint = request->endpoint();
8690

@@ -99,12 +103,24 @@ ::grpc::Status StfSenderRpcImpl::StfDataRequest(::grpc::ServerContext* /*context
99103
const StfDataRequestMessage* request,
100104
StfDataResponse* response)
101105
{
102-
103106
mOutput->sendStfToTfBuilder(request->stf_id(), request->tf_builder_id(), *response/*out*/);
104107

105108
return Status::OK;
106109
}
107110

111+
// rpc TerminatePartition(PartitionInfo) returns (PartitionResponse) { }
112+
::grpc::Status StfSenderRpcImpl::TerminatePartition(::grpc::ServerContext* /*context*/,
113+
const PartitionInfo* /*request*/, PartitionResponse* response)
114+
{
115+
DDDLOG("TerminatePartition request received.");
116+
// TODO: verify partition id
117+
response->set_partition_state(PartitionState::PARTITION_TERMINATING);
118+
119+
mTerminateRequested = true;
120+
121+
return Status::OK;
122+
}
123+
108124

109125
}
110126
} /* o2::DataDistribution */

src/StfSender/StfSenderRpc.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,18 @@ class StfSenderRpcImpl final : public StfSenderRpc::Service
6262
const StfDataRequestMessage* request,
6363
StfDataResponse* response) override;
6464

65+
// rpc TerminatePartition(PartitionInfo) returns (PartitionResponse) { }
66+
::grpc::Status TerminatePartition(::grpc::ServerContext* context,
67+
const PartitionInfo* request,
68+
PartitionResponse* response) override;
69+
6570
void start(StfSenderOutput *pOutput, const std::string pRpcSrvBindIp, int& lRealPort /*[out]*/);
6671
void stop();
6772

73+
bool isTerminateRequested() const { return mTerminateRequested; }
74+
6875
private:
76+
bool mTerminateRequested = false;
6977
std::unique_ptr<Server> mServer = nullptr;
7078
StfSenderOutput *mOutput = nullptr;
7179

src/StfSender/runStfSenderDevice.cxx

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,19 @@ int main(int argc, char* argv[])
7070

7171
// Install listener for Logging options
7272
o2::DataDistribution::impl::DataDistLoggerCtx::HandleFMQOptions(r);
73+
74+
// Install listener for discovery partition key
75+
r.fConfig.Subscribe<std::string>("discovery-partition", [&](const std::string& pKey, std::string pValue) {
76+
77+
if (pKey == "partition_id" || pKey == "partition-id" || pKey == "environment-id" || pKey == "environment_id") {
78+
79+
if (r.fConfig.GetProperty<std::string>("discovery-partition") == "") {
80+
r.fConfig.SetProperty<std::string>("discovery-partition", pValue);
81+
IDDLOG("Config::Subscribe received key-value pair. {}=<{}>", pKey, pValue);
82+
}
83+
}
84+
});
85+
7386
// reset unsupported options
7487
r.fConfig.SetProperty<int>("io-threads", (int) std::min(std::thread::hardware_concurrency(), 16u));
7588
r.fConfig.SetProperty<float>("rate", 0.f);

src/TfBuilder/TfBuilderDevice.cxx

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ void TfBuilderDevice::Init()
5151

5252
void TfBuilderDevice::Reset()
5353
{
54+
mMemI->stop();
5455
mMemI.reset();
5556
}
5657

@@ -69,10 +70,15 @@ void TfBuilderDevice::InitTask()
6970

7071
auto &lStatus = mDiscoveryConfig->status();
7172
lStatus.mutable_info()->set_type(TfBuilder);
72-
lStatus.mutable_info()->set_process_id(Config::getIdOption(*GetConfig()));
73+
lStatus.mutable_info()->set_process_id(Config::getIdOption(TfBuilder, *GetConfig()));
7374
lStatus.mutable_info()->set_ip_address(Config::getNetworkIfAddressOption(*GetConfig()));
7475

75-
mPartitionId = Config::getPartitionOption(*GetConfig());
76+
// wait for "partition-id"
77+
while (!Config::getPartitionOption(*GetConfig())) {
78+
WDDLOG("TfBuilder waiting on 'discovery-partition' config parameter.");
79+
std::this_thread::sleep_for(1s);
80+
}
81+
mPartitionId = *Config::getPartitionOption(*GetConfig());
7682
lStatus.mutable_partition()->set_partition_id(mPartitionId);
7783

7884
// File sink
@@ -110,15 +116,14 @@ void TfBuilderDevice::InitTask()
110116
mStandalone = true;
111117
IDDLOG("Not sending to DPL.");
112118
}
113-
114-
// start the info thread
115-
mInfoThread = create_thread_member("tfb_info", &TfBuilderDevice::InfoThread, this);
116119
}
117120
}
118121

119122
void TfBuilderDevice::PreRun()
120123
{
121-
start();
124+
if (!start()) {
125+
mShouldExit = true;
126+
}
122127
}
123128

124129
bool TfBuilderDevice::start()
@@ -136,7 +141,6 @@ bool TfBuilderDevice::start()
136141

137142
// we reached the scheduler instance, initialize everything else
138143
mRunning = true;
139-
auto lShmTransport = this->AddTransport(fair::mq::Transport::SHM);
140144

141145
mTfBuilder = std::make_unique<TimeFrameBuilder>(MemI(), mTfBufferSize, 512 << 20 /* config */, dplEnabled());
142146

@@ -154,19 +158,23 @@ bool TfBuilderDevice::start()
154158
if (!mFlpInputHandler->start(mDiscoveryConfig)) {
155159
mShouldExit = true;
156160
EDDLOG("Could not initialize input connections. Exiting.");
157-
throw "Input connection error";
158161
return false;
159162
}
160163

161164
// start file source
162165
mFileSource.start(MemI(), mStandalone ? false : mDplEnabled);
163166

167+
// start the info thread
168+
mInfoThread = create_thread_member("tfb_info", &TfBuilderDevice::InfoThread, this);
169+
164170
return true;
165171
}
166172

167173
void TfBuilderDevice::stop()
168174
{
169-
mRpc->stopAcceptingTfs();
175+
if (mRpc) {
176+
mRpc->stopAcceptingTfs();
177+
}
170178

171179
if (mTfDplAdapter) {
172180
mTfDplAdapter->stop();
@@ -177,41 +185,58 @@ void TfBuilderDevice::stop()
177185
}
178186

179187
mRunning = false;
188+
DDDLOG("TfBuilderDevice::stop(): mRunning is false.");
180189

181190
// Stop the pipeline
182191
stopPipeline();
183192

184193
// stop output handlers
185-
mFlpInputHandler->stop(mDiscoveryConfig);
194+
if (mFlpInputHandler) {
195+
mFlpInputHandler->stop(mDiscoveryConfig);
196+
}
197+
DDDLOG("TfBuilderDevice::stop(): Input handler stopped.");
186198
// signal and wait for the output thread
187199
mFileSink.stop();
188200
// join on fwd thread
189201
if (mTfFwdThread.joinable()) {
190202
mTfFwdThread.join();
191203
}
204+
DDDLOG("TfBuilderDevice::stop(): Forward thread stopped.");
192205

193206
//wait for the info thread
194207
if (mInfoThread.joinable()) {
195208
mInfoThread.join();
196209
}
210+
DDDLOG("TfBuilderDevice::stop(): Info thread stopped.");
197211

198212
// stop the RPCs
199-
mRpc->stop();
213+
if (mRpc) {
214+
mRpc->stop();
215+
}
216+
DDDLOG("TfBuilderDevice::stop(): RPC clients stopped.");
200217

201218
mDiscoveryConfig.reset();
202219

203-
DDDLOG("Reset() done... ");
220+
DDDLOG("TfBuilderDevice() stopped... ");
204221
}
205222

206223
void TfBuilderDevice::ResetTask()
207224
{
208225
stop();
226+
DDDLOG("ResetTask()");
227+
}
228+
229+
// Get here when ConditionalRun returns false
230+
void TfBuilderDevice::PostRun()
231+
{
232+
stop();
233+
DDDLOG("PostRun()");
209234
}
210235

211236
bool TfBuilderDevice::ConditionalRun()
212237
{
213-
if (mShouldExit) {
214-
mRunning = false;
238+
if (mShouldExit || mRpc->isTerminateRequested()) {
239+
// mRunning = false;
215240
return false;
216241
}
217242

@@ -279,10 +304,7 @@ void TfBuilderDevice::TfForwardThread()
279304

280305
void TfBuilderDevice::InfoThread()
281306
{
282-
// wait for the device to go into RUNNING state
283-
WaitForRunningState();
284-
285-
while (IsRunningState()) {
307+
while (mRunning) {
286308

287309
IDDLOG("TimeFrame size_mean={} in_frequency_mean={:.4} queued_stf={}",
288310
mTfSizeSamples.Mean(), mTfTimeSamples.MeanStepFreq(), getPipelineSize());

src/TfBuilder/TfBuilderDevice.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ class TfBuilderDevice : public DataDistDevice,
7070
bool start();
7171
void stop();
7272

73+
void PostRun() override final;
74+
7375
void Init() override final;
7476
void Reset() override final;
7577

src/TfBuilder/TfBuilderInput.cxx

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,12 @@ bool TfBuilderInput::start(std::shared_ptr<ConsulTfBuilder> pConfig)
120120
continue;
121121
}
122122

123-
if (lConnResult.status() != 0) {
123+
if (lConnResult.status() == ERROR_PARTITION_TERMINATING) {
124+
WDDLOG("Partition is terminating. Stopping.");
125+
return false;
126+
}
127+
128+
if (lConnResult.status() != OK) {
124129
EDDLOG("Request for StfSender connection failed. scheduler_error={}",
125130
TfBuilderConnectionStatus_Name(lConnResult.status()));
126131
return false;

0 commit comments

Comments
 (0)