diff --git a/SConstruct b/SConstruct index 0e3e020bc..f01da593e 100644 --- a/SConstruct +++ b/SConstruct @@ -955,6 +955,20 @@ if meta.compiler in ['gcc', 'clang']: if meta.platform in ['linux', 'darwin']: env.AddManualDependency(libs=['pthread']) +#Check for existence of function sem_clockwait + temp_conf = Configure(env) + header = """ +#ifdef __cplusplus +extern "C" +#endif +char sem_timedwait(void); +""" + + if temp_conf.CheckFunc('sem_clockwait', header): + env.Append(CPPDEFINES=['ROC_HAVE_SEM_CLOCKWAIT']) + + + if meta.platform in ['linux', 'android'] or meta.gnu_toolchain: if not GetOption('disable_soversion'): subenvs.public_libs['SHLIBSUFFIX'] = '{}.{}'.format( diff --git a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp b/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp index 0d9685390..8aea87c91 100644 --- a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp +++ b/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp @@ -9,6 +9,7 @@ #include "roc_core/semaphore.h" #include "roc_core/cpu_instructions.h" #include "roc_core/errno_to_str.h" +#include "roc_core/log.h" #include "roc_core/panic.h" #include @@ -38,14 +39,33 @@ bool Semaphore::timed_wait(nanoseconds_t deadline) { roc_panic("semaphore: unexpected negative deadline"); } - for (;;) { - timespec ts; - ts.tv_sec = long(deadline / Second); - ts.tv_nsec = long(deadline % Second); + nanoseconds_t converted_deadline = deadline; + +// convert deadline's domain into CLOCK_REALTIME if sem_clockwait is not available +#ifndef ROC_HAVE_SEM_CLOCKWAIT + converted_deadline += + (core::timestamp(core::ClockUnix) - core::timestamp(core::ClockMonotonic)); +#endif + + roc_log(roc::LogDebug, "origin time is %" PRId64 "\n", deadline); + roc_log(roc::LogDebug, "time is %" PRId64 "\n", converted_deadline); + roc_log(roc::LogDebug, "now time is %" PRId64 "\n", + core::timestamp(core::ClockMonotonic)); + + timespec ts; + ts.tv_sec = long(converted_deadline / Second); + ts.tv_nsec = long(converted_deadline % Second); + for (;;) { +#ifdef ROC_HAVE_SEM_CLOCKWAIT + if (sem_clockwait(&sem_, CLOCK_MONOTONIC, &ts) == 0) { + return true; + } +#else if (sem_timedwait(&sem_, &ts) == 0) { return true; } +#endif if (errno == ETIMEDOUT) { return false; diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index bc2b6ef6f..9f0032638 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -7,15 +7,73 @@ */ #include "roc_pipeline/state_tracker.h" +#include "roc_core/log.h" #include "roc_core/panic.h" namespace roc { namespace pipeline { StateTracker::StateTracker() - : halt_state_(-1) + : sem_(0) + , halt_state_(-1) , active_sessions_(0) - , pending_packets_(0) { + , pending_packets_(0) + , sem_is_occupied_(0) + , waiting_mask_(0) + , mutex_() + , waiting_con_(mutex_) { +} + +// StateTracker::~StateTracker() { +// mutex_.unlock(); +// } + +// This method should block until the state becomes any of the states specified by the +// mask, or deadline expires. E.g. if mask is ACTIVE | PAUSED, it should block until state +// becomes either ACTIVE or PAUSED. (Currently only two states are used, but later more +// states will be needed). Deadline should be an absolute timestamp. + +// Questions: +// - When should the function return true vs false +bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadline) { + waiting_mask_ = state_mask; + for (;;) { + // If no state is specified in state_mask, return immediately + if (state_mask == 0) { + return true; + } + + if (static_cast(get_state()) & state_mask) { + waiting_mask_ = 0; + return true; + } + + if (deadline >= 0 && deadline <= core::timestamp(core::ClockMonotonic)) { + waiting_mask_ = 0; + return false; + } + + if (sem_is_occupied_.compare_exchange(0, 1)) { + if (deadline >= 0) { + (void)sem_.timed_wait(deadline); + + } else { + sem_.wait(); + } + + sem_is_occupied_ = 0; + waiting_con_.broadcast(); + + } else { + core::Mutex::Lock lock(mutex_); + + if (deadline >= 0) { + (void)waiting_con_.timed_wait(deadline); + } else { + waiting_con_.wait(); + } + } + } } sndio::DeviceState StateTracker::get_state() const { @@ -65,22 +123,50 @@ size_t StateTracker::num_sessions() const { } void StateTracker::register_session() { - active_sessions_++; + if (active_sessions_++ == 0) { + signal_state_change(); + } } void StateTracker::unregister_session() { - if (--active_sessions_ < 0) { + int prev_sessions = active_sessions_--; + if (prev_sessions == 0) { roc_panic("state tracker: unpaired register/unregister session"); + } else if (prev_sessions == 1 && pending_packets_ == 0) { + signal_state_change(); } + + // if (--active_sessions_ < 0) { + // roc_panic("state tracker: unpaired register/unregister session"); + // } } void StateTracker::register_packet() { - pending_packets_++; + if (pending_packets_++ == 0 && active_sessions_ == 0) { + signal_state_change(); + } } void StateTracker::unregister_packet() { - if (--pending_packets_ < 0) { + int prev_packets = pending_packets_--; + if (prev_packets == 0) { roc_panic("state tracker: unpaired register/unregister packet"); + } else if (prev_packets == 1 && active_sessions_ == 0) { + signal_state_change(); + } + + // if (--pending_packets_ < 0) { + // roc_panic("state tracker: unpaired register/unregister packet"); + // } +} + +void StateTracker::signal_state_change() { + // if (waiting_mask_ != 0 && (static_cast(get_state()) & waiting_mask_)) { + // sem_.post(); + // } + if (sem_is_occupied_) { + roc_log(LogDebug, "signaling"); + sem_.post(); } } diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index 981ed0274..8b21faed3 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -13,8 +13,12 @@ #define ROC_PIPELINE_STATE_TRACKER_H_ #include "roc_core/atomic.h" +#include "roc_core/cond.h" +#include "roc_core/mutex.h" #include "roc_core/noncopyable.h" +#include "roc_core/semaphore.h" #include "roc_core/stddefs.h" +#include "roc_core/time.h" #include "roc_sndio/device_defs.h" namespace roc { @@ -32,6 +36,9 @@ class StateTracker : public core::NonCopyable<> { //! Initialize all counters to zero. StateTracker(); + //! Block until state becomes any of the ones specified by state_mask. + bool wait_state(unsigned state_mask, core::nanoseconds_t deadline); + //! Compute current state. sndio::DeviceState get_state() const; @@ -63,9 +70,16 @@ class StateTracker : public core::NonCopyable<> { void unregister_packet(); private: + core::Semaphore sem_; core::Atomic halt_state_; core::Atomic active_sessions_; core::Atomic pending_packets_; + core::Atomic sem_is_occupied_; + core::Atomic waiting_mask_; + core::Mutex mutex_; + core::Cond waiting_con_; + + void signal_state_change(); }; } // namespace pipeline diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp new file mode 100644 index 000000000..5c7a6bad8 --- /dev/null +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "test_harness.h" + +#include "roc_address/protocol.h" +#include "roc_audio/mixer.h" +#include "roc_audio/sample.h" +#include "roc_core/atomic.h" +#include "roc_core/heap_arena.h" +#include "roc_core/noop_arena.h" +#include "roc_core/semaphore.h" +#include "roc_core/thread.h" +#include "roc_core/time.h" +#include "roc_pipeline/config.h" +#include "roc_pipeline/receiver_endpoint.h" +#include "roc_pipeline/receiver_session_group.h" + +namespace roc { +namespace pipeline { + +namespace { + +enum { PacketSz = 512 }; + +core::HeapArena arena; + +packet::PacketFactory packet_factory(arena, PacketSz); +audio::FrameFactory frame_factory(arena, PacketSz * sizeof(audio::sample_t)); + +audio::ProcessorMap processor_map(arena); +rtp::EncodingMap encoding_map(arena); + +class TestThread : public core::Thread { +public: + TestThread(StateTracker& st, unsigned int state_mask, core::nanoseconds_t deadline) + : t_(st) + , r_(0) + , state_mask_(state_mask) + , deadline_(deadline) { + } + + bool running() const { + return r_; + } + + void wait_running() { + while (!r_) { + core::sleep_for(core::ClockMonotonic, core::Microsecond); + } + } + +private: + virtual void run() { + r_ = true; + t_.wait_state(state_mask_, deadline_); + r_ = false; + } + + StateTracker& t_; + core::Atomic r_; + unsigned int state_mask_; + core::nanoseconds_t deadline_; +}; + +} // namespace + +TEST_GROUP(state_tracker) {}; + +// set a thread that last for 0.5 seconds, wait for 1 second to make it timeout. +TEST(state_tracker, simple_timeout) { + StateTracker state_tracker; + TestThread thr(state_tracker, sndio::DeviceState_Active, + core::timestamp(core::ClockMonotonic) + core::Millisecond * 500); + + CHECK(thr.start()); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 1000); + CHECK(!(thr.running())); + thr.join(); +} + +TEST(state_tracker, multiple_timeout) { + StateTracker state_tracker; + TestThread** threads_ptr = new TestThread*[10]; + + // set threads that last for 1 second + for (int i = 0; i < 10; i++) { + threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, + core::timestamp(core::ClockMonotonic) + + core::Millisecond * 1000); + } + + // wait for start, then check if threads are running + for (int i = 0; i < 10; i++) { + CHECK(threads_ptr[i]->start()); + // CHECK(threads_ptr[i]->running()); + // roc_log(LogDebug, "check running %d\n", i); + } + core::sleep_for(core::ClockMonotonic, core::Millisecond * 10); + for (int i = 0; i < 10; i++) { + // roc_log(LogDebug, "check running %d\n", i); + CHECK(threads_ptr[i]->running()); + } + + // sleep for 2 seconds, making the threads timeout + roc_log(LogDebug, "started running"); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 2000); + + // check if threads are stopped + for (int i = 0; i < 10; i++) { + CHECK(!threads_ptr[i]->running()); + } + + roc_log(LogDebug, "started joining"); + + for (int i = 0; i < 10; i++) { + threads_ptr[i]->join(); + } + + roc_log(LogDebug, "finished joining"); + + for (int i = 0; i < 10; ++i) { + delete threads_ptr[i]; + } + delete[] threads_ptr; +} + +TEST(state_tracker, multiple_switch) { + StateTracker state_tracker; + TestThread** threads_ptr = new TestThread*[10]; + + // set threads without waiting time + for (int i = 0; i < 10; i++) { + threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, -1); + } + + for (int i = 0; i < 10; i++) { + CHECK(threads_ptr[i]->start()); + } + + roc_log(LogDebug, "started running"); + + // wait for threads starting + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + + // check if the threads have started + for (int i = 0; i < 10; i++) { + CHECK(threads_ptr[i]->running()); + } + + // register a packet + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + state_tracker.register_packet(); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + + // check if the threads have been stopped + for (int i = 0; i < 10; i++) { + CHECK(!(threads_ptr[i]->running())); + } + + roc_log(LogDebug, "started joining"); + for (int i = 0; i < 10; i++) { + threads_ptr[i]->join(); + } + roc_log(LogDebug, "finished joining"); + + for (int i = 0; i < 10; ++i) { + delete threads_ptr[i]; + } + delete[] threads_ptr; +} + +TEST(state_tracker, semaphore_test) { + core::Semaphore sem(0); + roc_log(LogDebug, "ready"); + if (sem.timed_wait(1 * core::Second + core::timestamp(core::ClockMonotonic))) + roc_log(LogDebug, "true, unlocked by other threads"); + else + roc_log(LogDebug, "false, timeout"); +} +} // namespace pipeline +} // namespace roc