use std::{
collections::HashMap,
net::{IpAddr, SocketAddr},
sync::{Arc, RwLock},
};
use futures::Future;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use zenoh_core::{zread, zwrite};
use zenoh_protocol::core::{EndPoint, Locator};
use zenoh_result::{zerror, ZResult};
use crate::BIND_INTERFACE;
pub struct ListenerUnicastIP {
endpoint: EndPoint,
token: CancellationToken,
handle: JoinHandle<ZResult<()>>,
}
impl ListenerUnicastIP {
fn new(
endpoint: EndPoint,
token: CancellationToken,
handle: JoinHandle<ZResult<()>>,
) -> ListenerUnicastIP {
ListenerUnicastIP {
endpoint,
token,
handle,
}
}
async fn stop(&self) {
self.token.cancel();
}
}
pub struct ListenersUnicastIP {
listeners: Arc<RwLock<HashMap<SocketAddr, ListenerUnicastIP>>>,
pub token: CancellationToken,
}
impl ListenersUnicastIP {
pub fn new() -> ListenersUnicastIP {
ListenersUnicastIP {
listeners: Arc::new(RwLock::new(HashMap::new())),
token: CancellationToken::new(),
}
}
pub async fn add_listener<F>(
&self,
endpoint: EndPoint,
addr: SocketAddr,
future: F,
token: CancellationToken,
) -> ZResult<()>
where
F: Future<Output = ZResult<()>> + Send + 'static,
{
let mut listeners = zwrite!(self.listeners);
let c_listeners = self.listeners.clone();
let c_addr = addr;
let task = async move {
let res = future.await;
zwrite!(c_listeners).remove(&c_addr);
res
};
let handle = zenoh_runtime::ZRuntime::Acceptor.spawn(task);
let listener = ListenerUnicastIP::new(endpoint, token, handle);
listeners.insert(addr, listener);
Ok(())
}
pub async fn del_listener(&self, addr: SocketAddr) -> ZResult<()> {
let listener = zwrite!(self.listeners).remove(&addr).ok_or_else(|| {
zerror!(
"Can not delete the listener because it has not been found: {}",
addr
)
})?;
listener.stop().await;
listener.handle.await?
}
pub fn get_endpoints(&self) -> Vec<EndPoint> {
zread!(self.listeners)
.values()
.map(|l| l.endpoint.clone())
.collect()
}
pub fn get_locators(&self) -> Vec<Locator> {
let mut locators = vec![];
let guard = zread!(self.listeners);
for (key, value) in guard.iter() {
let (kip, kpt) = (key.ip(), key.port());
let config = value.endpoint.config();
let iface = config.get(BIND_INTERFACE);
if kip.is_unspecified() {
let mut addrs = match kip {
IpAddr::V4(_) => zenoh_util::net::get_ipv4_ipaddrs(iface),
IpAddr::V6(_) => zenoh_util::net::get_ipv6_ipaddrs(iface),
};
let iter = addrs.drain(..).map(|x| {
Locator::new(
value.endpoint.protocol(),
SocketAddr::new(x, kpt).to_string(),
value.endpoint.metadata(),
)
.unwrap()
});
locators.extend(iter);
} else {
locators.push(value.endpoint.to_locator());
}
}
locators
}
}
impl Default for ListenersUnicastIP {
fn default() -> Self {
Self::new()
}
}