From b4799afced0271bfa4bc63252c3a28abe437abf1 Mon Sep 17 00:00:00 2001 From: Chuck Fan Date: Mon, 18 Sep 2023 22:57:03 +0800 Subject: [PATCH] fix: windowwithtimeorcount thread unsafe --- reactivex/operators/_windowwithtimeorcount.py | 68 ++++++++++--------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/reactivex/operators/_windowwithtimeorcount.py b/reactivex/operators/_windowwithtimeorcount.py index 5dc9581c5..05acb0dc1 100644 --- a/reactivex/operators/_windowwithtimeorcount.py +++ b/reactivex/operators/_windowwithtimeorcount.py @@ -39,17 +39,18 @@ def create_timer(_id: int): timer_d.disposable = m def action(scheduler: abc.SchedulerBase, state: Any = None): - nonlocal n, s, window_id - if _id != window_id: - return - - n = 0 - window_id += 1 - new_id = window_id - s.on_completed() - s = Subject() - observer.on_next(add_ref(s, ref_count_disposable)) - create_timer(new_id) + with source.lock: + nonlocal n, s, window_id + if _id != window_id: + return + + n = 0 + window_id += 1 + new_id = window_id + s.on_completed() + s = Subject() + observer.on_next(add_ref(s, ref_count_disposable)) + create_timer(new_id) m.disposable = _scheduler.schedule_relative(timespan, action) @@ -57,31 +58,34 @@ def action(scheduler: abc.SchedulerBase, state: Any = None): create_timer(0) def on_next(x: _T) -> None: - nonlocal n, s, window_id - new_window = False - new_id = 0 - - s.on_next(x) - n += 1 - if n == count: - new_window = True - n = 0 - window_id += 1 - new_id = window_id - s.on_completed() - s = Subject() - observer.on_next(add_ref(s, ref_count_disposable)) - - if new_window: - create_timer(new_id) + with source.lock: + nonlocal n, s, window_id + new_window = False + new_id = 0 + + s.on_next(x) + n += 1 + if n == count: + new_window = True + n = 0 + window_id += 1 + new_id = window_id + s.on_completed() + s = Subject() + observer.on_next(add_ref(s, ref_count_disposable)) + + if new_window: + create_timer(new_id) def on_error(e: Exception) -> None: - s.on_error(e) - observer.on_error(e) + with source.lock: + s.on_error(e) + observer.on_error(e) def on_completed() -> None: - s.on_completed() - observer.on_completed() + with source.lock: + s.on_completed() + observer.on_completed() group_disposable.add( source.subscribe(on_next, on_error, on_completed, scheduler=scheduler_)