use buffers::{atomic, ReadableBuffer, WritableBuffer};
use internal::sync::Signal;
use std::io;
use std::sync::atomic::*;
use std::sync::Arc;
const DEFAULT_CAPACITY: usize = 8192;
const FLAG_NONBLOCKING: u8 = 0b0001;
pub fn pipe() -> (PipeReader, PipeWriter) {
PipeBuilder::default().build()
}
pub struct PipeBuilder {
flags: u8,
capacity: usize,
}
impl Default for PipeBuilder {
fn default() -> Self {
Self {
flags: 0,
capacity: DEFAULT_CAPACITY,
}
}
}
impl PipeBuilder {
pub fn nonblocking(&mut self, nonblocking: bool) -> &mut Self {
if nonblocking {
self.flags |= FLAG_NONBLOCKING;
} else {
self.flags &= !FLAG_NONBLOCKING;
}
self
}
pub fn capacity(&mut self, capacity: usize) -> &mut Self {
self.capacity = capacity;
self
}
pub fn build(&self) -> (PipeReader, PipeWriter) {
let buffers = atomic::bounded(self.capacity);
let shared = Arc::<PipeShared>::default();
(
PipeReader {
flags: self.flags,
buffer: buffers.0,
shared: shared.clone(),
},
PipeWriter {
flags: self.flags,
buffer: buffers.1,
shared: shared,
},
)
}
}
pub struct PipeReader {
flags: u8,
buffer: atomic::Reader<u8>,
shared: Arc<PipeShared>,
}
impl PipeReader {
pub fn set_nonblocking(&mut self, nonblocking: bool) {
if nonblocking {
self.flags |= FLAG_NONBLOCKING;
} else {
self.flags &= !FLAG_NONBLOCKING;
}
}
}
impl io::Read for PipeReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
loop {
let len = self.buffer.pull(buf);
if len > 0 {
self.shared.full_signal.notify();
return Ok(len);
}
if self.shared.drop_flag.load(Ordering::SeqCst) {
return Ok(0);
}
if self.flags & FLAG_NONBLOCKING != 0 {
return Err(io::ErrorKind::WouldBlock.into());
}
self.shared.empty_signal.wait();
}
}
}
impl Drop for PipeReader {
fn drop(&mut self) {
self.shared.drop_flag.store(true, Ordering::SeqCst);
self.shared.full_signal.notify();
}
}
pub struct PipeWriter {
flags: u8,
buffer: atomic::Writer<u8>,
shared: Arc<PipeShared>,
}
impl PipeWriter {
pub fn set_nonblocking(&mut self, nonblocking: bool) {
if nonblocking {
self.flags |= FLAG_NONBLOCKING;
} else {
self.flags &= !FLAG_NONBLOCKING;
}
}
}
impl io::Write for PipeWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
loop {
if self.shared.drop_flag.load(Ordering::SeqCst) {
return Err(io::ErrorKind::BrokenPipe.into());
}
let len = self.buffer.push(buf);
if len > 0 {
self.shared.empty_signal.notify();
return Ok(len);
}
if self.flags & FLAG_NONBLOCKING != 0 {
return Err(io::ErrorKind::WouldBlock.into());
}
self.shared.full_signal.wait();
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Drop for PipeWriter {
fn drop(&mut self) {
self.shared.drop_flag.store(true, Ordering::SeqCst);
self.shared.empty_signal.notify();
}
}
#[derive(Default)]
struct PipeShared {
empty_signal: Signal,
full_signal: Signal,
drop_flag: AtomicBool,
}
#[cfg(test)]
mod tests {
use std::io::{self, Read, Write};
use std::thread;
use std::time::Duration;
use super::*;
#[test]
fn read_write() {
let (mut reader, mut writer) = pipe();
assert_eq!(writer.write(b"hello world").unwrap(), 11);
let mut buf = [0; 11];
assert_eq!(reader.read(&mut buf).unwrap(), 11);
assert_eq!(&buf, b"hello world");
}
#[test]
fn read_empty_blocking() {
let (mut reader, mut writer) = PipeBuilder::default()
.capacity(16)
.build();
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
let buf = [1; 1];
writer.write(&buf).unwrap();
});
let mut buf = [0; 1];
assert_eq!(reader.read(&mut buf).unwrap(), 1);
assert_eq!(buf[0], 1);
}
#[test]
fn read_nonblocking() {
let (mut reader, _writer) = pipe();
let mut buf = [0; 4];
reader.set_nonblocking(true);
assert_eq!(reader.read(&mut buf).err().unwrap().kind(), io::ErrorKind::WouldBlock);
}
#[test]
fn write_nonblocking() {
let (_reader, mut writer) = PipeBuilder::default()
.capacity(16)
.build();
let buf = [0; 16];
assert_eq!(writer.write(&buf).unwrap(), buf.len());
writer.set_nonblocking(true);
assert_eq!(writer.write(&buf).err().unwrap().kind(), io::ErrorKind::WouldBlock);
}
#[test]
fn read_from_closed_pipe_returns_zero() {
let (mut reader, _) = pipe();
let mut buf = [0; 16];
assert_eq!(reader.read(&mut buf).unwrap(), 0);
}
#[test]
fn write_to_closed_pipe_returns_broken_pipe() {
let (_, mut writer) = pipe();
assert_eq!(writer.write(b"hi").err().unwrap().kind(), io::ErrorKind::BrokenPipe);
}
}