1use crate::data::{DataRow, DataStreamInfo, DataStreamResult, stream_from_async_iter};
4use futures::stream;
5use std::path::PathBuf;
6use tokio::fs::File as AsyncFile;
7use tokio::io::{
8 AsyncBufReadExt, AsyncWriteExt, BufReader as AsyncBufReader, BufWriter as AsyncBufWriter,
9};
10use tokio_stream::{Stream, StreamExt};
11
12pub struct CsvReader {
14 reader: AsyncBufReader<AsyncFile>,
15 headers: Vec<String>,
16 line_number: usize,
17}
18
19impl CsvReader {
20 pub async fn from_file(file_path: &PathBuf) -> Result<Self, String> {
22 let file = AsyncFile::open(file_path)
23 .await
24 .map_err(|e| format!("Failed to open CSV file: {}", e))?;
25
26 let mut reader = AsyncBufReader::new(file);
27 let mut first_line = String::new();
28
29 reader
31 .read_line(&mut first_line)
32 .await
33 .map_err(|e| format!("Failed to read CSV headers: {}", e))?;
34
35 let headers = parse_csv_line(&first_line.trim());
36
37 Ok(Self {
38 reader,
39 headers,
40 line_number: 1,
41 })
42 }
43
44 pub fn headers(&self) -> &[String] {
46 &self.headers
47 }
48
49 pub fn into_stream(self) -> impl Stream<Item = Result<DataRow, String>> {
51 let headers = self.headers.clone();
52 stream::unfold(
53 (self.reader, self.line_number),
54 move |(mut reader, mut line_num)| {
55 let headers = headers.clone();
56 async move {
57 let mut line = String::new();
58 match reader.read_line(&mut line).await {
59 Ok(0) => None, Ok(_) => {
61 line_num += 1;
62 let line = line.trim();
63 if line.is_empty() {
64 Some((Ok(DataRow::new()), (reader, line_num)))
66 } else {
67 let values = parse_csv_line(line);
68 let mut row = DataRow::new();
69
70 for (i, header) in headers.iter().enumerate() {
72 let value = values.get(i).unwrap_or(&String::new()).clone();
73 row.insert(header.clone(), value);
74 }
75
76 Some((Ok(row), (reader, line_num)))
77 }
78 }
79 Err(e) => Some((
80 Err(format!("Error reading line {}: {}", line_num, e)),
81 (reader, line_num),
82 )),
83 }
84 }
85 },
86 )
87 .filter(|result| {
88 match result {
90 Ok(row) => !row.columns.is_empty(),
91 Err(_) => true,
92 }
93 })
94 }
95}
96
97pub struct CsvWriter {
99 writer: AsyncBufWriter<AsyncFile>,
100 headers_written: bool,
101}
102
103impl CsvWriter {
104 pub async fn from_file(file_path: &PathBuf) -> Result<Self, String> {
106 let file = AsyncFile::create(file_path)
107 .await
108 .map_err(|e| format!("Failed to create CSV file: {}", e))?;
109
110 let writer = AsyncBufWriter::new(file);
111
112 Ok(Self {
113 writer,
114 headers_written: false,
115 })
116 }
117
118 pub async fn write_headers(&mut self, headers: &[String]) -> Result<(), String> {
120 if !self.headers_written {
121 let header_line = headers.join(",");
122 self.writer
123 .write_all(format!("{}\n", header_line).as_bytes())
124 .await
125 .map_err(|e| format!("Failed to write CSV headers: {}", e))?;
126 self.headers_written = true;
127 }
128 Ok(())
129 }
130
131 pub async fn write_row(&mut self, row: &DataRow, headers: &[String]) -> Result<(), String> {
133 let csv_line = row.to_csv_line(headers);
134 self.writer
135 .write_all(format!("{}\n", csv_line).as_bytes())
136 .await
137 .map_err(|e| format!("Failed to write CSV row: {}", e))
138 }
139
140 pub async fn write_stream<S>(&mut self, mut stream: S, headers: &[String]) -> Result<(), String>
142 where
143 S: Stream<Item = Result<DataRow, String>> + Unpin,
144 {
145 self.write_headers(headers).await?;
147
148 while let Some(result) = stream.next().await {
150 match result {
151 Ok(row) => self.write_row(&row, headers).await?,
152 Err(e) => return Err(format!("Stream error: {}", e)),
153 }
154 }
155
156 Ok(())
157 }
158
159 pub async fn finish(mut self) -> Result<(), String> {
161 self.writer
162 .flush()
163 .await
164 .map_err(|e| format!("Failed to flush CSV output: {}", e))
165 }
166}
167
168fn parse_csv_line(line: &str) -> Vec<String> {
170 line.split(',')
173 .map(|field| field.trim().to_string())
174 .collect()
175}
176
177pub async fn load_csv_stream(file_path: &PathBuf) -> Result<DataStreamResult, String> {
179 let reader = CsvReader::from_file(file_path).await?;
180 let headers = reader.headers().to_vec();
181 let stream = reader.into_stream();
182
183 let info = DataStreamInfo::new(headers)
184 .with_description(format!("CSV data from {}", file_path.display()));
185
186 Ok(DataStreamResult::new(info, stream))
187}
188
189pub async fn write_csv_stream(result: DataStreamResult, file_path: &PathBuf) -> Result<(), String> {
191 let mut writer = CsvWriter::from_file(file_path).await?;
192 let headers = result.info.headers.clone();
193 let stream = result.stream;
194
195 writer.write_stream(stream, &headers).await?;
196 writer.finish().await?;
197
198 Ok(())
199}
200
201pub async fn write_csv_lines<I, S>(
203 lines: I,
204 file_path: &PathBuf,
205 headers: Option<&[&str]>,
206) -> Result<(), String>
207where
208 I: Iterator<Item = Vec<S>>,
209 S: AsRef<str>,
210{
211 let mut writer = CsvWriter::from_file(file_path).await?;
212
213 if let Some(headers) = headers {
215 let header_strings: Vec<String> = headers.iter().map(|h| h.to_string()).collect();
216 writer.write_headers(&header_strings).await?;
217 }
218
219 for line in lines {
221 let mut row = DataRow::new();
222 for (i, value) in line.iter().enumerate() {
223 row.insert(format!("col_{}", i), value.as_ref());
224 }
225
226 let header_vec: Vec<String> = (0..line.len()).map(|i| format!("col_{}", i)).collect();
227 writer.write_row(&row, &header_vec).await?;
228 }
229
230 writer.finish().await?;
231 Ok(())
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use tempfile::NamedTempFile;
238 use tokio::io::AsyncWriteExt;
239
240 #[tokio::test]
241 async fn test_csv_parsing() {
242 use std::io::Write;
243
244 let csv_content = "name,age,city\nAlice,25,NYC\nBob,30,LA\n";
245 let mut temp_file = NamedTempFile::new().unwrap();
246 temp_file.write_all(csv_content.as_bytes()).unwrap();
247 temp_file.flush().unwrap();
248
249 let path = temp_file.path().to_path_buf();
250 let result = load_csv_stream(&path).await.unwrap();
251
252 assert_eq!(result.info.headers, vec!["name", "age", "city"]);
253
254 let rows: Vec<_> = result.stream.collect().await;
255 assert_eq!(rows.len(), 2);
256
257 let first_row = rows[0].as_ref().unwrap();
258 assert_eq!(first_row.get("name"), Some(&"Alice".to_string()));
259 assert_eq!(first_row.get("age"), Some(&"25".to_string()));
260 assert_eq!(first_row.get("city"), Some(&"NYC".to_string()));
261 }
262
263 #[tokio::test]
264 async fn test_csv_writing() {
265 let rows = vec![
266 DataRow::new()
267 .with_column("name", "Alice")
268 .with_column("age", "25"),
269 DataRow::new()
270 .with_column("name", "Bob")
271 .with_column("age", "30"),
272 ];
273
274 let temp_file = NamedTempFile::new().unwrap();
275 let path = temp_file.path().to_path_buf();
276
277 let headers = vec!["name".to_string(), "age".to_string()];
278 let info = DataStreamInfo::new(headers.clone());
279 let stream = crate::data::stream_from_iter(rows);
280 let result = DataStreamResult::new(info, stream);
281
282 write_csv_stream(result, &path).await.unwrap();
283
284 let content = std::fs::read_to_string(&path).unwrap();
285 let lines: Vec<&str> = content.trim().split('\n').collect();
286
287 assert_eq!(lines.len(), 3); assert_eq!(lines[0], "name,age");
289 assert_eq!(lines[1], "Alice,25");
290 assert_eq!(lines[2], "Bob,30");
291 }
292}