1use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio_stream::Stream;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct DataRow {
12 pub columns: HashMap<String, String>,
13}
14
15impl DataRow {
16 pub fn new() -> Self {
17 Self {
18 columns: HashMap::new(),
19 }
20 }
21
22 pub fn with_column(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
23 self.columns.insert(name.into(), value.into());
24 self
25 }
26
27 pub fn insert(&mut self, name: impl Into<String>, value: impl Into<String>) {
28 self.columns.insert(name.into(), value.into());
29 }
30
31 pub fn get(&self, name: &str) -> Option<&String> {
32 self.columns.get(name)
33 }
34
35 pub fn keys(&self) -> impl Iterator<Item = &String> {
36 self.columns.keys()
37 }
38
39 pub fn values(&self) -> impl Iterator<Item = &String> {
40 self.columns.values()
41 }
42
43 pub fn to_csv_line(&self, headers: &[String]) -> String {
45 headers
46 .iter()
47 .map(|h| self.columns.get(h).map(|v| v.as_str()).unwrap_or(""))
48 .collect::<Vec<_>>()
49 .join(",")
50 }
51
52 pub fn to_csv_line_auto(&self) -> (Vec<String>, String) {
54 let mut headers: Vec<String> = self.columns.keys().cloned().collect();
55 headers.sort(); let line = self.to_csv_line(&headers);
57 (headers, line)
58 }
59}
60
61impl Default for DataRow {
62 fn default() -> Self {
63 Self::new()
64 }
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct DataStreamInfo {
70 pub headers: Vec<String>,
71 pub total_rows: Option<usize>,
72 pub description: Option<String>,
73}
74
75impl DataStreamInfo {
76 pub fn new(headers: Vec<String>) -> Self {
77 Self {
78 headers,
79 total_rows: None,
80 description: None,
81 }
82 }
83
84 pub fn with_total_rows(mut self, total: usize) -> Self {
85 self.total_rows = Some(total);
86 self
87 }
88
89 pub fn with_description(mut self, description: impl Into<String>) -> Self {
90 self.description = Some(description.into());
91 self
92 }
93}
94
95pub struct DataStreamResult {
97 pub info: DataStreamInfo,
98 pub stream: Pin<Box<dyn Stream<Item = Result<DataRow, String>> + Send>>,
99}
100
101impl std::fmt::Debug for DataStreamResult {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 f.debug_struct("DataStreamResult")
104 .field("info", &self.info)
105 .field("stream", &"<Stream>")
106 .finish()
107 }
108}
109
110impl DataStreamResult {
111 pub fn new<S>(info: DataStreamInfo, stream: S) -> Self
112 where
113 S: Stream<Item = Result<DataRow, String>> + Send + 'static,
114 {
115 Self {
116 info,
117 stream: Box::pin(stream),
118 }
119 }
120
121 pub fn single_row(row: DataRow) -> Self {
123 let (headers, _) = row.to_csv_line_auto();
124 let info = DataStreamInfo::new(headers).with_total_rows(1);
125 let stream = tokio_stream::iter(vec![Ok(row)]);
126 Self::new(info, stream)
127 }
128
129 pub fn empty(headers: Vec<String>) -> Self {
131 let info = DataStreamInfo::new(headers).with_total_rows(0);
132 let stream = tokio_stream::iter(vec![]);
133 Self::new(info, stream)
134 }
135
136 pub fn error(error: String) -> Self {
138 let info = DataStreamInfo::new(vec!["error".to_string()]).with_total_rows(1);
139 let stream = tokio_stream::iter(vec![Err(error)]);
140 Self::new(info, stream)
141 }
142}
143
144pub trait IntoDataRow {
146 fn into_data_row(self) -> DataRow;
147}
148
149impl IntoDataRow for DataRow {
150 fn into_data_row(self) -> DataRow {
151 self
152 }
153}
154
155impl IntoDataRow for HashMap<String, String> {
156 fn into_data_row(self) -> DataRow {
157 DataRow { columns: self }
158 }
159}
160
161impl<const N: usize> IntoDataRow for [(&str, &str); N] {
162 fn into_data_row(self) -> DataRow {
163 let mut row = DataRow::new();
164 for (key, value) in self {
165 row.insert(key, value);
166 }
167 row
168 }
169}
170
171pub fn stream_from_iter<I, T>(iter: I) -> impl Stream<Item = Result<DataRow, String>>
173where
174 I: IntoIterator<Item = T>,
175 T: IntoDataRow,
176{
177 tokio_stream::iter(iter.into_iter().map(|item| Ok(item.into_data_row())))
178}
179
180pub fn stream_from_async_iter<S>(stream: S) -> impl Stream<Item = Result<DataRow, String>>
182where
183 S: Stream<Item = Result<DataRow, String>>,
184{
185 stream
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191
192 #[test]
193 fn test_data_row_creation() {
194 let row = DataRow::new()
195 .with_column("name", "Alice")
196 .with_column("age", "25");
197
198 assert_eq!(row.get("name"), Some(&"Alice".to_string()));
199 assert_eq!(row.get("age"), Some(&"25".to_string()));
200 assert_eq!(row.get("missing"), None);
201 }
202
203 #[test]
204 fn test_csv_conversion() {
205 let row = DataRow::new()
206 .with_column("name", "Alice")
207 .with_column("age", "25");
208
209 let headers = vec!["name".to_string(), "age".to_string()];
210 let csv_line = row.to_csv_line(&headers);
211 assert_eq!(csv_line, "Alice,25");
212
213 let (auto_headers, auto_line) = row.to_csv_line_auto();
214 assert!(auto_headers.contains(&"name".to_string()));
215 assert!(auto_headers.contains(&"age".to_string()));
216 assert!(auto_line.contains("Alice"));
217 assert!(auto_line.contains("25"));
218 }
219
220 #[tokio::test]
221 async fn test_stream_result() {
222 let rows = vec![
223 DataRow::new().with_column("name", "Alice"),
224 DataRow::new().with_column("name", "Bob"),
225 ];
226
227 let info = DataStreamInfo::new(vec!["name".to_string()]);
228 let stream = stream_from_iter(rows);
229 let result = DataStreamResult::new(info, stream);
230
231 assert_eq!(result.info.headers, vec!["name".to_string()]);
232 }
233}