Skip to content

Commit

Permalink
LS: Calculate diagnostics on background
Browse files Browse the repository at this point in the history
  • Loading branch information
Draggu committed Oct 10, 2024
1 parent c70f3e4 commit 10d589d
Show file tree
Hide file tree
Showing 17 changed files with 798 additions and 323 deletions.
8 changes: 2 additions & 6 deletions crates/cairo-lang-language-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use tracing::{debug, error, warn};

use crate::lsp::capabilities::client::ClientCapabilitiesExt;
use crate::lsp::result::{LSPResult, LSPResultEx};
use crate::server::client::{Notifier, Requester};
use crate::server::client::Requester;
use crate::server::schedule::Task;
use crate::state::State;

// TODO(mkaput): Write a macro that will auto-generate this struct and the `reload` logic.
// TODO(mkaput): Write a test that checks that fields in this struct are sorted alphabetically.
Expand Down Expand Up @@ -48,7 +47,6 @@ impl Config {
&mut self,
requester: &mut Requester<'_>,
client_capabilities: &ClientCapabilities,
on_reloaded: fn(&mut State, &Notifier),
) -> LSPResult<()> {
if !client_capabilities.workspace_configuration_support() {
warn!(
Expand Down Expand Up @@ -80,7 +78,7 @@ impl Config {
// This conversion is O(1), and makes popping from front also O(1).
let mut response = VecDeque::from(response);

Task::local(move |state, notifier, _, _| {
Task::local(move |state, _, _, _| {
state.config.unmanaged_core_path = response
.pop_front()
.as_ref()
Expand All @@ -91,8 +89,6 @@ impl Config {
response.pop_front().as_ref().and_then(Value::as_bool).unwrap_or_default();

debug!("reloaded configuration: {:#?}", state.config);

on_reloaded(state, &notifier);
})
};

Expand Down
68 changes: 68 additions & 0 deletions crates/cairo-lang-language-server/src/lang/diagnostics/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use cairo_lang_defs::db::DefsGroup;
use cairo_lang_filesystem::ids::FileId;
use cairo_lang_lowering::db::LoweringGroup;
use cairo_lang_lowering::diagnostic::LoweringDiagnostic;
use cairo_lang_parser::db::ParserGroup;
use cairo_lang_semantic::db::SemanticGroup;
use cairo_lang_semantic::SemanticDiagnostic;
use lsp_types::Url;
use tracing::{error, trace_span};

use super::state::FileDiagnostics;
use crate::lang::db::AnalysisDatabase;
use crate::server::panic::catch_cancellation;

/// In LSP context [`Url`] is required and in [`salsa`], [`FileId`] but conversion is fallible as it
/// requires [`salsa`] query and snapshot can be already cancelled.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FileBothFormats {
pub id: FileId,
pub url: Url,
}

impl FileBothFormats {
pub fn new(id: FileId, url: Url) -> Self {
Self { id, url }
}

/// Refresh diagnostics for a single file.
pub fn refresh_diagnostics(&self, db: &AnalysisDatabase) -> FileDiagnostics {
let modules_ids = db.file_modules(self.id).unwrap_or_default();
let mut semantic_file_diagnostics: Vec<SemanticDiagnostic> = vec![];
let mut lowering_file_diagnostics: Vec<LoweringDiagnostic> = vec![];

macro_rules! diags {
($db:ident. $query:ident($file_id:expr), $f:expr) => {
trace_span!(stringify!($query)).in_scope(|| {
catch_cancellation(|| $db.$query($file_id))
.map($f)
.map_err(|err| {
err.resume_if_canceled();

error!("caught panic when computing diagnostics for file {}", self.url);
})
.unwrap_or_default()
})
};
}

for module_id in modules_ids.iter() {
semantic_file_diagnostics.extend(
diags!(db.module_semantic_diagnostics(*module_id), Result::unwrap_or_default)
.get_all(),
);
lowering_file_diagnostics.extend(
diags!(db.module_lowering_diagnostics(*module_id), Result::unwrap_or_default)
.get_all(),
);
}

let parser_file_diagnostics = diags!(db.file_syntax_diagnostics(self.id), |r| r);

FileDiagnostics::new(
parser_file_diagnostics,
semantic_file_diagnostics,
lowering_file_diagnostics,
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::collections::{HashMap, HashSet};

use cairo_lang_defs::db::DefsGroup;
use cairo_lang_filesystem::db::FilesGroup;
use cairo_lang_filesystem::ids::FileLongId;
use cairo_lang_utils::LookupIntern;
use lsp_types::Url;
use tracing::trace_span;

use super::file::FileBothFormats;
use super::state::FileDiagnostics;
use crate::lang::db::AnalysisDatabase;
use crate::lang::lsp::LsProtoGroup;

pub struct FilesQueue(Vec<FileBothFormats>);

impl IntoIterator for FilesQueue {
type IntoIter = std::vec::IntoIter<Self::Item>;
type Item = FileBothFormats;

fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}

impl FilesQueue {
pub fn new(db: &AnalysisDatabase, open_files: HashSet<Url>) -> Self {
let mut rest_of_files = HashSet::new();
let mut open_files = trace_span!("get_open_files_ids").in_scope(|| {
open_files
.into_iter()
.filter_map(|url| db.file_for_url(&url).map(|file| FileBothFormats::new(file, url)))
.collect::<Vec<_>>()
});

for crate_id in db.crates() {
for module_id in db.crate_modules(crate_id).iter() {
if let Ok(module_files) = db.module_files(*module_id) {
let unprocessed_files = module_files
.iter()
.copied()
.filter(|file_id| {
matches!(file_id.lookup_intern(db), FileLongId::OnDisk(_))
})
.map(|file| FileBothFormats::new(file, db.url_for_file(file)));

rest_of_files.extend(unprocessed_files);
}
}
}

// Remove open files from rest of files.
for file in &open_files {
rest_of_files.remove(file);
}

// Important: keep open files first so workers execute them at first too.
open_files.extend(rest_of_files);

Self(open_files)
}

pub fn urls(&self) -> impl Iterator<Item = Url> + '_ {
self.0.iter().map(|file| file.url.clone())
}

pub fn worker_files_partition(&self, worker: usize, jobs_number: usize) -> Self {
Self(
self.0
.iter()
.enumerate()
.filter(move |(i, _file)| i % jobs_number == worker)
.map(|(_, file)| file)
.cloned()
.collect(),
)
}

pub fn previous_generation_file_diagnostics(
&self,
diagnostics_for_file: &HashMap<Url, FileDiagnostics>,
) -> HashMap<Url, FileDiagnostics> {
self.0
.iter()
.filter_map(|file| {
diagnostics_for_file.get(&file.url).map(|diags| (file.url.clone(), diags.clone()))
})
.collect()
}
}
190 changes: 189 additions & 1 deletion crates/cairo-lang-language-server/src/lang/diagnostics/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,189 @@
pub mod lsp;
use std::ops::Add;

use crossbeam::channel::Sender;
use tracing::{error, warn};

use self::files_queue::FilesQueue;
use self::notifier::NotifierExt;
use self::slot::slot;
use self::state::{
DiagnosticsState, FileDiagnosticsChange, StateDiff, StateSnapshotForDiagnostics,
};
use crate::lang::db::AnalysisDatabase;
use crate::server::client::{Notifier, Responder};
use crate::server::panic::{catch_cancellation, UnwindErrorKind};
use crate::server::schedule::thread;
use crate::server::schedule::thread::ThreadPriority;
use crate::state::State;

mod file;
mod files_queue;
mod lsp;
mod notifier;
mod slot;
mod state;

pub struct Diagnostics {
state: DiagnosticsState,
notifier: Notifier,
thread_pool: thread::SharedPool,
}

impl Diagnostics {
pub fn tasks(
thread_pool: thread::SharedPool,
) -> (impl Fn(&mut State, Notifier), impl FnOnce(Notifier, Responder)) {
let (slot_reader, slot_writer) = slot(None);
let jobs_number = Self::jobs_number(&thread_pool);

let diagnostics_main_job = move |notifier, _responder| {
let mut diagnostics = Self::new(notifier, thread_pool);

while let Some(message) = slot_reader.pop() {
diagnostics.refresh(message);
}
};

let diagnostics_post_hook = move |state: &mut State, _notifier| {
let message = StateSnapshotForDiagnostics::from_state(state, jobs_number);

// TODO check if server is closing and send None to allow thread pool to be dropped.

slot_writer.set(Some(message));
};

(diagnostics_post_hook, diagnostics_main_job)
}

fn new(notifier: Notifier, thread_pool: thread::SharedPool) -> Self {
Self { state: Default::default(), thread_pool, notifier }
}

fn jobs_number(thread_pool: &thread::SharedPool) -> usize {
let size = thread_pool.size();

(size / 2).max(size - 2)
}

/// Refresh diagnostics and send diffs to the client.
#[tracing::instrument(level = "debug", skip_all)]
fn refresh(&mut self, message: StateSnapshotForDiagnostics) {
let Ok(files) = catch_cancellation(|| {
FilesQueue::new(message.db_snapshots.db_ref(), message.open_files.owned())
}) else {
// [`salsa`] failure while preparing state for workers.
// Probably very fast cancellation, skip further work as it will fail anyway.
return;
};

let jobs_number = Self::jobs_number(&self.thread_pool);
let (sender, receiver) = crossbeam::channel::bounded(jobs_number);

for (worker, db) in message.db_snapshots.into_iter().enumerate() {
let files = files.worker_files_partition(worker, jobs_number);

self.spawn_worker_tasks(files, db, message.trace_macro_diagnostics, sender.clone());
}

// For some reason rx is not disconnected after all threads completed.
let state_diff = receiver.into_iter().take(jobs_number).reduce(Add::add).unwrap();

// All db snapshots should be dropped at this point.

self.apply_state_diff(state_diff);
}

fn spawn_worker_tasks(
&mut self,
files: FilesQueue,
db: salsa::Snapshot<AnalysisDatabase>,
trace_macro_diagnostics: bool,
sender: Sender<StateDiff>,
) {
let notifier = self.notifier.clone();
let file_diagnostics =
files.previous_generation_file_diagnostics(&self.state.diagnostics_for_file);

self.thread_pool.spawn(ThreadPriority::Worker, move || {
let mut diff = StateDiff::new(files.urls());

for file in files {
// Anything using salsa should be done in catch.
let result = catch_cancellation(|| {
let new_file_diagnostics = file.refresh_diagnostics(&db);

if new_file_diagnostics.is_empty() {
diff.update_for(&file.url, FileDiagnosticsChange::Unset);

notifier.clear_diagnostics(file.url);
} else if file_diagnostics.get(&file.url) == Some(&new_file_diagnostics) {
diff.update_for(&file.url, FileDiagnosticsChange::Unchanged);

// No need to send same diagnostics twice.
} else {
notifier.publish_diagnostics(
file.url.clone(),
new_file_diagnostics.to_lsp(&db, trace_macro_diagnostics),
);

diff.update_for(
&file.url,
FileDiagnosticsChange::Replaced(new_file_diagnostics),
);
}
});

if let Err(err) = result {
diff.calculating_all_failed();

match err {
UnwindErrorKind::Canceled(_) => {
// Any further iteration will fail to this branch anyway.
// So no need to execute them.
break;
}
UnwindErrorKind::Other => {
error!("caught error while calculating diagnostics");
}
}
}
}

sender.send(diff).unwrap();
});
}

#[tracing::instrument(level = "trace", skip_all)]
fn apply_state_diff(&mut self, mut state_diff: StateDiff) {
self.state.diagnostics_for_file.retain(|url, old_diags| {
match state_diff.remove(url) {
Some(FileDiagnosticsChange::Replaced(diags)) => {
*old_diags = diags;

true
}
Some(FileDiagnosticsChange::Unchanged) => true,
Some(FileDiagnosticsChange::Unset) => false,
None => {
// Unset diagnostics for files that are no longer relevant only if we calculated
// all diagnostics (ie. no cancellation during this work
// happened) to make UX better.
if state_diff.calculated_all() {
self.notifier.clear_diagnostics(url.clone());
}

!state_diff.calculated_all()
}
}
});

// Files that were not previously tracked.
for (file, diags) in state_diff {
let FileDiagnosticsChange::Replaced(diags) = diags else {
continue;
};

self.state.diagnostics_for_file.insert(file, diags);
}
}
}
Loading

0 comments on commit 10d589d

Please sign in to comment.