use std::io::{mod, MemReader};
use serialize::Decodable;
use buffered::BufferedReader;
use {
ByteString, CsvResult, Decoded, IntoVector,
Error, ParseError, ParseErrorKind,
};
use self::ParseState::{
StartRecord, EndRecord, StartField,
RecordTermCR, RecordTermLF, RecordTermAny,
InField, InQuotedField, InQuotedFieldEscape, InQuotedFieldQuote,
};
#[deriving(Copy)]
pub enum RecordTerminator {
CRLF,
Any(u8),
}
impl PartialEq<u8> for RecordTerminator {
fn eq(&self, other: &u8) -> bool {
match *self {
RecordTerminator::CRLF => *other == b'\r' || *other == b'\n',
RecordTerminator::Any(b) => *other == b
}
}
}
pub struct Reader<R> {
delimiter: u8,
record_terminator: RecordTerminator,
quote: u8,
escape: u8,
double_quote: bool, flexible: bool, buffer: BufferedReader<R>,
fieldbuf: Vec<u8>, state: ParseState, err: Option<Error>,
first_record: Vec<ByteString>,
parsing_first_record: bool,
has_seeked: bool,
#[doc(hidden)]
pub has_headers: bool,
field_count: u64, column: u64, line_record: u64, line_current: u64, byte_offset: u64, }
impl<R: io::Reader> Reader<R> {
pub fn from_reader(rdr: R) -> Reader<R> {
Reader::from_buffer(BufferedReader::new(rdr))
}
fn from_buffer(buf: BufferedReader<R>) -> Reader<R> {
Reader {
delimiter: b',',
record_terminator: RecordTerminator::CRLF,
quote: b'"',
escape: b'\\',
double_quote: true,
flexible: false,
buffer: buf,
fieldbuf: Vec::with_capacity(1024),
state: StartRecord,
err: None,
first_record: vec![],
parsing_first_record: true,
has_seeked: false,
has_headers: true,
field_count: 0,
column: 1,
line_record: 1,
line_current: 1,
byte_offset: 0,
}
}
}
impl Reader<io::IoResult<io::File>> {
pub fn from_file(path: &Path) -> Reader<io::IoResult<io::File>> {
Reader::from_reader(io::File::open(path))
}
}
impl Reader<MemReader> {
pub fn from_string<S: StrAllocating>(s: S) -> Reader<MemReader> {
Reader::from_bytes(s.into_string().into_bytes())
}
pub fn from_bytes<V: IntoVector<u8>>(bytes: V) -> Reader<MemReader> {
Reader::from_reader(MemReader::new(bytes.into_vec()))
}
}
impl<R: io::Reader> Reader<R> {
pub fn decode<'a, D: Decodable<Decoded, Error>>
(&'a mut self) -> DecodedRecords<'a, R, D> {
DecodedRecords { p: self.byte_records() }
}
pub fn records<'a>(&'a mut self) -> StringRecords<'a, R> {
StringRecords { p: self.byte_records() }
}
pub fn headers(&mut self) -> CsvResult<Vec<String>> {
byte_record_to_utf8(try!(self.byte_headers()))
}
}
impl<R: io::Reader> Reader<R> {
pub fn delimiter(mut self, delimiter: u8) -> Reader<R> {
self.delimiter = delimiter;
self
}
pub fn has_headers(mut self, yes: bool) -> Reader<R> {
self.has_headers = yes;
self
}
pub fn flexible(mut self, yes: bool) -> Reader<R> {
self.flexible = yes;
self
}
pub fn record_terminator(mut self, term: RecordTerminator) -> Reader<R> {
self.record_terminator = term;
self
}
pub fn quote(mut self, quote: u8) -> Reader<R> {
self.quote = quote;
self
}
pub fn escape(mut self, escape: u8) -> Reader<R> {
self.escape = escape;
self
}
pub fn double_quote(mut self, yes: bool) -> Reader<R> {
self.double_quote = yes;
self
}
}
pub enum NextField<'a> {
Data(&'a [u8]),
Error(Error),
EndOfRecord,
EndOfCsv,
}
impl<'a> NextField<'a> {
pub fn into_iter_result(self) -> Option<CsvResult<&'a [u8]>> {
match self {
NextField::EndOfRecord | NextField::EndOfCsv => None,
NextField::Error(err) => Some(Err(err)),
NextField::Data(field) => Some(Ok(field)),
}
}
pub fn is_end(&self) -> bool {
if let NextField::EndOfCsv = *self { true } else { false }
}
}
impl<R: io::Reader> Reader<R> {
pub fn byte_headers(&mut self) -> CsvResult<Vec<ByteString>> {
if !self.first_record.is_empty() {
Ok(self.first_record.clone())
} else {
let mut headers = vec![];
loop {
let field = match self.next_field() {
NextField::EndOfRecord | NextField::EndOfCsv => break,
NextField::Error(err) => return Err(err),
NextField::Data(field) => field,
};
headers.push(ByteString::from_bytes(field));
}
assert!(headers.len() > 0 || self.done());
Ok(headers)
}
}
pub fn byte_records<'a>(&'a mut self) -> ByteRecords<'a, R> {
let first = self.has_seeked;
ByteRecords { p: self, first: first }
}
pub fn done(&self) -> bool {
self.err.is_some()
}
pub fn next_field<'a>(&'a mut self) -> NextField<'a> {
unsafe { self.fieldbuf.set_len(0); }
if self.state == EndRecord {
let first_len = self.first_record.len() as u64;
if !self.flexible && first_len != self.field_count {
let err = self.parse_err(ParseErrorKind::UnequalLengths(
self.first_record.len() as u64, self.field_count));
self.err = Some(err.clone());
return NextField::Error(err);
}
self.state = StartRecord;
self.parsing_first_record = false;
self.line_record = self.line_current;
self.field_count = 0;
return NextField::EndOfRecord;
}
if let Some(_) = self.err {
return NextField::EndOfCsv;
}
let mut pmachine = ParseMachine {
fieldbuf: &mut self.fieldbuf,
state: &mut self.state,
delimiter: self.delimiter,
record_terminator: self.record_terminator,
quote: self.quote,
escape: self.escape,
double_quote: self.double_quote,
};
let mut consumed = 0; 'TOPLOOP: loop {
match self.buffer.fill_buf() {
Err(err) => {
self.err = Some(Error::Io(err));
break 'TOPLOOP;
}
Ok(bs) => {
for &b in bs.iter() {
pmachine.parse_byte(b);
if *pmachine.state == EndRecord {
break 'TOPLOOP;
} else {
consumed += 1;
self.column += 1;
self.byte_offset += 1;
if b == b'\n' {
self.line_current += 1;
self.column = 1;
}
if *pmachine.state == StartField {
break 'TOPLOOP
}
}
}
}
}
self.buffer.consume(consumed);
consumed = 0;
}
self.buffer.consume(consumed);
match self.err {
None => {}
Some(Error::Io(io::IoError { kind: io::EndOfFile, .. })) => {
if *pmachine.state == StartRecord {
return NextField::EndOfCsv;
}
*pmachine.state = EndRecord;
}
Some(ref err) => {
*pmachine.state = StartRecord;
return NextField::Error(err.clone());
}
}
if self.parsing_first_record {
let bytes = ByteString::from_bytes((*pmachine.fieldbuf)[]);
self.first_record.push(bytes);
}
self.field_count += 1;
NextField::Data((*pmachine.fieldbuf)[])
}
#[doc(hidden)]
pub unsafe fn byte_fields<'a>(&'a mut self) -> UnsafeByteFields<'a, R> {
UnsafeByteFields { rdr: self }
}
pub fn line(&self) -> u64 {
self.line_record
}
pub fn byte_offset(&self) -> u64 {
self.byte_offset
}
fn parse_err(&self, kind: ParseErrorKind) -> Error {
Error::Parse(ParseError {
line: self.line_record,
column: self.column,
kind: kind,
})
}
}
impl<R: io::Reader + io::Seek> Reader<R> {
pub fn seek(&mut self, pos: i64, style: io::SeekStyle) -> CsvResult<()> {
self.has_seeked = true;
if pos as u64 == self.byte_offset() {
return Ok(())
}
self.buffer.clear();
self.err = None;
self.byte_offset = pos as u64;
try!(self.buffer.get_mut().seek(pos, style));
Ok(())
}
}
struct ParseMachine<'a> {
fieldbuf: &'a mut Vec<u8>,
state: &'a mut ParseState,
delimiter: u8,
record_terminator: RecordTerminator,
quote: u8,
escape: u8,
double_quote: bool,
}
#[deriving(Eq, PartialEq, Show)]
enum ParseState {
StartRecord,
EndRecord,
StartField,
RecordTermCR,
RecordTermLF,
RecordTermAny,
InField,
InQuotedField,
InQuotedFieldEscape,
InQuotedFieldQuote,
}
impl<'a> ParseMachine<'a> {
#[inline]
fn parse_byte(&mut self, b: u8) {
match *self.state {
StartRecord => self.parse_start_record(b),
EndRecord => unreachable!(),
StartField => self.parse_start_field(b),
RecordTermCR => self.parse_record_term_cr(b),
RecordTermLF => self.parse_record_term_lf(b),
RecordTermAny => self.parse_record_term_any(b),
InField => self.parse_in_field(b),
InQuotedField => self.parse_in_quoted_field(b),
InQuotedFieldEscape => self.parse_in_quoted_field_escape(b),
InQuotedFieldQuote => self.parse_in_quoted_field_quote(b),
}
}
#[inline]
fn parse_start_record(&mut self, b: u8) {
if !self.is_record_term(b) {
*self.state = StartField;
self.parse_start_field(b);
}
}
#[inline]
fn parse_start_field(&mut self, b: u8) {
if self.is_record_term(b) {
*self.state = self.record_term_next_state(b);
} else if b == self.quote {
*self.state = InQuotedField;
} else if b == self.delimiter {
} else {
*self.state = InField;
self.fieldbuf.push(b);
}
}
#[inline]
fn parse_record_term_cr(&mut self, b: u8) {
if b == b'\n' {
*self.state = RecordTermLF;
} else if b != b'\r' {
*self.state = EndRecord;
}
}
#[inline]
fn parse_record_term_lf(&mut self, b: u8) {
if b == b'\r' {
*self.state = RecordTermCR;
} else if b != b'\n' {
*self.state = EndRecord;
}
}
#[inline]
fn parse_record_term_any(&mut self, b: u8) {
match self.record_terminator {
RecordTerminator::CRLF => unreachable!(),
RecordTerminator::Any(bb) => {
if b != bb {
*self.state = EndRecord;
}
}
}
}
#[inline]
fn parse_in_field(&mut self, b: u8) {
if self.is_record_term(b) {
*self.state = self.record_term_next_state(b);
} else if b == self.delimiter {
*self.state = StartField;
} else {
self.fieldbuf.push(b);
}
}
#[inline]
fn parse_in_quoted_field(&mut self, b: u8) {
if b == self.quote {
*self.state = InQuotedFieldQuote;
} else if !self.double_quote && b == self.escape {
*self.state = InQuotedFieldEscape;
} else {
self.fieldbuf.push(b);
}
}
#[inline]
fn parse_in_quoted_field_escape(&mut self, b: u8) {
*self.state = InQuotedField;
self.fieldbuf.push(b);
}
#[inline]
fn parse_in_quoted_field_quote(&mut self, b: u8) {
if self.double_quote && b == self.quote {
*self.state = InQuotedField;
self.fieldbuf.push(b);
} else if b == self.delimiter {
*self.state = StartField;
} else if self.is_record_term(b) {
*self.state = self.record_term_next_state(b);
} else {
*self.state = InField;
self.fieldbuf.push(b);
}
}
#[inline]
fn is_record_term(&self, b: u8) -> bool {
self.record_terminator == b
}
#[inline]
fn record_term_next_state(&self, b: u8) -> ParseState {
match self.record_terminator {
RecordTerminator::CRLF => {
if b == b'\r' {
RecordTermCR
} else if b == b'\n' {
RecordTermLF
} else {
unreachable!()
}
}
RecordTerminator::Any(_) => RecordTermAny,
}
}
}
#[doc(hidden)]
pub struct UnsafeByteFields<'a, R: 'a> {
rdr: &'a mut Reader<R>,
}
#[doc(hidden)]
impl<'a, R: io::Reader> Iterator<CsvResult<&'a [u8]>>
for UnsafeByteFields<'a, R> {
fn next(&mut self) -> Option<CsvResult<&'a [u8]>> {
unsafe {
::std::mem::transmute(self.rdr.next_field().into_iter_result())
}
}
}
pub struct DecodedRecords<'a, R: 'a, D> {
p: ByteRecords<'a, R>,
}
impl<'a, R: io::Reader, D: Decodable<Decoded, Error>> Iterator<CsvResult<D>>
for DecodedRecords<'a, R, D> {
fn next(&mut self) -> Option<CsvResult<D>> {
self.p.next().map(|res| {
res.and_then(|byte_record| {
Decodable::decode(&mut Decoded::new(byte_record))
})
})
}
}
pub struct StringRecords<'a, R: 'a> {
p: ByteRecords<'a, R>,
}
impl<'a, R: io::Reader> Iterator<CsvResult<Vec<String>>>
for StringRecords<'a, R> {
fn next(&mut self) -> Option<CsvResult<Vec<String>>> {
self.p.next().map(|res| {
res.and_then(|byte_record| {
byte_record_to_utf8(byte_record)
})
})
}
}
pub struct ByteRecords<'a, R: 'a> {
p: &'a mut Reader<R>,
first: bool,
}
impl<'a, R: io::Reader> Iterator<CsvResult<Vec<ByteString>>>
for ByteRecords<'a, R> {
fn next(&mut self) -> Option<CsvResult<Vec<ByteString>>> {
if !self.first {
self.first = true;
let headers = self.p.byte_headers();
if headers.as_ref().map(|r| r.is_empty()).unwrap_or(false) {
assert!(self.p.done());
return None;
}
if !self.p.has_headers {
return Some(headers);
}
}
if self.p.done() {
return None;
}
let mut record = Vec::with_capacity(self.p.first_record.len());
loop {
match self.p.next_field() {
NextField::EndOfRecord | NextField::EndOfCsv => break,
NextField::Error(err) => return Some(Err(err)),
NextField::Data(field) =>
record.push(ByteString::from_bytes(field)),
}
}
Some(Ok(record))
}
}
fn byte_record_to_utf8(record: Vec<ByteString>) -> CsvResult<Vec<String>> {
for bytes in record.iter() {
if !::std::str::is_utf8(bytes[]) {
return Err(Error::Decode(format!(
"Could not decode the following bytes as UTF-8: {}", bytes)));
}
}
Ok(unsafe { ::std::mem::transmute(record) })
}