From 2ade7023f89d44f752144acb17f731ae197e34cc Mon Sep 17 00:00:00 2001 From: flw5469 Date: Fri, 16 May 2025 15:29:01 +0800 Subject: [PATCH 1/5] add: state tracker using semaphore and condition variable, and its test cases --- .../roc_pipeline/state_tracker.cpp | 98 ++++++++++- .../roc_pipeline/state_tracker.h | 16 +- src/tests/roc_pipeline/test_state_tracker.cpp | 156 ++++++++++++++++++ 3 files changed, 263 insertions(+), 7 deletions(-) create mode 100644 src/tests/roc_pipeline/test_state_tracker.cpp diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index bc2b6ef6f..9f0032638 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -7,15 +7,73 @@ */ #include "roc_pipeline/state_tracker.h" +#include "roc_core/log.h" #include "roc_core/panic.h" namespace roc { namespace pipeline { StateTracker::StateTracker() - : halt_state_(-1) + : sem_(0) + , halt_state_(-1) , active_sessions_(0) - , pending_packets_(0) { + , pending_packets_(0) + , sem_is_occupied_(0) + , waiting_mask_(0) + , mutex_() + , waiting_con_(mutex_) { +} + +// StateTracker::~StateTracker() { +// mutex_.unlock(); +// } + +// This method should block until the state becomes any of the states specified by the +// mask, or deadline expires. E.g. if mask is ACTIVE | PAUSED, it should block until state +// becomes either ACTIVE or PAUSED. (Currently only two states are used, but later more +// states will be needed). Deadline should be an absolute timestamp. + +// Questions: +// - When should the function return true vs false +bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadline) { + waiting_mask_ = state_mask; + for (;;) { + // If no state is specified in state_mask, return immediately + if (state_mask == 0) { + return true; + } + + if (static_cast(get_state()) & state_mask) { + waiting_mask_ = 0; + return true; + } + + if (deadline >= 0 && deadline <= core::timestamp(core::ClockMonotonic)) { + waiting_mask_ = 0; + return false; + } + + if (sem_is_occupied_.compare_exchange(0, 1)) { + if (deadline >= 0) { + (void)sem_.timed_wait(deadline); + + } else { + sem_.wait(); + } + + sem_is_occupied_ = 0; + waiting_con_.broadcast(); + + } else { + core::Mutex::Lock lock(mutex_); + + if (deadline >= 0) { + (void)waiting_con_.timed_wait(deadline); + } else { + waiting_con_.wait(); + } + } + } } sndio::DeviceState StateTracker::get_state() const { @@ -65,22 +123,50 @@ size_t StateTracker::num_sessions() const { } void StateTracker::register_session() { - active_sessions_++; + if (active_sessions_++ == 0) { + signal_state_change(); + } } void StateTracker::unregister_session() { - if (--active_sessions_ < 0) { + int prev_sessions = active_sessions_--; + if (prev_sessions == 0) { roc_panic("state tracker: unpaired register/unregister session"); + } else if (prev_sessions == 1 && pending_packets_ == 0) { + signal_state_change(); } + + // if (--active_sessions_ < 0) { + // roc_panic("state tracker: unpaired register/unregister session"); + // } } void StateTracker::register_packet() { - pending_packets_++; + if (pending_packets_++ == 0 && active_sessions_ == 0) { + signal_state_change(); + } } void StateTracker::unregister_packet() { - if (--pending_packets_ < 0) { + int prev_packets = pending_packets_--; + if (prev_packets == 0) { roc_panic("state tracker: unpaired register/unregister packet"); + } else if (prev_packets == 1 && active_sessions_ == 0) { + signal_state_change(); + } + + // if (--pending_packets_ < 0) { + // roc_panic("state tracker: unpaired register/unregister packet"); + // } +} + +void StateTracker::signal_state_change() { + // if (waiting_mask_ != 0 && (static_cast(get_state()) & waiting_mask_)) { + // sem_.post(); + // } + if (sem_is_occupied_) { + roc_log(LogDebug, "signaling"); + sem_.post(); } } diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index 981ed0274..fd99fd79c 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -13,9 +13,13 @@ #define ROC_PIPELINE_STATE_TRACKER_H_ #include "roc_core/atomic.h" +#include "roc_core/cond.h" +#include "roc_core/mutex.h" #include "roc_core/noncopyable.h" +#include "roc_core/semaphore.h" #include "roc_core/stddefs.h" -#include "roc_sndio/device_defs.h" +#include "roc_core/time.h" +#include "roc_sndio/device_state.h" namespace roc { namespace pipeline { @@ -32,6 +36,9 @@ class StateTracker : public core::NonCopyable<> { //! Initialize all counters to zero. StateTracker(); + //! Block until state becomes any of the ones specified by state_mask. + bool wait_state(unsigned state_mask, core::nanoseconds_t deadline); + //! Compute current state. sndio::DeviceState get_state() const; @@ -63,9 +70,16 @@ class StateTracker : public core::NonCopyable<> { void unregister_packet(); private: + core::Semaphore sem_; core::Atomic halt_state_; core::Atomic active_sessions_; core::Atomic pending_packets_; + core::Atomic sem_is_occupied_; + core::Atomic waiting_mask_; + core::Mutex mutex_; + core::Cond waiting_con_; + + void signal_state_change(); }; } // namespace pipeline diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp new file mode 100644 index 000000000..f406fe699 --- /dev/null +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "test_harness.h" + +#include "roc_address/protocol.h" +#include "roc_audio/mixer.h" +#include "roc_audio/sample.h" +#include "roc_core/atomic.h" +#include "roc_core/heap_arena.h" +#include "roc_core/noop_arena.h" +#include "roc_core/thread.h" +#include "roc_core/time.h" +#include "roc_pipeline/config.h" +#include "roc_pipeline/receiver_endpoint.h" +#include "roc_pipeline/receiver_session_group.h" +#include "roc_core/semaphore.h" + +namespace roc { +namespace pipeline { + +namespace { + +enum { PacketSz = 512 }; + +core::HeapArena arena; + +packet::PacketFactory packet_factory(arena, PacketSz); +audio::FrameFactory frame_factory(arena, PacketSz * sizeof(audio::sample_t)); + +audio::ProcessorMap processor_map(arena); +rtp::EncodingMap encoding_map(arena); + +class TestThread : public core::Thread { +public: + TestThread(StateTracker& st, unsigned int state_mask, core::nanoseconds_t deadline) + : t_(st) + , r_(0) + , state_mask_(state_mask) + , deadline_(deadline) { + } + + bool running() const { + return r_; + } + + void wait_running() { + while (!r_) { + core::sleep_for(core::ClockMonotonic, core::Microsecond); + } + } + +private: + virtual void run() { + r_ = true; + t_.wait_state(state_mask_, deadline_); + r_ = false; + } + + StateTracker& t_; + core::Atomic r_; + unsigned int state_mask_; + core::nanoseconds_t deadline_; +}; + +} // namespace + +TEST_GROUP(state_tracker) {}; + +TEST(state_tracker, simple_timeout) { + StateTracker state_tracker; + TestThread thr(state_tracker, sndio::DeviceState_Active, + core::timestamp(core::ClockMonotonic) + core::Millisecond * 500); + + CHECK(thr.start()); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 1000); + CHECK(!(thr.running())); + thr.join(); +} + +TEST(state_tracker, multiple_timeout) { + StateTracker state_tracker; + TestThread** threads_ptr = new TestThread*[10]; + for (int i = 0; i < 10; i++) { + threads_ptr[i] = + new TestThread(state_tracker, sndio::DeviceState_Active, + core::timestamp(core::ClockMonotonic) + core::Millisecond * 1000); + } + + for (int i = 0; i < 10; i++) { + CHECK(threads_ptr[i]->start()); + } + + roc_log(LogDebug, "started running"); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 2000); + + for (int i = 0; i < 10; i++) { + CHECK(!threads_ptr[i]->running()); + } + + roc_log(LogDebug, "started joining"); + + for (int i = 0; i < 10; i++) { + threads_ptr[i]->join(); + } + + roc_log(LogDebug, "finished joining"); +} + +TEST(state_tracker, multiple_switch) { + StateTracker state_tracker; + TestThread** threads_ptr = new TestThread*[10]; + for (int i = 0; i < 10; i++) { + threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, -1); + } + + for (int i = 0; i < 10; i++) { + CHECK(threads_ptr[i]->start()); + } + + roc_log(LogDebug, "started running"); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + + for (int i = 0; i < 10; i++) { + CHECK(threads_ptr[i]->running()); + } + + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + state_tracker.register_packet(); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + + for (int i = 0; i < 10; i++) { + CHECK(!(threads_ptr[i]->running())); + } + + roc_log(LogDebug, "started joining"); + for (int i = 0; i < 10; i++) { + threads_ptr[i]->join(); + } + roc_log(LogDebug, "finished joining"); +} + +TEST(state_tracker, semaphore_test) { + core::Semaphore sem(0); + if (sem.timed_wait(1 * core::Second + core::timestamp(core::ClockMonotonic))) + roc_log(LogDebug, "true"); + else + roc_log(LogDebug, "false"); +} +} // namespace pipeline +} // namespace roc From dc83b76c37c21b67dafc6a0f575eb24466c93c9a Mon Sep 17 00:00:00 2001 From: flw5469 Date: Tue, 20 May 2025 20:02:51 +0800 Subject: [PATCH 2/5] Updated test cases for state_tracker, convert time domain in semaphore and add checking of sem_clockwait in compile time. --- SConstruct | 14 ++++++++++ .../target_posix_ext/roc_core/semaphore.cpp | 27 ++++++++++++++++--- .../roc_pipeline/state_tracker.h | 2 +- src/tests/roc_pipeline/test_state_tracker.cpp | 26 ++++++++++++++++-- 4 files changed, 62 insertions(+), 7 deletions(-) diff --git a/SConstruct b/SConstruct index 0e3e020bc..f01da593e 100644 --- a/SConstruct +++ b/SConstruct @@ -955,6 +955,20 @@ if meta.compiler in ['gcc', 'clang']: if meta.platform in ['linux', 'darwin']: env.AddManualDependency(libs=['pthread']) +#Check for existence of function sem_clockwait + temp_conf = Configure(env) + header = """ +#ifdef __cplusplus +extern "C" +#endif +char sem_timedwait(void); +""" + + if temp_conf.CheckFunc('sem_clockwait', header): + env.Append(CPPDEFINES=['ROC_HAVE_SEM_CLOCKWAIT']) + + + if meta.platform in ['linux', 'android'] or meta.gnu_toolchain: if not GetOption('disable_soversion'): subenvs.public_libs['SHLIBSUFFIX'] = '{}.{}'.format( diff --git a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp b/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp index 0d9685390..53102f449 100644 --- a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp +++ b/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp @@ -10,6 +10,7 @@ #include "roc_core/cpu_instructions.h" #include "roc_core/errno_to_str.h" #include "roc_core/panic.h" +#include "roc_core/log.h" #include #include @@ -38,14 +39,32 @@ bool Semaphore::timed_wait(nanoseconds_t deadline) { roc_panic("semaphore: unexpected negative deadline"); } + nanoseconds_t converted_deadline = deadline; + + // convert deadline's domain into CLOCK_REALTIME if sem_clockwait is not available + #ifndef ROC_HAVE_SEM_CLOCKWAIT + converted_deadline += (core::timestamp(core::ClockUnix)-core::timestamp(core::ClockMonotonic)); + #endif + + roc_log(roc::LogDebug,"origin time is %" PRId64"\n", deadline); + roc_log(roc::LogDebug,"time is %" PRId64"\n", converted_deadline); + roc_log(roc::LogDebug,"now time is %" PRId64"\n", core::timestamp(core::ClockMonotonic)); + + timespec ts; + ts.tv_sec = long(converted_deadline / Second); + ts.tv_nsec = long(converted_deadline % Second); + for (;;) { - timespec ts; - ts.tv_sec = long(deadline / Second); - ts.tv_nsec = long(deadline % Second); - if (sem_timedwait(&sem_, &ts) == 0) { + #ifdef ROC_HAVE_SEM_CLOCKWAIT + if (sem_clockwait(&sem_, CLOCK_MONOTONIC , &ts) == 0) { return true; } + #else + if (sem_timedwait(&sem_, &ts) == 0) { + return true; + } + #endif if (errno == ETIMEDOUT) { return false; diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index fd99fd79c..8b21faed3 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -19,7 +19,7 @@ #include "roc_core/semaphore.h" #include "roc_core/stddefs.h" #include "roc_core/time.h" -#include "roc_sndio/device_state.h" +#include "roc_sndio/device_defs.h" namespace roc { namespace pipeline { diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index f406fe699..91811d636 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -72,6 +72,7 @@ class TestThread : public core::Thread { TEST_GROUP(state_tracker) {}; +// set a thread that last for 0.5 seconds, wait for 1 second to make it timeout. TEST(state_tracker, simple_timeout) { StateTracker state_tracker; TestThread thr(state_tracker, sndio::DeviceState_Active, @@ -86,19 +87,32 @@ TEST(state_tracker, simple_timeout) { TEST(state_tracker, multiple_timeout) { StateTracker state_tracker; TestThread** threads_ptr = new TestThread*[10]; + + //set threads that last for 1 second for (int i = 0; i < 10; i++) { threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, core::timestamp(core::ClockMonotonic) + core::Millisecond * 1000); } + //wait for start, then check if threads are running for (int i = 0; i < 10; i++) { CHECK(threads_ptr[i]->start()); + // CHECK(threads_ptr[i]->running()); + // roc_log(LogDebug, "check running %d\n", i); + + } + core::sleep_for(core::ClockMonotonic, core::Millisecond * 10); + for (int i = 0; i < 10; i++) { + //roc_log(LogDebug, "check running %d\n", i); + CHECK(threads_ptr[i]->running()); } + //sleep for 2 seconds, making the threads timeout roc_log(LogDebug, "started running"); core::sleep_for(core::ClockMonotonic, core::Millisecond * 2000); + //check if threads are stopped for (int i = 0; i < 10; i++) { CHECK(!threads_ptr[i]->running()); } @@ -115,6 +129,8 @@ TEST(state_tracker, multiple_timeout) { TEST(state_tracker, multiple_switch) { StateTracker state_tracker; TestThread** threads_ptr = new TestThread*[10]; + + //set threads without waiting time for (int i = 0; i < 10; i++) { threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, -1); } @@ -124,16 +140,21 @@ TEST(state_tracker, multiple_switch) { } roc_log(LogDebug, "started running"); + + //wait for threads starting core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + //check if the threads have started for (int i = 0; i < 10; i++) { CHECK(threads_ptr[i]->running()); } + //register a packet core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); state_tracker.register_packet(); core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + //check if the threads have been stopped for (int i = 0; i < 10; i++) { CHECK(!(threads_ptr[i]->running())); } @@ -147,10 +168,11 @@ TEST(state_tracker, multiple_switch) { TEST(state_tracker, semaphore_test) { core::Semaphore sem(0); + roc_log(LogDebug, "ready"); if (sem.timed_wait(1 * core::Second + core::timestamp(core::ClockMonotonic))) - roc_log(LogDebug, "true"); + roc_log(LogDebug, "true, unlocked by other threads"); else - roc_log(LogDebug, "false"); + roc_log(LogDebug, "false, timeout"); } } // namespace pipeline } // namespace roc From fa6fcde421c5ef25c6bdd38cda838fe64c25d040 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Wed, 21 May 2025 16:24:04 +0800 Subject: [PATCH 3/5] update --- .../target_posix_ext/roc_core/semaphore.cpp | 31 ++++++++++--------- src/tests/roc_pipeline/test_state_tracker.cpp | 31 +++++++++---------- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp b/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp index 53102f449..8aea87c91 100644 --- a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp +++ b/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp @@ -9,8 +9,8 @@ #include "roc_core/semaphore.h" #include "roc_core/cpu_instructions.h" #include "roc_core/errno_to_str.h" -#include "roc_core/panic.h" #include "roc_core/log.h" +#include "roc_core/panic.h" #include #include @@ -41,30 +41,31 @@ bool Semaphore::timed_wait(nanoseconds_t deadline) { nanoseconds_t converted_deadline = deadline; - // convert deadline's domain into CLOCK_REALTIME if sem_clockwait is not available - #ifndef ROC_HAVE_SEM_CLOCKWAIT - converted_deadline += (core::timestamp(core::ClockUnix)-core::timestamp(core::ClockMonotonic)); - #endif +// convert deadline's domain into CLOCK_REALTIME if sem_clockwait is not available +#ifndef ROC_HAVE_SEM_CLOCKWAIT + converted_deadline += + (core::timestamp(core::ClockUnix) - core::timestamp(core::ClockMonotonic)); +#endif - roc_log(roc::LogDebug,"origin time is %" PRId64"\n", deadline); - roc_log(roc::LogDebug,"time is %" PRId64"\n", converted_deadline); - roc_log(roc::LogDebug,"now time is %" PRId64"\n", core::timestamp(core::ClockMonotonic)); + roc_log(roc::LogDebug, "origin time is %" PRId64 "\n", deadline); + roc_log(roc::LogDebug, "time is %" PRId64 "\n", converted_deadline); + roc_log(roc::LogDebug, "now time is %" PRId64 "\n", + core::timestamp(core::ClockMonotonic)); timespec ts; ts.tv_sec = long(converted_deadline / Second); ts.tv_nsec = long(converted_deadline % Second); - - for (;;) { - #ifdef ROC_HAVE_SEM_CLOCKWAIT - if (sem_clockwait(&sem_, CLOCK_MONOTONIC , &ts) == 0) { + for (;;) { +#ifdef ROC_HAVE_SEM_CLOCKWAIT + if (sem_clockwait(&sem_, CLOCK_MONOTONIC, &ts) == 0) { return true; } - #else +#else if (sem_timedwait(&sem_, &ts) == 0) { - return true; + return true; } - #endif +#endif if (errno == ETIMEDOUT) { return false; diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index 91811d636..761a16071 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -14,12 +14,12 @@ #include "roc_core/atomic.h" #include "roc_core/heap_arena.h" #include "roc_core/noop_arena.h" +#include "roc_core/semaphore.h" #include "roc_core/thread.h" #include "roc_core/time.h" #include "roc_pipeline/config.h" #include "roc_pipeline/receiver_endpoint.h" #include "roc_pipeline/receiver_session_group.h" -#include "roc_core/semaphore.h" namespace roc { namespace pipeline { @@ -88,31 +88,30 @@ TEST(state_tracker, multiple_timeout) { StateTracker state_tracker; TestThread** threads_ptr = new TestThread*[10]; - //set threads that last for 1 second + // set threads that last for 1 second for (int i = 0; i < 10; i++) { - threads_ptr[i] = - new TestThread(state_tracker, sndio::DeviceState_Active, - core::timestamp(core::ClockMonotonic) + core::Millisecond * 1000); + threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, + core::timestamp(core::ClockMonotonic) + + core::Millisecond * 1000); } - //wait for start, then check if threads are running + // wait for start, then check if threads are running for (int i = 0; i < 10; i++) { CHECK(threads_ptr[i]->start()); // CHECK(threads_ptr[i]->running()); // roc_log(LogDebug, "check running %d\n", i); - } core::sleep_for(core::ClockMonotonic, core::Millisecond * 10); for (int i = 0; i < 10; i++) { - //roc_log(LogDebug, "check running %d\n", i); - CHECK(threads_ptr[i]->running()); + // roc_log(LogDebug, "check running %d\n", i); + CHECK(threads_ptr[i]->running()); } - //sleep for 2 seconds, making the threads timeout + // sleep for 2 seconds, making the threads timeout roc_log(LogDebug, "started running"); core::sleep_for(core::ClockMonotonic, core::Millisecond * 2000); - //check if threads are stopped + // check if threads are stopped for (int i = 0; i < 10; i++) { CHECK(!threads_ptr[i]->running()); } @@ -130,7 +129,7 @@ TEST(state_tracker, multiple_switch) { StateTracker state_tracker; TestThread** threads_ptr = new TestThread*[10]; - //set threads without waiting time + // set threads without waiting time for (int i = 0; i < 10; i++) { threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, -1); } @@ -141,20 +140,20 @@ TEST(state_tracker, multiple_switch) { roc_log(LogDebug, "started running"); - //wait for threads starting + // wait for threads starting core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); - //check if the threads have started + // check if the threads have started for (int i = 0; i < 10; i++) { CHECK(threads_ptr[i]->running()); } - //register a packet + // register a packet core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); state_tracker.register_packet(); core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); - //check if the threads have been stopped + // check if the threads have been stopped for (int i = 0; i < 10; i++) { CHECK(!(threads_ptr[i]->running())); } From c17dc43a35a35146cdaf40494ac3ba9a20921186 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Wed, 21 May 2025 20:48:25 +0800 Subject: [PATCH 4/5] fix memory leak by adding delete of test case threads --- src/tests/roc_pipeline/test_state_tracker.cpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index 761a16071..8db1334f0 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -123,6 +123,11 @@ TEST(state_tracker, multiple_timeout) { } roc_log(LogDebug, "finished joining"); + + for (int i = 0; i < 10; ++i) { + delete threads_ptr[i]; + } + delete[] threads_ptr; } TEST(state_tracker, multiple_switch) { @@ -163,6 +168,11 @@ TEST(state_tracker, multiple_switch) { threads_ptr[i]->join(); } roc_log(LogDebug, "finished joining"); + + for (int i = 0; i < 10; ++i) { + delete threads_ptr[i]; + } + delete[] threads_ptr; } TEST(state_tracker, semaphore_test) { From 6ed45bd869233c27e411c0851870117510accd56 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Wed, 21 May 2025 20:50:55 +0800 Subject: [PATCH 5/5] format code --- src/tests/roc_pipeline/test_state_tracker.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index 8db1334f0..5c7a6bad8 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -125,7 +125,7 @@ TEST(state_tracker, multiple_timeout) { roc_log(LogDebug, "finished joining"); for (int i = 0; i < 10; ++i) { - delete threads_ptr[i]; + delete threads_ptr[i]; } delete[] threads_ptr; } @@ -170,7 +170,7 @@ TEST(state_tracker, multiple_switch) { roc_log(LogDebug, "finished joining"); for (int i = 0; i < 10; ++i) { - delete threads_ptr[i]; + delete threads_ptr[i]; } delete[] threads_ptr; }