From c40795b7c798380920db63ec4500b2d9d8a34192 Mon Sep 17 00:00:00 2001 From: Timos Ampelikiotis Date: Thu, 10 Oct 2024 11:42:55 +0300 Subject: [PATCH 1/3] Update vhost-device-console backend Eliminate the use of select and 'nix' package. This is done by registering the input events (stdin or tcplistener) onto the main worker's epoll. Signed-off-by: Timos Ampelikiotis --- staging/vhost-device-console/Cargo.toml | 1 - staging/vhost-device-console/src/backend.rs | 39 +- staging/vhost-device-console/src/console.rs | 2 - .../vhost-device-console/src/vhu_console.rs | 456 ++++++++++-------- 4 files changed, 264 insertions(+), 234 deletions(-) diff --git a/staging/vhost-device-console/Cargo.toml b/staging/vhost-device-console/Cargo.toml index f35c4360..76a373b8 100644 --- a/staging/vhost-device-console/Cargo.toml +++ b/staging/vhost-device-console/Cargo.toml @@ -17,7 +17,6 @@ xen = ["vm-memory/xen", "vhost/xen", "vhost-user-backend/xen"] [dependencies] console = "0.15.7" crossterm = "0.27.0" -nix = "0.26.4" queues = "1.0.2" clap = { version = "4.5", features = ["derive"] } env_logger = "0.10" diff --git a/staging/vhost-device-console/src/backend.rs b/staging/vhost-device-console/src/backend.rs index 62d6745c..61f03a19 100644 --- a/staging/vhost-device-console/src/backend.rs +++ b/staging/vhost-device-console/src/backend.rs @@ -5,7 +5,7 @@ // // SPDX-License-Identifier: Apache-2.0 or BSD-3-Clause -use log::{error, info, warn}; +use log::{error, info}; use std::any::Any; use std::collections::HashMap; use std::path::PathBuf; @@ -28,6 +28,8 @@ pub(crate) enum Error { SocketCountInvalid(usize), #[error("Could not create console backend: {0}")] CouldNotCreateBackend(crate::vhu_console::Error), + #[error("Could not create console backend: {0}")] + CouldNotInitBackend(crate::vhu_console::Error), #[error("Could not create daemon: {0}")] CouldNotCreateDaemon(vhost_user_backend::Error), #[error("Fatal error: {0}")] @@ -92,6 +94,12 @@ pub(crate) fn start_backend_server( VhostUserConsoleBackend::new(arc_controller).map_err(Error::CouldNotCreateBackend)?, )); + vu_console_backend + .write() + .unwrap() + .assign_input_method(tcp_addr.clone()) + .map_err(Error::CouldNotInitBackend)?; + let mut daemon = VhostUserDaemon::new( String::from("vhost-device-console-backend"), vu_console_backend.clone(), @@ -102,26 +110,15 @@ pub(crate) fn start_backend_server( let vring_workers = daemon.get_epoll_handlers(); vu_console_backend .read() - .unwrap() - .set_vring_worker(&vring_workers[0]); - - // Start the corresponding console thread - let read_handle = if backend == BackendType::Nested { - VhostUserConsoleBackend::start_console_thread(&vu_console_backend) - } else { - VhostUserConsoleBackend::start_tcp_console_thread(&vu_console_backend, tcp_addr.clone()) - }; - - daemon.serve(&socket).map_err(Error::ServeFailed)?; - - // Kill console input thread - vu_console_backend.read().unwrap().kill_console_thread(); - - // Wait for read thread to exit - match read_handle.join() { - Ok(_) => info!("The read thread returned successfully"), - Err(e) => warn!("The read thread returned the error: {:?}", e), - } + .expect("Cannot open as write\n") + .set_vring_worker(vring_workers[0].clone()); + + daemon.serve(&socket).map_err(|e| { + // Even if daemon stops unexpectedly, the backend should + // be terminated properly (disable raw mode). + vu_console_backend.read().unwrap().prepare_exit(); + Error::ServeFailed(e) + })?; } } diff --git a/staging/vhost-device-console/src/console.rs b/staging/vhost-device-console/src/console.rs index 079fe0cb..b02b5d04 100644 --- a/staging/vhost-device-console/src/console.rs +++ b/staging/vhost-device-console/src/console.rs @@ -20,7 +20,6 @@ pub enum BackendType { pub(crate) struct ConsoleController { config: VirtioConsoleConfig, pub backend: BackendType, - pub exit: bool, } impl ConsoleController { @@ -33,7 +32,6 @@ impl ConsoleController { emerg_wr: 64.into(), }, backend, - exit: false, } } diff --git a/staging/vhost-device-console/src/vhu_console.rs b/staging/vhost-device-console/src/vhu_console.rs index d5cca74a..f33e2d19 100644 --- a/staging/vhost-device-console/src/vhu_console.rs +++ b/staging/vhost-device-console/src/vhu_console.rs @@ -11,22 +11,20 @@ use crate::virtio_console::{ VIRTIO_CONSOLE_F_MULTIPORT, VIRTIO_CONSOLE_PORT_ADD, VIRTIO_CONSOLE_PORT_NAME, VIRTIO_CONSOLE_PORT_OPEN, VIRTIO_CONSOLE_PORT_READY, }; -use console::Key; use crossterm::terminal::{disable_raw_mode, enable_raw_mode}; -use log::{error, trace}; -use nix::sys::select::{select, FdSet}; -use std::os::fd::AsRawFd; +use log::{error, trace, warn}; +use queues::{IsQueue, Queue}; +use std::net::TcpListener; +use std::os::fd::{AsRawFd, RawFd}; use std::slice::from_raw_parts; use std::sync::{Arc, RwLock}; -use std::thread::JoinHandle; use std::{ convert, - io::{self, Result as IoResult}, + io::{self, Read, Result as IoResult, Write}, }; use thiserror::Error as ThisError; use vhost::vhost_user::message::{VhostUserProtocolFeatures, VhostUserVirtioFeatures}; -use vhost_user_backend::VringEpollHandler; -use vhost_user_backend::{VhostUserBackendMut, VringRwLock, VringT}; +use vhost_user_backend::{VhostUserBackendMut, VringEpollHandler, VringRwLock, VringT}; use virtio_bindings::bindings::virtio_config::{VIRTIO_F_NOTIFY_ON_EMPTY, VIRTIO_F_VERSION_1}; use virtio_bindings::bindings::virtio_ring::{ VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC, @@ -38,13 +36,6 @@ use vm_memory::{ use vmm_sys_util::epoll::EventSet; use vmm_sys_util::eventfd::{EventFd, EFD_NONBLOCK}; -use console::Term; -use queues::{IsQueue, Queue}; -use std::io::Read; -use std::io::Write; -use std::net::TcpListener; -use std::thread::spawn; - /// Virtio configuration const QUEUE_SIZE: usize = 128; const NUM_QUEUES: usize = 4; @@ -63,6 +54,9 @@ const CTRL_TX_QUEUE: u16 = 3; /// needs to write to the RX control queue. const BACKEND_RX_EFD: u16 = (NUM_QUEUES + 1) as u16; const BACKEND_CTRL_RX_EFD: u16 = (NUM_QUEUES + 2) as u16; +const KEY_EFD: u16 = (NUM_QUEUES + 3) as u16; +const LISTENER_EFD: u16 = (NUM_QUEUES + 4) as u16; +const EXIT_EFD: u16 = (NUM_QUEUES + 5) as u16; /// Port name - Need to be updated when MULTIPORT feature /// is supported for more than one devices. @@ -88,6 +82,12 @@ pub(crate) enum Error { EventFdFailed, #[error("Failed to add control message in the internal queue")] RxCtrlQueueAddFailed, + #[error("Error adding epoll")] + EpollAdd, + #[error("Error removing epoll")] + EpollRemove, + #[error("Error creating epoll")] + EpollFdCreate, } impl convert::From for io::Error { @@ -96,6 +96,10 @@ impl convert::From for io::Error { } } +// Define a new trait that combines Read and Write +pub trait ReadWrite: Read + Write {} +impl ReadWrite for T {} + // SAFETY: The layout of the structure is fixed and can be initialized by // reading its content from byte array. unsafe impl ByteValued for VirtioConsoleControl {} @@ -105,10 +109,15 @@ pub(crate) struct VhostUserConsoleBackend { acked_features: u64, event_idx: bool, rx_ctrl_fifo: Queue, - rx_data_fifo: Queue, + rx_data_fifo: Queue, + epoll_fd: i32, + stream_fd: Option, pub(crate) ready: bool, pub(crate) ready_to_write: bool, pub(crate) output_queue: Queue, + pub(crate) stdin: Option>, + pub(crate) listener: Option, + pub(crate) stream: Option>, pub(crate) rx_event: EventFd, pub(crate) rx_ctrl_event: EventFd, pub(crate) exit_event: EventFd, @@ -120,14 +129,19 @@ type ConsoleDescriptorChain = DescriptorChain>) -> Result { Ok(VhostUserConsoleBackend { - controller, + controller: controller.clone(), event_idx: false, rx_ctrl_fifo: Queue::new(), rx_data_fifo: Queue::new(), + epoll_fd: epoll::create(false).map_err(|_| Error::EpollFdCreate)?, + stream_fd: None, acked_features: 0x0, ready: false, ready_to_write: false, output_queue: Queue::new(), + stdin: None, + stream: None, + listener: None, rx_event: EventFd::new(EFD_NONBLOCK).map_err(|_| Error::EventFdFailed)?, rx_ctrl_event: EventFd::new(EFD_NONBLOCK).map_err(|_| Error::EventFdFailed)?, exit_event: EventFd::new(EFD_NONBLOCK).map_err(|_| Error::EventFdFailed)?, @@ -135,6 +149,24 @@ impl VhostUserConsoleBackend { }) } + pub fn assign_input_method(&mut self, tcpaddr_str: String) -> Result<()> { + if self.controller.read().unwrap().backend == BackendType::Nested { + // Enable raw mode for local terminal if backend is nested + enable_raw_mode().expect("Raw mode error"); + + let stdin_fd = io::stdin().as_raw_fd(); + let stdin: Box = Box::new(io::stdin()); + self.stdin = Some(stdin); + + Self::epoll_register(self.epoll_fd.as_raw_fd(), stdin_fd, epoll::Events::EPOLLIN) + .map_err(|_| Error::EpollAdd)?; + } else { + let listener = TcpListener::bind(tcpaddr_str.clone()).expect("asdasd"); + self.listener = Some(listener); + } + Ok(()) + } + fn print_console_frame(&self, control_msg: VirtioConsoleControl) { trace!("id 0x{:x}", control_msg.id.to_native()); trace!("event 0x{:x}", control_msg.event.to_native()); @@ -157,22 +189,26 @@ impl VhostUserConsoleBackend { .writer(&atomic_mem) .map_err(|_| Error::DescriptorWriteFailed)?; - let response: String = match self.rx_data_fifo.remove() { - Ok(item) => item, - _ => { - return Ok(false); - } - }; + let avail_data_len = writer.available_bytes(); + let queue_len = self.rx_data_fifo.size(); + let min_limit = std::cmp::min(queue_len, avail_data_len); + + for _i in 0..min_limit { + let response: u8 = match self.rx_data_fifo.remove() { + Ok(item) => item, + _ => { + return Ok(true); + } + }; - for b in response.bytes() { writer - .write_obj::(b) + .write_obj::(response) .map_err(|_| Error::DescriptorWriteFailed)?; - } - vring - .add_used(desc_chain.head_index(), writer.bytes_written() as u32) - .map_err(|_| Error::AddUsedElemFailed(RX_QUEUE))?; + vring + .add_used(desc_chain.head_index(), writer.bytes_written() as u32) + .map_err(|_| Error::AddUsedElemFailed(RX_QUEUE))?; + } } Ok(true) @@ -210,8 +246,8 @@ impl VhostUserConsoleBackend { } else { self.output_queue .add(my_string) - .expect("Failed to add element in the output queue"); - //.map_err(|_| Error::RxCtrlQueueAddFailed)?; + .map_err(|_| Error::RxCtrlQueueAddFailed)?; + self.write_tcp_stream(); } vring @@ -438,7 +474,7 @@ impl VhostUserConsoleBackend { /// Set self's VringWorker. pub(crate) fn set_vring_worker( &self, - vring_worker: &Arc>>>, + vring_worker: Arc>>>, ) { let rx_event_fd = self.rx_event.as_raw_fd(); vring_worker @@ -453,187 +489,170 @@ impl VhostUserConsoleBackend { u64::from(BACKEND_CTRL_RX_EFD), ) .unwrap(); + + let exit_event_fd = self.exit_event.as_raw_fd(); + vring_worker + .register_listener(exit_event_fd, EventSet::IN, u64::from(EXIT_EFD)) + .unwrap(); + + let epoll_fd = self.epoll_fd.as_raw_fd(); + vring_worker + .register_listener(epoll_fd, EventSet::IN, u64::from(KEY_EFD)) + .unwrap(); + + if self.controller.read().unwrap().backend == BackendType::Network { + let listener_fd = self.listener.as_ref().expect("asd").as_raw_fd(); + vring_worker + .register_listener(listener_fd, EventSet::IN, u64::from(LISTENER_EFD)) + .unwrap(); + } } - pub(crate) fn start_tcp_console_thread( - vhu_console: &Arc>, - tcplisener_str: String, - ) -> JoinHandle> { - let vhu_console = Arc::clone(vhu_console); - spawn(move || { - loop { - let ready = vhu_console.read().unwrap().ready_to_write; - let exit = vhu_console.read().unwrap().controller.read().unwrap().exit; + /// Register a file with an epoll to listen for events in evset. + pub fn epoll_register(epoll_fd: RawFd, fd: RawFd, evset: epoll::Events) -> Result<()> { + epoll::ctl( + epoll_fd, + epoll::ControlOptions::EPOLL_CTL_ADD, + fd, + epoll::Event::new(evset, fd as u64), + ) + .map_err(|_| Error::EpollAdd)?; + Ok(()) + } - if exit { - trace!("Thread exits!"); - break; - } else if ready { - let listener = match TcpListener::bind(tcplisener_str.clone()) { - Ok(listener) => listener, - Err(e) => { - eprintln!("Failed to bind to {}: {}", tcplisener_str, e); - return Ok(()); - } - }; - listener.set_nonblocking(true).expect("Non-blocking error"); - - println!("Server listening on address: {}", tcplisener_str.clone()); - for stream in listener.incoming() { - match stream { - Ok(mut stream) => { - trace!("New connection"); - stream.set_nonblocking(true).expect("Non-blocking error"); - - let mut buffer = [0; 1024]; - loop { - let exit = - vhu_console.read().unwrap().controller.read().unwrap().exit; - if exit { - trace!("Thread exits!"); - return Ok(()); - } - // Write to the stream - if vhu_console.read().unwrap().output_queue.size() > 0 { - let byte_stream = vhu_console - .write() - .unwrap() - .output_queue - .remove() - .expect("Error removing element from output queue") - .into_bytes(); - if let Err(e) = stream.write_all(&byte_stream) { - eprintln!("Error writing to stream: {}", e); - } - } - match stream.read(&mut buffer) { - Ok(bytes_read) => { - if bytes_read == 0 { - println!("Close connection"); - break; - } - trace!( - "Received: {}", - String::from_utf8_lossy(&buffer[..bytes_read]) - ); - let input_buffer = - String::from_utf8_lossy(&buffer[..bytes_read]) - .to_string(); - vhu_console - .write() - .unwrap() - .rx_data_fifo - .add(input_buffer) - .unwrap(); - vhu_console.write().unwrap().rx_event.write(1).unwrap(); - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - continue; - } - Err(ref e) - if e.kind() == io::ErrorKind::BrokenPipe - || e.kind() == io::ErrorKind::ConnectionReset => - { - trace!("Stream has been closed."); - break; - } - Err(e) => { - eprintln!("Error reading from socket: {}", e); - } - } - } - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - let exit = - vhu_console.read().unwrap().controller.read().unwrap().exit; - if exit { - trace!("Thread exits!"); - return Ok(()); - } - continue; - } - Err(e) => { - eprintln!("Error accepting connection: {}", e); - break; - } - } + /// Remove a file from the epoll. + pub fn epoll_unregister(epoll_fd: RawFd, fd: RawFd) -> Result<()> { + epoll::ctl( + epoll_fd, + epoll::ControlOptions::EPOLL_CTL_DEL, + fd, + epoll::Event::new(epoll::Events::empty(), 0), + ) + .map_err(|_| Error::EpollRemove)?; + + Ok(()) + } + + fn create_new_stream_thread(&mut self) { + // Accept only one incoming connection + if let Some(stream) = self.listener.as_ref().expect("asd").incoming().next() { + match stream { + Ok(stream) => { + let local_addr = self + .listener + .as_ref() + .expect("No listener") + .local_addr() + .unwrap(); + println!("New connection on: {}", local_addr); + let stream_raw_fd = stream.as_raw_fd(); + self.stream_fd = Some(stream_raw_fd); + if let Err(err) = Self::epoll_register( + self.epoll_fd.as_raw_fd(), + stream_raw_fd, + epoll::Events::EPOLLIN, + ) { + warn!("Failed to register with epoll: {:?}", err); } + + let stream: Box = Box::new(stream); + self.stream = Some(stream); + self.write_tcp_stream(); + } + Err(e) => { + eprintln!("Stream error: {}", e); } } - Ok(()) - }) + } } - /// Start console thread. - pub(crate) fn start_console_thread( - vhu_console: &Arc>, - ) -> JoinHandle> { - let vhu_console = Arc::clone(vhu_console); - - let exit_eventfd = vhu_console.read().unwrap().exit_event.as_raw_fd(); - // Spawn a new thread to handle input. - spawn(move || { - let term = Term::stdout(); - let mut fdset = FdSet::new(); - fdset.insert(term.as_raw_fd()); - fdset.insert(exit_eventfd); - let max_fd = fdset.highest().expect("Failed to read fdset!") + 1; + fn write_tcp_stream(&mut self) { + if self.stream.is_some() { + while self.output_queue.size() > 0 { + let byte_stream = self + .output_queue + .remove() + .expect("Error removing element from output queue") + .into_bytes(); + + if let Err(e) = self + .stream + .as_mut() + .expect("Stream not found") + .write_all(&byte_stream) + { + eprintln!("Error writing to stream: {}", e); + } + } + } + } - loop { - let ready = vhu_console.read().unwrap().ready_to_write; - let exit = vhu_console.read().unwrap().controller.read().unwrap().exit; + fn read_tcp_stream(&mut self) { + let mut buffer = [0; 1024]; + match self.stream.as_mut().expect("No stream").read(&mut buffer) { + Ok(bytes_read) => { + if bytes_read == 0 { + let local_addr = self + .listener + .as_ref() + .expect("No listener") + .local_addr() + .unwrap(); + println!("Close connection on: {}", local_addr); + if let Err(err) = Self::epoll_unregister( + self.epoll_fd.as_raw_fd(), + self.stream_fd.expect("No stream fd"), + ) { + warn!("Failed to register with epoll: {:?}", err); + } + return; + } + if self.ready_to_write { + for byte in buffer.iter().take(bytes_read) { + self.rx_data_fifo.add(*byte).unwrap(); + } + self.rx_event.write(1).unwrap(); + } + } + Err(e) => { + eprintln!("Error reading from socket: {}", e); + } + } + } - if exit { - trace!("Exit!"); - break; - } else if ready { - let mut fdset_clone = fdset; - enable_raw_mode().expect("Raw mode error"); - - match select(Some(max_fd), Some(&mut fdset_clone), None, None, None) { - Ok(_num_fds) => { - let exit = vhu_console.read().unwrap().controller.read().unwrap().exit; - if (fdset_clone.contains(exit_eventfd)) && exit { - trace!("Exit!"); - break; - } - - if fdset_clone.contains(term.as_raw_fd()) { - if let Some(character) = match term.read_key().unwrap() { - Key::Char(character) => Some(character), - Key::Enter => Some('\n'), - Key::Tab => Some('\t'), - Key::Backspace => Some('\u{8}'), - _ => None, - } { - // Pass the data to vhu_console and trigger an EventFd - let input_buffer = character.to_string(); - vhu_console - .write() - .unwrap() - .rx_data_fifo - .add(input_buffer) - .unwrap(); - vhu_console.write().unwrap().rx_event.write(1).unwrap(); - } - } - } - Err(e) => { - eprintln!("Error in select: {}", e); - break; - } + fn read_char_thread(&mut self) -> IoResult<()> { + let mut bytes = [0; 1]; + match self.stdin.as_mut().expect("No stdin").read(&mut bytes) { + Ok(read_len) => { + if read_len > 0 { + // If the user presses ^C then exit + if bytes[0] == 3 { + disable_raw_mode().expect("Raw mode error"); + trace!("Termination!\n"); + std::process::exit(0); + } + + // If backend is ready pass the data to vhu_console + // and trigger an EventFd. + if self.ready_to_write { + self.rx_data_fifo.add(bytes[0]).unwrap(); + self.rx_event.write(1).unwrap(); } } + Ok(()) + } + Err(e) => { + eprintln!("Read stdin error: {}", e); + Err(e) } + } + } + pub fn prepare_exit(&self) { + /* For the nested backend */ + if self.controller.read().unwrap().backend == BackendType::Nested { disable_raw_mode().expect("Raw mode error"); - Ok(()) - }) - } - pub fn kill_console_thread(&self) { - trace!("Kill thread"); - self.controller.write().unwrap().exit = true; - self.exit_event.write(1).unwrap(); + } } } @@ -703,19 +722,24 @@ impl VhostUserBackendMut for VhostUserConsoleBackend { vrings: &[VringRwLock], _thread_id: usize, ) -> IoResult<()> { - if device_event == RX_QUEUE { - // Check if there are any available data - if self.rx_data_fifo.size() == 0 { - return Ok(()); - } - }; + if device_event == EXIT_EFD { + self.prepare_exit(); + return Ok(()); + } + + if device_event == LISTENER_EFD { + self.create_new_stream_thread(); + return Ok(()); + } - if device_event == CTRL_RX_QUEUE { - // Check if there are any available data and the device is ready - if (!self.ready) || (self.rx_ctrl_fifo.size() == 0) { + if device_event == KEY_EFD { + if self.controller.read().unwrap().backend == BackendType::Nested { + return self.read_char_thread(); + } else { + self.read_tcp_stream(); return Ok(()); } - }; + } let vring = if device_event == BACKEND_RX_EFD { &vrings[RX_QUEUE as usize] @@ -729,12 +753,24 @@ impl VhostUserBackendMut for VhostUserConsoleBackend { loop { vring.disable_notification().unwrap(); match device_event { - RX_QUEUE => self.process_rx_queue(vring), + RX_QUEUE => { + if self.rx_data_fifo.size() != 0 { + self.process_rx_queue(vring) + } else { + break; + } + } TX_QUEUE => { self.ready_to_write = true; self.process_tx_queue(vring) } - CTRL_RX_QUEUE => self.process_ctrl_rx_queue(vring), + CTRL_RX_QUEUE => { + if self.ready && (self.rx_ctrl_fifo.size() != 0) { + self.process_ctrl_rx_queue(vring) + } else { + break; + } + } CTRL_TX_QUEUE => self.process_ctrl_tx_queue(vring), BACKEND_RX_EFD => { let _ = self.rx_event.read(); From 33e7700f80c565ae272880984e18d89b40fe313f Mon Sep 17 00:00:00 2001 From: Timos Ampelikiotis Date: Thu, 10 Oct 2024 11:46:52 +0300 Subject: [PATCH 2/3] Fix and populate the tests of vhost-device-console Signed-off-by: Timos Ampelikiotis --- staging/vhost-device-console/src/backend.rs | 57 ++------ .../vhost-device-console/src/vhu_console.rs | 138 ++++++++++++------ 2 files changed, 108 insertions(+), 87 deletions(-) diff --git a/staging/vhost-device-console/src/backend.rs b/staging/vhost-device-console/src/backend.rs index 61f03a19..ab931ec0 100644 --- a/staging/vhost-device-console/src/backend.rs +++ b/staging/vhost-device-console/src/backend.rs @@ -240,71 +240,44 @@ mod tests { assert!(VuConsoleConfig::try_from(args).is_ok()); } - fn test_backend_start_and_stop(args: ConsoleArgs) { + fn test_backend_start_and_stop(args: ConsoleArgs) -> Result<()> { let config = VuConsoleConfig::try_from(args).expect("Wrong config"); let tcp_addrs = config.generate_tcp_addrs(); let backend = config.backend; - for (_socket, tcp_addr) in config + for (socket, tcp_addr) in config .generate_socket_paths() .into_iter() .zip(tcp_addrs.iter()) { - let controller = ConsoleController::new(backend); - let arc_controller = Arc::new(RwLock::new(controller)); - let vu_console_backend = Arc::new(RwLock::new( - VhostUserConsoleBackend::new(arc_controller) - .map_err(Error::CouldNotCreateBackend) - .expect("Fail create vhuconsole backend"), - )); - - let mut _daemon = VhostUserDaemon::new( - String::from("vhost-device-console-backend"), - vu_console_backend.clone(), - GuestMemoryAtomic::new(GuestMemoryMmap::new()), - ) - .map_err(Error::CouldNotCreateDaemon) - .expect("Failed create daemon"); - - // Start the corresponinding console thread - let read_handle = if backend == BackendType::Nested { - VhostUserConsoleBackend::start_console_thread(&vu_console_backend) - } else { - VhostUserConsoleBackend::start_tcp_console_thread( - &vu_console_backend, - tcp_addr.clone(), - ) - }; - - // Kill console input thread - vu_console_backend.read().unwrap().kill_console_thread(); - - // Wait for read thread to exit - assert_matches!(read_handle.join(), Ok(_)); + start_backend_server(socket, tcp_addr.to_string(), backend)?; } + Ok(()) } + #[test] - fn test_start_net_backend_success() { + fn test_start_backend_server_success() { let args = ConsoleArgs { - socket_path: String::from("/tmp/vhost.sock").into(), + socket_path: String::from("/not_a_dir/vhost.sock").into(), + //socket_path: String::from("/tmp/vhost.sock").into(), backend: BackendType::Network, tcp_port: String::from("12345"), socket_count: 1, }; - test_backend_start_and_stop(args); + assert!(test_backend_start_and_stop(args).is_err()); } #[test] - fn test_start_nested_backend_success() { - let args = ConsoleArgs { - socket_path: String::from("/tmp/vhost.sock").into(), - backend: BackendType::Nested, - tcp_port: String::from("12345"), + fn test_start_backend_success() { + let config = VuConsoleConfig { + socket_path: String::from("/not_a_dir/vhost.sock").into(), + backend: BackendType::Network, + tcp_port: String::from("12346"), socket_count: 1, }; - test_backend_start_and_stop(args); + assert!(start_backend(config).is_err()); } } diff --git a/staging/vhost-device-console/src/vhu_console.rs b/staging/vhost-device-console/src/vhu_console.rs index f33e2d19..3246c96f 100644 --- a/staging/vhost-device-console/src/vhu_console.rs +++ b/staging/vhost-device-console/src/vhu_console.rs @@ -817,6 +817,7 @@ impl VhostUserBackendMut for VhostUserConsoleBackend { #[cfg(test)] mod tests { use super::*; + use std::io::Cursor; use virtio_bindings::virtio_ring::{VRING_DESC_F_NEXT, VRING_DESC_F_WRITE}; use virtio_queue::{mock::MockSplitQueue, Descriptor, Queue}; use vm_memory::{Bytes, GuestAddress, GuestMemoryAtomic, GuestMemoryMmap}; @@ -839,15 +840,12 @@ mod tests { let mut vu_console_backend = VhostUserConsoleBackend::new(console_controller) .expect("Failed create vhuconsole backend"); - // Artificial memory let mem = GuestMemoryAtomic::new( GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(), ); - // Update memory vu_console_backend.update_memory(mem.clone()).unwrap(); - // Artificial Vring let vring = VringRwLock::new(mem, 0x1000).unwrap(); vring.set_queue_info(0x100, 0x200, 0x300).unwrap(); vring.set_queue_ready(true); @@ -884,12 +882,10 @@ mod tests { let mut vu_console_backend = VhostUserConsoleBackend::new(console_controller) .expect("Failed create vhuconsole backend"); - // Artificial memory let mem = GuestMemoryAtomic::new( GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(), ); - // Artificial Vring let vring = VringRwLock::new(mem.clone(), 0x1000).unwrap(); // Empty descriptor chain should be ignored @@ -945,7 +941,6 @@ mod tests { let mut vu_console_backend = VhostUserConsoleBackend::new(console_controller) .expect("Failed create vhuconsole backend"); - // Artificial memory let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(); // Test 1: Empty queue @@ -1020,7 +1015,6 @@ mod tests { let mut vu_console_backend = VhostUserConsoleBackend::new(console_controller) .expect("Failed create vhuconsole backend"); - // Artificial memory let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(); // Test 1: Empty queue @@ -1071,7 +1065,6 @@ mod tests { let mut vu_console_backend = VhostUserConsoleBackend::new(console_controller) .expect("Failed create vhuconsole backend"); - // Artificial memory & update device's memory let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(); let mem_1 = GuestMemoryAtomic::new(mem.clone()); vu_console_backend.update_memory(mem_1.clone()).unwrap(); @@ -1125,7 +1118,6 @@ mod tests { let mut vu_console_backend = VhostUserConsoleBackend::new(console_controller) .expect("Failed create vhuconsole backend"); - // Artificial memory let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(); // Test 1: Empty queue @@ -1181,7 +1173,6 @@ mod tests { let mut vu_console_backend = VhostUserConsoleBackend::new(console_controller) .expect("Failed create vhuconsole backend"); - // Artificial memory let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(); let desc_chain = build_desc_chain(&mem, 1, vec![0], 0x200); let mem1 = GuestMemoryAtomic::new(mem.clone()); @@ -1223,7 +1214,6 @@ mod tests { let mut vu_console_backend = VhostUserConsoleBackend::new(console_controller) .expect("Failed create vhuconsole backend"); - // Artificial memory let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(); // Test 1: Empty queue @@ -1238,7 +1228,8 @@ mod tests { let mem1 = GuestMemoryAtomic::new(mem.clone()); let vring = VringRwLock::new(mem1.clone(), 0x1000).unwrap(); vu_console_backend.update_memory(mem1).unwrap(); - assert!(!vu_console_backend + + assert!(vu_console_backend .process_rx_requests(vec![desc_chain], &vring) .unwrap()); @@ -1248,14 +1239,16 @@ mod tests { let vring = VringRwLock::new(mem1.clone(), 0x1000).unwrap(); vu_console_backend.update_memory(mem1).unwrap(); - let input_buffer = "Hello!".to_string(); - let _ = vu_console_backend.rx_data_fifo.add(input_buffer.clone()); - assert_eq!( - vu_console_backend - .process_rx_requests(vec![desc_chain], &vring) - .unwrap_err(), - Error::DescriptorWriteFailed - ); + let input_buffer = b"Hello!"; + // Add each byte individually to the rx_data_fifo + for &byte in input_buffer.clone().iter() { + let _ = vu_console_backend.rx_data_fifo.add(byte); + } + + // available_data are 0 so min_limit is 0 too + assert!(vu_console_backend + .process_rx_requests(vec![desc_chain], &vring) + .unwrap()); // Test 4: Fill message to the buffer. Everything should work! let desc_chain = build_desc_chain(&mem, 1, vec![VRING_DESC_F_WRITE as u16], 0x200); @@ -1263,8 +1256,10 @@ mod tests { let vring = VringRwLock::new(mem1.clone(), 0x1000).unwrap(); vu_console_backend.update_memory(mem1).unwrap(); - let input_buffer = "Hello!".to_string(); - let _ = vu_console_backend.rx_data_fifo.add(input_buffer.clone()); + let input_buffer = b"Hello!"; + for &byte in input_buffer.clone().iter() { + let _ = vu_console_backend.rx_data_fifo.add(byte); + } assert!(vu_console_backend .process_rx_requests(vec![desc_chain.clone()], &vring) .unwrap()); @@ -1283,37 +1278,90 @@ mod tests { .copied() .collect(); - assert_eq!(String::from_utf8(read_buffer).unwrap(), input_buffer); + assert_eq!(read_buffer, input_buffer); } #[test] - fn test_virtio_console_start_tcp_console_thread() { + fn test_virtio_console_start_nested_console_thread() { let console_controller = Arc::new(RwLock::new(ConsoleController::new(BackendType::Nested))); - let vu_console_backend = Arc::new(RwLock::new( - VhostUserConsoleBackend::new(console_controller) - .expect("Failed create vhuconsole backend"), - )); - let tcp_addr = "127.0.0.1:12345".to_string(); - - let read_handle = VhostUserConsoleBackend::start_tcp_console_thread( - &vu_console_backend, - tcp_addr.clone(), - ); - vu_console_backend.read().unwrap().kill_console_thread(); - assert!(read_handle.join().is_ok()); + let mut vu_console_backend = VhostUserConsoleBackend::new(console_controller) + .expect("Failed create vhuconsole backend"); + + let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(); + let mem = GuestMemoryAtomic::new(mem); + vu_console_backend.update_memory(mem.clone()).unwrap(); + let vring = VringRwLock::new(mem, 0x1000).unwrap(); + + let input_data = b"H"; + let cursor = Cursor::new(input_data.clone().to_vec()); + + // Replace stdin with a cursor for testing + vu_console_backend.stdin = Some(Box::new(cursor)); + + vu_console_backend.ready_to_write = true; + assert!(vu_console_backend + .handle_event(KEY_EFD, EventSet::IN, &[vring], 0) + .is_ok()); + + let received_byte = vu_console_backend.rx_data_fifo.peek(); + + // verify that the character has been received and is the one we sent + assert!(received_byte.clone().is_ok()); + assert_eq!(received_byte.unwrap(), input_data[0]); } #[test] - fn test_virtio_console_start_nested_console_thread() { - let console_controller = Arc::new(RwLock::new(ConsoleController::new(BackendType::Nested))); - let vu_console_backend = Arc::new(RwLock::new( - VhostUserConsoleBackend::new(console_controller) - .expect("Failed create vhuconsole backend"), - )); + fn test_virtio_console_tcp_console_read_func() { + let console_controller = + Arc::new(RwLock::new(ConsoleController::new(BackendType::Network))); + let mut vu_console_backend = VhostUserConsoleBackend::new(console_controller) + .expect("Failed create vhuconsole backend"); + + let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(); + let mem = GuestMemoryAtomic::new(mem); + vu_console_backend.update_memory(mem.clone()).unwrap(); + let vring = VringRwLock::new(mem, 0x1000).unwrap(); + + let input_data = b"H"; + let cursor = Cursor::new(input_data.clone().to_vec()); + + // Replace stream with a cursor for testing + vu_console_backend.stream = Some(Box::new(cursor)); + + vu_console_backend.ready_to_write = true; + assert!(vu_console_backend + .handle_event(KEY_EFD, EventSet::IN, &[vring], 0) + .is_ok()); + + let received_byte = vu_console_backend.rx_data_fifo.peek(); - let read_handle = VhostUserConsoleBackend::start_console_thread(&vu_console_backend); + // verify that the character has been received and is the one we sent + assert!(received_byte.clone().is_ok()); + assert_eq!(received_byte.unwrap(), input_data[0]); + } + + #[test] + fn test_virtio_console_tcp_console_write_func() { + let console_controller = + Arc::new(RwLock::new(ConsoleController::new(BackendType::Network))); + let mut vu_console_backend = VhostUserConsoleBackend::new(console_controller) + .expect("Failed create vhuconsole backend"); + + let mem = GuestMemoryMmap::<()>::from_ranges(&[(GuestAddress(0), 0x1000)]).unwrap(); + let mem = GuestMemoryAtomic::new(mem); + vu_console_backend.update_memory(mem.clone()).unwrap(); + + // Test 1: Call the actual read function + let cursor = Cursor::new(Vec::new()); + + vu_console_backend.stream = Some(Box::new(cursor)); + vu_console_backend + .output_queue + .add("Test".to_string()) + .unwrap(); + vu_console_backend.write_tcp_stream(); - vu_console_backend.read().unwrap().kill_console_thread(); - assert!(read_handle.join().is_ok()); + // All data has been consumed by the cursor + assert_eq!(vu_console_backend.output_queue.size(), 0); } } From a35f354a956b63801cd4be09a496c3a2888ef7b8 Mon Sep 17 00:00:00 2001 From: Timos Ampelikiotis Date: Thu, 10 Oct 2024 11:50:11 +0300 Subject: [PATCH 3/3] Move the vhost-device-console in the main worksapce Signed-off-by: Timos Ampelikiotis --- Cargo.toml | 1 + README.md | 2 +- staging/Cargo.toml | 1 - .../CHANGELOG.md | 0 .../Cargo.toml | 0 .../LICENSE-APACHE | 0 .../LICENSE-BSD-3-Clause | 0 .../vhost-device-console => vhost-device-console}/README.md | 6 ------ .../src/backend.rs | 0 .../src/console.rs | 0 .../src/main.rs | 0 .../src/vhu_console.rs | 0 .../src/virtio_console.rs | 0 13 files changed, 2 insertions(+), 8 deletions(-) rename {staging/vhost-device-console => vhost-device-console}/CHANGELOG.md (100%) rename {staging/vhost-device-console => vhost-device-console}/Cargo.toml (100%) rename {staging/vhost-device-console => vhost-device-console}/LICENSE-APACHE (100%) rename {staging/vhost-device-console => vhost-device-console}/LICENSE-BSD-3-Clause (100%) rename {staging/vhost-device-console => vhost-device-console}/README.md (95%) rename {staging/vhost-device-console => vhost-device-console}/src/backend.rs (100%) rename {staging/vhost-device-console => vhost-device-console}/src/console.rs (100%) rename {staging/vhost-device-console => vhost-device-console}/src/main.rs (100%) rename {staging/vhost-device-console => vhost-device-console}/src/vhu_console.rs (100%) rename {staging/vhost-device-console => vhost-device-console}/src/virtio_console.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index b0d010dd..33924eb8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ resolver = "2" members = [ + "vhost-device-console", "vhost-device-gpio", "vhost-device-i2c", "vhost-device-input", diff --git a/README.md b/README.md index b4cfe977..e2b04294 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ To be included here device backends must: Here is the list of device backends that we support: +- [Console](https://github.com/rust-vmm/vhost-device/blob/main/vhost-device-console/README.md) - [GPIO](https://github.com/rust-vmm/vhost-device/blob/main/vhost-device-gpio/README.md) - [I2C](https://github.com/rust-vmm/vhost-device/blob/main/vhost-device-i2c/README.md) - [Input](https://github.com/rust-vmm/vhost-device/blob/main/vhost-device-input/README.md) @@ -48,7 +49,6 @@ Here is the list of device backends in **staging**: - [Video](https://github.com/rust-vmm/vhost-device/blob/main/staging/vhost-device-video/README.md) - [Can](https://github.com/rust-vmm/vhost-device/blob/main/staging/vhost-device-can/README.md) -- [Console](https://github.com/rust-vmm/vhost-device/blob/main/staging/vhost-device-console/README.md)