rmap/
sink.rs

1//! Output formatting and streaming for command results
2
3use crate::data::{DataRow, DataStreamResult};
4use crate::loaders::csv::{CsvWriter, write_csv_stream};
5use comfy_table::{Attribute, Cell, ContentArrangement, Table, modifiers, presets};
6use std::path::PathBuf;
7use tokio_stream::StreamExt;
8
9/// Print streaming data results to console or file
10pub async fn print_datastream_result(
11    result: DataStreamResult,
12    output_file: &str,
13) -> Result<(), String> {
14    if output_file == "-" {
15        // Print to console as table
16        print_to_console(result).await
17    } else {
18        // Write to file
19        let path = PathBuf::from(output_file);
20        if is_csv_file(&path) {
21            write_csv_stream(result, &path).await
22        } else {
23            write_plain_text(result, &path).await
24        }
25    }
26}
27
28/// Print streaming results to console as a formatted table
29async fn print_to_console(mut result: DataStreamResult) -> Result<(), String> {
30    let headers = result.info.headers.clone();
31
32    // Create table with headers
33    let mut table = Table::new();
34    table
35        .load_preset(presets::UTF8_FULL_CONDENSED)
36        .apply_modifier(modifiers::UTF8_ROUND_CORNERS)
37        .set_content_arrangement(ContentArrangement::Dynamic)
38        .set_header(&headers);
39
40    // Add rows from stream
41    let mut row_count = 0;
42    while let Some(result_item) = result.stream.next().await {
43        match result_item {
44            Ok(row) => {
45                let row_values: Vec<String> = headers
46                    .iter()
47                    .map(|h| row.get(h).cloned().unwrap_or_default())
48                    .collect();
49
50                table.add_row(row_values);
51                row_count += 1;
52
53                // Print partial results for large datasets (every 1000 rows)
54                if row_count % 1000 == 0 {
55                    println!("Processed {} rows...", row_count);
56                }
57            }
58            Err(e) => return Err(format!("Stream error: {}", e)),
59        }
60    }
61
62    // Print the final table
63    println!("{}", table);
64
65    if let Some(description) = result.info.description {
66        println!("\n{}", description);
67    }
68
69    println!("Total rows: {}", row_count);
70
71    Ok(())
72}
73
74/// Write streaming results to a plain text file
75async fn write_plain_text(mut result: DataStreamResult, path: &PathBuf) -> Result<(), String> {
76    use tokio::fs::File;
77    use tokio::io::{AsyncWriteExt, BufWriter};
78
79    let file = File::create(path)
80        .await
81        .map_err(|e| format!("Failed to create output file: {}", e))?;
82
83    let mut writer = BufWriter::new(file);
84    let headers = result.info.headers.clone();
85
86    // Write headers
87    let header_line = headers.join("\t");
88    writer
89        .write_all(format!("{}\n", header_line).as_bytes())
90        .await
91        .map_err(|e| format!("Failed to write headers: {}", e))?;
92
93    // Write data rows
94    while let Some(result_item) = result.stream.next().await {
95        match result_item {
96            Ok(row) => {
97                let row_values: Vec<String> = headers
98                    .iter()
99                    .map(|h| row.get(h).cloned().unwrap_or_default())
100                    .collect();
101
102                let row_line = row_values.join("\t");
103                writer
104                    .write_all(format!("{}\n", row_line).as_bytes())
105                    .await
106                    .map_err(|e| format!("Failed to write row: {}", e))?;
107            }
108            Err(e) => return Err(format!("Stream error: {}", e)),
109        }
110    }
111
112    writer
113        .flush()
114        .await
115        .map_err(|e| format!("Failed to flush output: {}", e))?;
116
117    Ok(())
118}
119
120/// Check if file path indicates CSV format
121fn is_csv_file(path: &PathBuf) -> bool {
122    path.extension()
123        .and_then(|ext| ext.to_str())
124        .map(|ext| ext.to_lowercase() == "csv")
125        .unwrap_or(false)
126}
127
128/// Print streaming results with progress indication
129pub async fn print_datastream_with_progress(
130    mut result: DataStreamResult,
131    output_file: &str,
132) -> Result<(), String> {
133    use indicatif::{ProgressBar, ProgressStyle};
134
135    let progress_bar = if let Some(total) = result.info.total_rows {
136        let pb = ProgressBar::new(total as u64);
137        pb.set_style(
138            ProgressStyle::default_bar()
139                .template(
140                    "{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})",
141                )
142                .unwrap()
143                .progress_chars("#>-"),
144        );
145        Some(pb)
146    } else {
147        let pb = ProgressBar::new_spinner();
148        pb.set_style(
149            ProgressStyle::default_spinner()
150                .template("{spinner:.green} [{elapsed_precise}] {pos} rows processed")
151                .unwrap(),
152        );
153        Some(pb)
154    };
155
156    let headers = result.info.headers.clone();
157
158    if output_file == "-" {
159        // Print to console with progress
160        let mut table = Table::new();
161        table
162            .load_preset(presets::UTF8_FULL_CONDENSED)
163            .apply_modifier(modifiers::UTF8_ROUND_CORNERS)
164            .set_content_arrangement(ContentArrangement::Dynamic)
165            .set_header(&headers);
166
167        let mut row_count = 0;
168        while let Some(result_item) = result.stream.next().await {
169            match result_item {
170                Ok(row) => {
171                    let row_values: Vec<String> = headers
172                        .iter()
173                        .map(|h| row.get(h).cloned().unwrap_or_default())
174                        .collect();
175
176                    table.add_row(row_values);
177                    row_count += 1;
178
179                    if let Some(ref pb) = progress_bar {
180                        pb.set_position(row_count);
181                    }
182                }
183                Err(e) => return Err(format!("Stream error: {}", e)),
184            }
185        }
186
187        if let Some(pb) = progress_bar {
188            pb.finish_with_message("Processing complete");
189        }
190
191        println!("{}", table);
192
193        if let Some(description) = result.info.description {
194            println!("\n{}", description);
195        }
196
197        println!("Total rows: {}", row_count);
198    } else {
199        // Write to file with progress
200        let path = PathBuf::from(output_file);
201
202        if is_csv_file(&path) {
203            let mut writer = CsvWriter::from_file(&path).await?;
204            writer.write_headers(&headers).await?;
205
206            let mut row_count = 0;
207            while let Some(result_item) = result.stream.next().await {
208                match result_item {
209                    Ok(row) => {
210                        writer.write_row(&row, &headers).await?;
211                        row_count += 1;
212
213                        if let Some(ref pb) = progress_bar {
214                            pb.set_position(row_count);
215                        }
216                    }
217                    Err(e) => return Err(format!("Stream error: {}", e)),
218                }
219            }
220
221            writer.finish().await?;
222        } else {
223            write_plain_text(DataStreamResult::new(result.info, result.stream), &path).await?;
224        }
225
226        if let Some(pb) = progress_bar {
227            pb.finish_with_message("File written successfully");
228        }
229    }
230
231    Ok(())
232}
233
234/// Stream data directly to output without buffering (for very large datasets)
235pub async fn stream_to_output(
236    mut result: DataStreamResult,
237    output_file: &str,
238) -> Result<(), String> {
239    let headers = result.info.headers.clone();
240
241    if output_file == "-" {
242        // Stream to console without table formatting (for large datasets)
243        let header_line = headers.join(",");
244        println!("{}", header_line);
245
246        while let Some(result_item) = result.stream.next().await {
247            match result_item {
248                Ok(row) => {
249                    let row_values: Vec<String> = headers
250                        .iter()
251                        .map(|h| row.get(h).cloned().unwrap_or_default())
252                        .collect();
253
254                    let row_line = row_values.join(",");
255                    println!("{}", row_line);
256                }
257                Err(e) => return Err(format!("Stream error: {}", e)),
258            }
259        }
260    } else {
261        // Stream directly to file
262        let path = PathBuf::from(output_file);
263
264        if is_csv_file(&path) {
265            write_csv_stream(result, &path).await?;
266        } else {
267            write_plain_text(result, &path).await?;
268        }
269    }
270
271    Ok(())
272}