diff options
| author | Jonas Maier <jonas@x77.dev> | 2026-03-10 10:00:59 +0100 |
|---|---|---|
| committer | Jonas Maier <jonas@x77.dev> | 2026-03-10 10:00:59 +0100 |
| commit | d10a1b087458bad2b5dbcc7ee96518c3b3f674ec (patch) | |
| tree | d5c3981fe669919bdbcba9c49325209171810f17 | |
| parent | 5d8d9f07669cdc8ac17c866edb6a8c07bbe4221c (diff) | |
| download | pish-d10a1b087458bad2b5dbcc7ee96518c3b3f674ec.tar.gz | |
rework command executor
| -rw-r--r-- | src/export_fun.rs | 2 | ||||
| -rw-r--r-- | src/run/mod.rs | 326 |
2 files changed, 172 insertions, 156 deletions
diff --git a/src/export_fun.rs b/src/export_fun.rs index 0e0f33a..3c0f1c9 100644 --- a/src/export_fun.rs +++ b/src/export_fun.rs @@ -90,8 +90,8 @@ fn handle_server(session: Arc<Mutex<Session>>, mut stream: UnixStream) -> io::Re let exit_code = match res { Ok(_) => 0, Err(e) => match e { - crate::run::ExecError::UnknownVariable(_) => -3, crate::run::ExecError::ExecError(x) => x, + _ => -3, }, }; diff --git a/src/run/mod.rs b/src/run/mod.rs index e041066..757f4cc 100644 --- a/src/run/mod.rs +++ b/src/run/mod.rs @@ -2,7 +2,9 @@ use std::collections::HashMap; use std::fs::File; use std::io::{PipeReader, PipeWriter}; use std::path::PathBuf; +use std::process::Child; use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; use crate::parse::{self, Ast, PostExpansion, PreExpansion}; use crate::*; @@ -13,6 +15,68 @@ mod builtin; pub enum ExecError { UnknownVariable(BString), ExecError(i32), + SpawnIO(String, std::io::Error), + IO(std::io::Error), + ErrorStack(Vec<ExecError>), + Panic, +} + +impl ExecError { + pub fn error_message(&self) -> String { + match self { + ExecError::UnknownVariable(items) => { + format!("unknown variable: {}", String::from_utf8_lossy(&items)) + } + ExecError::ExecError(exit_code) => format!("{exit_code}"), + ExecError::SpawnIO(cmd, error) => match error.kind() { + io::ErrorKind::NotFound => format!("{cmd} not found"), + io::ErrorKind::PermissionDenied => { + format!("{cmd} is not executable") + } + io::ErrorKind::FileTooLarge => format!("{cmd} is too massive"), + io::ErrorKind::ResourceBusy | io::ErrorKind::ExecutableFileBusy => { + format!("{cmd} is busy") + } + io::ErrorKind::TooManyLinks => { + format!("{cmd} could not be resolved") + } + io::ErrorKind::InvalidFilename => { + format!("{cmd} is not a valid file name") + } + io::ErrorKind::ArgumentListTooLong => format!("too many arguments"), + io::ErrorKind::Interrupted => format!("got interrupted"), + io::ErrorKind::Unsupported => format!("{cmd} is not supported"), + e => format!("I am surprised you can get this error here: {e:?}"), + }, + ExecError::IO(error) => format!("{error:?}"), + ExecError::Panic => format!("worker thread panicked"), + ExecError::ErrorStack(stack) => { + let mut out = String::new(); + for e in stack.iter() { + if !out.is_empty() { + out += "\n"; + } + out += &e.error_message(); + } + out + } + } + } +} + +impl From<BuiltinError> for ExecError { + fn from(value: BuiltinError) -> Self { + match value { + BuiltinError::IO(error) => Self::IO(error), + BuiltinError::Exit(code) => Self::ExecError(code), + } + } +} + +impl From<std::io::Error> for ExecError { + fn from(value: std::io::Error) -> Self { + Self::IO(value) + } } #[derive(Clone)] @@ -81,175 +145,130 @@ impl Write for Output { } } -impl Executor { - fn execute_pipeline( - &mut self, - pipes: parse::Pipes<parse::PostExpansion>, - stdin: Input, - stdout: Output, - ) -> Result<(), ExecError> { - let mut children = Vec::new(); - let mut threads = Vec::new(); - let mut prev_reader = None; - let mut spawn_error = false; - - let mut stdin = Some(stdin); - let mut stdout = Some(stdout); - - let pipelen = pipes.cmds.len(); - for (i, cmd) in pipes.cmds.into_iter().enumerate() { - let (reader, writer) = if i < pipelen - 1 { - let (r, w) = io::pipe().unwrap(); - (Some(r), Some(w)) - } else { - (None, None) - }; - - let dc = get_command_kind(&self.se.lock().unwrap(), &cmd.cmd[..]); - - match dc { - CommandKind::Path(path) => { - let mut command = Command::new(&path); - for arg in cmd.args.iter() { - command.arg(OsStr::from_bytes(arg)); - } - - if let Some(r) = prev_reader.take() { - command.stdin(Stdio::from(r)); - } else { - command.stdin(stdin.take().unwrap()); - } - - if let Some(w) = writer { - command.stdout(Stdio::from(w)); - } else { - command.stdout(stdout.take().unwrap()); - } - - if let Some(sr) = self.se.lock().unwrap().socket_running.as_ref() { - let my_path = std::env::var_os("PATH").expect("no PATH - seriously?"); - let mut new_path = sr.path().as_os_str().as_bytes().to_vec(); - new_path.push(b':'); - new_path.extend_from_slice(my_path.as_bytes()); - command.env("PATH", OsStr::from_bytes(&new_path)); - command.env("PISH_SOCKET", sr.socket_path().as_os_str()); - } +enum SpawnedCmd { + Builtin(JoinHandle<Result<(), BuiltinError>>), + Fun(JoinHandle<Result<(), ExecError>>), + Child(Child), + SpawnError(io::Error), +} - let child = match command.spawn() { - Ok(c) => c, - Err(e) => { - let cmd = path.to_string_lossy(); - let msg = match e.kind() { - io::ErrorKind::NotFound => format!("{cmd} not found"), - io::ErrorKind::PermissionDenied => { - format!("{cmd} is not executable") - } - io::ErrorKind::FileTooLarge => format!("{cmd} is too massive"), - io::ErrorKind::ResourceBusy | io::ErrorKind::ExecutableFileBusy => { - format!("{cmd} is busy") - } - io::ErrorKind::TooManyLinks => { - format!("{cmd} could not be resolved") - } - io::ErrorKind::InvalidFilename => { - format!("{cmd} is not a valid file name") - } - io::ErrorKind::ArgumentListTooLong => format!("too many arguments"), - io::ErrorKind::Interrupted => format!("got interrupted"), - io::ErrorKind::Unsupported => format!("{cmd} is not supported"), - e => format!("I am surprised you can get this error here: {e:?}"), - }; - println!("pish: {msg}"); - spawn_error = true; - break; - } - }; - - children.push(child); +impl SpawnedCmd { + fn join(self) -> Result<(), ExecError> { + match self { + SpawnedCmd::Builtin(handle) => handle.join().map_err(|_| ExecError::Panic)??, + SpawnedCmd::Fun(handle) => handle.join().map_err(|_| ExecError::Panic)??, + SpawnedCmd::Child(mut child) => { + let exit_code = child.wait()?; + match exit_code.code() { + Some(0) => (), + Some(x) => Err(ExecError::ExecError(x))?, + None => Err(ExecError::ExecError(-1))?, } + } + SpawnedCmd::SpawnError(err) => Err(err)?, + } + Ok(()) + } +} - CommandKind::Builtin(builtin) => { - let mut input: Box<dyn io::Read + Send> = if let Some(r) = prev_reader.take() { - Box::new(r) - } else { - Box::new(stdin.take().unwrap()) - }; - - let mut output: Box<dyn io::Write + Send> = if let Some(w) = writer { - Box::new(w) - } else { - Box::new(stdout.take().unwrap()) - }; - - // currently only required for `re`, cannot happen in background thread - builtin.special(self.se.clone(), &cmd.args); +impl Executor { + fn spawn_cmd( + &mut self, + cmd: CommandKind, + args: Vec<BString>, + mut stdin: Input, + mut stdout: Output, + ) -> SpawnedCmd { + match cmd { + CommandKind::Builtin(builtin) => { + builtin.special(self.se.clone(), &args[1..]); + let cloned_session = self.se.clone(); + let handle = std::thread::spawn(move || { + builtin.io(cloned_session, &args[1..], &mut stdin, &mut stdout) + }); + SpawnedCmd::Builtin(handle) + } + CommandKind::Fun(ast) => { + let mut this = self.clone(); + this.args = Some(args); - let cloned_session = self.se.clone(); - let handle = std::thread::spawn(move || { - builtin.io(cloned_session, &cmd.args, &mut input, &mut output) - }); + let handle = std::thread::spawn(move || { + let ast = ast.expand(&mut this)?; + this.execute(ast, stdin, stdout)?; + Ok(()) + }); - threads.push(handle); + SpawnedCmd::Fun(handle) + } + CommandKind::Path(path) => { + let mut command = Command::new(&path); + for arg in args.iter().skip(1) { + command.arg(OsStr::from_bytes(arg)); } - CommandKind::Fun(ast) => { - let mut this = self.clone(); - - let mut args = Vec::with_capacity(cmd.args.len() + 1); - args.push(cmd.cmd.clone()); - args.extend_from_slice(&cmd.args); - this.args = Some(args); + command.stdin(stdin); + command.stdout(stdout); + + // TODO: move to export_fun.rs? + if let Some(sr) = self.se.lock().unwrap().socket_running.as_ref() { + let my_path = std::env::var_os("PATH").expect("no PATH - seriously?"); + let mut new_path = sr.path().as_os_str().as_bytes().to_vec(); + new_path.push(b':'); + new_path.extend_from_slice(my_path.as_bytes()); + command.env("PATH", OsStr::from_bytes(&new_path)); + command.env("PISH_SOCKET", sr.socket_path().as_os_str()); + } - let ast = ast.expand(&mut this)?; - this.execute( - ast, - prev_reader.map(|r| Input::Pipe(r)).unwrap_or(Input::Stdin), - writer.map(|w| Output::Pipe(w)).unwrap_or(Output::Stdout), - ) - .unwrap(); + match command.spawn() { + Ok(c) => SpawnedCmd::Child(c), + Err(e) => SpawnedCmd::SpawnError(e), } } - - prev_reader = reader; } + } - if spawn_error { - for child in children.iter_mut() { - if let Err(e) = child.kill() { - println!("failed to kill child - {e:?}"); - } - } - return Err(ExecError::ExecError(127)); + fn execute_pipeline( + &mut self, + pipes: parse::Pipes<parse::PostExpansion>, + stdin: Input, + stdout: Output, + ) -> Result<(), ExecError> { + let mut inputs = Vec::with_capacity(pipes.cmds.len()); + let mut outputs = Vec::with_capacity(pipes.cmds.len()); + let mut executors = Vec::with_capacity(pipes.cmds.len()); + + inputs.push(stdin); + for _ in 1..pipes.cmds.len() { + let (r, w) = io::pipe().unwrap(); + inputs.push(Input::Pipe(r)); + outputs.push(Output::Pipe(w)); } + outputs.push(stdout); - let mut code = 0; - for jh in threads { - match jh.join() { - Ok(Ok(())) => (), - Ok(Err(e)) => match e { - BuiltinError::IO(_) => code = -1, - BuiltinError::Exit(c) => code = c, - }, - Err(_) => code = 127, - } + assert_eq!(pipes.cmds.len(), inputs.len()); + assert_eq!(pipes.cmds.len(), outputs.len()); + + for (mut cmd, (stdin, stdout)) in pipes + .cmds + .into_iter() + .zip(inputs.into_iter().zip(outputs.into_iter())) + { + let dc = get_command_kind(&self.se.lock().unwrap(), &cmd.cmd[..]); + cmd.args.insert(0, cmd.cmd.clone()); + executors.push(self.spawn_cmd(dc, cmd.args, stdin, stdout)); } - for child in children.iter_mut() { - match child.wait() { - Ok(ec) => { - if let Some(c) = ec.code() { - code = c; - } - } - Err(e) => { - println!("failed to wait for child - {e:?}") - } + + let mut code = None; + for e in executors { + if let Some(err) = e.join().err() { + code = Some(err); } } - if code == 0 { - Ok(()) + if let Some(err) = code { + Err(err) } else { - Err(ExecError::ExecError(code)) + Ok(()) } } @@ -384,13 +403,10 @@ pub fn run(se: Arc<Mutex<Session>>, cmd: Vec<u8>) { let status_string = match result { Ok(_) => String::new(), - Err(ExecError::UnknownVariable(var)) => { - format!("unbound variable: {}", String::from_utf8_lossy(&var)) - } - Err(ExecError::ExecError(i)) => i.to_string(), + Err(e) => format!("{}\r\n", e.error_message()), }; - print!("\r{status_string}\r\n{}", se.lock().unwrap().prompt()); + print!("\r{status_string}{}", se.lock().unwrap().prompt()); let _ = std::io::stdout().lock().flush(); } |
