use std::cell::UnsafeCell;
use std::cmp;
use std::fmt;
use std::io::{Read as _, Seek as _, Write as _};
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use crate::fs::{Metadata, Permissions};
use crate::future;
use crate::io::{self, Read, Seek, SeekFrom, Write};
use crate::path::Path;
use crate::prelude::*;
use crate::task::{self, spawn_blocking, Context, Poll, Waker};
use crate::utils::Context as _;
pub struct File {
file: Arc<std::fs::File>,
lock: Lock<State>,
}
impl File {
pub(crate) fn new(file: std::fs::File, is_flushed: bool) -> File {
let file = Arc::new(file);
File {
file: file.clone(),
lock: Lock::new(State {
file,
mode: Mode::Idle,
cache: Vec::new(),
is_flushed,
last_read_err: None,
last_write_err: None,
}),
}
}
pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<File> {
let path = path.as_ref().to_owned();
let file = spawn_blocking(move || {
std::fs::File::open(&path).context(|| format!("could not open `{}`", path.display()))
})
.await?;
Ok(File::new(file, true))
}
pub async fn create<P: AsRef<Path>>(path: P) -> io::Result<File> {
let path = path.as_ref().to_owned();
let file = spawn_blocking(move || {
std::fs::File::create(&path)
.context(|| format!("could not create `{}`", path.display()))
})
.await?;
Ok(File::new(file, true))
}
pub async fn sync_all(&self) -> io::Result<()> {
let state = future::poll_fn(|cx| {
let state = futures_core::ready!(self.lock.poll_lock(cx));
state.poll_flush(cx)
})
.await?;
spawn_blocking(move || state.file.sync_all()).await
}
pub async fn sync_data(&self) -> io::Result<()> {
let state = future::poll_fn(|cx| {
let state = futures_core::ready!(self.lock.poll_lock(cx));
state.poll_flush(cx)
})
.await?;
spawn_blocking(move || state.file.sync_data()).await
}
pub async fn set_len(&self, size: u64) -> io::Result<()> {
let state = future::poll_fn(|cx| {
let state = futures_core::ready!(self.lock.poll_lock(cx));
let state = futures_core::ready!(state.poll_unread(cx))?;
state.poll_flush(cx)
})
.await?;
spawn_blocking(move || state.file.set_len(size)).await
}
pub async fn metadata(&self) -> io::Result<Metadata> {
let file = self.file.clone();
spawn_blocking(move || file.metadata()).await
}
pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
let file = self.file.clone();
spawn_blocking(move || file.set_permissions(perm)).await
}
}
impl Drop for File {
fn drop(&mut self) {
let _ = task::block_on(self.flush());
}
}
impl fmt::Debug for File {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.file.fmt(f)
}
}
impl Read for File {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut &*self).poll_read(cx, buf)
}
}
impl Read for &File {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let state = futures_core::ready!(self.lock.poll_lock(cx));
state.poll_read(cx, buf)
}
}
impl Write for File {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut &*self).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut &*self).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut &*self).poll_close(cx)
}
}
impl Write for &File {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let state = futures_core::ready!(self.lock.poll_lock(cx));
state.poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let state = futures_core::ready!(self.lock.poll_lock(cx));
state.poll_flush(cx).map(|res| res.map(drop))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let state = futures_core::ready!(self.lock.poll_lock(cx));
state.poll_close(cx)
}
}
impl Seek for File {
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
Pin::new(&mut &*self).poll_seek(cx, pos)
}
}
impl Seek for &File {
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
let state = futures_core::ready!(self.lock.poll_lock(cx));
state.poll_seek(cx, pos)
}
}
impl From<std::fs::File> for File {
fn from(file: std::fs::File) -> File {
File::new(file, false)
}
}
cfg_unix! {
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
impl AsRawFd for File {
fn as_raw_fd(&self) -> RawFd {
self.file.as_raw_fd()
}
}
impl FromRawFd for File {
unsafe fn from_raw_fd(fd: RawFd) -> File {
std::fs::File::from_raw_fd(fd).into()
}
}
impl IntoRawFd for File {
fn into_raw_fd(self) -> RawFd {
let file = self.file.clone();
drop(self);
Arc::try_unwrap(file)
.expect("cannot acquire ownership of the file handle after drop")
.into_raw_fd()
}
}
}
cfg_windows! {
use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
impl AsRawHandle for File {
fn as_raw_handle(&self) -> RawHandle {
self.file.as_raw_handle()
}
}
impl FromRawHandle for File {
unsafe fn from_raw_handle(handle: RawHandle) -> File {
std::fs::File::from_raw_handle(handle).into()
}
}
impl IntoRawHandle for File {
fn into_raw_handle(self) -> RawHandle {
let file = self.file.clone();
drop(self);
Arc::try_unwrap(file)
.expect("cannot acquire ownership of the file handle after drop")
.into_raw_handle()
}
}
}
struct Lock<T>(Arc<LockState<T>>);
unsafe impl<T: Send> Send for Lock<T> {}
unsafe impl<T: Send> Sync for Lock<T> {}
struct LockState<T> {
locked: AtomicBool,
value: UnsafeCell<T>,
wakers: Mutex<Vec<Waker>>,
}
impl<T> Lock<T> {
fn new(value: T) -> Lock<T> {
Lock(Arc::new(LockState {
locked: AtomicBool::new(false),
value: UnsafeCell::new(value),
wakers: Mutex::new(Vec::new()),
}))
}
fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<LockGuard<T>> {
if self.0.locked.swap(true, Ordering::Acquire) {
let mut list = self.0.wakers.lock().unwrap();
if self.0.locked.swap(true, Ordering::Acquire) {
if list.iter().all(|w| !w.will_wake(cx.waker())) {
list.push(cx.waker().clone());
}
return Poll::Pending;
}
}
Poll::Ready(LockGuard(self.0.clone()))
}
}
struct LockGuard<T>(Arc<LockState<T>>);
unsafe impl<T: Send> Send for LockGuard<T> {}
unsafe impl<T: Sync> Sync for LockGuard<T> {}
impl<T> LockGuard<T> {
fn register(&self, cx: &Context<'_>) {
let mut list = self.0.wakers.lock().unwrap();
if list.iter().all(|w| !w.will_wake(cx.waker())) {
list.push(cx.waker().clone());
}
}
}
impl<T> Drop for LockGuard<T> {
fn drop(&mut self) {
self.0.locked.store(false, Ordering::Release);
for w in self.0.wakers.lock().unwrap().drain(..) {
w.wake();
}
}
}
impl<T> Deref for LockGuard<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.0.value.get() }
}
}
impl<T> DerefMut for LockGuard<T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.0.value.get() }
}
}
enum Mode {
Idle,
Reading(usize),
Writing,
}
struct State {
file: Arc<std::fs::File>,
mode: Mode,
cache: Vec<u8>,
is_flushed: bool,
last_read_err: Option<io::Error>,
last_write_err: Option<io::Error>,
}
impl LockGuard<State> {
fn poll_seek(mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<io::Result<u64>> {
if pos == SeekFrom::Current(0) {
let internal = (&*self.file).seek(SeekFrom::Current(0))?;
let actual = match self.mode {
Mode::Idle => internal,
Mode::Reading(start) => internal - self.cache.len() as u64 + start as u64,
Mode::Writing => internal + self.cache.len() as u64,
};
return Poll::Ready(Ok(actual));
}
if let Mode::Reading(start) = self.mode {
if let SeekFrom::Current(diff) = pos {
if let Some(new) = (start as i64).checked_add(diff) {
if 0 <= new && new <= self.cache.len() as i64 {
let internal = (&*self.file).seek(SeekFrom::Current(0))?;
self.mode = Mode::Reading(new as usize);
return Poll::Ready(Ok(internal - self.cache.len() as u64 + new as u64));
}
}
}
}
self = futures_core::ready!(self.poll_unread(cx))?;
self = futures_core::ready!(self.poll_flush(cx))?;
Poll::Ready((&*self.file).seek(pos))
}
fn poll_read(mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
if let Some(err) = self.last_read_err.take() {
return Poll::Ready(Err(err));
}
match self.mode {
Mode::Idle => {}
Mode::Reading(start) => {
let available = self.cache.len() - start;
if available > 0 || self.cache.is_empty() {
let n = cmp::min(available, buf.len());
buf[..n].copy_from_slice(&self.cache[start..n]);
self.mode = Mode::Reading(start + n);
return Poll::Ready(Ok(n));
}
}
Mode::Writing => {
self = futures_core::ready!(self.poll_flush(cx))?;
}
}
if self.cache.len() < buf.len() {
let diff = buf.len() - self.cache.len();
self.cache.reserve(diff);
}
unsafe {
self.cache.set_len(buf.len());
}
self.register(cx);
spawn_blocking(move || {
let res = {
let State { file, cache, .. } = &mut *self;
(&**file).read(cache)
};
match res {
Ok(n) => {
unsafe {
self.cache.set_len(n);
}
self.mode = Mode::Reading(0);
}
Err(err) => {
self.cache.clear();
self.mode = Mode::Idle;
self.last_read_err = Some(err);
}
}
});
Poll::Pending
}
fn poll_unread(mut self, _: &mut Context<'_>) -> Poll<io::Result<Self>> {
match self.mode {
Mode::Idle | Mode::Writing => Poll::Ready(Ok(self)),
Mode::Reading(start) => {
let n = self.cache.len() - start;
if n > 0 {
let _ = (&*self.file).seek(SeekFrom::Current(-(n as i64)));
}
self.cache.clear();
self.mode = Mode::Idle;
Poll::Ready(Ok(self))
}
}
}
fn poll_write(mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
if let Some(err) = self.last_write_err.take() {
return Poll::Ready(Err(err));
}
self = futures_core::ready!(self.poll_unread(cx))?;
if self.cache.capacity() < buf.len() {
let diff = buf.len() - self.cache.capacity();
self.cache.reserve(diff);
}
let available = self.cache.capacity() - self.cache.len();
if available > 0 || buf.is_empty() {
let n = cmp::min(available, buf.len());
let start = self.cache.len();
unsafe {
self.cache.set_len(start + n);
}
self.cache[start..start + n].copy_from_slice(&buf[..n]);
self.is_flushed = false;
self.mode = Mode::Writing;
Poll::Ready(Ok(n))
} else {
futures_core::ready!(self.poll_drain(cx))?;
Poll::Pending
}
}
fn poll_drain(mut self, cx: &mut Context<'_>) -> Poll<io::Result<Self>> {
if let Some(err) = self.last_write_err.take() {
return Poll::Ready(Err(err));
}
match self.mode {
Mode::Idle | Mode::Reading(..) => Poll::Ready(Ok(self)),
Mode::Writing => {
self.register(cx);
spawn_blocking(move || {
match (&*self.file).write_all(&self.cache) {
Ok(_) => {
self.cache.clear();
self.mode = Mode::Idle;
}
Err(err) => {
self.last_write_err = Some(err);
}
};
});
Poll::Pending
}
}
}
fn poll_flush(mut self, cx: &mut Context<'_>) -> Poll<io::Result<Self>> {
if self.is_flushed {
return Poll::Ready(Ok(self));
}
self = futures_core::ready!(self.poll_drain(cx))?;
self.register(cx);
spawn_blocking(move || {
match (&*self.file).flush() {
Ok(()) => {
self.is_flushed = true;
}
Err(err) => {
self.last_write_err = Some(err);
}
}
});
Poll::Pending
}
fn poll_close(self, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}