aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJonas Maier <jonas@x77.dev>2026-03-10 10:00:59 +0100
committerJonas Maier <jonas@x77.dev>2026-03-10 10:00:59 +0100
commitd10a1b087458bad2b5dbcc7ee96518c3b3f674ec (patch)
treed5c3981fe669919bdbcba9c49325209171810f17
parent5d8d9f07669cdc8ac17c866edb6a8c07bbe4221c (diff)
downloadpish-d10a1b087458bad2b5dbcc7ee96518c3b3f674ec.tar.gz
rework command executor
-rw-r--r--src/export_fun.rs2
-rw-r--r--src/run/mod.rs326
2 files changed, 172 insertions, 156 deletions
diff --git a/src/export_fun.rs b/src/export_fun.rs
index 0e0f33a..3c0f1c9 100644
--- a/src/export_fun.rs
+++ b/src/export_fun.rs
@@ -90,8 +90,8 @@ fn handle_server(session: Arc<Mutex<Session>>, mut stream: UnixStream) -> io::Re
let exit_code = match res {
Ok(_) => 0,
Err(e) => match e {
- crate::run::ExecError::UnknownVariable(_) => -3,
crate::run::ExecError::ExecError(x) => x,
+ _ => -3,
},
};
diff --git a/src/run/mod.rs b/src/run/mod.rs
index e041066..757f4cc 100644
--- a/src/run/mod.rs
+++ b/src/run/mod.rs
@@ -2,7 +2,9 @@ use std::collections::HashMap;
use std::fs::File;
use std::io::{PipeReader, PipeWriter};
use std::path::PathBuf;
+use std::process::Child;
use std::sync::{Arc, Mutex};
+use std::thread::JoinHandle;
use crate::parse::{self, Ast, PostExpansion, PreExpansion};
use crate::*;
@@ -13,6 +15,68 @@ mod builtin;
pub enum ExecError {
UnknownVariable(BString),
ExecError(i32),
+ SpawnIO(String, std::io::Error),
+ IO(std::io::Error),
+ ErrorStack(Vec<ExecError>),
+ Panic,
+}
+
+impl ExecError {
+ pub fn error_message(&self) -> String {
+ match self {
+ ExecError::UnknownVariable(items) => {
+ format!("unknown variable: {}", String::from_utf8_lossy(&items))
+ }
+ ExecError::ExecError(exit_code) => format!("{exit_code}"),
+ ExecError::SpawnIO(cmd, error) => match error.kind() {
+ io::ErrorKind::NotFound => format!("{cmd} not found"),
+ io::ErrorKind::PermissionDenied => {
+ format!("{cmd} is not executable")
+ }
+ io::ErrorKind::FileTooLarge => format!("{cmd} is too massive"),
+ io::ErrorKind::ResourceBusy | io::ErrorKind::ExecutableFileBusy => {
+ format!("{cmd} is busy")
+ }
+ io::ErrorKind::TooManyLinks => {
+ format!("{cmd} could not be resolved")
+ }
+ io::ErrorKind::InvalidFilename => {
+ format!("{cmd} is not a valid file name")
+ }
+ io::ErrorKind::ArgumentListTooLong => format!("too many arguments"),
+ io::ErrorKind::Interrupted => format!("got interrupted"),
+ io::ErrorKind::Unsupported => format!("{cmd} is not supported"),
+ e => format!("I am surprised you can get this error here: {e:?}"),
+ },
+ ExecError::IO(error) => format!("{error:?}"),
+ ExecError::Panic => format!("worker thread panicked"),
+ ExecError::ErrorStack(stack) => {
+ let mut out = String::new();
+ for e in stack.iter() {
+ if !out.is_empty() {
+ out += "\n";
+ }
+ out += &e.error_message();
+ }
+ out
+ }
+ }
+ }
+}
+
+impl From<BuiltinError> for ExecError {
+ fn from(value: BuiltinError) -> Self {
+ match value {
+ BuiltinError::IO(error) => Self::IO(error),
+ BuiltinError::Exit(code) => Self::ExecError(code),
+ }
+ }
+}
+
+impl From<std::io::Error> for ExecError {
+ fn from(value: std::io::Error) -> Self {
+ Self::IO(value)
+ }
}
#[derive(Clone)]
@@ -81,175 +145,130 @@ impl Write for Output {
}
}
-impl Executor {
- fn execute_pipeline(
- &mut self,
- pipes: parse::Pipes<parse::PostExpansion>,
- stdin: Input,
- stdout: Output,
- ) -> Result<(), ExecError> {
- let mut children = Vec::new();
- let mut threads = Vec::new();
- let mut prev_reader = None;
- let mut spawn_error = false;
-
- let mut stdin = Some(stdin);
- let mut stdout = Some(stdout);
-
- let pipelen = pipes.cmds.len();
- for (i, cmd) in pipes.cmds.into_iter().enumerate() {
- let (reader, writer) = if i < pipelen - 1 {
- let (r, w) = io::pipe().unwrap();
- (Some(r), Some(w))
- } else {
- (None, None)
- };
-
- let dc = get_command_kind(&self.se.lock().unwrap(), &cmd.cmd[..]);
-
- match dc {
- CommandKind::Path(path) => {
- let mut command = Command::new(&path);
- for arg in cmd.args.iter() {
- command.arg(OsStr::from_bytes(arg));
- }
-
- if let Some(r) = prev_reader.take() {
- command.stdin(Stdio::from(r));
- } else {
- command.stdin(stdin.take().unwrap());
- }
-
- if let Some(w) = writer {
- command.stdout(Stdio::from(w));
- } else {
- command.stdout(stdout.take().unwrap());
- }
-
- if let Some(sr) = self.se.lock().unwrap().socket_running.as_ref() {
- let my_path = std::env::var_os("PATH").expect("no PATH - seriously?");
- let mut new_path = sr.path().as_os_str().as_bytes().to_vec();
- new_path.push(b':');
- new_path.extend_from_slice(my_path.as_bytes());
- command.env("PATH", OsStr::from_bytes(&new_path));
- command.env("PISH_SOCKET", sr.socket_path().as_os_str());
- }
+enum SpawnedCmd {
+ Builtin(JoinHandle<Result<(), BuiltinError>>),
+ Fun(JoinHandle<Result<(), ExecError>>),
+ Child(Child),
+ SpawnError(io::Error),
+}
- let child = match command.spawn() {
- Ok(c) => c,
- Err(e) => {
- let cmd = path.to_string_lossy();
- let msg = match e.kind() {
- io::ErrorKind::NotFound => format!("{cmd} not found"),
- io::ErrorKind::PermissionDenied => {
- format!("{cmd} is not executable")
- }
- io::ErrorKind::FileTooLarge => format!("{cmd} is too massive"),
- io::ErrorKind::ResourceBusy | io::ErrorKind::ExecutableFileBusy => {
- format!("{cmd} is busy")
- }
- io::ErrorKind::TooManyLinks => {
- format!("{cmd} could not be resolved")
- }
- io::ErrorKind::InvalidFilename => {
- format!("{cmd} is not a valid file name")
- }
- io::ErrorKind::ArgumentListTooLong => format!("too many arguments"),
- io::ErrorKind::Interrupted => format!("got interrupted"),
- io::ErrorKind::Unsupported => format!("{cmd} is not supported"),
- e => format!("I am surprised you can get this error here: {e:?}"),
- };
- println!("pish: {msg}");
- spawn_error = true;
- break;
- }
- };
-
- children.push(child);
+impl SpawnedCmd {
+ fn join(self) -> Result<(), ExecError> {
+ match self {
+ SpawnedCmd::Builtin(handle) => handle.join().map_err(|_| ExecError::Panic)??,
+ SpawnedCmd::Fun(handle) => handle.join().map_err(|_| ExecError::Panic)??,
+ SpawnedCmd::Child(mut child) => {
+ let exit_code = child.wait()?;
+ match exit_code.code() {
+ Some(0) => (),
+ Some(x) => Err(ExecError::ExecError(x))?,
+ None => Err(ExecError::ExecError(-1))?,
}
+ }
+ SpawnedCmd::SpawnError(err) => Err(err)?,
+ }
+ Ok(())
+ }
+}
- CommandKind::Builtin(builtin) => {
- let mut input: Box<dyn io::Read + Send> = if let Some(r) = prev_reader.take() {
- Box::new(r)
- } else {
- Box::new(stdin.take().unwrap())
- };
-
- let mut output: Box<dyn io::Write + Send> = if let Some(w) = writer {
- Box::new(w)
- } else {
- Box::new(stdout.take().unwrap())
- };
-
- // currently only required for `re`, cannot happen in background thread
- builtin.special(self.se.clone(), &cmd.args);
+impl Executor {
+ fn spawn_cmd(
+ &mut self,
+ cmd: CommandKind,
+ args: Vec<BString>,
+ mut stdin: Input,
+ mut stdout: Output,
+ ) -> SpawnedCmd {
+ match cmd {
+ CommandKind::Builtin(builtin) => {
+ builtin.special(self.se.clone(), &args[1..]);
+ let cloned_session = self.se.clone();
+ let handle = std::thread::spawn(move || {
+ builtin.io(cloned_session, &args[1..], &mut stdin, &mut stdout)
+ });
+ SpawnedCmd::Builtin(handle)
+ }
+ CommandKind::Fun(ast) => {
+ let mut this = self.clone();
+ this.args = Some(args);
- let cloned_session = self.se.clone();
- let handle = std::thread::spawn(move || {
- builtin.io(cloned_session, &cmd.args, &mut input, &mut output)
- });
+ let handle = std::thread::spawn(move || {
+ let ast = ast.expand(&mut this)?;
+ this.execute(ast, stdin, stdout)?;
+ Ok(())
+ });
- threads.push(handle);
+ SpawnedCmd::Fun(handle)
+ }
+ CommandKind::Path(path) => {
+ let mut command = Command::new(&path);
+ for arg in args.iter().skip(1) {
+ command.arg(OsStr::from_bytes(arg));
}
- CommandKind::Fun(ast) => {
- let mut this = self.clone();
-
- let mut args = Vec::with_capacity(cmd.args.len() + 1);
- args.push(cmd.cmd.clone());
- args.extend_from_slice(&cmd.args);
- this.args = Some(args);
+ command.stdin(stdin);
+ command.stdout(stdout);
+
+ // TODO: move to export_fun.rs?
+ if let Some(sr) = self.se.lock().unwrap().socket_running.as_ref() {
+ let my_path = std::env::var_os("PATH").expect("no PATH - seriously?");
+ let mut new_path = sr.path().as_os_str().as_bytes().to_vec();
+ new_path.push(b':');
+ new_path.extend_from_slice(my_path.as_bytes());
+ command.env("PATH", OsStr::from_bytes(&new_path));
+ command.env("PISH_SOCKET", sr.socket_path().as_os_str());
+ }
- let ast = ast.expand(&mut this)?;
- this.execute(
- ast,
- prev_reader.map(|r| Input::Pipe(r)).unwrap_or(Input::Stdin),
- writer.map(|w| Output::Pipe(w)).unwrap_or(Output::Stdout),
- )
- .unwrap();
+ match command.spawn() {
+ Ok(c) => SpawnedCmd::Child(c),
+ Err(e) => SpawnedCmd::SpawnError(e),
}
}
-
- prev_reader = reader;
}
+ }
- if spawn_error {
- for child in children.iter_mut() {
- if let Err(e) = child.kill() {
- println!("failed to kill child - {e:?}");
- }
- }
- return Err(ExecError::ExecError(127));
+ fn execute_pipeline(
+ &mut self,
+ pipes: parse::Pipes<parse::PostExpansion>,
+ stdin: Input,
+ stdout: Output,
+ ) -> Result<(), ExecError> {
+ let mut inputs = Vec::with_capacity(pipes.cmds.len());
+ let mut outputs = Vec::with_capacity(pipes.cmds.len());
+ let mut executors = Vec::with_capacity(pipes.cmds.len());
+
+ inputs.push(stdin);
+ for _ in 1..pipes.cmds.len() {
+ let (r, w) = io::pipe().unwrap();
+ inputs.push(Input::Pipe(r));
+ outputs.push(Output::Pipe(w));
}
+ outputs.push(stdout);
- let mut code = 0;
- for jh in threads {
- match jh.join() {
- Ok(Ok(())) => (),
- Ok(Err(e)) => match e {
- BuiltinError::IO(_) => code = -1,
- BuiltinError::Exit(c) => code = c,
- },
- Err(_) => code = 127,
- }
+ assert_eq!(pipes.cmds.len(), inputs.len());
+ assert_eq!(pipes.cmds.len(), outputs.len());
+
+ for (mut cmd, (stdin, stdout)) in pipes
+ .cmds
+ .into_iter()
+ .zip(inputs.into_iter().zip(outputs.into_iter()))
+ {
+ let dc = get_command_kind(&self.se.lock().unwrap(), &cmd.cmd[..]);
+ cmd.args.insert(0, cmd.cmd.clone());
+ executors.push(self.spawn_cmd(dc, cmd.args, stdin, stdout));
}
- for child in children.iter_mut() {
- match child.wait() {
- Ok(ec) => {
- if let Some(c) = ec.code() {
- code = c;
- }
- }
- Err(e) => {
- println!("failed to wait for child - {e:?}")
- }
+
+ let mut code = None;
+ for e in executors {
+ if let Some(err) = e.join().err() {
+ code = Some(err);
}
}
- if code == 0 {
- Ok(())
+ if let Some(err) = code {
+ Err(err)
} else {
- Err(ExecError::ExecError(code))
+ Ok(())
}
}
@@ -384,13 +403,10 @@ pub fn run(se: Arc<Mutex<Session>>, cmd: Vec<u8>) {
let status_string = match result {
Ok(_) => String::new(),
- Err(ExecError::UnknownVariable(var)) => {
- format!("unbound variable: {}", String::from_utf8_lossy(&var))
- }
- Err(ExecError::ExecError(i)) => i.to_string(),
+ Err(e) => format!("{}\r\n", e.error_message()),
};
- print!("\r{status_string}\r\n{}", se.lock().unwrap().prompt());
+ print!("\r{status_string}{}", se.lock().unwrap().prompt());
let _ = std::io::stdout().lock().flush();
}