Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 51 additions & 13 deletions pylight/src/index/symbol_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,20 @@ impl SymbolIndex {
Ok(())
}

/// Parse and index a list of Python files in parallel
/// Parse and index a list of Python files in parallel with optional progress tracking
/// Returns (number of files parsed, total symbols, elapsed time)
pub fn parse_and_index_files(
pub fn parse_and_index_files_with_progress<F>(
self: Arc<Self>,
python_files: Vec<PathBuf>,
) -> Result<(usize, usize, std::time::Duration)> {
progress_callback: F,
) -> Result<(usize, usize, std::time::Duration)>
where
F: Fn(usize, usize) + Send + Sync,
{
let start_time = std::time::Instant::now();
let total_files = python_files.len();
let processed_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let progress_callback = Arc::new(progress_callback);

// Process files in parallel and collect all results
let all_file_symbols: Vec<(PathBuf, Vec<Symbol>)> = python_files
Expand All @@ -256,7 +263,7 @@ impl SymbolIndex {
};

// Read and parse the file
match std::fs::read_to_string(path) {
let result = match std::fs::read_to_string(path) {
Ok(content) => match parser.parse_file(path, &content) {
Ok(symbols) => {
tracing::debug!(
Expand All @@ -273,16 +280,18 @@ impl SymbolIndex {
}
},
Err(e) => {
// Only warn for errors other than "file not found" since that's expected
// during rapid file system changes (e.g., git operations)
if e.kind() != std::io::ErrorKind::NotFound {
tracing::warn!("Failed to read {}: {}", path.display(), e);
} else {
if e.kind() == std::io::ErrorKind::NotFound {
tracing::debug!("File no longer exists: {}", path.display());
}
None
}
}
};

// Update progress counter and call callback
let current = processed_count.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
progress_callback(current, total_files);

result
})
.collect();

Expand All @@ -301,29 +310,58 @@ impl SymbolIndex {
Ok((file_count, total_symbols, elapsed))
}

/// Parse and index a list of Python files in parallel
/// Returns (number of files parsed, total symbols, elapsed time)
pub fn parse_and_index_files(
self: Arc<Self>,
python_files: Vec<PathBuf>,
) -> Result<(usize, usize, std::time::Duration)> {
// Call the progress version with a no-op callback
self.parse_and_index_files_with_progress(python_files, |_, _| {})
}

/// Index all Python files in a workspace directory
pub fn index_workspace(self: Arc<Self>, root: &PathBuf) -> Result<()> {
// Call the progress version with a no-op callback
self.index_workspace_with_progress(root, |_, _| {})
}

/// Index all Python files in a workspace directory with progress callback
pub fn index_workspace_with_progress<F>(
self: Arc<Self>,
root: &PathBuf,
progress_callback: F,
) -> Result<()>
where
F: Fn(usize, usize) + Send + Sync,
{
tracing::info!("Starting workspace indexing for: {}", root.display());

// Collect all Python files first
let file_collection_start = std::time::Instant::now();
let python_files = files::collect_python_files(root);
let file_collection_elapsed = file_collection_start.elapsed();
let total_files = python_files.len();

tracing::info!(
"Found {} Python files to index in {:.2}s (using {} threads)",
python_files.len(),
total_files,
file_collection_elapsed.as_secs_f64(),
num_cpus::get().saturating_sub(1).max(1)
);

// Call progress callback for file discovery phase
progress_callback(0, total_files);

// Log thread pool info
tracing::info!(
"Starting parallel processing with {} concurrent tasks",
rayon::current_num_threads()
);

// Parse and index files
let (file_count, total_symbols, elapsed) = self.parse_and_index_files(python_files)?;
// Parse and index files with progress tracking
let (file_count, total_symbols, elapsed) =
self.parse_and_index_files_with_progress(python_files, progress_callback)?;

tracing::info!(
"Parallel parsing completed in {:.2}s ({:.0} files/sec)",
Expand Down
1 change: 1 addition & 0 deletions pylight/src/lsp/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! LSP server implementation

pub mod handlers;
pub mod progress;
pub mod server;

pub use server::LspServer;
187 changes: 187 additions & 0 deletions pylight/src/lsp/progress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
//! Progress reporting utilities for LSP

use lsp_server::{Connection, Message, Notification};
use lsp_types::{
notification::{Notification as NotificationTrait, Progress},
request::{Request as LspRequest, WorkDoneProgressCreate},
ProgressParams, ProgressParamsValue, ProgressToken, WorkDoneProgress, WorkDoneProgressBegin,
WorkDoneProgressCreateParams, WorkDoneProgressEnd, WorkDoneProgressReport,
};
use std::sync::atomic::{AtomicU32, Ordering};

/// Unique token generator for progress reporting
static PROGRESS_TOKEN_COUNTER: AtomicU32 = AtomicU32::new(0);

/// Helper for managing LSP progress reporting
pub struct ProgressReporter {
connection: Connection,
token: ProgressToken,
}

impl ProgressReporter {
/// Create a new progress reporter
pub fn new(connection: Connection) -> Result<Self, Box<dyn std::error::Error>> {
let token_id = PROGRESS_TOKEN_COUNTER.fetch_add(1, Ordering::SeqCst);
let token = ProgressToken::Number(token_id as i32);

// Request permission to create progress
let create_params = WorkDoneProgressCreateParams {
token: token.clone(),
};

let request = lsp_server::Request {
id: lsp_server::RequestId::from(token_id as i32),
method: WorkDoneProgressCreate::METHOD.to_string(),
params: serde_json::to_value(create_params)?,
};

connection.sender.send(Message::Request(request))?;

// Wait for response (with timeout)
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
loop {
if std::time::Instant::now() > deadline {
return Err("Timeout waiting for progress create response".into());
}

if let Ok(Message::Response(resp)) = connection
.receiver
.recv_timeout(std::time::Duration::from_millis(100))
{
if resp.id == lsp_server::RequestId::from(token_id as i32) {
if resp.error.is_some() {
return Err("Client rejected progress creation".into());
}
break;
}
}
}

Ok(Self { connection, token })
}

/// Begin progress reporting
pub fn begin(
&self,
title: impl Into<String>,
message: Option<String>,
percentage: Option<u32>,
cancellable: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let begin = WorkDoneProgressBegin {
title: title.into(),
cancellable: Some(cancellable),
message,
percentage,
};

self.send_progress(WorkDoneProgress::Begin(begin))
}

/// Report progress update
pub fn report(
&self,
message: Option<String>,
percentage: Option<u32>,
) -> Result<(), Box<dyn std::error::Error>> {
let report = WorkDoneProgressReport {
cancellable: None,
message,
percentage,
};

self.send_progress(WorkDoneProgress::Report(report))
}

/// End progress reporting
pub fn end(&self, message: Option<String>) -> Result<(), Box<dyn std::error::Error>> {
let end = WorkDoneProgressEnd { message };
self.send_progress(WorkDoneProgress::End(end))
}

/// Send a progress notification
fn send_progress(&self, progress: WorkDoneProgress) -> Result<(), Box<dyn std::error::Error>> {
let params = ProgressParams {
token: self.token.clone(),
value: ProgressParamsValue::WorkDone(progress),
};

let notification = Notification {
method: <Progress as NotificationTrait>::METHOD.to_string(),
params: serde_json::to_value(params)?,
};

self.connection
.sender
.send(Message::Notification(notification))?;

Ok(())
}
}

/// Simple progress reporter that sends notifications without waiting for client permission
pub struct SimpleProgressReporter {
sender: crossbeam_channel::Sender<Message>,
token: ProgressToken,
}

impl SimpleProgressReporter {
/// Create a new simple progress reporter
pub fn new(sender: crossbeam_channel::Sender<Message>, token: ProgressToken) -> Self {
Self { sender, token }
}

/// Begin progress reporting
pub fn begin(
&self,
title: impl Into<String>,
message: Option<String>,
percentage: Option<u32>,
) -> Result<(), Box<dyn std::error::Error>> {
let begin = WorkDoneProgressBegin {
title: title.into(),
cancellable: Some(false),
message,
percentage,
};

self.send_progress(WorkDoneProgress::Begin(begin))
}

/// Report progress update
pub fn report(
&self,
message: Option<String>,
percentage: Option<u32>,
) -> Result<(), Box<dyn std::error::Error>> {
let report = WorkDoneProgressReport {
cancellable: None,
message,
percentage,
};

self.send_progress(WorkDoneProgress::Report(report))
}

/// End progress reporting
pub fn end(&self, message: Option<String>) -> Result<(), Box<dyn std::error::Error>> {
let end = WorkDoneProgressEnd { message };
self.send_progress(WorkDoneProgress::End(end))
}

/// Send a progress notification
fn send_progress(&self, progress: WorkDoneProgress) -> Result<(), Box<dyn std::error::Error>> {
let params = ProgressParams {
token: self.token.clone(),
value: ProgressParamsValue::WorkDone(progress),
};

let notification = Notification {
method: <Progress as NotificationTrait>::METHOD.to_string(),
params: serde_json::to_value(params)?,
};

self.sender.send(Message::Notification(notification))?;
Ok(())
}
}
Loading
Loading