aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorJonas Maier <>2026-03-10 14:47:03 +0100
committerJonas Maier <>2026-03-10 14:47:03 +0100
commit956e67aafd0ca9bf49782e308644590406d11989 (patch)
tree28129ced71c282a67948046ec8a08d0a7ca72587 /src
parent7728b844958bb7882ddf384b40e5711d7a9316ad (diff)
downloadpish-956e67aafd0ca9bf49782e308644590406d11989.tar.gz
correct session directory cleanup
Diffstat (limited to 'src')
-rw-r--r--src/completion.rs1
-rw-r--r--src/export_fun.rs61
2 files changed, 49 insertions, 13 deletions
diff --git a/src/completion.rs b/src/completion.rs
index 3adce90..237e956 100644
--- a/src/completion.rs
+++ b/src/completion.rs
@@ -47,7 +47,6 @@ fn _path_completion(mut prefix: BString) -> io::Result<Vec<Suggestion>> {
}
pub fn path_completion(prefix: BString) -> Vec<Suggestion> {
- eprintln!("path completion request for {}\r\n", String::from_utf8_lossy(&prefix));
match _path_completion(prefix) {
Ok(suggestions) => suggestions,
Err(err) => {
diff --git a/src/export_fun.rs b/src/export_fun.rs
index 3c0f1c9..ecbdf46 100644
--- a/src/export_fun.rs
+++ b/src/export_fun.rs
@@ -1,5 +1,9 @@
//! allow sub-programs to invoke arbitrary user-defined functions via a unix socket
+use nix::poll::PollFd;
+use nix::poll::PollFlags;
+use nix::poll::poll;
+
use crate::Session;
use crate::run::Input;
use crate::run::Output;
@@ -12,6 +16,7 @@ use std::io;
use std::io::IoSliceMut;
use std::io::Read;
use std::io::Write;
+use std::os::fd::AsFd;
use std::os::fd::FromRawFd;
use std::os::fd::OwnedFd;
use std::os::unix::ffi::OsStrExt;
@@ -25,6 +30,9 @@ use std::path::PathBuf;
use std::process::exit;
use std::sync::Arc;
use std::sync::Mutex;
+use std::sync::mpsc;
+use std::sync::mpsc::Receiver;
+use std::sync::mpsc::Sender;
use std::thread;
use std::time::Duration;
@@ -165,6 +173,7 @@ fn unique_string() -> String {
pub struct SocketRunning {
bin_dir: PathBuf,
path: PathBuf,
+ recv: Receiver<()>,
}
impl SocketRunning {
@@ -190,10 +199,13 @@ impl Drop for SocketDropper {
return;
};
- // connect to the socket for a brief moment to make it check that it should terminate
- if let Ok(mut con) = UnixStream::connect(&sr.path) {
- let _ = write!(con, "please shut down :))");
- };
+ // need to unlock since background thread also accesses session
+ drop(se);
+
+ // wait 1s for background to exit
+ if let Err(e) = sr.recv.recv_timeout(Duration::from_secs(1)) {
+ eprintln!("background thread is still running({e:?}, session might not be cleaned up\r");
+ }
}
}
@@ -205,35 +217,60 @@ pub fn listen(session: Arc<Mutex<Session>>) -> impl Drop {
std::fs::create_dir_all(&bin_dir).unwrap();
let socket_path = session_dir.join("cmd.sock");
+ let (send, recv) = mpsc::channel();
+
{
let mut se = session.lock().unwrap();
assert!(se.socket_running.is_none());
se.socket_running = Some(SocketRunning {
bin_dir,
path: socket_path.clone(),
+ recv,
});
}
let se = session.clone();
thread::spawn(move || {
- struct SessionRemover(PathBuf);
+ struct SessionRemover {
+ send: Sender<()>,
+ path: PathBuf,
+ }
impl Drop for SessionRemover {
fn drop(&mut self) {
- let _ = fs::remove_dir_all(&self.0);
+ let _ = fs::remove_dir_all(&self.path);
+ let _ = self.send.send(());
}
}
- let _session_remover = SessionRemover(session_dir);
+ let _session_remover = SessionRemover {
+ path: session_dir,
+ send,
+ };
+
let listener = UnixListener::bind(socket_path).unwrap();
- let mut it = listener.incoming();
- while let Some(stream) = it.next() {
+ listener.set_nonblocking(true).unwrap();
+ let timeout_ms: u16 = 200;
+
+ loop {
+ // poll socket with timeout
+ let mut poll_fds = [PollFd::new(listener.as_fd(), PollFlags::POLLIN)];
+ let is_ready = match poll(&mut poll_fds, timeout_ms) {
+ Ok(0) => false,
+ Ok(_) => true,
+ Err(_) => false,
+ };
+
+ // check if we should terminate
match se.lock() {
Err(_) => break,
Ok(se) if se.socket_running.is_none() => break,
_ => (),
}
- if let Ok(stream) = stream {
- let se = se.clone();
- thread::spawn(move || handle_server(se, stream));
+
+ if is_ready {
+ if let Ok((stream, _addr)) = listener.accept() {
+ let se = se.clone();
+ thread::spawn(move || handle_server(se, stream));
+ }
}
}
});