use core::panic;
use std::{
borrow::Borrow,
collections::HashMap,
env,
future::Future,
ops::Deref,
sync::{
atomic::{AtomicUsize, Ordering},
OnceLock,
},
time::Duration,
};
use lazy_static::lazy_static;
use serde::Deserialize;
use tokio::runtime::{Handle, Runtime, RuntimeFlavor};
use zenoh_macros::{GenericRuntimeParam, RegisterParam};
use zenoh_result::ZResult as Result;
pub const ZENOH_RUNTIME_ENV: &str = "ZENOH_RUNTIME";
#[derive(Deserialize, Debug, GenericRuntimeParam)]
#[serde(deny_unknown_fields, default)]
pub struct RuntimeParam {
pub worker_threads: usize,
pub max_blocking_threads: usize,
pub handover: Option<ZRuntime>,
}
impl Default for RuntimeParam {
fn default() -> Self {
Self {
worker_threads: 1,
max_blocking_threads: 50,
handover: None,
}
}
}
impl RuntimeParam {
pub fn build(&self, zrt: ZRuntime) -> Result<Runtime> {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(self.worker_threads)
.max_blocking_threads(self.max_blocking_threads)
.enable_io()
.enable_time()
.thread_name_fn(move || {
let id = ZRUNTIME_INDEX
.get(&zrt)
.unwrap()
.fetch_add(1, Ordering::SeqCst);
format!("{}-{}", zrt, id)
})
.build()?;
Ok(rt)
}
}
#[derive(Hash, Eq, PartialEq, Clone, Copy, Debug, RegisterParam, Deserialize)]
#[param(RuntimeParam)]
pub enum ZRuntime {
#[serde(rename = "app")]
#[param(worker_threads = 1)]
Application,
#[serde(rename = "acc")]
#[param(worker_threads = 1)]
Acceptor,
#[serde(rename = "tx")]
#[param(worker_threads = 1)]
TX,
#[serde(rename = "rx")]
#[param(worker_threads = 2)]
RX,
#[serde(rename = "net")]
#[param(worker_threads = 1)]
Net,
}
impl ZRuntime {
pub fn block_in_place<F, R>(&self, f: F) -> R
where
F: Future<Output = R>,
{
if let Ok(handle) = Handle::try_current() {
if handle.runtime_flavor() == RuntimeFlavor::CurrentThread {
panic!("Zenoh runtime doesn't support Tokio's current thread scheduler. Please use multi thread scheduler instead, e.g. a multi thread scheduler with one worker thread: `#[tokio::main(flavor = \"multi_thread\", worker_threads = 1)]`");
}
}
tokio::task::block_in_place(move || self.block_on(f))
}
}
impl Deref for ZRuntime {
type Target = Handle;
fn deref(&self) -> &Self::Target {
ZRUNTIME_POOL.get(self)
}
}
lazy_static! {
pub static ref ZRUNTIME_POOL: ZRuntimePool = ZRuntimePool::new();
pub static ref ZRUNTIME_INDEX: HashMap<ZRuntime, AtomicUsize> = ZRuntime::iter()
.map(|zrt| (zrt, AtomicUsize::new(0)))
.collect();
}
pub struct ZRuntimePoolGuard;
impl Drop for ZRuntimePoolGuard {
fn drop(&mut self) {
unsafe {
std::mem::drop((ZRUNTIME_POOL.deref() as *const ZRuntimePool).read());
std::mem::drop(
(ZRUNTIME_INDEX.deref() as *const HashMap<ZRuntime, AtomicUsize>).read(),
);
}
}
}
pub struct ZRuntimePool(HashMap<ZRuntime, OnceLock<Runtime>>);
impl ZRuntimePool {
fn new() -> Self {
Self(ZRuntime::iter().map(|zrt| (zrt, OnceLock::new())).collect())
}
pub fn get(&self, zrt: &ZRuntime) -> &Handle {
let param: &RuntimeParam = zrt.borrow();
let zrt = match param.handover {
Some(handover) => handover,
None => *zrt,
};
self.0
.get(&zrt)
.unwrap_or_else(|| panic!("The hashmap should contains {zrt} after initialization"))
.get_or_init(|| {
zrt.init()
.unwrap_or_else(|_| panic!("Failed to init {zrt}"))
})
.handle()
}
}
impl Drop for ZRuntimePool {
fn drop(&mut self) {
let handles: Vec<_> = self
.0
.drain()
.filter_map(|(_name, mut rt)| {
rt.take()
.map(|r| std::thread::spawn(move || r.shutdown_timeout(Duration::from_secs(1))))
})
.collect();
for hd in handles {
let _ = hd.join();
}
}
}
#[should_panic(expected = "Zenoh runtime doesn't support")]
#[tokio::test]
async fn block_in_place_fail_test() {
use crate::ZRuntime;
ZRuntime::TX.block_in_place(async { println!("Done") });
}