use std::{
panic,
path::{Path, PathBuf},
thread::{self, JoinHandle},
time::Instant,
};
#[cfg(feature = "ipc")]
use std::time::Duration;
use zng_txt::Txt;
use crate::{ipc, AnyResult, Event, Request, Response, ViewConfig, ViewProcessGen, ViewProcessOffline, VpResult};
type EventListenerJoin = JoinHandle<Box<dyn FnMut(Event) + Send>>;
#[cfg(feature = "ipc")]
type DuctHandle = duct::Handle;
#[cfg(not(feature = "ipc"))]
struct DuctHandle;
pub(crate) const VIEW_VERSION: &str = "ZNG_VIEW_VERSION";
pub(crate) const VIEW_SERVER: &str = "ZNG_VIEW_SERVER";
pub(crate) const VIEW_MODE: &str = "ZNG_VIEW_MODE";
#[cfg_attr(not(feature = "ipc"), allow(unused))]
pub struct Controller {
process: Option<DuctHandle>,
online: bool,
generation: ViewProcessGen,
is_respawn: bool,
view_process_exe: PathBuf,
request_sender: ipc::RequestSender,
response_receiver: ipc::ResponseReceiver,
event_listener: Option<EventListenerJoin>,
headless: bool,
same_process: bool,
device_events: bool,
last_respawn: Option<Instant>,
fast_respawn_count: u8,
}
#[cfg(test)]
fn _assert_sync(x: Controller) -> impl Send + Sync {
x
}
impl Controller {
pub fn start<F>(view_process_exe: Option<PathBuf>, device_events: bool, headless: bool, on_event: F) -> Self
where
F: FnMut(Event) + Send + 'static,
{
Self::start_impl(view_process_exe, device_events, headless, Box::new(on_event))
}
fn start_impl(
view_process_exe: Option<PathBuf>,
device_events: bool,
headless: bool,
mut on_event: Box<dyn FnMut(Event) + Send>,
) -> Self {
if ViewConfig::from_env().is_some() {
panic!("cannot start Controller in process configured to be view-process");
}
let view_process_exe = view_process_exe.unwrap_or_else(|| {
std::env::current_exe()
.and_then(|p| p.canonicalize())
.expect("failed to get the current exe")
});
let (process, request_sender, response_receiver, mut event_receiver) =
Self::spawn_view_process(&view_process_exe, headless).expect("failed to spawn or connect to view-process");
let ev = thread::spawn(move || {
while let Ok(ev) = event_receiver.recv() {
on_event(ev);
}
on_event(Event::Disconnected(ViewProcessGen::first()));
on_event
});
let mut c = Controller {
same_process: process.is_none(),
online: false,
process,
view_process_exe,
request_sender,
response_receiver,
event_listener: Some(ev),
headless,
device_events,
generation: ViewProcessGen::first(),
is_respawn: false,
last_respawn: None,
fast_respawn_count: 0,
};
if let Err(ViewProcessOffline) = c.try_init() {
panic!("respawn on init");
}
c
}
fn try_init(&mut self) -> VpResult<()> {
self.init(self.generation, self.is_respawn, self.device_events, self.headless)?;
Ok(())
}
pub fn online(&self) -> bool {
self.online
}
pub fn generation(&self) -> ViewProcessGen {
self.generation
}
pub fn headless(&self) -> bool {
self.headless
}
pub fn device_events(&self) -> bool {
self.device_events
}
pub fn same_process(&self) -> bool {
self.same_process
}
fn offline_err(&self) -> Result<(), ViewProcessOffline> {
if self.online {
Ok(())
} else {
Err(ViewProcessOffline)
}
}
fn try_talk(&mut self, req: Request) -> ipc::IpcResult<Response> {
self.request_sender.send(req)?;
self.response_receiver.recv()
}
pub(crate) fn talk(&mut self, req: Request) -> VpResult<Response> {
debug_assert!(req.expect_response());
if req.must_be_online() {
self.offline_err()?;
}
match self.try_talk(req) {
Ok(r) => Ok(r),
Err(ipc::Disconnected) => {
self.handle_disconnect(self.generation);
Err(ViewProcessOffline)
}
}
}
pub(crate) fn command(&mut self, req: Request) -> VpResult<()> {
debug_assert!(!req.expect_response());
if req.must_be_online() {
self.offline_err()?;
}
match self.request_sender.send(req) {
Ok(_) => Ok(()),
Err(ipc::Disconnected) => {
self.handle_disconnect(self.generation);
Err(ViewProcessOffline)
}
}
}
fn spawn_view_process(
view_process_exe: &Path,
headless: bool,
) -> AnyResult<(Option<DuctHandle>, ipc::RequestSender, ipc::ResponseReceiver, ipc::EventReceiver)> {
let _span = tracing::trace_span!("spawn_view_process").entered();
let init = ipc::AppInit::new();
let process = if ViewConfig::is_awaiting_same_process() {
ViewConfig::set_same_process(ViewConfig {
version: crate::VERSION.into(),
server_name: Txt::from_str(init.name()),
headless,
});
None
} else {
#[cfg(not(feature = "ipc"))]
{
let _ = view_process_exe;
panic!("expected only same_process mode with `ipc` feature disabled");
}
#[cfg(feature = "ipc")]
{
let process = duct::cmd!(view_process_exe)
.env(VIEW_VERSION, crate::VERSION)
.env(VIEW_SERVER, init.name())
.env(VIEW_MODE, if headless { "headless" } else { "headed" })
.env("RUST_BACKTRACE", "full")
.stdin_null()
.stdout_capture()
.stderr_capture()
.unchecked()
.start()?;
Some(process)
}
};
let (req, rsp, ev) = match init.connect() {
Ok(r) => r,
Err(e) => {
#[cfg(feature = "ipc")]
if let Some(p) = process {
if let Err(ke) = p.kill() {
tracing::error!(
"failed to kill new view-process after failing to connect to it\n connection error: {e:?}\n kill error: {ke:?}",
);
}
}
return Err(e);
}
};
Ok((process, req, rsp, ev))
}
pub fn handle_inited(&mut self, gen: ViewProcessGen) {
if gen == self.generation {
self.online = true;
}
}
pub fn handle_disconnect(&mut self, gen: ViewProcessGen) {
if gen == self.generation {
#[cfg(not(feature = "ipc"))]
{
tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
}
#[cfg(feature = "ipc")]
{
self.respawn_impl(true)
}
}
}
pub fn respawn(&mut self) {
#[cfg(not(feature = "ipc"))]
{
tracing::error!(target: "vp_respawn", "cannot recover in same_process mode (no ipc)");
}
#[cfg(feature = "ipc")]
self.respawn_impl(false);
}
#[cfg(feature = "ipc")]
fn respawn_impl(&mut self, is_crash: bool) {
use zng_unit::TimeUnits;
self.online = false;
self.is_respawn = true;
let process = if let Some(p) = self.process.take() {
p
} else {
if self.same_process {
tracing::error!(target: "vp_respawn", "cannot recover in same_process mode");
}
return;
};
if is_crash {
tracing::error!(target: "vp_respawn", "channel disconnect, will try respawn");
}
if is_crash {
let t = Instant::now();
if let Some(last_respawn) = self.last_respawn {
if t - last_respawn < Duration::from_secs(60) {
self.fast_respawn_count += 1;
if self.fast_respawn_count == 2 {
panic!("disconnect respawn happened 2 times less than 1 minute apart");
}
} else {
self.fast_respawn_count = 0;
}
}
self.last_respawn = Some(t);
} else {
self.last_respawn = None;
}
let mut killed_by_us = false;
if !is_crash {
let _ = process.kill();
killed_by_us = true;
} else if !matches!(process.try_wait(), Ok(Some(_))) {
thread::sleep(300.ms());
if !matches!(process.try_wait(), Ok(Some(_))) {
killed_by_us = true;
let _ = process.kill();
}
}
let code_and_output = match process.into_output() {
Ok(c) => Some(c),
Err(e) => {
tracing::error!(target: "vp_respawn", "view-process could not be killed, will abandon running, {e:?}");
None
}
};
if let Some(c) = code_and_output {
tracing::info!(target: "vp_respawn", "view-process killed");
let code = c.status.code();
#[allow(unused_mut)]
let mut signal = None::<i32>;
if !killed_by_us {
#[cfg(windows)]
if code == Some(1) {
tracing::warn!(target: "vp_respawn", "view-process exit code (1), probably killed by the system, \
will exit app-process with the same code");
std::process::exit(1);
}
#[cfg(unix)]
if code.is_none() {
use std::os::unix::process::ExitStatusExt as _;
signal = c.status.signal();
if let Some(sig) = signal {
if [2, 9, 17, 19, 23].contains(&sig) {
tracing::warn!(target: "vp_respawn", "view-process exited by signal ({sig}), \
will exit app-process with code 1");
std::process::exit(1);
}
}
}
}
if !killed_by_us {
let code = code.unwrap_or(0);
let signal = signal.unwrap_or(0);
tracing::error!(target: "vp_respawn", "view-process exit code: {code:#X}, signal: {signal}");
}
let stderr = match String::from_utf8(c.stderr) {
Ok(s) => {
if !s.is_empty() {
tracing::error!(target: "vp_respawn", "view-process stderr:\n```stderr\n{s}\n```")
}
Some(s)
}
Err(e) => {
tracing::error!(target: "vp_respawn", "failed to read view-process stderr: {e}");
None
}
};
if ViewConfig::is_version_err(code, stderr.as_deref()) {
let code = code.unwrap_or(1);
tracing::error!(target: "vp_respawn", "view-process API version don't match, \
will exit app-process with code 0x{code:x}");
std::process::exit(code);
}
match String::from_utf8(c.stdout) {
Ok(s) => {
if !s.is_empty() {
tracing::info!(target: "vp_respawn", "view-process stdout:\n```stdout\n{s}\n```")
}
}
Err(e) => tracing::error!(target: "vp_respawn", "failed to read view-process stdout: {e}"),
}
} else {
tracing::error!(target: "vp_respawn", "failed to kill view-process, will abandon it running and spawn a new one");
}
let mut on_event = match self.event_listener.take().unwrap().join() {
Ok(fn_) => fn_,
Err(p) => panic::resume_unwind(p),
};
let mut retries = 3;
let (new_process, request, response, mut event) = loop {
match Self::spawn_view_process(&self.view_process_exe, self.headless) {
Ok(r) => break r,
Err(e) => {
tracing::error!(target: "vp_respawn", "failed to respawn, {e:?}");
retries -= 1;
if retries == 0 {
panic!("failed to respawn `view-process` after 3 retries");
}
tracing::info!(target: "vp_respawn", "retrying respawn");
}
}
};
self.process = new_process;
self.request_sender = request;
self.response_receiver = response;
let next_id = self.generation.next();
self.generation = next_id;
if let Err(ViewProcessOffline) = self.try_init() {
panic!("respawn on respawn startup");
}
let ev = thread::spawn(move || {
while let Ok(ev) = event.recv() {
on_event(ev);
}
on_event(Event::Disconnected(next_id));
on_event
});
self.event_listener = Some(ev);
}
}
impl Drop for Controller {
fn drop(&mut self) {
let _ = self.exit();
#[cfg(feature = "ipc")]
if let Some(process) = self.process.take() {
let _ = process.kill();
}
}
}