Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ indicatif = "0.17.12"
cfg_aliases = "0.2.1"

[features]
default = ["pipewire"]
Comment thread
mbernat marked this conversation as resolved.
pipewire = ["dep:pipewire", "dep:libspa", "dep:libspa-sys", "dep:zerocopy"]

[target.'cfg(any(target_os = "linux", target_os = "dragonfly", target_os = "freebsd", target_os = "netbsd"))'.dependencies]
Expand All @@ -33,7 +34,7 @@ libc = "0.2.174"
libspa = { version = "0.8.0", optional = true }
libspa-sys = { version = "0.8.0", optional = true }
nix = "0.30.1"
pipewire = { version = "0.8.0", optional = true }
pipewire = { version = "0.8.0", features = ["v0_3_49"], optional = true }

[target.'cfg(any(target_os = "macos", target_os = "ios"))'.dependencies]
coreaudio-rs = "0.13.0"
Expand Down
27 changes: 25 additions & 2 deletions examples/enumerate_pipewire.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,34 @@
use interflow::{prelude::pipewire::driver::PipewireDriver, AudioDevice, AudioDriver};

mod util;

type Result = std::result::Result<(), Box<dyn std::error::Error>>;

#[cfg(all(os_pipewire, feature = "pipewire"))]
fn main() -> Result<(), Box<dyn std::error::Error>> {
fn main() -> Result {
use crate::util::enumerate::enumerate_devices;
use interflow::backends::pipewire::driver::PipewireDriver;
env_logger::init();
enumerate_devices(PipewireDriver::new()?)?;
let driver = PipewireDriver::new()?;
enumerate_props(&driver)?;
enumerate_devices(driver)?;
Ok(())
}

fn enumerate_props(driver: &PipewireDriver) -> Result {
eprintln!("Props:");

for device in driver.list_devices()? {
let Some(props) = device.props()? else {
continue;
};

eprintln!("\t{:?}", device.device_type());
eprintln!("\t\tdescription: {}", props.description);
eprintln!("\t\tname: {}", props.name);
eprintln!("\t\tnick: {}", props.nick);
}

Ok(())
}

Expand Down
52 changes: 45 additions & 7 deletions src/backends/pipewire/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,29 @@ use std::rc::Rc;
pub struct PipewireDevice {
pub(super) target_node: Option<u32>,
pub device_type: DeviceType,
pub object_serial: Option<String>,
pub stream_name: Cow<'static, str>,
}

impl PipewireDevice {
pub fn props(&self) -> Result<Option<NodeProps>, PipewireError> {
let Some(node_id) = self.target_node else {
return Ok(None);
};

get_node_props(node_id)
}
}

impl AudioDevice for PipewireDevice {
type Error = PipewireError;

fn name(&self) -> Cow<str> {
let Some(node_id) = self.target_node else {
return Cow::Borrowed("Default");
};
match get_node_name(node_id) {
Ok(Some(name)) => Cow::Owned(name),
match get_node_props(node_id) {
Ok(Some(props)) => Cow::Owned(props.name),
Ok(None) => Cow::Borrowed("Unknown"),
Err(e) => {
log::error!("Failed to get device name: {}", e);
Expand Down Expand Up @@ -67,7 +78,12 @@ impl AudioInputDevice for PipewireDevice {
stream_config: StreamConfig,
callback: Callback,
) -> Result<Self::StreamHandle<Callback>, Self::Error> {
StreamHandle::new_input(&self.stream_name, stream_config, callback)
StreamHandle::new_input(
self.object_serial.clone(),
&self.stream_name,
stream_config,
callback,
)
}
}

Expand All @@ -88,7 +104,12 @@ impl AudioOutputDevice for PipewireDevice {
stream_config: StreamConfig,
callback: Callback,
) -> Result<Self::StreamHandle<Callback>, Self::Error> {
StreamHandle::new_output(&self.stream_name, stream_config, callback)
StreamHandle::new_output(
self.object_serial.clone(),
&self.stream_name,
stream_config,
callback,
)
}
}

Expand All @@ -98,7 +119,7 @@ impl PipewireDevice {
}
}

fn get_node_name(node_id: u32) -> Result<Option<String>, PipewireError> {
fn get_node_props(node_id: u32) -> Result<Option<NodeProps>, PipewireError> {
let mainloop = MainLoop::new(None)?;
let context = Context::new(&mainloop)?;
let core = context.connect(None)?;
Expand Down Expand Up @@ -134,8 +155,19 @@ fn get_node_name(node_id: u32) -> Result<Option<String>, PipewireError> {
move |global| {
if node_id == global.id {
if let Some(props) = global.props {
if let Some(name) = props.get("node.name") {
data.borrow_mut().replace(name.to_string());
let description = props.get("node.description");
let name = props.get("node.name");
let nick = props.get("node.nick");

if let (Some(description), Some(name), Some(nick)) =
(description, name, nick)
{
let props = NodeProps {
description: description.to_string(),
name: name.to_string(),
nick: nick.to_string(),
};
data.borrow_mut().replace(props);
}
}
}
Expand All @@ -150,3 +182,9 @@ fn get_node_name(node_id: u32) -> Result<Option<String>, PipewireError> {
drop(_listener_reg);
Ok(Rc::into_inner(data).unwrap().into_inner())
}

pub struct NodeProps {
pub description: String,
pub name: String,
pub nick: String,
}
4 changes: 3 additions & 1 deletion src/backends/pipewire/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ impl AudioDriver for PipewireDriver {
Ok(Some(PipewireDevice {
target_node: None,
device_type,
object_serial: None,
stream_name: Cow::Borrowed("Interflow stream"),
}))
}

fn list_devices(&self) -> Result<impl IntoIterator<Item = Self::Device>, Self::Error> {
Ok(utils::get_devices()?
.into_iter()
.map(|(id, device_type)| PipewireDevice {
.map(|(id, device_type, object_serial)| PipewireDevice {
target_node: Some(id),
device_type,
object_serial: Some(object_serial),
stream_name: Cow::Borrowed("Interflow stream"),
}))
}
Expand Down
120 changes: 75 additions & 45 deletions src/backends/pipewire/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
use libspa::buffer::Data;
use libspa::param::audio::{AudioFormat, AudioInfoRaw};
use libspa::pod::Pod;
use libspa::utils::Direction;
use libspa_sys::{SPA_PARAM_EnumFormat, SPA_TYPE_OBJECT_Format};
use pipewire::context::Context;
use pipewire::keys;
Expand Down Expand Up @@ -74,11 +75,9 @@ impl<Callback> StreamInner<Callback> {

impl<Callback: AudioOutputCallback> StreamInner<Callback> {
fn process_output(&mut self, channels: usize, frames: usize) -> usize {
let buffer = AudioMut::from_noninterleaved_mut(
&mut self.scratch_buffer[..channels * frames],
channels,
)
.unwrap();
let buffer =
AudioMut::from_interleaved_mut(&mut self.scratch_buffer[..channels * frames], channels)
.unwrap();
if let Some(callback) = self.callback.as_mut() {
let context = AudioCallbackContext {
stream_config: self.config,
Expand All @@ -101,7 +100,7 @@ impl<Callback: AudioOutputCallback> StreamInner<Callback> {
impl<Callback: AudioInputCallback> StreamInner<Callback> {
fn process_input(&mut self, channels: usize, frames: usize) -> usize {
let buffer =
AudioRef::from_noninterleaved(&self.scratch_buffer[..channels * frames], channels)
AudioRef::from_interleaved(&self.scratch_buffer[..channels * frames], channels)
.unwrap();
if let Some(callback) = self.callback.as_mut() {
let context = AudioCallbackContext {
Expand Down Expand Up @@ -143,6 +142,7 @@ impl<Callback> AudioStreamHandle<Callback> for StreamHandle<Callback> {

impl<Callback: 'static + Send> StreamHandle<Callback> {
fn create_stream(
device_object_serial: Option<String>,
name: String,
mut config: StreamConfig,
callback: Callback,
Expand All @@ -159,16 +159,25 @@ impl<Callback: 'static + Send> StreamHandle<Callback> {

let channels = config.channels.count();
let channels_str = channels.to_string();
let stream = Stream::new(
&core,
&name,
properties! {
*keys::MEDIA_TYPE => "Audio",
*keys::MEDIA_ROLE => "Music",
*keys::MEDIA_CATEGORY => get_category(direction),
*keys::AUDIO_CHANNELS => channels_str,
},
)?;

let mut props = properties! {
*keys::MEDIA_TYPE => "Audio",
*keys::MEDIA_ROLE => "Music",
*keys::MEDIA_CATEGORY => get_category(direction),
*keys::AUDIO_CHANNELS => channels_str,
};

if let Some(device_object_serial) = device_object_serial {
props.insert(*keys::TARGET_OBJECT, device_object_serial);
}

if let (Some(min), Some(max)) = config.buffer_size_range {
if min == max {
props.insert(*keys::NODE_FORCE_QUANTUM, min.to_string());
}
}

let stream = Stream::new(&core, &name, props)?;
config.samplerate = config.samplerate.round();
let _listener = stream
.add_local_listener_with_user_data(StreamInner {
Expand All @@ -186,26 +195,26 @@ impl<Callback: 'static + Send> StreamHandle<Callback> {
return;
}
if let Some(mut buffer) = stream.dequeue_buffer() {
let requested_frames = if direction == Direction::Output {
let requested_frames = buffer.requested() as usize;
if requested_frames == 0 {
log::warn!("0 frames were requested");
return;
}
requested_frames
} else {
0
};

let datas = buffer.datas_mut();
log::debug!("Datas: len={}", datas.len());
let Some(min_frames) = datas
.iter_mut()
.filter_map(|d| d.data().map(|d| d.len() / size_of::<f32>()))
.min()
else {

if datas.is_empty() {
log::warn!("No datas available");
return;
};
let frames = min_frames.min(MAX_FRAMES);

let frames = process_frames(datas, inner, channels, frames);

for data in datas.iter_mut() {
let chunk = data.chunk_mut();
*chunk.offset_mut() = 0;
*chunk.stride_mut() = size_of::<f32>() as _;
*chunk.size_mut() = (size_of::<f32>() * frames) as _;
}
process_frames(datas, inner, channels, requested_frames);
} else {
log::warn!("No buffer available");
}
Expand All @@ -218,7 +227,7 @@ impl<Callback: 'static + Send> StreamHandle<Callback> {
id: SPA_PARAM_EnumFormat,
properties: {
let mut info = AudioInfoRaw::new();
info.set_format(AudioFormat::F32P);
info.set_format(AudioFormat::F32LE);
info.set_rate(config.samplerate as u32);
info.set_channels(channels as u32);
info.into()
Expand Down Expand Up @@ -250,26 +259,36 @@ impl<Callback: 'static + Send> StreamHandle<Callback> {
impl<Callback: 'static + Send + AudioInputCallback> StreamHandle<Callback> {
/// Create an input Pipewire stream
pub fn new_input(
device_object_serial: Option<String>,
name: impl ToString,
config: StreamConfig,
callback: Callback,
) -> Result<Self, PipewireError> {
Self::create_stream(
device_object_serial,
name.to_string(),
config,
callback,
pipewire::spa::utils::Direction::Input,
|datas, inner, channels, frames| {
for (i, data) in datas.iter_mut().enumerate() {
if let Some(data) = data.data() {
let slice: &[f32] = zerocopy::FromBytes::ref_from_bytes(data)
|datas, inner, channels, _frames| {
// TODO: also take chunk offset into account to index into the data?
let mut frames_total = 0;

for (chunk, data) in datas.iter_mut().enumerate() {
let samples = data.chunk().size() as usize / size_of::<f32>() as usize;
if let Some(bytes) = data.data() {
let frames = samples / channels;
frames_total += frames;

let slice: &[f32] = zerocopy::FromBytes::ref_from_bytes(bytes)
.inspect_err(|e| log::error!("Cannot cast to f32 slice: {e}"))
.unwrap();
let target = &mut inner.scratch_buffer[i * frames..][..frames];
target.copy_from_slice(&slice[..frames]);
let target = &mut inner.scratch_buffer[chunk * samples..][..samples];
target.copy_from_slice(&slice[..samples]);
}
}
inner.process_input(channels, frames)

inner.process_input(channels, frames_total)
},
)
}
Expand All @@ -288,27 +307,38 @@ fn get_category(direction: pipewire::spa::utils::Direction) -> &'static str {
impl<Callback: 'static + Send + AudioOutputCallback> StreamHandle<Callback> {
/// Create an output Pipewire stream
pub fn new_output(
device_object_serial: Option<String>,
name: impl ToString,
config: StreamConfig,
callback: Callback,
) -> Result<Self, PipewireError> {
Self::create_stream(
device_object_serial,
name.to_string(),
config,
callback,
pipewire::spa::utils::Direction::Output,
|datas, inner, channels, frames| {
let frames = inner.process_output(channels, frames);
|datas, inner, channels, requested_frames| {
let provided_frames = inner.process_output(channels, requested_frames);
// TODO handle frames_total not being a multiple of datas.len()
let frames_per_data = provided_frames / datas.len();
let samples = frames_per_data * channels;

for (i, data) in datas.iter_mut().enumerate() {
let processed_slice = &inner.scratch_buffer[i * frames..][..frames];
if let Some(data) = data.data() {
let slice: &mut [f32] = zerocopy::FromBytes::mut_from_bytes(data)
let processed_slice = &inner.scratch_buffer[i * samples..][..samples];
if let Some(bytes) = data.data() {
let slice: &mut [f32] = zerocopy::FromBytes::mut_from_bytes(bytes)
.inspect_err(|e| log::error!("Cannot cast to f32 slice: {e}"))
.unwrap();
slice[..frames].copy_from_slice(processed_slice);
slice[..samples].copy_from_slice(processed_slice);
let chunk = data.chunk_mut();
*chunk.offset_mut() = 0;
*chunk.stride_mut() = size_of::<f32>() as _;
*chunk.size_mut() = (size_of::<f32>() * samples) as _;
}
}
frames

provided_frames
},
)
}
Expand Down
Loading