diff options
Diffstat (limited to 'src/run/mod.rs')
| -rw-r--r-- | src/run/mod.rs | 209 |
1 files changed, 113 insertions, 96 deletions
diff --git a/src/run/mod.rs b/src/run/mod.rs index 6af1717..6046a12 100644 --- a/src/run/mod.rs +++ b/src/run/mod.rs @@ -1,9 +1,10 @@ +use nix::spawn; + +use crate::rw::*; use std::collections::HashMap; -use std::fs::File; -use std::io::{PipeReader, PipeWriter}; use std::path::PathBuf; use std::sync::{Arc, Mutex}; -use std::thread::JoinHandle; +use std::thread::Thread; use crate::parse::{self, Ast, PostExpansion, PreExpansion}; use crate::wait::{ChildWaiter, ThreadWaiter}; @@ -86,64 +87,9 @@ pub struct Executor { expand_commands: bool, } -pub enum Input { - Stdin, - Pipe(PipeReader), - File(File), -} - -pub enum Output { - Stdout, - Pipe(PipeWriter), - File(File), -} - -impl From<Input> for Stdio { - fn from(value: Input) -> Self { - match value { - Input::Stdin => Stdio::inherit(), - Input::Pipe(reader) => reader.into(), - Input::File(file) => file.into(), - } - } -} - -impl From<Output> for Stdio { - fn from(value: Output) -> Stdio { - match value { - Output::Stdout => Stdio::inherit(), - Output::Pipe(writer) => writer.into(), - Output::File(file) => file.into(), - } - } -} - -impl Read for Input { - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - match self { - Input::Stdin => io::stdin().read(buf), - Input::Pipe(reader) => reader.read(buf), - Input::File(file) => file.read(buf), - } - } -} - -impl Write for Output { - fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - match self { - Output::Stdout => io::stdout().write(buf), - Output::Pipe(writer) => writer.write(buf), - Output::File(file) => file.write(buf), - } - } - - fn flush(&mut self) -> io::Result<()> { - match self { - Output::Stdout => io::stdout().flush(), - Output::Pipe(writer) => writer.flush(), - Output::File(file) => file.flush(), - } - } +struct SpawnedPipeline { + executors: Vec<SpawnedCmd>, + cancel: Vec<Canceler>, } enum SpawnedCmd { @@ -152,6 +98,7 @@ enum SpawnedCmd { Child(ChildWaiter), SpawnError(io::Error), Joined(Result<(), ExecError>), + Pipeline(SpawnedPipeline), } impl SpawnedCmd { @@ -174,6 +121,18 @@ impl SpawnedCmd { } SpawnedCmd::SpawnError(err) => Err(err)?, SpawnedCmd::Joined(res) => res?, + SpawnedCmd::Pipeline(pipes) => { + let joined: Vec<_> = pipes + .executors + .into_iter() + .filter_map(|e| e.join().err()) + .collect(); + match joined.len() { + 0 => (), + 1 => return Err(joined.into_iter().next().unwrap()), + _ => return Err(ExecError::ErrorStack(joined)), + } + } } Ok(()) } @@ -189,11 +148,45 @@ impl SpawnedCmd { }, SpawnedCmd::SpawnError(_) => true, SpawnedCmd::Joined(_) => true, + SpawnedCmd::Pipeline(pipes) => pipes + .executors + .iter_mut() + .all(|e| e.join_timeout(timeout_ms)), + } + } + + fn cancel(&mut self) { + match self { + SpawnedCmd::Pipeline(pipes) => { + for c in pipes.cancel.iter_mut() { + c.cancel(); + } + } + SpawnedCmd::Builtin(_) + | SpawnedCmd::Fun(_) + | SpawnedCmd::Child(_) + | SpawnedCmd::SpawnError(_) + | SpawnedCmd::Joined(_) => (), } } } impl Executor { + fn exec_loop(&mut self, mut s: SpawnedCmd, cs: &mut [Canceler]) -> Result<(), ExecError> { + while !s.join_timeout(50) { + if let Ok(mut se) = self.se.try_lock() + && ctrlc::pop(&mut se) + { + for c in cs.iter_mut() { + c.cancel(); + } + s.cancel(); + break; + } + } + s.join() + } + pub fn new_for_completion(se: Arc<Mutex<Session>>) -> Self { Self { se, @@ -206,8 +199,8 @@ impl Executor { &mut self, cmd: CommandKind, args: Vec<BString>, - mut stdin: Input, - mut stdout: Output, + mut stdin: InputReader, + mut stdout: OutputWriter, ) -> SpawnedCmd { match cmd { CommandKind::Builtin(builtin) => { @@ -236,8 +229,8 @@ impl Executor { command.arg(OsStr::from_bytes(arg)); } - command.stdin(stdin); - command.stdout(stdout); + command.stdin(Input::from(stdin)); + command.stdout(Output::from(stdout)); crate::export_fun::prepare_command(self.se.clone(), &mut command); @@ -252,18 +245,23 @@ impl Executor { fn execute_pipeline( &mut self, pipes: parse::Pipes<parse::PostExpansion>, - stdin: Input, - stdout: Output, - ) -> Result<(), ExecError> { + stdin: InputReader, + stdout: OutputWriter, + ) -> SpawnedCmd { let mut inputs = Vec::with_capacity(pipes.cmds.len()); let mut outputs = Vec::with_capacity(pipes.cmds.len()); + let mut cancel = Vec::with_capacity(pipes.cmds.len() * 2); let mut executors = Vec::with_capacity(pipes.cmds.len()); inputs.push(stdin); for _ in 1..pipes.cmds.len() { let (r, w) = io::pipe().unwrap(); - inputs.push(Input::Pipe(r)); - outputs.push(Output::Pipe(w)); + let (i, c1) = InputReader::new(Input::Pipe(r)); + let (o, c2) = OutputWriter::new(Output::Pipe(w)); + inputs.push(i); + outputs.push(o); + cancel.push(c1); + cancel.push(c2); } outputs.push(stdout); @@ -280,26 +278,15 @@ impl Executor { executors.push(self.spawn_cmd(dc, cmd.args, stdin, stdout)); } - let mut code = None; - for e in executors { - if let Some(err) = e.join().err() { - code = Some(err); - } - } - - if let Some(err) = code { - Err(err) - } else { - Ok(()) - } + SpawnedCmd::Pipeline(SpawnedPipeline { executors, cancel }) } - fn execute_var_assign(&mut self, va: parse::VarAssign<PostExpansion>) -> Result<(), ExecError> { + fn execute_var_assign(&mut self, va: parse::VarAssign<PostExpansion>) -> SpawnedCmd { self.se.lock().unwrap().vars.insert(va.var, va.val); - Ok(()) + SpawnedCmd::Joined(Ok(())) } - fn execute_fun_decl(&mut self, fd: parse::FunDecl<PostExpansion>) -> Result<(), ExecError> { + fn execute_fun_decl(&mut self, fd: parse::FunDecl<PostExpansion>) -> SpawnedCmd { self.se .lock() .unwrap() @@ -307,15 +294,15 @@ impl Executor { .insert(fd.name.clone(), *fd.body.body); crate::export_fun::create_function_hook(self.se.clone(), &fd.name); // TODO: very ugly to ad-hoc keep export stuff & session data in sync here - Ok(()) + SpawnedCmd::Joined(Ok(())) } fn execute( &mut self, ast: Ast<parse::PostExpansion>, - stdin: Input, - stdout: Output, - ) -> Result<(), ExecError> { + stdin: InputReader, + stdout: OutputWriter, + ) -> SpawnedCmd { match ast { Ast::FunDecl(fd) => self.execute_fun_decl(fd), Ast::VarAssign(va) => self.execute_var_assign(va), @@ -326,9 +313,9 @@ impl Executor { pub fn execute_fun( session: Arc<Mutex<Session>>, invoke: Vec<BString>, - stdin: Input, - stdout: Output, - ) -> Result<(), ExecError> { + stdin: InputReader, + stdout: OutputWriter, + ) -> SpawnedCmd { let mut this = Self { se: session, args: None, @@ -389,10 +376,26 @@ impl parse::Expander for Executor { } let (stdin, _) = io::pipe().unwrap(); + let (stdin, mut c1) = InputReader::new(Input::Pipe(stdin)); let (mut expansion, stdout) = io::pipe().unwrap(); + let (stdout, mut c2) = OutputWriter::new(Output::Pipe(stdout)); let mut this = self.clone(); - let t = - std::thread::spawn(move || this.execute(ast, Input::Pipe(stdin), Output::Pipe(stdout))); + + let se = self.se.clone(); + let t = std::thread::spawn(move || { + let mut spawned = this.execute(ast, stdin, stdout); + while !spawned.join_timeout(50) { + if let Ok(mut se) = se.try_lock() + && ctrlc::pop(&mut se) + { + c1.cancel(); + c2.cancel(); + spawned.cancel(); + break; + } + } + spawned.join() + }); let mut buf = Vec::new(); expansion.read_to_end(&mut buf).unwrap(); t.join().unwrap()?; @@ -405,12 +408,26 @@ impl parse::Expander for Executor { fn exec(se: Arc<Mutex<Session>>, ast: Ast<PreExpansion>) -> Result<(), ExecError> { let mut exec = Executor { - se, + se: se.clone(), args: None, expand_commands: true, }; let ast = ast.expand(&mut exec)?; - exec.execute(ast, Input::Stdin, Output::Stdout) + let (stdin, mut c1) = InputReader::new(Input::Stdin); + let (stdout, mut c2) = OutputWriter::new(Output::Stdout); + let mut spawned = exec.execute(ast, stdin, stdout); + ctrlc::pop(&mut *se.lock().unwrap()); + while !spawned.join_timeout(50) { + if let Ok(mut se) = se.try_lock() + && ctrlc::pop(&mut se) + { + c1.cancel(); + c2.cancel(); + spawned.cancel(); + break; + } + } + spawned.join() } pub fn run(se: Arc<Mutex<Session>>, cmd: Vec<u8>) { |
