From 18486a02fae5966e61f16ea7bc5c33c6c7c69487 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Mon, 13 Aug 2018 13:46:05 +0300 Subject: indexing infra --- crates/server/Cargo.toml | 1 + crates/server/src/main.rs | 26 +++++++++++-- crates/server/src/main_loop/mod.rs | 61 ++++++++++++++++++++++++----- crates/server/src/vfs.rs | 79 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 155 insertions(+), 12 deletions(-) create mode 100644 crates/server/src/vfs.rs (limited to 'crates/server') diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 8e077ecf0..0ad193b8a 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -15,6 +15,7 @@ flexi_logger = "0.9.1" log = "0.4.3" url_serde = "0.2.0" languageserver-types = "0.49.0" +walkdir = "2.2.0" text_unit = { version = "0.1.2", features = ["serde"] } libsyntax2 = { path = "../libsyntax2" } diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs index c2952465e..8dca32183 100644 --- a/crates/server/src/main.rs +++ b/crates/server/src/main.rs @@ -13,6 +13,7 @@ extern crate threadpool; extern crate log; extern crate url_serde; extern crate flexi_logger; +extern crate walkdir; extern crate libeditor; extern crate libanalysis; extern crate libsyntax2; @@ -24,6 +25,9 @@ mod dispatch; mod util; mod conv; mod main_loop; +mod vfs; + +use std::path::PathBuf; use threadpool::ThreadPool; use crossbeam_channel::bounded; @@ -114,13 +118,29 @@ fn initialized(io: &mut Io) -> Result<()> { { let mut world = WorldState::new(); let mut pool = ThreadPool::new(4); - let (sender, receiver) = bounded::(16); + let (task_sender, task_receiver) = bounded::(16); + let (fs_events_receiver, watcher) = vfs::watch(vec![ + PathBuf::from("./") + ]); info!("lifecycle: handshake finished, server ready to serve requests"); - let res = main_loop::main_loop(io, &mut world, &mut pool, sender, receiver.clone()); + let res = main_loop::main_loop( + io, + &mut world, + &mut pool, + task_sender, + task_receiver.clone(), + fs_events_receiver, + ); + info!("waiting for background jobs to finish..."); - receiver.for_each(drop); + task_receiver.for_each(drop); pool.join(); info!("...background jobs have finished"); + + info!("waiting for file watcher to finish..."); + watcher.stop()?; + info!("...file watcher has finished"); + res }?; diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs index 5b7093ad7..f954e632c 100644 --- a/crates/server/src/main_loop/mod.rs +++ b/crates/server/src/main_loop/mod.rs @@ -1,6 +1,9 @@ mod handlers; -use std::collections::HashSet; +use std::{ + path::PathBuf, + collections::{HashSet, HashMap}, +}; use threadpool::ThreadPool; use crossbeam_channel::{Sender, Receiver}; @@ -13,6 +16,7 @@ use { Task, Result, io::{Io, RawMsg, RawRequest, RawNotification}, util::FilePath, + vfs::{FileEvent, FileEventKind}, main_loop::handlers::{ handle_syntax_tree, handle_extend_selection, @@ -28,24 +32,33 @@ pub(super) fn main_loop( io: &mut Io, world: &mut WorldState, pool: &mut ThreadPool, - sender: Sender, - receiver: Receiver, + task_sender: Sender, + task_receiver: Receiver, + fs_events_receiver: Receiver>, ) -> Result<()> { info!("server initialized, serving requests"); let mut next_request_id = 0; let mut pending_requests: HashSet = HashSet::new(); + let mut mem_map: HashMap> = HashMap::new(); + let mut fs_events_receiver = Some(&fs_events_receiver); loop { enum Event { Msg(RawMsg), Task(Task), + Fs(Vec), ReceiverDead, + FsWatcherDead, } let event = select! { recv(io.receiver(), msg) => match msg { Some(msg) => Event::Msg(msg), None => Event::ReceiverDead, }, - recv(receiver, task) => Event::Task(task.unwrap()), + recv(task_receiver, task) => Event::Task(task.unwrap()), + recv(fs_events_receiver, events) => match events { + Some(events) => Event::Fs(events), + None => Event::FsWatcherDead, + } }; match event { @@ -53,6 +66,9 @@ pub(super) fn main_loop( io.cleanup_receiver()?; unreachable!(); } + Event::FsWatcherDead => { + fs_events_receiver = None; + } Event::Task(task) => { match task { Task::Request(mut request) => { @@ -70,15 +86,36 @@ pub(super) fn main_loop( } continue; } + Event::Fs(events) => { + trace!("fs change, {} events", events.len()); + let changes = events.into_iter() + .map(|event| { + let text = match event.kind { + FileEventKind::Add(text) => Some(text), + FileEventKind::Remove => None, + }; + (event.path, text) + }) + .filter_map(|(path, text)| { + if mem_map.contains_key(path.as_path()) { + mem_map.insert(path, text); + None + } else { + Some((path, text)) + } + }); + + world.change_files(changes); + } Event::Msg(msg) => { match msg { RawMsg::Request(req) => { - if !on_request(io, world, pool, &sender, req)? { + if !on_request(io, world, pool, &task_sender, req)? { return Ok(()); } } RawMsg::Notification(not) => { - on_notification(io, world, pool, &sender, not)? + on_notification(io, world, pool, &task_sender, not, &mut mem_map)? } RawMsg::Response(resp) => { if !pending_requests.remove(&resp.id) { @@ -160,11 +197,13 @@ fn on_notification( pool: &ThreadPool, sender: &Sender, not: RawNotification, + mem_map: &mut HashMap>, ) -> Result<()> { let mut not = Some(not); dispatch::handle_notification::(&mut not, |params| { let path = params.text_document.file_path()?; - world.change_overlay(path, Some(params.text_document.text)); + mem_map.insert(path.clone(), None); + world.change_file(path, Some(params.text_document.text)); update_file_notifications_on_threadpool( pool, world.snapshot(), sender.clone(), params.text_document.uri, ); @@ -175,7 +214,7 @@ fn on_notification( let text = params.content_changes.pop() .ok_or_else(|| format_err!("empty changes"))? .text; - world.change_overlay(path, Some(text)); + world.change_file(path, Some(text)); update_file_notifications_on_threadpool( pool, world.snapshot(), sender.clone(), params.text_document.uri, ); @@ -183,7 +222,11 @@ fn on_notification( })?; dispatch::handle_notification::(&mut not, |params| { let path = params.text_document.file_path()?; - world.change_overlay(path, None); + let text = match mem_map.remove(&path) { + Some(text) => text, + None => bail!("unmatched close notification"), + }; + world.change_file(path, text); let not = req::PublishDiagnosticsParams { uri: params.text_document.uri, diagnostics: Vec::new(), diff --git a/crates/server/src/vfs.rs b/crates/server/src/vfs.rs new file mode 100644 index 000000000..a5c367494 --- /dev/null +++ b/crates/server/src/vfs.rs @@ -0,0 +1,79 @@ +use std::{ + path::PathBuf, + thread, + fs, +}; + +use crossbeam_channel::{Sender, Receiver, bounded}; +use drop_bomb::DropBomb; +use walkdir::WalkDir; + +use Result; + + +pub struct FileEvent { + pub path: PathBuf, + pub kind: FileEventKind, +} + +pub enum FileEventKind { + Add(String), + #[allow(unused)] + Remove, +} + +pub struct Watcher { + thread: thread::JoinHandle<()>, + bomb: DropBomb, +} + +impl Watcher { + pub fn stop(mut self) -> Result<()> { + self.bomb.defuse(); + self.thread.join() + .map_err(|_| format_err!("file watcher died")) + } +} + +pub fn watch(roots: Vec) -> (Receiver>, Watcher) { + let (sender, receiver) = bounded(16); + let thread = thread::spawn(move || run(roots, sender)); + (receiver, Watcher { + thread, + bomb: DropBomb::new("Watcher should be stopped explicitly"), + }) +} + +fn run(roots: Vec, sender: Sender>) { + for root in roots { + let mut events = Vec::new(); + for entry in WalkDir::new(root.as_path()) { + let entry = match entry { + Ok(entry) => entry, + Err(e) => { + warn!("watcher error: {}", e); + continue; + } + }; + if !entry.file_type().is_file() { + continue; + } + let path = entry.path(); + if path.extension().and_then(|os| os.to_str()) != Some("rs") { + continue; + } + let text = match fs::read_to_string(path) { + Ok(text) => text, + Err(e) => { + warn!("watcher error: {}", e); + continue; + } + }; + events.push(FileEvent { + path: path.to_owned(), + kind: FileEventKind::Add(text), + }) + } + sender.send(events) + } +} -- cgit v1.2.3