rmap/
data.rs

1//! Custom data structures and streaming support to replace polars
2
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio_stream::Stream;
8
9/// A single row of data with named columns
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct DataRow {
12    pub columns: HashMap<String, String>,
13}
14
15impl DataRow {
16    pub fn new() -> Self {
17        Self {
18            columns: HashMap::new(),
19        }
20    }
21
22    pub fn with_column(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
23        self.columns.insert(name.into(), value.into());
24        self
25    }
26
27    pub fn insert(&mut self, name: impl Into<String>, value: impl Into<String>) {
28        self.columns.insert(name.into(), value.into());
29    }
30
31    pub fn get(&self, name: &str) -> Option<&String> {
32        self.columns.get(name)
33    }
34
35    pub fn keys(&self) -> impl Iterator<Item = &String> {
36        self.columns.keys()
37    }
38
39    pub fn values(&self) -> impl Iterator<Item = &String> {
40        self.columns.values()
41    }
42
43    /// Convert to CSV line with specified column order
44    pub fn to_csv_line(&self, headers: &[String]) -> String {
45        headers
46            .iter()
47            .map(|h| self.columns.get(h).map(|v| v.as_str()).unwrap_or(""))
48            .collect::<Vec<_>>()
49            .join(",")
50    }
51
52    /// Convert to CSV line with automatic column order
53    pub fn to_csv_line_auto(&self) -> (Vec<String>, String) {
54        let mut headers: Vec<String> = self.columns.keys().cloned().collect();
55        headers.sort(); // Consistent ordering
56        let line = self.to_csv_line(&headers);
57        (headers, line)
58    }
59}
60
61impl Default for DataRow {
62    fn default() -> Self {
63        Self::new()
64    }
65}
66
67/// Metadata about a data stream
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct DataStreamInfo {
70    pub headers: Vec<String>,
71    pub total_rows: Option<usize>,
72    pub description: Option<String>,
73}
74
75impl DataStreamInfo {
76    pub fn new(headers: Vec<String>) -> Self {
77        Self {
78            headers,
79            total_rows: None,
80            description: None,
81        }
82    }
83
84    pub fn with_total_rows(mut self, total: usize) -> Self {
85        self.total_rows = Some(total);
86        self
87    }
88
89    pub fn with_description(mut self, description: impl Into<String>) -> Self {
90        self.description = Some(description.into());
91        self
92    }
93}
94
95/// Result type for command execution with streaming data
96pub struct DataStreamResult {
97    pub info: DataStreamInfo,
98    pub stream: Pin<Box<dyn Stream<Item = Result<DataRow, String>> + Send>>,
99}
100
101impl std::fmt::Debug for DataStreamResult {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        f.debug_struct("DataStreamResult")
104            .field("info", &self.info)
105            .field("stream", &"<Stream>")
106            .finish()
107    }
108}
109
110impl DataStreamResult {
111    pub fn new<S>(info: DataStreamInfo, stream: S) -> Self
112    where
113        S: Stream<Item = Result<DataRow, String>> + Send + 'static,
114    {
115        Self {
116            info,
117            stream: Box::pin(stream),
118        }
119    }
120
121    /// Create a simple result with a single row
122    pub fn single_row(row: DataRow) -> Self {
123        let (headers, _) = row.to_csv_line_auto();
124        let info = DataStreamInfo::new(headers).with_total_rows(1);
125        let stream = tokio_stream::iter(vec![Ok(row)]);
126        Self::new(info, stream)
127    }
128
129    /// Create an empty result
130    pub fn empty(headers: Vec<String>) -> Self {
131        let info = DataStreamInfo::new(headers).with_total_rows(0);
132        let stream = tokio_stream::iter(vec![]);
133        Self::new(info, stream)
134    }
135
136    /// Create an error result
137    pub fn error(error: String) -> Self {
138        let info = DataStreamInfo::new(vec!["error".to_string()]).with_total_rows(1);
139        let stream = tokio_stream::iter(vec![Err(error)]);
140        Self::new(info, stream)
141    }
142}
143
144/// Helper trait for converting various data types to DataRow
145pub trait IntoDataRow {
146    fn into_data_row(self) -> DataRow;
147}
148
149impl IntoDataRow for DataRow {
150    fn into_data_row(self) -> DataRow {
151        self
152    }
153}
154
155impl IntoDataRow for HashMap<String, String> {
156    fn into_data_row(self) -> DataRow {
157        DataRow { columns: self }
158    }
159}
160
161impl<const N: usize> IntoDataRow for [(&str, &str); N] {
162    fn into_data_row(self) -> DataRow {
163        let mut row = DataRow::new();
164        for (key, value) in self {
165            row.insert(key, value);
166        }
167        row
168    }
169}
170
171/// Helper for creating streams from iterators
172pub fn stream_from_iter<I, T>(iter: I) -> impl Stream<Item = Result<DataRow, String>>
173where
174    I: IntoIterator<Item = T>,
175    T: IntoDataRow,
176{
177    tokio_stream::iter(iter.into_iter().map(|item| Ok(item.into_data_row())))
178}
179
180/// Helper for creating streams from async iterators
181pub fn stream_from_async_iter<S>(stream: S) -> impl Stream<Item = Result<DataRow, String>>
182where
183    S: Stream<Item = Result<DataRow, String>>,
184{
185    stream
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    #[test]
193    fn test_data_row_creation() {
194        let row = DataRow::new()
195            .with_column("name", "Alice")
196            .with_column("age", "25");
197
198        assert_eq!(row.get("name"), Some(&"Alice".to_string()));
199        assert_eq!(row.get("age"), Some(&"25".to_string()));
200        assert_eq!(row.get("missing"), None);
201    }
202
203    #[test]
204    fn test_csv_conversion() {
205        let row = DataRow::new()
206            .with_column("name", "Alice")
207            .with_column("age", "25");
208
209        let headers = vec!["name".to_string(), "age".to_string()];
210        let csv_line = row.to_csv_line(&headers);
211        assert_eq!(csv_line, "Alice,25");
212
213        let (auto_headers, auto_line) = row.to_csv_line_auto();
214        assert!(auto_headers.contains(&"name".to_string()));
215        assert!(auto_headers.contains(&"age".to_string()));
216        assert!(auto_line.contains("Alice"));
217        assert!(auto_line.contains("25"));
218    }
219
220    #[tokio::test]
221    async fn test_stream_result() {
222        let rows = vec![
223            DataRow::new().with_column("name", "Alice"),
224            DataRow::new().with_column("name", "Bob"),
225        ];
226
227        let info = DataStreamInfo::new(vec!["name".to_string()]);
228        let stream = stream_from_iter(rows);
229        let result = DataStreamResult::new(info, stream);
230
231        assert_eq!(result.info.headers, vec!["name".to_string()]);
232    }
233}