rmap/loaders/
csv.rs

1//! Async CSV parser and writer with streaming support
2
3use crate::data::{DataRow, DataStreamInfo, DataStreamResult, stream_from_async_iter};
4use futures::stream;
5use std::path::PathBuf;
6use tokio::fs::File as AsyncFile;
7use tokio::io::{
8    AsyncBufReadExt, AsyncWriteExt, BufReader as AsyncBufReader, BufWriter as AsyncBufWriter,
9};
10use tokio_stream::{Stream, StreamExt};
11
12/// Async CSV reader that streams rows
13pub struct CsvReader {
14    reader: AsyncBufReader<AsyncFile>,
15    headers: Vec<String>,
16    line_number: usize,
17}
18
19impl CsvReader {
20    /// Create a new CSV reader from a file path
21    pub async fn from_file(file_path: &PathBuf) -> Result<Self, String> {
22        let file = AsyncFile::open(file_path)
23            .await
24            .map_err(|e| format!("Failed to open CSV file: {}", e))?;
25
26        let mut reader = AsyncBufReader::new(file);
27        let mut first_line = String::new();
28
29        // Read first line for headers
30        reader
31            .read_line(&mut first_line)
32            .await
33            .map_err(|e| format!("Failed to read CSV headers: {}", e))?;
34
35        let headers = parse_csv_line(&first_line.trim());
36
37        Ok(Self {
38            reader,
39            headers,
40            line_number: 1,
41        })
42    }
43
44    /// Get the headers
45    pub fn headers(&self) -> &[String] {
46        &self.headers
47    }
48
49    /// Convert to a stream of DataRows
50    pub fn into_stream(self) -> impl Stream<Item = Result<DataRow, String>> {
51        let headers = self.headers.clone();
52        stream::unfold(
53            (self.reader, self.line_number),
54            move |(mut reader, mut line_num)| {
55                let headers = headers.clone();
56                async move {
57                    let mut line = String::new();
58                    match reader.read_line(&mut line).await {
59                        Ok(0) => None, // EOF
60                        Ok(_) => {
61                            line_num += 1;
62                            let line = line.trim();
63                            if line.is_empty() {
64                                // Skip empty lines, continue iteration
65                                Some((Ok(DataRow::new()), (reader, line_num)))
66                            } else {
67                                let values = parse_csv_line(line);
68                                let mut row = DataRow::new();
69
70                                // Match values to headers
71                                for (i, header) in headers.iter().enumerate() {
72                                    let value = values.get(i).unwrap_or(&String::new()).clone();
73                                    row.insert(header.clone(), value);
74                                }
75
76                                Some((Ok(row), (reader, line_num)))
77                            }
78                        }
79                        Err(e) => Some((
80                            Err(format!("Error reading line {}: {}", line_num, e)),
81                            (reader, line_num),
82                        )),
83                    }
84                }
85            },
86        )
87        .filter(|result| {
88            // Filter out empty rows (from empty lines)
89            match result {
90                Ok(row) => !row.columns.is_empty(),
91                Err(_) => true,
92            }
93        })
94    }
95}
96
97/// Async CSV writer that writes streaming data
98pub struct CsvWriter {
99    writer: AsyncBufWriter<AsyncFile>,
100    headers_written: bool,
101}
102
103impl CsvWriter {
104    /// Create a new CSV writer
105    pub async fn from_file(file_path: &PathBuf) -> Result<Self, String> {
106        let file = AsyncFile::create(file_path)
107            .await
108            .map_err(|e| format!("Failed to create CSV file: {}", e))?;
109
110        let writer = AsyncBufWriter::new(file);
111
112        Ok(Self {
113            writer,
114            headers_written: false,
115        })
116    }
117
118    /// Write headers if not already written
119    pub async fn write_headers(&mut self, headers: &[String]) -> Result<(), String> {
120        if !self.headers_written {
121            let header_line = headers.join(",");
122            self.writer
123                .write_all(format!("{}\n", header_line).as_bytes())
124                .await
125                .map_err(|e| format!("Failed to write CSV headers: {}", e))?;
126            self.headers_written = true;
127        }
128        Ok(())
129    }
130
131    /// Write a single data row
132    pub async fn write_row(&mut self, row: &DataRow, headers: &[String]) -> Result<(), String> {
133        let csv_line = row.to_csv_line(headers);
134        self.writer
135            .write_all(format!("{}\n", csv_line).as_bytes())
136            .await
137            .map_err(|e| format!("Failed to write CSV row: {}", e))
138    }
139
140    /// Write streaming data
141    pub async fn write_stream<S>(&mut self, mut stream: S, headers: &[String]) -> Result<(), String>
142    where
143        S: Stream<Item = Result<DataRow, String>> + Unpin,
144    {
145        // Write headers first
146        self.write_headers(headers).await?;
147
148        // Write all rows from stream
149        while let Some(result) = stream.next().await {
150            match result {
151                Ok(row) => self.write_row(&row, headers).await?,
152                Err(e) => return Err(format!("Stream error: {}", e)),
153            }
154        }
155
156        Ok(())
157    }
158
159    /// Flush and close the writer
160    pub async fn finish(mut self) -> Result<(), String> {
161        self.writer
162            .flush()
163            .await
164            .map_err(|e| format!("Failed to flush CSV output: {}", e))
165    }
166}
167
168/// Parse a CSV line into fields (simple implementation)
169fn parse_csv_line(line: &str) -> Vec<String> {
170    // Simple CSV parsing - splits on commas
171    // Note: This doesn't handle quoted fields with commas, but it's sufficient for basic use
172    line.split(',')
173        .map(|field| field.trim().to_string())
174        .collect()
175}
176
177/// Load CSV file as a DataStreamResult
178pub async fn load_csv_stream(file_path: &PathBuf) -> Result<DataStreamResult, String> {
179    let reader = CsvReader::from_file(file_path).await?;
180    let headers = reader.headers().to_vec();
181    let stream = reader.into_stream();
182
183    let info = DataStreamInfo::new(headers)
184        .with_description(format!("CSV data from {}", file_path.display()));
185
186    Ok(DataStreamResult::new(info, stream))
187}
188
189/// Write DataStreamResult to CSV file
190pub async fn write_csv_stream(result: DataStreamResult, file_path: &PathBuf) -> Result<(), String> {
191    let mut writer = CsvWriter::from_file(file_path).await?;
192    let headers = result.info.headers.clone();
193    let stream = result.stream;
194
195    writer.write_stream(stream, &headers).await?;
196    writer.finish().await?;
197
198    Ok(())
199}
200
201/// Write simple CSV lines asynchronously
202pub async fn write_csv_lines<I, S>(
203    lines: I,
204    file_path: &PathBuf,
205    headers: Option<&[&str]>,
206) -> Result<(), String>
207where
208    I: Iterator<Item = Vec<S>>,
209    S: AsRef<str>,
210{
211    let mut writer = CsvWriter::from_file(file_path).await?;
212
213    // Write headers if provided
214    if let Some(headers) = headers {
215        let header_strings: Vec<String> = headers.iter().map(|h| h.to_string()).collect();
216        writer.write_headers(&header_strings).await?;
217    }
218
219    // Write data lines
220    for line in lines {
221        let mut row = DataRow::new();
222        for (i, value) in line.iter().enumerate() {
223            row.insert(format!("col_{}", i), value.as_ref());
224        }
225
226        let header_vec: Vec<String> = (0..line.len()).map(|i| format!("col_{}", i)).collect();
227        writer.write_row(&row, &header_vec).await?;
228    }
229
230    writer.finish().await?;
231    Ok(())
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use tempfile::NamedTempFile;
238    use tokio::io::AsyncWriteExt;
239
240    #[tokio::test]
241    async fn test_csv_parsing() {
242        use std::io::Write;
243
244        let csv_content = "name,age,city\nAlice,25,NYC\nBob,30,LA\n";
245        let mut temp_file = NamedTempFile::new().unwrap();
246        temp_file.write_all(csv_content.as_bytes()).unwrap();
247        temp_file.flush().unwrap();
248
249        let path = temp_file.path().to_path_buf();
250        let result = load_csv_stream(&path).await.unwrap();
251
252        assert_eq!(result.info.headers, vec!["name", "age", "city"]);
253
254        let rows: Vec<_> = result.stream.collect().await;
255        assert_eq!(rows.len(), 2);
256
257        let first_row = rows[0].as_ref().unwrap();
258        assert_eq!(first_row.get("name"), Some(&"Alice".to_string()));
259        assert_eq!(first_row.get("age"), Some(&"25".to_string()));
260        assert_eq!(first_row.get("city"), Some(&"NYC".to_string()));
261    }
262
263    #[tokio::test]
264    async fn test_csv_writing() {
265        let rows = vec![
266            DataRow::new()
267                .with_column("name", "Alice")
268                .with_column("age", "25"),
269            DataRow::new()
270                .with_column("name", "Bob")
271                .with_column("age", "30"),
272        ];
273
274        let temp_file = NamedTempFile::new().unwrap();
275        let path = temp_file.path().to_path_buf();
276
277        let headers = vec!["name".to_string(), "age".to_string()];
278        let info = DataStreamInfo::new(headers.clone());
279        let stream = crate::data::stream_from_iter(rows);
280        let result = DataStreamResult::new(info, stream);
281
282        write_csv_stream(result, &path).await.unwrap();
283
284        let content = std::fs::read_to_string(&path).unwrap();
285        let lines: Vec<&str> = content.trim().split('\n').collect();
286
287        assert_eq!(lines.len(), 3); // header + 2 data rows
288        assert_eq!(lines[0], "name,age");
289        assert_eq!(lines[1], "Alice,25");
290        assert_eq!(lines[2], "Bob,30");
291    }
292}