lua.rs (6624B)
1 use std::{ 2 io::Write, 3 process::Command, 4 sync::{ 5 Arc, RwLock, 6 atomic::{AtomicBool, AtomicI32, Ordering}, 7 }, 8 }; 9 10 use mlua::prelude::*; 11 use nix::{ 12 sys::signal::{Signal, kill}, 13 unistd::Pid, 14 }; 15 use rexpect::session::{PtySession, spawn_command}; 16 use send_wrapper::SendWrapper; 17 use tempfile::NamedTempFile; 18 use thiserror::Error; 19 20 use crate::inspect::format_string_bytes; 21 22 pub trait LuaExecutor: Send + Sync { 23 fn exec(&self, code: &str) -> LuaResult<LuaValue>; 24 fn globals(&self) -> LuaResult<LuaTable>; 25 fn cancel(&self); 26 } 27 28 pub struct MluaExecutor { 29 lua: Lua, 30 cancelled: Arc<AtomicBool>, 31 } 32 33 impl MluaExecutor { 34 pub fn new() -> Self { 35 let lua = Lua::new(); 36 let cancelled = Arc::new(AtomicBool::new(false)); 37 38 let inner_cancelled = cancelled.clone(); 39 lua.set_hook(LuaHookTriggers::EVERY_LINE, move |_lua, _debug| { 40 if inner_cancelled.load(Ordering::Relaxed) { 41 inner_cancelled.store(false, Ordering::Relaxed); 42 43 return Err(LuaError::runtime("cancelled")); 44 } 45 46 Ok(LuaVmState::Continue) 47 }); 48 49 Self { lua, cancelled } 50 } 51 } 52 53 impl LuaExecutor for MluaExecutor { 54 fn exec(&self, code: &str) -> LuaResult<LuaValue> { 55 self.lua.load(code).set_name("=repl").eval() 56 } 57 58 fn globals(&self) -> LuaResult<LuaTable> { 59 Ok(self.lua.globals()) 60 } 61 62 fn cancel(&self) { 63 self.cancelled.store(true, Ordering::Relaxed); 64 } 65 } 66 67 pub struct SystemLuaExecutor { 68 session: RwLock<SendWrapper<PtySession>>, 69 program: String, 70 lua: Lua, 71 72 cancellation_file: RwLock<Option<NamedTempFile>>, 73 pid: AtomicI32, 74 is_stopping: AtomicBool, 75 } 76 77 #[derive(Debug, Error)] 78 pub enum SystemLuaError { 79 #[error("lua error: {0}")] 80 Lua(#[from] LuaError), 81 #[error("expect error: {0}")] 82 Expect(#[from] rexpect::error::Error), 83 #[error("io error: {0}")] 84 Io(#[from] std::io::Error), 85 #[error("restarted system Lua")] 86 Restarted, 87 #[error("runtime error")] 88 RuntimeError(String), 89 } 90 91 enum RpcCommand { 92 Globals, 93 Exec(String), 94 Prepare(String), 95 } 96 97 impl RpcCommand { 98 pub fn to_lua(&self) -> String { 99 match self { 100 Self::Globals => String::from("globals"), 101 Self::Exec(code) => format!("exec:{}", format_string_bytes(code.as_bytes(), false)), 102 Self::Prepare(file) => format!("prepare:{file}"), 103 } 104 } 105 } 106 107 const RPC_CODE: &str = include_str!("../lua/rpc.lua"); 108 109 impl SystemLuaExecutor { 110 pub fn new(program: &str) -> Result<Self, SystemLuaError> { 111 let (session, file) = Self::obtain_session(program)?; 112 let pid = session.process.child_pid.as_raw(); 113 114 Ok(Self { 115 session: RwLock::new(SendWrapper::new(session)), 116 program: program.to_string(), 117 lua: unsafe { Lua::unsafe_new() }, 118 cancellation_file: RwLock::new(file), 119 pid: AtomicI32::new(pid), 120 is_stopping: AtomicBool::new(false), 121 }) 122 } 123 124 fn obtain_session( 125 program: &str, 126 ) -> Result<(PtySession, Option<NamedTempFile>), SystemLuaError> { 127 let mut cmd = Command::new(program); 128 129 cmd.arg("-e"); 130 cmd.arg(RPC_CODE); 131 132 let mut session = spawn_command(cmd, None)?; 133 134 // TODO; should this be in our cache/run dir? 135 let file = NamedTempFile::new()?; 136 137 let prepare = RpcCommand::Prepare(file.path().to_string_lossy().to_string()); 138 139 let cmd = prepare.to_lua(); 140 session.send_line(&cmd)?; 141 142 let lua = Lua::new(); 143 144 loop { 145 let code = session.read_line()?; 146 147 if let Ok(prepare_result) = lua.load(&code).eval::<LuaTable>() { 148 if prepare_result.get::<bool>("data")? { 149 return Ok((session, Some(file))); 150 } else { 151 return Ok((session, None)); 152 } 153 } 154 } 155 } 156 157 fn restart_process(&self, session: &mut SendWrapper<PtySession>) -> Result<(), SystemLuaError> { 158 let (pty, file) = Self::obtain_session(&self.program)?; 159 self.pid 160 .store(pty.process.child_pid.as_raw(), Ordering::Relaxed); 161 162 *session = SendWrapper::new(pty); 163 164 let mut cancellation_file = self 165 .cancellation_file 166 .write() 167 .expect("write cancellation_file"); 168 *cancellation_file = file; 169 170 Ok(()) 171 } 172 173 fn request(&self, command: RpcCommand) -> Result<LuaTable, SystemLuaError> { 174 self.is_stopping.store(false, Ordering::Relaxed); 175 176 let mut session = self.session.write().expect("write process"); 177 178 let cmd = command.to_lua(); 179 180 if session.send_line(&cmd).is_err() { 181 // killed 182 self.restart_process(&mut session)?; 183 184 return Err(SystemLuaError::Restarted); 185 } 186 187 loop { 188 let code = match session.read_line() { 189 Ok(code) => code, 190 Err(rexpect::error::Error::EOF { .. }) => { 191 self.restart_process(&mut session)?; 192 193 return Err(SystemLuaError::Restarted); 194 } 195 x => x?, 196 }; 197 198 if let Ok(res) = self.lua.load(&code).eval::<LuaTable>() { 199 if res.get::<String>("ty")? == "error" { 200 return Err(SystemLuaError::RuntimeError(res.get("data")?)); 201 }; 202 203 return Ok(res); 204 } else { 205 println!("{}", &code); 206 } 207 } 208 } 209 } 210 211 impl LuaExecutor for SystemLuaExecutor { 212 fn exec(&self, code: &str) -> LuaResult<LuaValue> { 213 self.request(RpcCommand::Exec(code.to_string())) 214 .map_err(LuaError::external)? 215 .get("data") 216 } 217 218 fn globals(&self) -> LuaResult<LuaTable> { 219 self.request(RpcCommand::Globals) 220 .map_err(LuaError::external)? 221 .get("data") 222 } 223 224 fn cancel(&self) { 225 let mut cancellation_file = self 226 .cancellation_file 227 .write() 228 .expect("write cancellation_file"); 229 230 if !self.is_stopping.load(Ordering::Relaxed) { 231 self.is_stopping.store(true, Ordering::Relaxed); 232 233 if let Some(file) = cancellation_file.as_mut() { 234 if file.write_all(b"stop").is_ok() && file.flush().is_ok() { 235 return; 236 } 237 } 238 } 239 240 // Restart process 241 let pid = self.pid.load(Ordering::Relaxed); 242 let _ = kill(Pid::from_raw(pid), Signal::SIGKILL); 243 } 244 }