pub mod cqe;
pub mod sqe;
mod completion_queue;
mod submission_queue;
mod probe;
pub mod registrar;
use std::fmt;
use std::io;
use std::mem::{self, MaybeUninit};
use std::os::unix::io::RawFd;
use std::ptr::{self, NonNull};
use std::time::Duration;
#[doc(inline)]
pub use sqe::{SQE, SQEs};
#[doc(inline)]
pub use cqe::{CQE, CQEs, CQEsBlocking};
pub use completion_queue::CompletionQueue;
pub use submission_queue::SubmissionQueue;
pub use probe::Probe;
#[doc(inline)]
pub use registrar::{Registrar, Personality};
bitflags::bitflags! {
pub struct SetupFlags: u32 {
const IOPOLL = 1 << 0;
const SQPOLL = 1 << 1;
const SQ_AFF = 1 << 2;
const CQSIZE = 1 << 3;
const CLAMP = 1 << 4;
const ATTACH_WQ = 1 << 5;
}
}
bitflags::bitflags! {
pub struct SetupFeatures: u32 {
const SINGLE_MMAP = 1 << 0;
const NODROP = 1 << 1;
const SUBMIT_STABLE = 1 << 2;
const RW_CUR_POS = 1 << 3;
const CUR_PERSONALITY = 1 << 4;
const FAST_POLL = 1 << 5;
const POLL_32BITS = 1 << 6;
}
}
pub struct IoUring {
ring: uring_sys::io_uring,
}
impl IoUring {
pub fn new(entries: u32) -> io::Result<IoUring> {
IoUring::new_with_flags(entries, SetupFlags::empty(), SetupFeatures::empty())
}
pub fn new_with_flags(entries: u32, flags: SetupFlags, features: SetupFeatures) -> io::Result<IoUring> {
unsafe {
let mut params: uring_sys::io_uring_params = mem::zeroed();
params.flags = flags.bits();
params.features = features.bits();
let mut ring = MaybeUninit::uninit();
resultify(uring_sys::io_uring_queue_init_params(
entries as _,
ring.as_mut_ptr(),
&mut params,
))?;
Ok(IoUring { ring: ring.assume_init() })
}
}
pub fn sq(&mut self) -> SubmissionQueue<'_> {
SubmissionQueue::new(&*self)
}
pub fn cq(&mut self) -> CompletionQueue<'_> {
CompletionQueue::new(&*self)
}
pub fn registrar(&self) -> Registrar<'_> {
Registrar::new(self)
}
pub fn queues(&mut self) -> (SubmissionQueue<'_>, CompletionQueue<'_>, Registrar<'_>) {
(SubmissionQueue::new(&*self), CompletionQueue::new(&*self), Registrar::new(&*self))
}
pub fn probe(&mut self) -> io::Result<Probe> {
Probe::for_ring(&mut self.ring)
}
pub fn prepare_sqe(&mut self) -> Option<SQE<'_>> {
unsafe {
submission_queue::prepare_sqe(&mut self.ring)
}
}
pub fn prepare_sqes(&mut self, count: u32) -> Option<SQEs<'_>> {
unsafe {
submission_queue::prepare_sqes(&mut self.ring.sq, count)
}
}
pub fn submit_sqes(&mut self) -> io::Result<u32> {
self.sq().submit()
}
pub fn submit_sqes_and_wait(&mut self, wait_for: u32) -> io::Result<u32> {
self.sq().submit_and_wait(wait_for)
}
pub fn submit_sqes_and_wait_with_timeout(&mut self, wait_for: u32, duration: Duration)
-> io::Result<u32>
{
self.sq().submit_and_wait_with_timeout(wait_for, duration)
}
pub fn peek_for_cqe(&mut self) -> Option<CQE> {
unsafe {
let mut cqe = MaybeUninit::uninit();
let count = uring_sys::io_uring_peek_batch_cqe(&mut self.ring, cqe.as_mut_ptr(), 1);
if count > 0 {
Some(CQE::new(NonNull::from(&self.ring), &mut *cqe.assume_init()))
} else {
None
}
}
}
pub fn wait_for_cqe(&mut self) -> io::Result<CQE> {
let ring = NonNull::from(&self.ring);
self.inner_wait_for_cqes(1, ptr::null()).map(|cqe| CQE::new(ring, cqe))
}
pub fn wait_for_cqe_with_timeout(&mut self, duration: Duration)
-> io::Result<CQE>
{
let ts = uring_sys::__kernel_timespec {
tv_sec: duration.as_secs() as _,
tv_nsec: duration.subsec_nanos() as _
};
let ring = NonNull::from(&self.ring);
self.inner_wait_for_cqes(1, &ts).map(|cqe| CQE::new(ring, cqe))
}
pub fn cqes(&mut self) -> CQEs<'_> {
CQEs::new(NonNull::from(&mut self.ring))
}
pub fn cqes_blocking(&mut self, count: u32) -> CQEsBlocking<'_> {
CQEsBlocking::new(NonNull::from(&mut self.ring), count)
}
pub fn wait_for_cqes(&mut self, count: u32) -> io::Result<()> {
self.inner_wait_for_cqes(count as _, ptr::null()).map(|_| ())
}
fn inner_wait_for_cqes(&mut self, count: u32, ts: *const uring_sys::__kernel_timespec)
-> io::Result<&mut uring_sys::io_uring_cqe>
{
unsafe {
let mut cqe = MaybeUninit::uninit();
resultify(uring_sys::io_uring_wait_cqes(
&mut self.ring,
cqe.as_mut_ptr(),
count,
ts,
ptr::null(),
))?;
Ok(&mut *cqe.assume_init())
}
}
pub fn raw(&self) -> &uring_sys::io_uring {
&self.ring
}
pub unsafe fn raw_mut(&mut self) -> &mut uring_sys::io_uring {
&mut self.ring
}
pub fn cq_ready(&mut self) -> u32 {
self.cq().ready()
}
pub fn sq_ready(&mut self) -> u32 {
self.sq().ready()
}
pub fn sq_space_left(&mut self) -> u32 {
self.sq().space_left()
}
pub fn cq_eventfd_enabled(&mut self) -> bool {
self.cq().eventfd_enabled()
}
pub fn cq_eventfd_toggle(&mut self, enabled: bool) -> io::Result<()> {
self.cq().eventfd_toggle(enabled)
}
pub fn raw_fd(&self) -> RawFd {
self.ring.ring_fd
}
}
impl fmt::Debug for IoUring {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct(std::any::type_name::<Self>()).field("fd", &self.ring.ring_fd).finish()
}
}
impl Drop for IoUring {
fn drop(&mut self) {
unsafe { uring_sys::io_uring_queue_exit(&mut self.ring) };
}
}
unsafe impl Send for IoUring { }
unsafe impl Sync for IoUring { }
fn resultify(x: i32) -> io::Result<u32> {
match x >= 0 {
true => Ok(x as u32),
false => Err(io::Error::from_raw_os_error(-x)),
}
}
#[cfg(test)]
mod tests {
use super::resultify;
#[test]
fn test_resultify() {
let side_effect = |i, effect: &mut _| -> i32 {
*effect += 1;
return i;
};
let mut calls = 0;
let ret = resultify(side_effect(0, &mut calls));
assert!(match ret { Ok(0) => true, _ => false });
assert_eq!(calls, 1);
calls = 0;
let ret = resultify(side_effect(1, &mut calls));
assert!(match ret { Ok(1) => true, _ => false });
assert_eq!(calls, 1);
calls = 0;
let ret = resultify(side_effect(-1, &mut calls));
assert!(match ret { Err(e) if e.raw_os_error() == Some(1) => true, _ => false });
assert_eq!(calls, 1);
}
}