use std::mem;
use {HandleBase, Handle, HandleRef, Signals, Status, Time};
use {sys, into_result};
#[derive(Debug, Eq, PartialEq)]
pub struct Port(Handle);
impl HandleBase for Port {
fn get_ref(&self) -> HandleRef {
self.0.get_ref()
}
fn from_handle(handle: Handle) -> Self {
Port(handle)
}
}
#[derive(PartialEq, Eq, Debug)]
pub struct Packet(sys::mx_port_packet_t);
#[derive(Debug, Copy, Clone)]
pub enum PacketContents {
User(UserPacket),
SignalOne(SignalPacket),
SignalRep(SignalPacket),
}
#[derive(Debug, Copy, Clone)]
pub struct UserPacket(sys::mx_packet_user_t);
#[derive(Debug, Copy, Clone)]
pub struct SignalPacket(sys::mx_packet_signal_t);
impl Packet {
pub fn from_user_packet(key: u64, status: i32, user: UserPacket) -> Packet {
Packet(
sys::mx_port_packet_t {
key: key,
packet_type: sys::mx_packet_type_t::MX_PKT_TYPE_USER,
status: status,
union: user.0,
}
)
}
pub fn key(&self) -> u64 {
self.0.key
}
pub fn status(&self) -> i32 {
self.0.status
}
pub fn contents(&self) -> PacketContents {
if self.0.packet_type == sys::mx_packet_type_t::MX_PKT_TYPE_USER {
PacketContents::User(UserPacket(self.0.union))
} else if self.0.packet_type == sys::mx_packet_type_t::MX_PKT_TYPE_SIGNAL_ONE {
PacketContents::SignalOne(SignalPacket(unsafe { mem::transmute_copy(&self.0.union) }))
} else if self.0.packet_type == sys::mx_packet_type_t::MX_PKT_TYPE_SIGNAL_REP {
PacketContents::SignalRep(SignalPacket(unsafe { mem::transmute_copy(&self.0.union) }))
} else {
panic!("unexpected packet type");
}
}
}
impl UserPacket {
pub fn from_u8_array(val: [u8; 32]) -> UserPacket {
UserPacket(val)
}
pub fn as_u8_array(&self) -> &[u8; 32] {
&self.0
}
pub fn as_mut_u8_array(&mut self) -> &mut [u8; 32] {
&mut self.0
}
}
impl SignalPacket {
pub fn trigger(&self) -> Signals {
self.0.trigger
}
pub fn observed(&self) -> Signals {
self.0.observed
}
pub fn count(&self) -> u64 {
self.0.count
}
}
impl Port {
pub fn create(opts: PortOpts) -> Result<Port, Status> {
unsafe {
let mut handle = 0;
let status = sys::mx_port_create(opts as u32, &mut handle);
into_result(status, || Self::from_handle(Handle(handle)))
}
}
pub fn queue(&self, packet: &Packet) -> Result<(), Status> {
let status = unsafe {
sys::mx_port_queue(self.raw_handle(),
&packet.0 as *const sys::mx_port_packet_t as *const u8, 0)
};
into_result(status, || ())
}
pub fn wait(&self, deadline: Time) -> Result<Packet, Status> {
let mut packet = Default::default();
let status = unsafe {
sys::mx_port_wait(self.raw_handle(), deadline,
&mut packet as *mut sys::mx_port_packet_t as *mut u8, 0)
};
into_result(status, || Packet(packet))
}
pub fn cancel<H>(&self, source: &H, key: u64) -> Result<(), Status> where H: HandleBase {
let status = unsafe {
sys::mx_port_cancel(self.raw_handle(), source.raw_handle(), key)
};
into_result(status, || ())
}
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum PortOpts {
Default = 0,
}
impl Default for PortOpts {
fn default() -> Self {
PortOpts::Default
}
}
#[repr(u32)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum WaitAsyncOpts {
Once = sys::MX_WAIT_ASYNC_ONCE,
Repeating = sys::MX_WAIT_ASYNC_REPEATING,
}
#[cfg(test)]
mod tests {
use super::*;
use {Duration, Event, EventOpts};
use {MX_SIGNAL_LAST_HANDLE, MX_SIGNAL_NONE, MX_USER_SIGNAL_0, MX_USER_SIGNAL_1};
use deadline_after;
#[test]
fn port_basic() {
let ten_ms: Duration = 10_000_000;
let port = Port::create(PortOpts::Default).unwrap();
assert_eq!(port.wait(deadline_after(ten_ms)), Err(Status::ErrTimedOut));
let packet = Packet::from_user_packet(
42,
123,
UserPacket::from_u8_array([13; 32]),
);
assert!(port.queue(&packet).is_ok());
let read_packet = port.wait(deadline_after(ten_ms)).unwrap();
assert_eq!(read_packet, packet);
}
#[test]
fn wait_async_once() {
let ten_ms: Duration = 10_000_000;
let key = 42;
let port = Port::create(PortOpts::Default).unwrap();
let event = Event::create(EventOpts::Default).unwrap();
assert!(event.wait_async(&port, key, MX_USER_SIGNAL_0 | MX_USER_SIGNAL_1,
WaitAsyncOpts::Once).is_ok());
assert_eq!(port.wait(deadline_after(ten_ms)), Err(Status::ErrTimedOut));
assert!(event.signal(MX_SIGNAL_NONE, MX_USER_SIGNAL_0).is_ok());
let read_packet = port.wait(deadline_after(ten_ms)).unwrap();
assert_eq!(read_packet.key(), key);
assert_eq!(read_packet.status(), 0);
match read_packet.contents() {
PacketContents::SignalOne(sig) => {
assert_eq!(sig.trigger(), MX_USER_SIGNAL_0 | MX_USER_SIGNAL_1);
assert_eq!(sig.observed(), MX_USER_SIGNAL_0 | MX_SIGNAL_LAST_HANDLE);
assert_eq!(sig.count(), 1);
}
_ => panic!("wrong packet type"),
}
assert_eq!(port.wait(deadline_after(ten_ms)), Err(Status::ErrTimedOut));
assert!(event.wait_async(&port, key, MX_USER_SIGNAL_0, WaitAsyncOpts::Once).is_ok());
let read_packet = port.wait(deadline_after(ten_ms)).unwrap();
assert_eq!(read_packet.key(), key);
assert_eq!(read_packet.status(), 0);
match read_packet.contents() {
PacketContents::SignalOne(sig) => {
assert_eq!(sig.trigger(), MX_USER_SIGNAL_0);
assert_eq!(sig.observed(), MX_USER_SIGNAL_0 | MX_SIGNAL_LAST_HANDLE);
assert_eq!(sig.count(), 1);
}
_ => panic!("wrong packet type"),
}
assert!(event.wait_async(&port, key, MX_USER_SIGNAL_0, WaitAsyncOpts::Once).is_ok());
assert!(port.cancel(&event, key).is_ok());
assert_eq!(port.wait(deadline_after(ten_ms)), Err(Status::ErrTimedOut));
assert!(event.signal(MX_USER_SIGNAL_0, MX_SIGNAL_NONE).is_ok()); assert!(event.wait_async(&port, key, MX_USER_SIGNAL_0, WaitAsyncOpts::Once).is_ok());
assert!(port.cancel(&event, key).is_ok());
assert!(event.signal(MX_SIGNAL_NONE, MX_USER_SIGNAL_0).is_ok());
assert_eq!(port.wait(deadline_after(ten_ms)), Err(Status::ErrTimedOut));
}
#[test]
fn wait_async_repeating() {
let ten_ms: Duration = 10_000_000;
let key = 42;
let port = Port::create(PortOpts::Default).unwrap();
let event = Event::create(EventOpts::Default).unwrap();
assert!(event.wait_async(&port, key, MX_USER_SIGNAL_0 | MX_USER_SIGNAL_1,
WaitAsyncOpts::Repeating).is_ok());
assert_eq!(port.wait(deadline_after(ten_ms)), Err(Status::ErrTimedOut));
assert!(event.signal(MX_SIGNAL_NONE, MX_USER_SIGNAL_0).is_ok());
let read_packet = port.wait(deadline_after(ten_ms)).unwrap();
assert_eq!(read_packet.key(), key);
assert_eq!(read_packet.status(), 0);
match read_packet.contents() {
PacketContents::SignalRep(sig) => {
assert_eq!(sig.trigger(), MX_USER_SIGNAL_0 | MX_USER_SIGNAL_1);
assert_eq!(sig.observed(), MX_USER_SIGNAL_0 | MX_SIGNAL_LAST_HANDLE);
assert_eq!(sig.count(), 1);
}
_ => panic!("wrong packet type"),
}
assert_eq!(port.wait(deadline_after(ten_ms)), Err(Status::ErrTimedOut));
assert!(event.signal(MX_USER_SIGNAL_0, MX_SIGNAL_NONE).is_ok()); assert!(event.signal(MX_SIGNAL_NONE, MX_USER_SIGNAL_0).is_ok());
let read_packet = port.wait(deadline_after(ten_ms)).unwrap();
assert_eq!(read_packet.key(), key);
assert_eq!(read_packet.status(), 0);
match read_packet.contents() {
PacketContents::SignalRep(sig) => {
assert_eq!(sig.trigger(), MX_USER_SIGNAL_0 | MX_USER_SIGNAL_1);
assert_eq!(sig.observed(), MX_USER_SIGNAL_0 | MX_SIGNAL_LAST_HANDLE);
assert_eq!(sig.count(), 1);
}
_ => panic!("wrong packet type"),
}
assert!(port.cancel(&event, key).is_ok());
assert_eq!(port.wait(deadline_after(ten_ms)), Err(Status::ErrTimedOut));
assert!(event.signal(MX_USER_SIGNAL_0, MX_SIGNAL_NONE).is_ok()); assert!(event.signal(MX_SIGNAL_NONE, MX_USER_SIGNAL_0).is_ok());
assert_eq!(port.wait(deadline_after(ten_ms)), Err(Status::ErrTimedOut));
assert!(event.wait_async(&port, key, MX_USER_SIGNAL_0, WaitAsyncOpts::Repeating).is_ok());
let read_packet = port.wait(deadline_after(ten_ms)).unwrap();
assert_eq!(read_packet.key(), key);
assert_eq!(read_packet.status(), 0);
match read_packet.contents() {
PacketContents::SignalRep(sig) => {
assert_eq!(sig.trigger(), MX_USER_SIGNAL_0);
assert_eq!(sig.observed(), MX_USER_SIGNAL_0 | MX_SIGNAL_LAST_HANDLE);
assert_eq!(sig.count(), 1);
}
_ => panic!("wrong packet type"),
}
drop(event);
assert_eq!(port.wait(deadline_after(ten_ms)), Err(Status::ErrTimedOut));
}
}