1use crate::data::{DataRow, DataStreamResult};
4use crate::loaders::csv::{CsvWriter, write_csv_stream};
5use comfy_table::{Attribute, Cell, ContentArrangement, Table, modifiers, presets};
6use std::path::PathBuf;
7use tokio_stream::StreamExt;
8
9pub async fn print_datastream_result(
11 result: DataStreamResult,
12 output_file: &str,
13) -> Result<(), String> {
14 if output_file == "-" {
15 print_to_console(result).await
17 } else {
18 let path = PathBuf::from(output_file);
20 if is_csv_file(&path) {
21 write_csv_stream(result, &path).await
22 } else {
23 write_plain_text(result, &path).await
24 }
25 }
26}
27
28async fn print_to_console(mut result: DataStreamResult) -> Result<(), String> {
30 let headers = result.info.headers.clone();
31
32 let mut table = Table::new();
34 table
35 .load_preset(presets::UTF8_FULL_CONDENSED)
36 .apply_modifier(modifiers::UTF8_ROUND_CORNERS)
37 .set_content_arrangement(ContentArrangement::Dynamic)
38 .set_header(&headers);
39
40 let mut row_count = 0;
42 while let Some(result_item) = result.stream.next().await {
43 match result_item {
44 Ok(row) => {
45 let row_values: Vec<String> = headers
46 .iter()
47 .map(|h| row.get(h).cloned().unwrap_or_default())
48 .collect();
49
50 table.add_row(row_values);
51 row_count += 1;
52
53 if row_count % 1000 == 0 {
55 println!("Processed {} rows...", row_count);
56 }
57 }
58 Err(e) => return Err(format!("Stream error: {}", e)),
59 }
60 }
61
62 println!("{}", table);
64
65 if let Some(description) = result.info.description {
66 println!("\n{}", description);
67 }
68
69 println!("Total rows: {}", row_count);
70
71 Ok(())
72}
73
74async fn write_plain_text(mut result: DataStreamResult, path: &PathBuf) -> Result<(), String> {
76 use tokio::fs::File;
77 use tokio::io::{AsyncWriteExt, BufWriter};
78
79 let file = File::create(path)
80 .await
81 .map_err(|e| format!("Failed to create output file: {}", e))?;
82
83 let mut writer = BufWriter::new(file);
84 let headers = result.info.headers.clone();
85
86 let header_line = headers.join("\t");
88 writer
89 .write_all(format!("{}\n", header_line).as_bytes())
90 .await
91 .map_err(|e| format!("Failed to write headers: {}", e))?;
92
93 while let Some(result_item) = result.stream.next().await {
95 match result_item {
96 Ok(row) => {
97 let row_values: Vec<String> = headers
98 .iter()
99 .map(|h| row.get(h).cloned().unwrap_or_default())
100 .collect();
101
102 let row_line = row_values.join("\t");
103 writer
104 .write_all(format!("{}\n", row_line).as_bytes())
105 .await
106 .map_err(|e| format!("Failed to write row: {}", e))?;
107 }
108 Err(e) => return Err(format!("Stream error: {}", e)),
109 }
110 }
111
112 writer
113 .flush()
114 .await
115 .map_err(|e| format!("Failed to flush output: {}", e))?;
116
117 Ok(())
118}
119
120fn is_csv_file(path: &PathBuf) -> bool {
122 path.extension()
123 .and_then(|ext| ext.to_str())
124 .map(|ext| ext.to_lowercase() == "csv")
125 .unwrap_or(false)
126}
127
128pub async fn print_datastream_with_progress(
130 mut result: DataStreamResult,
131 output_file: &str,
132) -> Result<(), String> {
133 use indicatif::{ProgressBar, ProgressStyle};
134
135 let progress_bar = if let Some(total) = result.info.total_rows {
136 let pb = ProgressBar::new(total as u64);
137 pb.set_style(
138 ProgressStyle::default_bar()
139 .template(
140 "{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})",
141 )
142 .unwrap()
143 .progress_chars("#>-"),
144 );
145 Some(pb)
146 } else {
147 let pb = ProgressBar::new_spinner();
148 pb.set_style(
149 ProgressStyle::default_spinner()
150 .template("{spinner:.green} [{elapsed_precise}] {pos} rows processed")
151 .unwrap(),
152 );
153 Some(pb)
154 };
155
156 let headers = result.info.headers.clone();
157
158 if output_file == "-" {
159 let mut table = Table::new();
161 table
162 .load_preset(presets::UTF8_FULL_CONDENSED)
163 .apply_modifier(modifiers::UTF8_ROUND_CORNERS)
164 .set_content_arrangement(ContentArrangement::Dynamic)
165 .set_header(&headers);
166
167 let mut row_count = 0;
168 while let Some(result_item) = result.stream.next().await {
169 match result_item {
170 Ok(row) => {
171 let row_values: Vec<String> = headers
172 .iter()
173 .map(|h| row.get(h).cloned().unwrap_or_default())
174 .collect();
175
176 table.add_row(row_values);
177 row_count += 1;
178
179 if let Some(ref pb) = progress_bar {
180 pb.set_position(row_count);
181 }
182 }
183 Err(e) => return Err(format!("Stream error: {}", e)),
184 }
185 }
186
187 if let Some(pb) = progress_bar {
188 pb.finish_with_message("Processing complete");
189 }
190
191 println!("{}", table);
192
193 if let Some(description) = result.info.description {
194 println!("\n{}", description);
195 }
196
197 println!("Total rows: {}", row_count);
198 } else {
199 let path = PathBuf::from(output_file);
201
202 if is_csv_file(&path) {
203 let mut writer = CsvWriter::from_file(&path).await?;
204 writer.write_headers(&headers).await?;
205
206 let mut row_count = 0;
207 while let Some(result_item) = result.stream.next().await {
208 match result_item {
209 Ok(row) => {
210 writer.write_row(&row, &headers).await?;
211 row_count += 1;
212
213 if let Some(ref pb) = progress_bar {
214 pb.set_position(row_count);
215 }
216 }
217 Err(e) => return Err(format!("Stream error: {}", e)),
218 }
219 }
220
221 writer.finish().await?;
222 } else {
223 write_plain_text(DataStreamResult::new(result.info, result.stream), &path).await?;
224 }
225
226 if let Some(pb) = progress_bar {
227 pb.finish_with_message("File written successfully");
228 }
229 }
230
231 Ok(())
232}
233
234pub async fn stream_to_output(
236 mut result: DataStreamResult,
237 output_file: &str,
238) -> Result<(), String> {
239 let headers = result.info.headers.clone();
240
241 if output_file == "-" {
242 let header_line = headers.join(",");
244 println!("{}", header_line);
245
246 while let Some(result_item) = result.stream.next().await {
247 match result_item {
248 Ok(row) => {
249 let row_values: Vec<String> = headers
250 .iter()
251 .map(|h| row.get(h).cloned().unwrap_or_default())
252 .collect();
253
254 let row_line = row_values.join(",");
255 println!("{}", row_line);
256 }
257 Err(e) => return Err(format!("Stream error: {}", e)),
258 }
259 }
260 } else {
261 let path = PathBuf::from(output_file);
263
264 if is_csv_file(&path) {
265 write_csv_stream(result, &path).await?;
266 } else {
267 write_plain_text(result, &path).await?;
268 }
269 }
270
271 Ok(())
272}