From 18ad2173816bf455c2dabece9009aae29133b3d3 Mon Sep 17 00:00:00 2001 From: Jonas Maier <> Date: Wed, 11 Mar 2026 15:32:51 +0100 Subject: first draft of cancellable builtins, kinda shit --- src/completion.rs | 8 +- src/export_fun.rs | 3 +- src/main.rs | 1 + src/run/mod.rs | 209 +++++++++++++++++++++++++----------------------- src/rw.rs | 231 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 352 insertions(+), 100 deletions(-) create mode 100644 src/rw.rs (limited to 'src') diff --git a/src/completion.rs b/src/completion.rs index 80a0ebb..3e2565b 100644 --- a/src/completion.rs +++ b/src/completion.rs @@ -1,5 +1,9 @@ -use crate::*; +use crate::{BString, Session}; +use std::collections::HashMap; +use std::ffi::OsStr; +use std::os::unix::ffi::OsStrExt; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; use std::{env, fs}; pub struct Suggestion { @@ -7,7 +11,7 @@ pub struct Suggestion { pub delta: BString, } -fn _path_completion(mut prefix: BString) -> io::Result> { +fn _path_completion(mut prefix: BString) -> std::io::Result> { let mut partial_entry = BString::new(); while let Some(c) = prefix.last().cloned() { if c == b'/' { diff --git a/src/export_fun.rs b/src/export_fun.rs index 8576cce..d8722b4 100644 --- a/src/export_fun.rs +++ b/src/export_fun.rs @@ -8,8 +8,7 @@ use nix::poll::poll; use crate::Session; use crate::defer; -use crate::run::Input; -use crate::run::Output; +use crate::rw::*; use crate::run::get_command_kind; use std::env::current_exe; use std::ffi::OsStr; diff --git a/src/main.rs b/src/main.rs index a7138f7..222a5c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,6 +29,7 @@ pub mod reload; pub mod run; pub mod serialization; pub mod wait; +pub mod rw; use linebuf::LineBuf; use raw::*; diff --git a/src/run/mod.rs b/src/run/mod.rs index 6af1717..6046a12 100644 --- a/src/run/mod.rs +++ b/src/run/mod.rs @@ -1,9 +1,10 @@ +use nix::spawn; + +use crate::rw::*; use std::collections::HashMap; -use std::fs::File; -use std::io::{PipeReader, PipeWriter}; use std::path::PathBuf; use std::sync::{Arc, Mutex}; -use std::thread::JoinHandle; +use std::thread::Thread; use crate::parse::{self, Ast, PostExpansion, PreExpansion}; use crate::wait::{ChildWaiter, ThreadWaiter}; @@ -86,64 +87,9 @@ pub struct Executor { expand_commands: bool, } -pub enum Input { - Stdin, - Pipe(PipeReader), - File(File), -} - -pub enum Output { - Stdout, - Pipe(PipeWriter), - File(File), -} - -impl From for Stdio { - fn from(value: Input) -> Self { - match value { - Input::Stdin => Stdio::inherit(), - Input::Pipe(reader) => reader.into(), - Input::File(file) => file.into(), - } - } -} - -impl From for Stdio { - fn from(value: Output) -> Stdio { - match value { - Output::Stdout => Stdio::inherit(), - Output::Pipe(writer) => writer.into(), - Output::File(file) => file.into(), - } - } -} - -impl Read for Input { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match self { - Input::Stdin => io::stdin().read(buf), - Input::Pipe(reader) => reader.read(buf), - Input::File(file) => file.read(buf), - } - } -} - -impl Write for Output { - fn write(&mut self, buf: &[u8]) -> io::Result { - match self { - Output::Stdout => io::stdout().write(buf), - Output::Pipe(writer) => writer.write(buf), - Output::File(file) => file.write(buf), - } - } - - fn flush(&mut self) -> io::Result<()> { - match self { - Output::Stdout => io::stdout().flush(), - Output::Pipe(writer) => writer.flush(), - Output::File(file) => file.flush(), - } - } +struct SpawnedPipeline { + executors: Vec, + cancel: Vec, } enum SpawnedCmd { @@ -152,6 +98,7 @@ enum SpawnedCmd { Child(ChildWaiter), SpawnError(io::Error), Joined(Result<(), ExecError>), + Pipeline(SpawnedPipeline), } impl SpawnedCmd { @@ -174,6 +121,18 @@ impl SpawnedCmd { } SpawnedCmd::SpawnError(err) => Err(err)?, SpawnedCmd::Joined(res) => res?, + SpawnedCmd::Pipeline(pipes) => { + let joined: Vec<_> = pipes + .executors + .into_iter() + .filter_map(|e| e.join().err()) + .collect(); + match joined.len() { + 0 => (), + 1 => return Err(joined.into_iter().next().unwrap()), + _ => return Err(ExecError::ErrorStack(joined)), + } + } } Ok(()) } @@ -189,11 +148,45 @@ impl SpawnedCmd { }, SpawnedCmd::SpawnError(_) => true, SpawnedCmd::Joined(_) => true, + SpawnedCmd::Pipeline(pipes) => pipes + .executors + .iter_mut() + .all(|e| e.join_timeout(timeout_ms)), + } + } + + fn cancel(&mut self) { + match self { + SpawnedCmd::Pipeline(pipes) => { + for c in pipes.cancel.iter_mut() { + c.cancel(); + } + } + SpawnedCmd::Builtin(_) + | SpawnedCmd::Fun(_) + | SpawnedCmd::Child(_) + | SpawnedCmd::SpawnError(_) + | SpawnedCmd::Joined(_) => (), } } } impl Executor { + fn exec_loop(&mut self, mut s: SpawnedCmd, cs: &mut [Canceler]) -> Result<(), ExecError> { + while !s.join_timeout(50) { + if let Ok(mut se) = self.se.try_lock() + && ctrlc::pop(&mut se) + { + for c in cs.iter_mut() { + c.cancel(); + } + s.cancel(); + break; + } + } + s.join() + } + pub fn new_for_completion(se: Arc>) -> Self { Self { se, @@ -206,8 +199,8 @@ impl Executor { &mut self, cmd: CommandKind, args: Vec, - mut stdin: Input, - mut stdout: Output, + mut stdin: InputReader, + mut stdout: OutputWriter, ) -> SpawnedCmd { match cmd { CommandKind::Builtin(builtin) => { @@ -236,8 +229,8 @@ impl Executor { command.arg(OsStr::from_bytes(arg)); } - command.stdin(stdin); - command.stdout(stdout); + command.stdin(Input::from(stdin)); + command.stdout(Output::from(stdout)); crate::export_fun::prepare_command(self.se.clone(), &mut command); @@ -252,18 +245,23 @@ impl Executor { fn execute_pipeline( &mut self, pipes: parse::Pipes, - stdin: Input, - stdout: Output, - ) -> Result<(), ExecError> { + stdin: InputReader, + stdout: OutputWriter, + ) -> SpawnedCmd { let mut inputs = Vec::with_capacity(pipes.cmds.len()); let mut outputs = Vec::with_capacity(pipes.cmds.len()); + let mut cancel = Vec::with_capacity(pipes.cmds.len() * 2); let mut executors = Vec::with_capacity(pipes.cmds.len()); inputs.push(stdin); for _ in 1..pipes.cmds.len() { let (r, w) = io::pipe().unwrap(); - inputs.push(Input::Pipe(r)); - outputs.push(Output::Pipe(w)); + let (i, c1) = InputReader::new(Input::Pipe(r)); + let (o, c2) = OutputWriter::new(Output::Pipe(w)); + inputs.push(i); + outputs.push(o); + cancel.push(c1); + cancel.push(c2); } outputs.push(stdout); @@ -280,26 +278,15 @@ impl Executor { executors.push(self.spawn_cmd(dc, cmd.args, stdin, stdout)); } - let mut code = None; - for e in executors { - if let Some(err) = e.join().err() { - code = Some(err); - } - } - - if let Some(err) = code { - Err(err) - } else { - Ok(()) - } + SpawnedCmd::Pipeline(SpawnedPipeline { executors, cancel }) } - fn execute_var_assign(&mut self, va: parse::VarAssign) -> Result<(), ExecError> { + fn execute_var_assign(&mut self, va: parse::VarAssign) -> SpawnedCmd { self.se.lock().unwrap().vars.insert(va.var, va.val); - Ok(()) + SpawnedCmd::Joined(Ok(())) } - fn execute_fun_decl(&mut self, fd: parse::FunDecl) -> Result<(), ExecError> { + fn execute_fun_decl(&mut self, fd: parse::FunDecl) -> SpawnedCmd { self.se .lock() .unwrap() @@ -307,15 +294,15 @@ impl Executor { .insert(fd.name.clone(), *fd.body.body); crate::export_fun::create_function_hook(self.se.clone(), &fd.name); // TODO: very ugly to ad-hoc keep export stuff & session data in sync here - Ok(()) + SpawnedCmd::Joined(Ok(())) } fn execute( &mut self, ast: Ast, - stdin: Input, - stdout: Output, - ) -> Result<(), ExecError> { + stdin: InputReader, + stdout: OutputWriter, + ) -> SpawnedCmd { match ast { Ast::FunDecl(fd) => self.execute_fun_decl(fd), Ast::VarAssign(va) => self.execute_var_assign(va), @@ -326,9 +313,9 @@ impl Executor { pub fn execute_fun( session: Arc>, invoke: Vec, - stdin: Input, - stdout: Output, - ) -> Result<(), ExecError> { + stdin: InputReader, + stdout: OutputWriter, + ) -> SpawnedCmd { let mut this = Self { se: session, args: None, @@ -389,10 +376,26 @@ impl parse::Expander for Executor { } let (stdin, _) = io::pipe().unwrap(); + let (stdin, mut c1) = InputReader::new(Input::Pipe(stdin)); let (mut expansion, stdout) = io::pipe().unwrap(); + let (stdout, mut c2) = OutputWriter::new(Output::Pipe(stdout)); let mut this = self.clone(); - let t = - std::thread::spawn(move || this.execute(ast, Input::Pipe(stdin), Output::Pipe(stdout))); + + let se = self.se.clone(); + let t = std::thread::spawn(move || { + let mut spawned = this.execute(ast, stdin, stdout); + while !spawned.join_timeout(50) { + if let Ok(mut se) = se.try_lock() + && ctrlc::pop(&mut se) + { + c1.cancel(); + c2.cancel(); + spawned.cancel(); + break; + } + } + spawned.join() + }); let mut buf = Vec::new(); expansion.read_to_end(&mut buf).unwrap(); t.join().unwrap()?; @@ -405,12 +408,26 @@ impl parse::Expander for Executor { fn exec(se: Arc>, ast: Ast) -> Result<(), ExecError> { let mut exec = Executor { - se, + se: se.clone(), args: None, expand_commands: true, }; let ast = ast.expand(&mut exec)?; - exec.execute(ast, Input::Stdin, Output::Stdout) + let (stdin, mut c1) = InputReader::new(Input::Stdin); + let (stdout, mut c2) = OutputWriter::new(Output::Stdout); + let mut spawned = exec.execute(ast, stdin, stdout); + ctrlc::pop(&mut *se.lock().unwrap()); + while !spawned.join_timeout(50) { + if let Ok(mut se) = se.try_lock() + && ctrlc::pop(&mut se) + { + c1.cancel(); + c2.cancel(); + spawned.cancel(); + break; + } + } + spawned.join() } pub fn run(se: Arc>, cmd: Vec) { diff --git a/src/rw.rs b/src/rw.rs new file mode 100644 index 0000000..31808fb --- /dev/null +++ b/src/rw.rs @@ -0,0 +1,231 @@ +use core::fmt; +use std::{ + fs::File, + io::{self, PipeReader, PipeWriter, Read, Write}, + os::fd::{AsFd, BorrowedFd}, + process::Stdio, +}; + +use nix::poll::{PollFd, PollFlags}; + +pub enum Input { + Stdin, + Pipe(PipeReader), + File(File), +} + +pub enum Output { + Stdout, + Pipe(PipeWriter), + File(File), +} + +impl From for Stdio { + fn from(value: Input) -> Self { + match value { + Input::Stdin => Stdio::inherit(), + Input::Pipe(reader) => reader.into(), + Input::File(file) => file.into(), + } + } +} + +impl From for Stdio { + fn from(value: Output) -> Stdio { + match value { + Output::Stdout => Stdio::inherit(), + Output::Pipe(writer) => writer.into(), + Output::File(file) => file.into(), + } + } +} + +pub struct Canceler { + tx: PipeWriter, +} + +impl Canceler { + pub fn cancel(&mut self) { + let _ = self.tx.write(b"."); + } +} + +pub struct InputReader { + input: Input, + cancel: PipeReader, + canceled: bool, +} + +impl InputReader { + pub fn new(input: Input) -> (InputReader, Canceler) { + let (cancel, tx) = std::io::pipe().unwrap(); + ( + Self { + input, + cancel, + canceled: false, + }, + Canceler { tx }, + ) + } +} + +const TIMEOUT_MS: u16 = 20; + +enum PollStatus { + Cancel, + Ready, + Wait, +} + +fn check<'a>( + canceled: &mut bool, + cancel: &PipeReader, + fd: BorrowedFd<'a>, + flags: PollFlags, +) -> PollStatus { + if *canceled { + return PollStatus::Cancel; + } + + let mut poll_fds = [ + PollFd::new(cancel.as_fd(), PollFlags::POLLIN), + PollFd::new(fd, flags), + ]; + + if nix::poll::poll(&mut poll_fds, TIMEOUT_MS).is_err() { + *canceled = true; + return PollStatus::Cancel; + }; + + if let Some(event) = poll_fds[0].revents() { + if event.contains(PollFlags::POLLIN) { + *canceled = true; + return PollStatus::Cancel; + } + } + + if let Some(event) = poll_fds[1].revents() { + if event.contains(flags) { + return PollStatus::Ready; + } + } + + PollStatus::Wait +} + +impl InputReader { + fn cancel(&mut self) -> PollStatus { + self.canceled = true; + PollStatus::Cancel + } + + fn poll(&mut self) -> PollStatus { + let stdin = io::stdin(); + let read_fd = match &self.input { + Input::Stdin => stdin.as_fd(), + Input::Pipe(pipe) => pipe.as_fd(), + Input::File(file) => file.as_fd(), + }; + check(&mut self.canceled, &self.cancel, read_fd, PollFlags::POLLIN) + } +} + +#[derive(Debug, Clone, Copy)] +struct Canceled; + +impl fmt::Display for Canceled { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "canceled") + } +} + +impl std::error::Error for Canceled {} + +impl Read for InputReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + loop { + match self.poll() { + PollStatus::Cancel => return Err(io::Error::new(io::ErrorKind::Other, Canceled)), + PollStatus::Ready => (), + PollStatus::Wait => continue, + } + return match &mut self.input { + Input::Stdin => io::stdin().read(buf), + Input::Pipe(reader) => reader.read(buf), + Input::File(file) => file.read(buf), + }; + } + } +} + +pub struct OutputWriter { + output: Output, + cancel: PipeReader, + canceled: bool, +} + +impl OutputWriter { + pub fn new(output: Output) -> (Self, Canceler) { + let (cancel, tx) = std::io::pipe().unwrap(); + ( + Self { + output, + cancel, + canceled: false, + }, + Canceler { tx }, + ) + } + fn poll(&mut self) -> PollStatus { + let stdout = io::stdout(); + let write_fd = match &self.output { + Output::Stdout => stdout.as_fd(), + Output::Pipe(pipe) => pipe.as_fd(), + Output::File(file) => file.as_fd(), + }; + check( + &mut self.canceled, + &self.cancel, + write_fd, + PollFlags::POLLOUT, + ) + } +} + +impl Write for OutputWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + loop { + match self.poll() { + PollStatus::Cancel => return Err(io::Error::new(io::ErrorKind::Other, Canceled)), + PollStatus::Ready => (), + PollStatus::Wait => continue, + } + return match &mut self.output { + Output::Stdout => io::stdout().write(buf), + Output::Pipe(writer) => writer.write(buf), + Output::File(file) => file.write(buf), + }; + } + } + + fn flush(&mut self) -> io::Result<()> { + match &mut self.output { + Output::Stdout => io::stdout().flush(), + Output::Pipe(writer) => writer.flush(), + Output::File(file) => file.flush(), + } + } +} + +impl From for Input { + fn from(value: InputReader) -> Self { + value.input + } +} + +impl From for Output { + fn from(value: OutputWriter) -> Self { + value.output + } +} -- cgit v1.2.3