-
-
Notifications
You must be signed in to change notification settings - Fork 240
Add StateTracker::wait_state() and converted linux semaphore timed wait into monotonic clock domain #814
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Add StateTracker::wait_state() and converted linux semaphore timed wait into monotonic clock domain #814
Changes from all commits
2ade702
dc83b76
fa6fcde
c17dc43
6ed45bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
| #include "roc_core/semaphore.h" | ||
| #include "roc_core/cpu_instructions.h" | ||
| #include "roc_core/errno_to_str.h" | ||
| #include "roc_core/log.h" | ||
| #include "roc_core/panic.h" | ||
|
|
||
| #include <errno.h> | ||
|
|
@@ -38,14 +39,33 @@ bool Semaphore::timed_wait(nanoseconds_t deadline) { | |
| roc_panic("semaphore: unexpected negative deadline"); | ||
| } | ||
|
|
||
| for (;;) { | ||
| timespec ts; | ||
| ts.tv_sec = long(deadline / Second); | ||
| ts.tv_nsec = long(deadline % Second); | ||
| nanoseconds_t converted_deadline = deadline; | ||
|
|
||
| // convert deadline's domain into CLOCK_REALTIME if sem_clockwait is not available | ||
| #ifndef ROC_HAVE_SEM_CLOCKWAIT | ||
| converted_deadline += | ||
| (core::timestamp(core::ClockUnix) - core::timestamp(core::ClockMonotonic)); | ||
| #endif | ||
|
|
||
| roc_log(roc::LogDebug, "origin time is %" PRId64 "\n", deadline); | ||
| roc_log(roc::LogDebug, "time is %" PRId64 "\n", converted_deadline); | ||
| roc_log(roc::LogDebug, "now time is %" PRId64 "\n", | ||
| core::timestamp(core::ClockMonotonic)); | ||
|
Comment on lines
+50
to
+53
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method can be in hot path and called very frequently, so we shouldn't use logging here. |
||
|
|
||
| timespec ts; | ||
| ts.tv_sec = long(converted_deadline / Second); | ||
| ts.tv_nsec = long(converted_deadline % Second); | ||
|
|
||
| for (;;) { | ||
| #ifdef ROC_HAVE_SEM_CLOCKWAIT | ||
| if (sem_clockwait(&sem_, CLOCK_MONOTONIC, &ts) == 0) { | ||
| return true; | ||
| } | ||
| #else | ||
| if (sem_timedwait(&sem_, &ts) == 0) { | ||
| return true; | ||
| } | ||
|
Comment on lines
+64
to
67
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After giving this second thought, it seems that using
Then I checked what is the actual availability of sem_clockwait() on other POSIX OSes:
This means that we're going to use sem_timedwait on quite a few platforms and our code will potentially hang on system time change. An alternative is to implement semaphore using a cond variable on platforms without The good news are that Long story short, I think when (Sorry that I haven't realized it earlier). So I suggest to create two separate semaphore implementations in two target directories:
(and we also have We can do it like this: diff --git a/SConstruct b/SConstruct
index f01da593..9904c913 100644
--- a/SConstruct
+++ b/SConstruct
@@ -794,9 +794,15 @@ else:
])
if meta.platform in ['linux', 'android', 'unix']:
- env.Append(ROC_TARGETS=[
- 'target_posix_ext',
- ])
+ if 'ROC_HAVE_SEM_CLOCKWAIT' in env['CPPDEFINES']:
+ env.Append(ROC_TARGETS=[
+ 'target_posix_sem',
+ ])
+ else:
+ env.Append(ROC_TARGETS=[
+ 'target_nosem',
+ ])
if meta.platform in ['linux', 'android', 'darwin'] or meta.gnu_toolchain:
env.Append(ROC_TARGETS=[We can rename The fallback semaphore implementation can be placed into We already have What do you think, are you willing to implement this? |
||
| #endif | ||
|
|
||
| if (errno == ETIMEDOUT) { | ||
| return false; | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -7,15 +7,73 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| #include "roc_pipeline/state_tracker.h" | ||||||||||||||||||||||||||||||||||||||||||||||||||
| #include "roc_core/log.h" | ||||||||||||||||||||||||||||||||||||||||||||||||||
| #include "roc_core/panic.h" | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| namespace roc { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| namespace pipeline { | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| StateTracker::StateTracker() | ||||||||||||||||||||||||||||||||||||||||||||||||||
| : halt_state_(-1) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| : sem_(0) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| , halt_state_(-1) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| , active_sessions_(0) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| , pending_packets_(0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| , pending_packets_(0) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| , sem_is_occupied_(0) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| , waiting_mask_(0) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| , mutex_() | ||||||||||||||||||||||||||||||||||||||||||||||||||
| , waiting_con_(mutex_) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| // StateTracker::~StateTracker() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // mutex_.unlock(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+27
to
+29
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove commented out/dead code. |
||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| // This method should block until the state becomes any of the states specified by the | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // mask, or deadline expires. E.g. if mask is ACTIVE | PAUSED, it should block until state | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // becomes either ACTIVE or PAUSED. (Currently only two states are used, but later more | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // states will be needed). Deadline should be an absolute timestamp. | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| // Questions: | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // - When should the function return true vs false | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+31
to
+37
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's expand this comment and move it to
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadline) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| waiting_mask_ = state_mask; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that |
||||||||||||||||||||||||||||||||||||||||||||||||||
| for (;;) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // If no state is specified in state_mask, return immediately | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (state_mask == 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return true; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+41
to
+44
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check can be done before the loop. |
||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| if (static_cast<unsigned>(get_state()) & state_mask) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| waiting_mask_ = 0; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return true; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| if (deadline >= 0 && deadline <= core::timestamp(core::ClockMonotonic)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| waiting_mask_ = 0; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return false; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| if (sem_is_occupied_.compare_exchange(0, 1)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (deadline >= 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| (void)sem_.timed_wait(deadline); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| sem_.wait(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+46
to
+62
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a race condition between Let's assume that T1 and T2 are two threads that called Imagine the following sequence of events.
The problem is that we're checking state before doing CAS, but we're guaranteed to be notified on state change only after successful CAS. |
||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| sem_is_occupied_ = 0; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| waiting_con_.broadcast(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| core::Mutex::Lock lock(mutex_); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| if (deadline >= 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| (void)waiting_con_.timed_wait(deadline); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| waiting_con_.wait(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+65
to
+75
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is another race, between semaphore branch and mutex branches of concurrent Let's assume three threads: W1 and W2 call In this scenario, W1 and W2 initially entered
The problem is that semaphore branch triggers cond var without holding the mutex, and that both branches check state before acquiring the mutex.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another thing I don't quite like in current implementation are unnecessary wake-ups, also related to how mutex is handled currently. Imagine 3 threads all invoke Then the state changes, but not in the way that these threads are waiting for:
Here, thread (C) goes to sleep in (9) and wakes up in (11), only to go to sleep again in (12). And if there are more waiting threads (D, E, F ...), each of them will have that extra wake-up.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both race condition and extra wake up can be both eliminated by revising when we lock and unlock the mutex. Something like this: This is just rough pseudo-code, feel free to play with it as you want. Also I guess my comments are too verbose for real code. And regarding unnecessary wake-ups: not releasing the mutex across loop iterations helps OS to eliminate unnecessary wake-ups. E.g. on glibc, when we call In other words, on each state change, each waiter will wake-up exactly once, check state, and either return or go to sleep until next state change.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks a lot for the review! I shifted my focus elsewhere, getting back on this now.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a bug? T1 wait at point P2.A sem.wait() for state A 1: T1: sem.wait() I am seeing if I can think of a better flow. Please let me know if I misunderstood something. Thanks!
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh nvm it seems to be not a big deal, I can just move the CAS inside the loop. |
||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| sndio::DeviceState StateTracker::get_state() const { | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -65,22 +123,50 @@ size_t StateTracker::num_sessions() const { | |||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| void StateTracker::register_session() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| active_sessions_++; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (active_sessions_++ == 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| signal_state_change(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| void StateTracker::unregister_session() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (--active_sessions_ < 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| int prev_sessions = active_sessions_--; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (prev_sessions == 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| roc_panic("state tracker: unpaired register/unregister session"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } else if (prev_sessions == 1 && pending_packets_ == 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| signal_state_change(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| // if (--active_sessions_ < 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // roc_panic("state tracker: unpaired register/unregister session"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| void StateTracker::register_packet() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| pending_packets_++; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (pending_packets_++ == 0 && active_sessions_ == 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| signal_state_change(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| void StateTracker::unregister_packet() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (--pending_packets_ < 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| int prev_packets = pending_packets_--; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (prev_packets == 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| roc_panic("state tracker: unpaired register/unregister packet"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } else if (prev_packets == 1 && active_sessions_ == 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| signal_state_change(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| // if (--pending_packets_ < 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // roc_panic("state tracker: unpaired register/unregister packet"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| void StateTracker::signal_state_change() { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // if (waiting_mask_ != 0 && (static_cast<unsigned>(get_state()) & waiting_mask_)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // sem_.post(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (sem_is_occupied_) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| roc_log(LogDebug, "signaling"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| sem_.post(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why you defined the function manually instead of including
semaphore.h?If no, I suggest to use include. Also let's move it to the section where we do other checks and already have
conf: