aboutsummaryrefslogtreecommitdiffstats
path: root/src/run
diff options
context:
space:
mode:
Diffstat (limited to 'src/run')
-rw-r--r--src/run/mod.rs209
1 files changed, 113 insertions, 96 deletions
diff --git a/src/run/mod.rs b/src/run/mod.rs
index 6af1717..6046a12 100644
--- a/src/run/mod.rs
+++ b/src/run/mod.rs
@@ -1,9 +1,10 @@
+use nix::spawn;
+
+use crate::rw::*;
use std::collections::HashMap;
-use std::fs::File;
-use std::io::{PipeReader, PipeWriter};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
-use std::thread::JoinHandle;
+use std::thread::Thread;
use crate::parse::{self, Ast, PostExpansion, PreExpansion};
use crate::wait::{ChildWaiter, ThreadWaiter};
@@ -86,64 +87,9 @@ pub struct Executor {
expand_commands: bool,
}
-pub enum Input {
- Stdin,
- Pipe(PipeReader),
- File(File),
-}
-
-pub enum Output {
- Stdout,
- Pipe(PipeWriter),
- File(File),
-}
-
-impl From<Input> for Stdio {
- fn from(value: Input) -> Self {
- match value {
- Input::Stdin => Stdio::inherit(),
- Input::Pipe(reader) => reader.into(),
- Input::File(file) => file.into(),
- }
- }
-}
-
-impl From<Output> for Stdio {
- fn from(value: Output) -> Stdio {
- match value {
- Output::Stdout => Stdio::inherit(),
- Output::Pipe(writer) => writer.into(),
- Output::File(file) => file.into(),
- }
- }
-}
-
-impl Read for Input {
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- match self {
- Input::Stdin => io::stdin().read(buf),
- Input::Pipe(reader) => reader.read(buf),
- Input::File(file) => file.read(buf),
- }
- }
-}
-
-impl Write for Output {
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- match self {
- Output::Stdout => io::stdout().write(buf),
- Output::Pipe(writer) => writer.write(buf),
- Output::File(file) => file.write(buf),
- }
- }
-
- fn flush(&mut self) -> io::Result<()> {
- match self {
- Output::Stdout => io::stdout().flush(),
- Output::Pipe(writer) => writer.flush(),
- Output::File(file) => file.flush(),
- }
- }
+struct SpawnedPipeline {
+ executors: Vec<SpawnedCmd>,
+ cancel: Vec<Canceler>,
}
enum SpawnedCmd {
@@ -152,6 +98,7 @@ enum SpawnedCmd {
Child(ChildWaiter),
SpawnError(io::Error),
Joined(Result<(), ExecError>),
+ Pipeline(SpawnedPipeline),
}
impl SpawnedCmd {
@@ -174,6 +121,18 @@ impl SpawnedCmd {
}
SpawnedCmd::SpawnError(err) => Err(err)?,
SpawnedCmd::Joined(res) => res?,
+ SpawnedCmd::Pipeline(pipes) => {
+ let joined: Vec<_> = pipes
+ .executors
+ .into_iter()
+ .filter_map(|e| e.join().err())
+ .collect();
+ match joined.len() {
+ 0 => (),
+ 1 => return Err(joined.into_iter().next().unwrap()),
+ _ => return Err(ExecError::ErrorStack(joined)),
+ }
+ }
}
Ok(())
}
@@ -189,11 +148,45 @@ impl SpawnedCmd {
},
SpawnedCmd::SpawnError(_) => true,
SpawnedCmd::Joined(_) => true,
+ SpawnedCmd::Pipeline(pipes) => pipes
+ .executors
+ .iter_mut()
+ .all(|e| e.join_timeout(timeout_ms)),
+ }
+ }
+
+ fn cancel(&mut self) {
+ match self {
+ SpawnedCmd::Pipeline(pipes) => {
+ for c in pipes.cancel.iter_mut() {
+ c.cancel();
+ }
+ }
+ SpawnedCmd::Builtin(_)
+ | SpawnedCmd::Fun(_)
+ | SpawnedCmd::Child(_)
+ | SpawnedCmd::SpawnError(_)
+ | SpawnedCmd::Joined(_) => (),
}
}
}
impl Executor {
+ fn exec_loop(&mut self, mut s: SpawnedCmd, cs: &mut [Canceler]) -> Result<(), ExecError> {
+ while !s.join_timeout(50) {
+ if let Ok(mut se) = self.se.try_lock()
+ && ctrlc::pop(&mut se)
+ {
+ for c in cs.iter_mut() {
+ c.cancel();
+ }
+ s.cancel();
+ break;
+ }
+ }
+ s.join()
+ }
+
pub fn new_for_completion(se: Arc<Mutex<Session>>) -> Self {
Self {
se,
@@ -206,8 +199,8 @@ impl Executor {
&mut self,
cmd: CommandKind,
args: Vec<BString>,
- mut stdin: Input,
- mut stdout: Output,
+ mut stdin: InputReader,
+ mut stdout: OutputWriter,
) -> SpawnedCmd {
match cmd {
CommandKind::Builtin(builtin) => {
@@ -236,8 +229,8 @@ impl Executor {
command.arg(OsStr::from_bytes(arg));
}
- command.stdin(stdin);
- command.stdout(stdout);
+ command.stdin(Input::from(stdin));
+ command.stdout(Output::from(stdout));
crate::export_fun::prepare_command(self.se.clone(), &mut command);
@@ -252,18 +245,23 @@ impl Executor {
fn execute_pipeline(
&mut self,
pipes: parse::Pipes<parse::PostExpansion>,
- stdin: Input,
- stdout: Output,
- ) -> Result<(), ExecError> {
+ stdin: InputReader,
+ stdout: OutputWriter,
+ ) -> SpawnedCmd {
let mut inputs = Vec::with_capacity(pipes.cmds.len());
let mut outputs = Vec::with_capacity(pipes.cmds.len());
+ let mut cancel = Vec::with_capacity(pipes.cmds.len() * 2);
let mut executors = Vec::with_capacity(pipes.cmds.len());
inputs.push(stdin);
for _ in 1..pipes.cmds.len() {
let (r, w) = io::pipe().unwrap();
- inputs.push(Input::Pipe(r));
- outputs.push(Output::Pipe(w));
+ let (i, c1) = InputReader::new(Input::Pipe(r));
+ let (o, c2) = OutputWriter::new(Output::Pipe(w));
+ inputs.push(i);
+ outputs.push(o);
+ cancel.push(c1);
+ cancel.push(c2);
}
outputs.push(stdout);
@@ -280,26 +278,15 @@ impl Executor {
executors.push(self.spawn_cmd(dc, cmd.args, stdin, stdout));
}
- let mut code = None;
- for e in executors {
- if let Some(err) = e.join().err() {
- code = Some(err);
- }
- }
-
- if let Some(err) = code {
- Err(err)
- } else {
- Ok(())
- }
+ SpawnedCmd::Pipeline(SpawnedPipeline { executors, cancel })
}
- fn execute_var_assign(&mut self, va: parse::VarAssign<PostExpansion>) -> Result<(), ExecError> {
+ fn execute_var_assign(&mut self, va: parse::VarAssign<PostExpansion>) -> SpawnedCmd {
self.se.lock().unwrap().vars.insert(va.var, va.val);
- Ok(())
+ SpawnedCmd::Joined(Ok(()))
}
- fn execute_fun_decl(&mut self, fd: parse::FunDecl<PostExpansion>) -> Result<(), ExecError> {
+ fn execute_fun_decl(&mut self, fd: parse::FunDecl<PostExpansion>) -> SpawnedCmd {
self.se
.lock()
.unwrap()
@@ -307,15 +294,15 @@ impl Executor {
.insert(fd.name.clone(), *fd.body.body);
crate::export_fun::create_function_hook(self.se.clone(), &fd.name);
// TODO: very ugly to ad-hoc keep export stuff & session data in sync here
- Ok(())
+ SpawnedCmd::Joined(Ok(()))
}
fn execute(
&mut self,
ast: Ast<parse::PostExpansion>,
- stdin: Input,
- stdout: Output,
- ) -> Result<(), ExecError> {
+ stdin: InputReader,
+ stdout: OutputWriter,
+ ) -> SpawnedCmd {
match ast {
Ast::FunDecl(fd) => self.execute_fun_decl(fd),
Ast::VarAssign(va) => self.execute_var_assign(va),
@@ -326,9 +313,9 @@ impl Executor {
pub fn execute_fun(
session: Arc<Mutex<Session>>,
invoke: Vec<BString>,
- stdin: Input,
- stdout: Output,
- ) -> Result<(), ExecError> {
+ stdin: InputReader,
+ stdout: OutputWriter,
+ ) -> SpawnedCmd {
let mut this = Self {
se: session,
args: None,
@@ -389,10 +376,26 @@ impl parse::Expander for Executor {
}
let (stdin, _) = io::pipe().unwrap();
+ let (stdin, mut c1) = InputReader::new(Input::Pipe(stdin));
let (mut expansion, stdout) = io::pipe().unwrap();
+ let (stdout, mut c2) = OutputWriter::new(Output::Pipe(stdout));
let mut this = self.clone();
- let t =
- std::thread::spawn(move || this.execute(ast, Input::Pipe(stdin), Output::Pipe(stdout)));
+
+ let se = self.se.clone();
+ let t = std::thread::spawn(move || {
+ let mut spawned = this.execute(ast, stdin, stdout);
+ while !spawned.join_timeout(50) {
+ if let Ok(mut se) = se.try_lock()
+ && ctrlc::pop(&mut se)
+ {
+ c1.cancel();
+ c2.cancel();
+ spawned.cancel();
+ break;
+ }
+ }
+ spawned.join()
+ });
let mut buf = Vec::new();
expansion.read_to_end(&mut buf).unwrap();
t.join().unwrap()?;
@@ -405,12 +408,26 @@ impl parse::Expander for Executor {
fn exec(se: Arc<Mutex<Session>>, ast: Ast<PreExpansion>) -> Result<(), ExecError> {
let mut exec = Executor {
- se,
+ se: se.clone(),
args: None,
expand_commands: true,
};
let ast = ast.expand(&mut exec)?;
- exec.execute(ast, Input::Stdin, Output::Stdout)
+ let (stdin, mut c1) = InputReader::new(Input::Stdin);
+ let (stdout, mut c2) = OutputWriter::new(Output::Stdout);
+ let mut spawned = exec.execute(ast, stdin, stdout);
+ ctrlc::pop(&mut *se.lock().unwrap());
+ while !spawned.join_timeout(50) {
+ if let Ok(mut se) = se.try_lock()
+ && ctrlc::pop(&mut se)
+ {
+ c1.cancel();
+ c2.cancel();
+ spawned.cancel();
+ break;
+ }
+ }
+ spawned.join()
}
pub fn run(se: Arc<Mutex<Session>>, cmd: Vec<u8>) {