-
Notifications
You must be signed in to change notification settings - Fork 412
New actor hierarchy #1652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New actor hierarchy #1652
Conversation
📝 WalkthroughWalkthroughThe PR updates the ractor dependency to 0.14.3, adds ractor-supervisor 0.1.9, introduces audio metadata and sample-rate tracking throughout the audio stack, refactors the listener plugin's actor architecture from a flat session model to a supervisor-based hierarchy with controller and context actors, and adds stream utilities for push-based audio processing. Changes
Sequence DiagramsequenceDiagram
participant App as Tauri App
participant SS as SessionSupervisor
participant CA as ControllerActor
participant LS as LiveSupervisor
participant SA as SourceActor
participant RA as RecorderActor
participant LA as ListenerActor
App->>SS: start_session_supervisor(params)
SS->>CA: spawn ControllerActor
SS->>LS: spawn LiveSupervisor
LS->>SA: spawn SourceActor
LS->>RA: spawn RecorderActor
LS->>LA: spawn ListenerActor
SA->>SA: read sample_rate from context
SA->>RA: push audio chunks {samples, sample_rate}
SA->>LA: push audio chunks {samples, sample_rate}
RA->>RA: resample via PushSource + ResampledAsyncSource
LA->>LA: process with metadata-aware sample_rate
App->>CA: SetMicMute / ChangeMicDevice
CA->>SA: propagate device/mode changes
alt RestForOne triggered
LS->>SA: restart on failure
LS->>RA: restart on failure
LS->>LA: restart on failure
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes
Possibly related PRs
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
owhisper/owhisper-interface/src/lib.rs (1)
135-157: Keep ListenParams backward-compatibleAdding
pub sample_rate: u32without#[serde(default)]breaks every client payload that was serialized before this PR; deserialization now requires callers to send the new field. Please either add a serde default (e.g. 16000) or make the field optional so existing requests keep working.pub struct ListenParams { #[serde(default)] pub model: Option<String>, pub channels: u8, - pub sample_rate: u32, + #[serde(default = "ListenParams::default_sample_rate")] + pub sample_rate: u32, @@ impl ListenParams { + fn default_sample_rate() -> u32 { + 16000 + } }
🧹 Nitpick comments (3)
owhisper/owhisper-client/src/lib.rs (1)
66-71: Remove the deadsample_ratebindingLine 66 introduces
sample_rate, but after commenting out the append we never read it.cargo clippy -- -D warningswill flag this as unused. Please drop the binding (and the commented append) or hook the parameter back up before landing.- let sample_rate = params.sample_rate.to_string(); - query_pairs.append_pair("model", model); query_pairs.append_pair("encoding", "linear16"); - // query_pairs.append_pair("sample_rate", &sample_rate);crates/audio/src/speaker/windows.rs (2)
18-20: Consider tracking the actual negotiated sample rate.The hardcoded
44100matches the desired format at line 78. However, sinceautoconvert: trueis set at line 83, WASAPI may negotiate a different sample rate. Consider storing and returning the actual negotiated rate from the initialized audio client for accuracy.
65-67: Same consideration asSpeakerInput::sample_rate().This returns the same hardcoded
44100. If you implement the optional improvement suggested forSpeakerInput::sample_rate()(tracking actual negotiated rate), update this method accordingly to maintain consistency.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (2)
.cursor/rules/simple.mdcis excluded by!**/.cursor/**Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (32)
Cargo.toml(1 hunks)Taskfile.yaml(1 hunks)apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/hooks.ts(1 hunks)apps/desktop/src/hooks/useAutoEnhance.ts(1 hunks)crates/audio-utils/src/lib.rs(4 hunks)crates/audio-utils/src/stream.rs(1 hunks)crates/audio/src/mic.rs(1 hunks)crates/audio/src/speaker/linux.rs(1 hunks)crates/audio/src/speaker/macos.rs(1 hunks)crates/audio/src/speaker/mod.rs(1 hunks)crates/audio/src/speaker/windows.rs(1 hunks)owhisper/owhisper-client/src/lib.rs(2 hunks)owhisper/owhisper-interface/src/lib.rs(2 hunks)packages/tiptap/src/editor/index.tsx(1 hunks)packages/tiptap/src/shared/custom-list-keymap.ts(2 hunks)plugins/listener/Cargo.toml(1 hunks)plugins/listener/src/actors/batch.rs(4 hunks)plugins/listener/src/actors/context.rs(1 hunks)plugins/listener/src/actors/controller.rs(1 hunks)plugins/listener/src/actors/listener.rs(14 hunks)plugins/listener/src/actors/live_supervisor.rs(1 hunks)plugins/listener/src/actors/mod.rs(1 hunks)plugins/listener/src/actors/processor.rs(0 hunks)plugins/listener/src/actors/recorder.rs(8 hunks)plugins/listener/src/actors/session.rs(0 hunks)plugins/listener/src/actors/session_supervisor.rs(1 hunks)plugins/listener/src/actors/source.rs(7 hunks)plugins/listener/src/events.rs(1 hunks)plugins/listener/src/ext.rs(7 hunks)plugins/listener/src/lib.rs(1 hunks)plugins/local-stt/src/server/external.rs(2 hunks)plugins/local-stt/src/server/internal.rs(1 hunks)
💤 Files with no reviewable changes (2)
- plugins/listener/src/actors/processor.rs
- plugins/listener/src/actors/session.rs
🧰 Additional context used
🧬 Code graph analysis (18)
crates/audio/src/speaker/windows.rs (4)
crates/audio/src/mic.rs (2)
sample_rate(69-71)sample_rate(193-195)crates/audio/src/speaker/linux.rs (2)
sample_rate(10-12)sample_rate(26-28)crates/audio/src/speaker/macos.rs (2)
sample_rate(36-38)sample_rate(94-96)crates/audio/src/speaker/mod.rs (4)
sample_rate(46-48)sample_rate(51-53)sample_rate(99-101)sample_rate(104-106)
crates/audio/src/speaker/macos.rs (7)
crates/audio/src/mic.rs (2)
sample_rate(69-71)sample_rate(193-195)crates/audio/src/speaker/linux.rs (2)
sample_rate(10-12)sample_rate(26-28)crates/audio/src/speaker/mod.rs (4)
sample_rate(46-48)sample_rate(51-53)sample_rate(99-101)sample_rate(104-106)crates/audio/src/speaker/windows.rs (2)
sample_rate(18-20)sample_rate(65-67)crates/audio/src/lib.rs (1)
sample_rate(195-201)crates/audio/src/resampler.rs (2)
sample_rate(96-98)sample_rate(143-149)crates/audio/src/norm.rs (1)
sample_rate(118-120)
crates/audio/src/mic.rs (5)
crates/audio/src/speaker/linux.rs (2)
sample_rate(10-12)sample_rate(26-28)crates/audio/src/speaker/macos.rs (2)
sample_rate(36-38)sample_rate(94-96)crates/audio/src/speaker/mod.rs (4)
sample_rate(46-48)sample_rate(51-53)sample_rate(99-101)sample_rate(104-106)crates/audio/src/speaker/windows.rs (2)
sample_rate(18-20)sample_rate(65-67)crates/audio/src/lib.rs (1)
sample_rate(195-201)
plugins/local-stt/src/server/external.rs (1)
owhisper/owhisper-interface/src/stream.rs (1)
text(141-148)
apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/hooks.ts (1)
apps/desktop/src/store/tinybase/schema-external.ts (1)
Word(118-118)
plugins/listener/src/actors/batch.rs (1)
crates/audio-utils/src/lib.rs (1)
chunk_audio_file(187-227)
plugins/listener/src/actors/session_supervisor.rs (2)
plugins/listener/src/actors/live_supervisor.rs (1)
live_supervisor_spec(33-69)plugins/listener/src/actors/controller.rs (3)
app(53-55)new(43-51)name(92-94)
crates/audio/src/speaker/linux.rs (4)
crates/audio/src/mic.rs (2)
sample_rate(69-71)sample_rate(193-195)crates/audio/src/speaker/macos.rs (2)
sample_rate(36-38)sample_rate(94-96)crates/audio/src/speaker/mod.rs (4)
sample_rate(46-48)sample_rate(51-53)sample_rate(99-101)sample_rate(104-106)crates/audio/src/speaker/windows.rs (2)
sample_rate(18-20)sample_rate(65-67)
plugins/listener/src/actors/live_supervisor.rs (5)
plugins/listener/src/actors/listener.rs (1)
name(56-58)plugins/listener/src/actors/context.rs (1)
new(13-18)plugins/listener/src/actors/controller.rs (3)
new(43-51)app(53-55)name(92-94)plugins/listener/src/actors/source.rs (2)
new(482-488)name(73-75)plugins/listener/src/actors/recorder.rs (1)
name(38-40)
owhisper/owhisper-client/src/lib.rs (2)
crates/audio-utils/src/stream.rs (1)
sample_rate(180-190)crates/ws-utils/src/lib.rs (2)
sample_rate(117-119)sample_rate(145-147)
crates/audio/src/speaker/mod.rs (5)
crates/audio/src/mic.rs (2)
sample_rate(69-71)sample_rate(193-195)crates/audio/src/speaker/linux.rs (2)
sample_rate(10-12)sample_rate(26-28)crates/audio/src/speaker/macos.rs (2)
sample_rate(36-38)sample_rate(94-96)crates/audio/src/speaker/windows.rs (2)
sample_rate(18-20)sample_rate(65-67)crates/audio/src/lib.rs (1)
sample_rate(195-201)
owhisper/owhisper-interface/src/lib.rs (1)
crates/ws-utils/src/lib.rs (2)
sample_rate(117-119)sample_rate(145-147)
plugins/listener/src/actors/listener.rs (4)
crates/audio-utils/src/stream.rs (1)
sample_rate(180-190)plugins/listener/src/actors/source.rs (1)
request_rest_for_one(453-470)plugins/listener/src/actors/context.rs (2)
default(34-36)default(47-53)owhisper/owhisper-interface/src/lib.rs (2)
default(130-132)default(152-161)
plugins/listener/src/actors/context.rs (1)
plugins/listener/src/actors/controller.rs (1)
new(43-51)
plugins/listener/src/actors/recorder.rs (3)
crates/audio-utils/src/vorbis.rs (2)
decode_vorbis_to_wav_file(111-150)encode_wav_to_vorbis_file(152-175)crates/audio-utils/src/stream.rs (3)
drain_stream(17-46)new(86-98)sample_rate(180-190)crates/audio/src/resampler.rs (1)
source(117-117)
plugins/listener/src/ext.rs (4)
plugins/listener/src/actors/batch.rs (2)
spawn_batch_actor(92-95)name(87-89)plugins/listener/src/actors/session_supervisor.rs (1)
start_session_supervisor(16-42)plugins/listener/src/actors/controller.rs (2)
name(92-94)app(53-55)crates/audio-utils/src/lib.rs (1)
audio_file_metadata(121-126)
plugins/listener/src/actors/controller.rs (2)
plugins/listener/src/actors/context.rs (1)
new(13-18)plugins/listener/src/actors/source.rs (5)
new(482-488)name(73-75)pre_start(84-183)handle(185-228)post_stop(230-241)
plugins/listener/src/actors/source.rs (6)
crates/audio/src/speaker/macos.rs (5)
std(132-132)sample_rate(36-38)sample_rate(94-96)new(51-92)stream(152-188)plugins/listener/src/actors/listener.rs (3)
request_rest_for_one(181-207)name(56-58)handle(105-159)crates/audio/src/mic.rs (6)
mpsc(76-76)sample_rate(69-71)sample_rate(193-195)new(34-67)device(89-89)stream(75-154)plugins/listener/src/actors/context.rs (3)
default(34-36)default(47-53)new(13-18)plugins/listener/src/actors/recorder.rs (2)
name(38-40)handle(92-108)crates/audio-utils/src/lib.rs (1)
f32_to_i16_bytes(65-75)
🔇 Additional comments (18)
apps/desktop/src/components/main/body/sessions/note-input/transcript/shared/hooks.ts (1)
16-20: Intermediate variable is acceptable; inline return would also work.Extracting the computed array into
retis a matter of style. The previous inline return was equally readable. If debugging intermediate states is the intent, this works well; otherwise, consider reverting for brevity.plugins/local-stt/src/server/external.rs (2)
40-40: LGTM! Required for ractor 0.14.3 compatibility.The
#[ractor::async_trait]attribute is necessary for the updated ractor dependency mentioned in the PR objectives.
65-66: LGTM! Log noise reduction.The additional filters for interim transcription messages and handler logs are consistent with the existing filtering pattern and appropriately reduce log verbosity.
plugins/local-stt/src/server/internal.rs (2)
39-39: LGTM! Required for ractor 0.14.3 compatibility.The
#[ractor::async_trait]attribute aligns with the ractor dependency update.
45-53: LGTM! Correct Actor initialization pattern.The
pre_startimplementation correctly accepts and destructures the startup arguments, following the standard ractor Actor pattern.packages/tiptap/src/editor/index.tsx (1)
83-86: LGTM! Tab handling now delegates to extensions.The change from
return truetoreturn falseis correct and necessary for the new list indentation feature. ThepreventDefault()still blocks the browser's default Tab behavior (moving focus out of the editor), but returningfalsenow allows TipTap's extension chain to process the Tab key—specifically enabling theCustomListKeymapextension incustom-list-keymap.tsto handle list indentation via Tab.packages/tiptap/src/shared/custom-list-keymap.ts (4)
9-15: LGTM! Safe schema lookup with proper null handling.The helper correctly maps list types to schema node names with appropriate optional chaining and type guards to handle missing nodes gracefully.
16-36: LGTM! Well-structured command execution with appropriate fallback.The helper correctly iterates through supported list types, checks for active state, and executes the indent/dedent command. The
scrollIntoView: falseoption prevents unwanted scrolling during indentation, which is good UX.
79-87: LGTM! Tab indentation with appropriate fallback.The Tab handler correctly attempts list indentation first, then falls back to the original shortcut. This works in coordination with the Editor component's Tab handler (which prevents default browser behavior while allowing extension processing).
89-97: LGTM! Shift-Tab dedentation mirrors Tab indentation pattern.The Shift-Tab handler correctly attempts list dedentation with appropriate fallback, maintaining consistency with the Tab handler implementation.
owhisper/owhisper-client/src/lib.rs (1)
105-115: Confirm the default sample-rate fallbackLines 105-115 now forward
params.sample_ratedirectly into the query. Callers—including the tests further down—still rely onListenParams { ..Default::default() }, so ifListenParams::default().sample_ratewasn’t updated to a non-zero (e.g., 16 kHz) value we’ll start emittingsample_rate=0to the backend. Please double-check the default or add an explicit fallback here to preserve the previous behavior before shipping.crates/audio/src/mic.rs (1)
69-71: LGTM!The
sample_rate()accessor provides a clean public API for querying the microphone's sample rate, consistent with theAsyncSourcetrait implementation at lines 193-195.crates/audio/src/speaker/macos.rs (1)
94-96: Verify thatasbd()never fails after construction.The
.unwrap()at line 95 could panic ifasbd()returns an error. While this pattern appears safe (the same call is used successfully at line 153), ensure thatasbd()always succeeds once theTapGuardis constructed.crates/audio/src/speaker/linux.rs (1)
10-12: LGTM!The hardcoded sample rate of 16000 is appropriate for the Linux stub implementation, matching the pattern used for recorded audio in
crates/audio/src/lib.rsat line 199.plugins/listener/src/events.rs (1)
38-58: Excellent defensive fix for non-finite audio samples.The filter is added to exclude non-finite entries from mic and speaker magnitude calculations. This prevents NaN or infinity values from propagating through
max_by()(which could fail or produce unexpected results with non-finite values) and ultimately reaching the UI as invalid amplitude values.crates/audio/src/speaker/mod.rs (1)
45-53: LGTM! Clean platform abstraction.The dual
cfgblocks provide a uniform API across platforms while returning a safe default (0) for unsupported platforms, consistent with the error handling patterns used innew()andstream()methods.plugins/listener/src/lib.rs (1)
25-25: Actor migration is complete and correctly implemented.The registry lookup at line 25 properly uses
SESSION_SUPERVISOR_NAME, which is defined as"session_supervisor"in the actors module and used consistently across all registry operations. The supervisor is registered with this exact name viaSupervisor::spawn(), and all oldSessionActorreferences have been removed.Cargo.toml (1)
135-136: No issues found. The ractor version downgrade is intentional and required.ractor-supervisor 0.1.9's default feature "async-trait" requires ractor ^0.14.3, which explains why the dependency was pinned to 0.14.3. The Cargo.lock file confirms ractor resolves to 0.14.7, which satisfies this constraint. This is a necessary dependency requirement, not a rollback or bug fix, and the versions are properly compatible.
| if (listenerJustStopped) { | ||
| startEnhance(); | ||
| } | ||
| }, [listenerStatus, prevListenerStatus, startEnhance]); | ||
|
|
||
| useEffect(() => { | ||
| if (enhanceTask.status === "generating" && tab.state.editor !== "enhanced") { | ||
| updateSessionTabState(tab, { editor: "enhanced" }); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify that setting editor state to "enhanced" before enhancement completes is intentional.
The editor state is updated to "enhanced" immediately when the listener stops (line 62), but startEnhance() (line 61) runs asynchronously and may not even start due to guards at line 46. This creates several potential issues:
- If
startEnhancereturns early (no model, no transcript, or already generating), the UI shows "enhanced" when no enhancement occurred - The UI indicates "enhanced" before the enhancement task actually completes
- If the enhancement task fails, the editor remains in "enhanced" state without actual enhanced content
Consider moving updateSessionTabState into the onSuccess callback (lines 38-42) to ensure the editor state only updates when enhancement actually succeeds.
I'll examine the codebase to verify the review comment's concerns about the timing of the editor state update.
---
🏁 Script executed:
```shell
#!/bin/bash
# Retrieve the useAutoEnhance.ts file to examine the full context
cat apps/desktop/src/hooks/useAutoEnhance.ts
Length of output: 2002
Move updateSessionTabState to the onSuccess callback to ensure editor state only updates when enhancement succeeds.
The review comment's concerns are valid:
-
Guards at lines 46-48 confirm early return risk: The condition
if (!model || !hasTranscript || enhanceTask.status === "generating")can causestartEnhance()to exit without starting the task. -
State update happens immediately (line 62): Calling
updateSessionTabState(tab, { editor: "enhanced" })right afterstartEnhance()means the editor state is set to "enhanced" even if the enhancement never starts. -
onSuccess callback incomplete (lines 38-42): The callback only updates
updateEnhancedMd(text)but doesn't update the editor state indicator. The state update should be added here to guarantee it only happens when the task succeeds. -
Missing failure handling: If the enhancement task fails, the editor remains stuck in "enhanced" state without actual enhanced content.
The fix requires modifying the onSuccess callback to include updateSessionTabState(tab, { editor: "enhanced" }) and removing it from the synchronous effect, ensuring the UI accurately reflects the actual enhancement status.
🤖 Prompt for AI Agents
In apps/desktop/src/hooks/useAutoEnhance.ts around lines 60 to 63, the code
updates the session tab state to "enhanced" immediately after calling
startEnhance(), which can set the UI to enhanced even if enhancement never
starts or fails; move the call updateSessionTabState(tab, { editor: "enhanced"
}) out of the synchronous branch and into the enhanceTask's onSuccess callback
(add it alongside updateEnhancedMd(text)), and remove the immediate call so the
editor state only flips when the enhancement actually succeeds; also ensure
onFailure/onError handling resets or clears the editor state to avoid leaving it
stuck on "enhanced" after a failed enhancement.
| updateSessionTabState(tab, { editor: "enhanced" }); | ||
| } | ||
| }, [enhanceTask.status, tab, updateSessionTabState]); | ||
| }, [listenerStatus, prevListenerStatus, startEnhance]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add missing dependencies to the useEffect dependency array.
The effect uses tab (line 62) and updateSessionTabState (line 62) but neither is included in the dependency array. This violates the Rules of Hooks and can lead to stale closures where the effect references outdated values.
Apply this diff:
- }, [listenerStatus, prevListenerStatus, startEnhance]);
+ }, [listenerStatus, prevListenerStatus, startEnhance, tab, updateSessionTabState]);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| }, [listenerStatus, prevListenerStatus, startEnhance]); | |
| }, [listenerStatus, prevListenerStatus, startEnhance, tab, updateSessionTabState]); |
🤖 Prompt for AI Agents
In apps/desktop/src/hooks/useAutoEnhance.ts around line 64, the useEffect
dependency array currently lists [listenerStatus, prevListenerStatus,
startEnhance] but the effect also reads `tab` and `updateSessionTabState`; add
`tab` and `updateSessionTabState` to the dependency array (or if
`updateSessionTabState` is recreated every render, memoize it with useCallback
where it's defined) so the effect re-runs with the correct values and avoids
stale closures.
| #[derive(Debug, Default)] | ||
| struct Shared { | ||
| queue: VecDeque<Chunk>, | ||
| current: Option<Chunk>, | ||
| index: usize, | ||
| sample_rate: u32, | ||
| closed: bool, | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| struct Chunk { | ||
| samples: Vec<f32>, | ||
| sample_rate: u32, | ||
| } | ||
|
|
||
| impl PushSource { | ||
| /// Create a new push source with an initial sample rate. | ||
| pub fn new(initial_sample_rate: u32) -> (Self, PushSourceHandle) { | ||
| let shared = Arc::new(Mutex::new(Shared { | ||
| sample_rate: initial_sample_rate, | ||
| ..Default::default() | ||
| })); | ||
|
|
||
| ( | ||
| Self { | ||
| shared: shared.clone(), | ||
| }, | ||
| PushSourceHandle { shared }, | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| impl PushSourceHandle { | ||
| /// Queue a chunk of samples produced at the provided sample rate. | ||
| pub fn push(&self, samples: Vec<f32>, sample_rate: u32) { | ||
| if samples.is_empty() || sample_rate == 0 { | ||
| return; | ||
| } | ||
|
|
||
| let mut shared = self.shared.lock().unwrap(); | ||
| shared.queue.push_back(Chunk { | ||
| samples, | ||
| sample_rate, | ||
| }); | ||
| } | ||
|
|
||
| /// Signal that no additional data will arrive. | ||
| pub fn close(&self) { | ||
| let mut shared = self.shared.lock().unwrap(); | ||
| shared.closed = true; | ||
| } | ||
| } | ||
|
|
||
| struct PushSourceStream<'a> { | ||
| source: &'a mut PushSource, | ||
| } | ||
|
|
||
| impl<'a> Stream for PushSourceStream<'a> { | ||
| type Item = f32; | ||
|
|
||
| fn poll_next( | ||
| self: std::pin::Pin<&mut Self>, | ||
| _cx: &mut Context<'_>, | ||
| ) -> Poll<Option<Self::Item>> { | ||
| let mut shared = self.source.shared.lock().unwrap(); | ||
|
|
||
| loop { | ||
| if let Some(len) = shared.current.as_ref().map(|chunk| chunk.samples.len()) { | ||
| if shared.index < len { | ||
| let sample = shared.current.as_ref().unwrap().samples[shared.index]; | ||
| shared.index += 1; | ||
|
|
||
| if shared.index == len { | ||
| shared.current = None; | ||
| shared.index = 0; | ||
| } | ||
|
|
||
| return Poll::Ready(Some(sample)); | ||
| } | ||
|
|
||
| shared.current = None; | ||
| shared.index = 0; | ||
| continue; | ||
| } | ||
|
|
||
| if let Some(next) = shared.queue.pop_front() { | ||
| shared.sample_rate = next.sample_rate; | ||
|
|
||
| if next.samples.is_empty() { | ||
| continue; | ||
| } | ||
|
|
||
| shared.current = Some(next); | ||
| shared.index = 0; | ||
| continue; | ||
| } | ||
|
|
||
| return if shared.closed { | ||
| Poll::Ready(None) | ||
| } else { | ||
| Poll::Pending | ||
| }; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wake consumers when new audio arrives
PushSourceStream::poll_next never stores the caller’s waker, and push/close never wake one. After the first Poll::Pending, the runtime has no reason to poll again, so downstream awaits will hang even though new samples are queued. Persist the waker in Shared and wake it from push/close before returning pending.
Apply this diff to make the stream wake correctly:
#[derive(Debug, Default)]
struct Shared {
queue: VecDeque<Chunk>,
current: Option<Chunk>,
index: usize,
sample_rate: u32,
closed: bool,
+ waker: Option<Waker>,
}
@@
pub fn push(&self, samples: Vec<f32>, sample_rate: u32) {
if samples.is_empty() || sample_rate == 0 {
return;
}
let mut shared = self.shared.lock().unwrap();
shared.queue.push_back(Chunk {
samples,
sample_rate,
});
+ if let Some(waker) = shared.waker.take() {
+ waker.wake();
+ }
}
@@
pub fn close(&self) {
let mut shared = self.shared.lock().unwrap();
shared.closed = true;
+ if let Some(waker) = shared.waker.take() {
+ waker.wake();
+ }
}
@@
- fn poll_next(
- self: std::pin::Pin<&mut Self>,
- _cx: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
+ fn poll_next(
+ self: std::pin::Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<Option<Self::Item>> {
let mut shared = self.source.shared.lock().unwrap();
@@
- return if shared.closed {
- Poll::Ready(None)
- } else {
- Poll::Pending
- };
+ if shared.closed {
+ return Poll::Ready(None);
+ }
+
+ shared.waker = Some(cx.waker().clone());
+ return Poll::Pending;
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| #[derive(Debug, Default)] | |
| struct Shared { | |
| queue: VecDeque<Chunk>, | |
| current: Option<Chunk>, | |
| index: usize, | |
| sample_rate: u32, | |
| closed: bool, | |
| } | |
| #[derive(Debug)] | |
| struct Chunk { | |
| samples: Vec<f32>, | |
| sample_rate: u32, | |
| } | |
| impl PushSource { | |
| /// Create a new push source with an initial sample rate. | |
| pub fn new(initial_sample_rate: u32) -> (Self, PushSourceHandle) { | |
| let shared = Arc::new(Mutex::new(Shared { | |
| sample_rate: initial_sample_rate, | |
| ..Default::default() | |
| })); | |
| ( | |
| Self { | |
| shared: shared.clone(), | |
| }, | |
| PushSourceHandle { shared }, | |
| ) | |
| } | |
| } | |
| impl PushSourceHandle { | |
| /// Queue a chunk of samples produced at the provided sample rate. | |
| pub fn push(&self, samples: Vec<f32>, sample_rate: u32) { | |
| if samples.is_empty() || sample_rate == 0 { | |
| return; | |
| } | |
| let mut shared = self.shared.lock().unwrap(); | |
| shared.queue.push_back(Chunk { | |
| samples, | |
| sample_rate, | |
| }); | |
| } | |
| /// Signal that no additional data will arrive. | |
| pub fn close(&self) { | |
| let mut shared = self.shared.lock().unwrap(); | |
| shared.closed = true; | |
| } | |
| } | |
| struct PushSourceStream<'a> { | |
| source: &'a mut PushSource, | |
| } | |
| impl<'a> Stream for PushSourceStream<'a> { | |
| type Item = f32; | |
| fn poll_next( | |
| self: std::pin::Pin<&mut Self>, | |
| _cx: &mut Context<'_>, | |
| ) -> Poll<Option<Self::Item>> { | |
| let mut shared = self.source.shared.lock().unwrap(); | |
| loop { | |
| if let Some(len) = shared.current.as_ref().map(|chunk| chunk.samples.len()) { | |
| if shared.index < len { | |
| let sample = shared.current.as_ref().unwrap().samples[shared.index]; | |
| shared.index += 1; | |
| if shared.index == len { | |
| shared.current = None; | |
| shared.index = 0; | |
| } | |
| return Poll::Ready(Some(sample)); | |
| } | |
| shared.current = None; | |
| shared.index = 0; | |
| continue; | |
| } | |
| if let Some(next) = shared.queue.pop_front() { | |
| shared.sample_rate = next.sample_rate; | |
| if next.samples.is_empty() { | |
| continue; | |
| } | |
| shared.current = Some(next); | |
| shared.index = 0; | |
| continue; | |
| } | |
| return if shared.closed { | |
| Poll::Ready(None) | |
| } else { | |
| Poll::Pending | |
| }; | |
| } | |
| #[derive(Debug, Default)] | |
| struct Shared { | |
| queue: VecDeque<Chunk>, | |
| current: Option<Chunk>, | |
| index: usize, | |
| sample_rate: u32, | |
| closed: bool, | |
| waker: Option<Waker>, | |
| } | |
| #[derive(Debug)] | |
| struct Chunk { | |
| samples: Vec<f32>, | |
| sample_rate: u32, | |
| } | |
| impl PushSource { | |
| /// Create a new push source with an initial sample rate. | |
| pub fn new(initial_sample_rate: u32) -> (Self, PushSourceHandle) { | |
| let shared = Arc::new(Mutex::new(Shared { | |
| sample_rate: initial_sample_rate, | |
| ..Default::default() | |
| })); | |
| ( | |
| Self { | |
| shared: shared.clone(), | |
| }, | |
| PushSourceHandle { shared }, | |
| ) | |
| } | |
| } | |
| impl PushSourceHandle { | |
| /// Queue a chunk of samples produced at the provided sample rate. | |
| pub fn push(&self, samples: Vec<f32>, sample_rate: u32) { | |
| if samples.is_empty() || sample_rate == 0 { | |
| return; | |
| } | |
| let mut shared = self.shared.lock().unwrap(); | |
| shared.queue.push_back(Chunk { | |
| samples, | |
| sample_rate, | |
| }); | |
| if let Some(waker) = shared.waker.take() { | |
| waker.wake(); | |
| } | |
| } | |
| /// Signal that no additional data will arrive. | |
| pub fn close(&self) { | |
| let mut shared = self.shared.lock().unwrap(); | |
| shared.closed = true; | |
| if let Some(waker) = shared.waker.take() { | |
| waker.wake(); | |
| } | |
| } | |
| } | |
| struct PushSourceStream<'a> { | |
| source: &'a mut PushSource, | |
| } | |
| impl<'a> Stream for PushSourceStream<'a> { | |
| type Item = f32; | |
| fn poll_next( | |
| self: std::pin::Pin<&mut Self>, | |
| cx: &mut Context<'_>, | |
| ) -> Poll<Option<Self::Item>> { | |
| let mut shared = self.source.shared.lock().unwrap(); | |
| loop { | |
| if let Some(len) = shared.current.as_ref().map(|chunk| chunk.samples.len()) { | |
| if shared.index < len { | |
| let sample = shared.current.as_ref().unwrap().samples[shared.index]; | |
| shared.index += 1; | |
| if shared.index == len { | |
| shared.current = None; | |
| shared.index = 0; | |
| } | |
| return Poll::Ready(Some(sample)); | |
| } | |
| shared.current = None; | |
| shared.index = 0; | |
| continue; | |
| } | |
| if let Some(next) = shared.queue.pop_front() { | |
| shared.sample_rate = next.sample_rate; | |
| if next.samples.is_empty() { | |
| continue; | |
| } | |
| shared.current = Some(next); | |
| shared.index = 0; | |
| continue; | |
| } | |
| if shared.closed { | |
| return Poll::Ready(None); | |
| } | |
| shared.waker = Some(cx.waker().clone()); | |
| return Poll::Pending; | |
| } | |
| } | |
| } |
| tracing::debug!("batch task: creating listen client"); | ||
| let channel_count = args.listen_params.channels.clamp(1, 2); | ||
| let channel_count = metadata.channels.clamp(1, 2); | ||
| let listen_params = owhisper_interface::ListenParams { | ||
| channels: metadata.channels, | ||
| sample_rate: metadata.sample_rate, | ||
| ..args.listen_params.clone() | ||
| }; | ||
| let client = owhisper_client::ListenClient::builder() | ||
| .api_base(args.base_url) | ||
| .api_key(args.api_key) | ||
| .params(args.listen_params.clone()) | ||
| .params(listen_params) | ||
| .build_with_channels(channel_count); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reject multi-channel audio (>2) before streaming.
hypr_audio_utils::chunk_audio_file preserves the original channel count, so for surround files we still hold >2 interleaved channels here. We clamp channel_count to 1–2 for build_with_channels, but we continue streaming the full-width PCM payload and tell the service it’s only 1–2 channels. That desynchronizes frame parsing and produces nonsense audio on the remote side. Please fail fast (or downmix) when metadata.channels > 2, and make the listen params reflect the actual channel_count used for streaming.
- let channel_count = metadata.channels.clamp(1, 2);
- let listen_params = owhisper_interface::ListenParams {
- channels: metadata.channels,
+ let channel_count = metadata.channels.clamp(1, 2);
+ if metadata.channels > 2 {
+ let error = format!("unsupported channel count {}", metadata.channels);
+ tracing::error!(channels = metadata.channels, "batch task: {error}");
+ notify_start_result(&start_notifier, Err(error.clone()));
+ let _ = myself.send_message(BatchMsg::StreamStartFailed(error));
+ return;
+ }
+
+ let listen_params = owhisper_interface::ListenParams {
+ channels: channel_count,
sample_rate: metadata.sample_rate,
..args.listen_params.clone()
};📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| tracing::debug!("batch task: creating listen client"); | |
| let channel_count = args.listen_params.channels.clamp(1, 2); | |
| let channel_count = metadata.channels.clamp(1, 2); | |
| let listen_params = owhisper_interface::ListenParams { | |
| channels: metadata.channels, | |
| sample_rate: metadata.sample_rate, | |
| ..args.listen_params.clone() | |
| }; | |
| let client = owhisper_client::ListenClient::builder() | |
| .api_base(args.base_url) | |
| .api_key(args.api_key) | |
| .params(args.listen_params.clone()) | |
| .params(listen_params) | |
| .build_with_channels(channel_count); | |
| tracing::debug!("batch task: creating listen client"); | |
| let channel_count = metadata.channels.clamp(1, 2); | |
| if metadata.channels > 2 { | |
| let error = format!("unsupported channel count {}", metadata.channels); | |
| tracing::error!(channels = metadata.channels, "batch task: {error}"); | |
| notify_start_result(&start_notifier, Err(error.clone())); | |
| let _ = myself.send_message(BatchMsg::StreamStartFailed(error)); | |
| return; | |
| } | |
| let listen_params = owhisper_interface::ListenParams { | |
| channels: channel_count, | |
| sample_rate: metadata.sample_rate, | |
| ..args.listen_params.clone() | |
| }; | |
| let client = owhisper_client::ListenClient::builder() | |
| .api_base(args.base_url) | |
| .api_key(args.api_key) | |
| .params(listen_params) | |
| .build_with_channels(channel_count); |
| SessionEvent::RunningActive {} | ||
| .emit(args.shared.app()) | ||
| .unwrap(); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid panicking on event emission failure
SessionEvent::RunningActive {}.emit(...) can fail (window gone, JS state not ready). Using .unwrap() will panic the controller during pre_start, collapsing the whole supervisor tree. Propagate the error instead, just like post_stop already does.
Apply this diff to propagate the error safely:
- SessionEvent::RunningActive {}
- .emit(args.shared.app())
- .unwrap();
+ SessionEvent::RunningActive {}
+ .emit(args.shared.app())?;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| SessionEvent::RunningActive {} | |
| .emit(args.shared.app()) | |
| .unwrap(); | |
| SessionEvent::RunningActive {} | |
| .emit(args.shared.app())?; |
🤖 Prompt for AI Agents
In plugins/listener/src/actors/controller.rs around lines 113 to 116, the call
SessionEvent::RunningActive {}.emit(args.shared.app()).unwrap() can panic if
emission fails; replace the unwrap with proper error propagation (e.g. use ? or
map_err to convert the emit error into the function's error type) so pre_start
returns the error instead of panicking, matching how post_stop propagates emit
failures.
| let outbound = tokio_stream::StreamExt::map( | ||
| tokio_stream::wrappers::ReceiverStream::new(rx), | ||
| |msg| match msg { | ||
| MixedMessage::Audio((mic, spk)) => { | ||
| MixedMessage::Audio((spk, bytes::Bytes::from(vec![0; mic.len()]))) | ||
| } | ||
| MixedMessage::Control(c) => MixedMessage::Control(c), | ||
| }, | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restore mic channel forwarding in dual mode
The dual-mode outbound map now drops the microphone payload entirely by sending (spk, zeroes). ListenerMsg::Audio is populated with (mic, spk) from SourceActor, so this regression means the remote service never receives any actual mic samples and final transcripts will miss user speech. Please keep the original mic bytes (and only pad if you truly need to match lengths) so both channels survive the hand-off.
Apply this diff to restore the mic channel:
|msg| match msg {
- MixedMessage::Audio((mic, spk)) => {
- MixedMessage::Audio((spk, bytes::Bytes::from(vec![0; mic.len()])))
- }
+ MixedMessage::Audio((mic, spk)) => MixedMessage::Audio((mic, spk)),📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let outbound = tokio_stream::StreamExt::map( | |
| tokio_stream::wrappers::ReceiverStream::new(rx), | |
| |msg| match msg { | |
| MixedMessage::Audio((mic, spk)) => { | |
| MixedMessage::Audio((spk, bytes::Bytes::from(vec![0; mic.len()]))) | |
| } | |
| MixedMessage::Control(c) => MixedMessage::Control(c), | |
| }, | |
| ); | |
| let outbound = tokio_stream::StreamExt::map( | |
| tokio_stream::wrappers::ReceiverStream::new(rx), | |
| |msg| match msg { | |
| MixedMessage::Audio((mic, spk)) => MixedMessage::Audio((mic, spk)), | |
| MixedMessage::Control(c) => MixedMessage::Control(c), | |
| }, | |
| ); |
🤖 Prompt for AI Agents
In plugins/listener/src/actors/listener.rs around lines 295 to 303, the outbound
Stream map currently replaces the mic payload with zero bytes (sending (spk,
zeroes)) which drops the original mic samples; change the mapping to preserve
the original mic bytes and only pad the other channel if necessary to match
lengths. Specifically, when matching MixedMessage::Audio((mic, spk)) return
MixedMessage::Audio((mic, spk_or_padded)) where mic is the original bytes and
spk_or_padded is the speaker bytes padded to the mic length only if required,
and leave MixedMessage::Control unchanged.
| Ok(mic_input) => mic_input.stream().chunks(AEC_BLOCK_SIZE), | ||
| Err(err) => { | ||
| tracing::error!( | ||
| error = ?err, | ||
| mic_device = ?mic_device_name, | ||
| "mic_stream_init_failed" | ||
| ); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; | ||
|
|
||
| let spk_stream = match SpeakerInput::new().and_then(|input| input.stream()) { | ||
| Ok(stream) => stream.chunks(AEC_BLOCK_SIZE), | ||
| Err(err) => { | ||
| tracing::error!(error = ?err, "speaker_stream_init_failed"); | ||
| return; | ||
| } | ||
| }; | ||
|
|
||
| tokio::pin!(mic_stream); | ||
| tokio::pin!(spk_stream); | ||
|
|
||
| let mut agc_m = hypr_agc::Agc::default(); | ||
| let mut agc_s = hypr_agc::Agc::default(); | ||
| let mut joiner = Joiner::new(); | ||
| let mut last_sent_mic: Option<Arc<[f32]>> = None; | ||
| let mut last_sent_spk: Option<Arc<[f32]>> = None; | ||
| let mut last_amp_emit = Instant::now(); | ||
|
|
||
| loop { | ||
| tokio::select! { | ||
| mic_next = mic_stream.next() => { | ||
| if let Some(mut data) = mic_next { | ||
| let output_data = if mic_muted.load(Ordering::Relaxed) { | ||
| vec![0.0; data.len()] | ||
| } else { | ||
| agc_m.process(&mut data); | ||
| data | ||
| }; | ||
| let arc = Arc::<[f32]>::from(output_data); | ||
| joiner.push_mic(arc); | ||
| process_ready_inline(&mut joiner, mode, sample_rate, &mut last_sent_mic, &mut last_sent_spk, &mut last_amp_emit, &app).await; | ||
| } else { | ||
| break; | ||
| } | ||
| } | ||
| spk_next = spk_stream.next() => { | ||
| if let Some(mut data) = spk_next { | ||
| agc_s.process(&mut data); | ||
| let arc = Arc::<[f32]>::from(data); | ||
| joiner.push_spk(arc); | ||
| process_ready_inline(&mut joiner, mode, sample_rate, &mut last_sent_mic, &mut last_sent_spk, &mut last_amp_emit, &app).await; | ||
| } else { | ||
| break; | ||
| } | ||
| } | ||
| }) | ||
| } | ||
| } | ||
| #[cfg(not(target_os = "macos"))] | ||
| { | ||
| tokio::spawn(async move {}) | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t abort capture when speaker taps aren’t available
SpeakerInput::new() regularly fails on platforms without loopback support (e.g. Linux), so returning early here leaves the mic path completely silent. Likewise, if the speaker stream ends, the else { break; } stops the loop even though the mic could keep flowing. Please fall back to silence (the joiner already handles it) instead of returning, and keep the mic loop alive when the speaker side drops out.
- let spk_stream = match SpeakerInput::new().and_then(|input| input.stream()) {
- Ok(stream) => stream.chunks(AEC_BLOCK_SIZE),
- Err(err) => {
- tracing::error!(error = ?err, "speaker_stream_init_failed");
- return;
- }
- };
+ let spk_stream = SpeakerInput::new()
+ .and_then(|input| input.stream())
+ .map(|stream| stream.chunks(AEC_BLOCK_SIZE))
+ .ok();
@@
- spk_next = spk_stream.next() => {
- if let Some(mut data) = spk_next {
+ spk_next = async { spk_stream.as_ref().and_then(|s| s.next().await) } => {
+ if let Some(mut data) = spk_next {
@@
- } else {
- break;
- }
+ }
}Committable suggestion skipped: line range outside the PR's diff.
| db: | ||
| env: | ||
| DB: /Users/yujonglee/Library/Application Support/com.hyprnote.nightly/db.sqlite | ||
| cmds: | ||
| - | | ||
| sqlite3 -json "$DB" 'SELECT store FROM main LIMIT 1;' | | ||
| jq -r '.[0].store' | | ||
| jless |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid hard-coding a user-specific DB path
DB points at /Users/yujonglee/..., which will fail for everyone else. Please parameterize this (e.g. pick it up from env or default to $HOME/Library/...) or document how to override it before landing.
🤖 Prompt for AI Agents
In Taskfile.yaml around lines 76 to 83 the DB value is hard-coded to a single
user path (/Users/yujonglee/...), which will break for other developers; change
it to read from an environment variable with a sensible default that uses $HOME
(e.g. DB from env or fallback to $HOME/Library/... ), update the task command to
reference that variable rather than a literal path, and add a short note in
README or Taskfile comments describing how to override DB via environment for
other users.
No description provided.