rmap/commands/
analyze.rs

1use clap::Parser;
2use clap::Subcommand;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::net::Ipv6Addr;
6use std::path::PathBuf;
7
8use crate::commands::Command;
9use crate::commands::common::AddressPredicate;
10use crate::data::{DataRow, DataStreamInfo, DataStreamResult, stream_from_iter};
11use crate::sink::print_datastream_result;
12
13#[derive(Subcommand, Serialize, Deserialize, Debug, Clone)]
14pub enum AnalyzeCommand {
15    /// Address space dispersion metrics
16    Dispersion,
17    /// Information entropy analysis
18    Entropy {
19        /// Start bit position (0-127) for entropy calculation
20        #[arg(short = 's', long, value_parser = clap::value_parser!(u8).range(0..=127), default_value_t = 0)]
21        start_bit: u8,
22
23        /// End bit position (1-128) for entropy calculation
24        #[arg(short = 'e', long, value_parser = clap::value_parser!(u8).range(1..=128), default_value_t = 128)]
25        end_bit: u8,
26    },
27    /// Subnet distribution analysis
28    Subnets {
29        /// Maximum number of subnets to show (default: 10)
30        #[arg(short = 'n', long, value_parser = clap::value_parser!(usize), default_value_t = 10)]
31        max_subnets: usize,
32
33        /// CIDR prefix length (default: 64)
34        #[arg(short = 'l', long, value_parser = clap::value_parser!(u8).range(1..=128), default_value_t = 64)]
35        prefix_length: u8,
36    },
37    /// Count addresses matching each predicate
38    Counts,
39}
40
41#[derive(Parser, Serialize, Deserialize)]
42pub struct AnalyzeCommandArgs {
43    /// Path to file containing data to analyze
44    #[arg(value_name = "FILE")]
45    pub file: PathBuf,
46    /// Column name to select from input data
47    #[arg(short = 'f', long, value_name = "FIELD")]
48    pub field: Option<String>,
49    /// Include addresses matching these predicates (can be specified multiple times)
50    #[arg(long, value_enum)]
51    pub include: Vec<AddressPredicate>,
52    /// Exclude addresses matching these predicates (can be specified multiple times)
53    #[arg(long, value_enum)]
54    pub exclude: Vec<AddressPredicate>,
55    /// Remove duplicate addresses from input dataset before analysis
56    #[arg(short = 'u', long)]
57    pub unique: bool,
58    /// Analysis subcommand to run
59    #[command(subcommand)]
60    pub analysis: AnalyzeCommand,
61}
62
63impl Command for AnalyzeCommandArgs {
64    async fn run(&self) -> Result<(), String> {
65        // Load and filter addresses from file
66        let addresses = self.load_and_filter_addresses().await?;
67
68        let result = match &self.analysis {
69            AnalyzeCommand::Dispersion => self.analyze_dispersion(&addresses)?,
70            AnalyzeCommand::Entropy { start_bit, end_bit } => {
71                if start_bit >= end_bit {
72                    return Err("start_bit must be less than end_bit".to_string());
73                }
74                self.analyze_entropy(&addresses, *start_bit, *end_bit)?
75            }
76            AnalyzeCommand::Subnets {
77                max_subnets,
78                prefix_length,
79            } => self.analyze_subnets(&addresses, *max_subnets, *prefix_length)?,
80            AnalyzeCommand::Counts => self.analyze_counts(&addresses)?,
81        };
82
83        // Print the result instead of returning it
84        print_datastream_result(result, "-").await?;
85        Ok(())
86    }
87}
88
89impl AnalyzeCommandArgs {
90    pub fn new(
91        file: PathBuf,
92        field: Option<String>,
93        include: Vec<AddressPredicate>,
94        exclude: Vec<AddressPredicate>,
95        unique: bool,
96        analysis: AnalyzeCommand,
97    ) -> Self {
98        Self {
99            file,
100            field,
101            include,
102            exclude,
103            unique,
104            analysis,
105        }
106    }
107
108    async fn load_and_filter_addresses(&self) -> Result<Vec<Ipv6Addr>, String> {
109        // Read file and parse addresses
110        let content = tokio::fs::read_to_string(&self.file)
111            .await
112            .map_err(|e| format!("Failed to read file: {}", e))?;
113
114        let mut addresses = Vec::new();
115        for line in content.lines() {
116            let line = line.trim();
117            if line.is_empty() {
118                continue;
119            }
120
121            // If a specific field is requested, try to parse as CSV and extract that field
122            let addr_str = if let Some(field) = &self.field {
123                // Simple CSV parsing - could be improved
124                let parts: Vec<&str> = line.split(',').collect();
125                if parts.len() > 0 {
126                    parts[0] // For simplicity, just take first column
127                } else {
128                    line
129                }
130            } else {
131                line
132            };
133
134            if let Ok(addr) = addr_str.parse::<Ipv6Addr>() {
135                addresses.push(addr);
136            }
137        }
138
139        // Apply predicate filters
140        let filtered_addresses = self.apply_predicates(addresses)?;
141
142        // Apply unique filter if requested
143        let final_addresses = if self.unique {
144            let mut unique_set = std::collections::HashSet::new();
145            filtered_addresses
146                .into_iter()
147                .filter(|addr| unique_set.insert(*addr))
148                .collect()
149        } else {
150            filtered_addresses
151        };
152
153        Ok(final_addresses)
154    }
155
156    fn apply_predicates(&self, addresses: Vec<Ipv6Addr>) -> Result<Vec<Ipv6Addr>, String> {
157        let all_predicates = analyze::analysis::predicates::get_all_predicates();
158
159        Ok(addresses
160            .into_iter()
161            .filter(|addr| {
162                // Check include predicates
163                if !self.include.is_empty() {
164                    let include_match = self.include.iter().any(|predicate| {
165                        let filter_name = predicate.to_filter_name();
166                        if let Some((_, predicate_fn)) =
167                            all_predicates.iter().find(|(name, _)| name == &filter_name)
168                        {
169                            predicate_fn(*addr)
170                        } else {
171                            false
172                        }
173                    });
174                    if !include_match {
175                        return false;
176                    }
177                }
178
179                // Check exclude predicates
180                for predicate in &self.exclude {
181                    let filter_name = predicate.to_filter_name();
182                    if let Some((_, predicate_fn)) =
183                        all_predicates.iter().find(|(name, _)| name == &filter_name)
184                    {
185                        if predicate_fn(*addr) {
186                            return false;
187                        }
188                    }
189                }
190
191                true
192            })
193            .collect())
194    }
195
196    fn analyze_dispersion(&self, addresses: &[Ipv6Addr]) -> Result<DataStreamResult, String> {
197        // Placeholder implementation - in real implementation, this would compute dispersion metrics
198        let row = DataRow::new()
199            .with_column("metric", "dispersion")
200            .with_column("value", "0.5")
201            .with_column("addresses_analyzed", addresses.len().to_string());
202
203        Ok(DataStreamResult::single_row(row))
204    }
205
206    fn analyze_entropy(
207        &self,
208        addresses: &[Ipv6Addr],
209        start_bit: u8,
210        end_bit: u8,
211    ) -> Result<DataStreamResult, String> {
212        // Placeholder implementation - in real implementation, this would compute entropy
213        let row = DataRow::new()
214            .with_column("metric", "entropy")
215            .with_column("start_bit", start_bit.to_string())
216            .with_column("end_bit", end_bit.to_string())
217            .with_column("entropy_value", "2.5")
218            .with_column("addresses_analyzed", addresses.len().to_string());
219
220        Ok(DataStreamResult::single_row(row))
221    }
222
223    fn analyze_subnets(
224        &self,
225        addresses: &[Ipv6Addr],
226        max_subnets: usize,
227        prefix_length: u8,
228    ) -> Result<DataStreamResult, String> {
229        // Simple subnet analysis - group by prefix
230        let mut subnet_counts: HashMap<String, usize> = HashMap::new();
231
232        for addr in addresses {
233            let bytes = addr.octets();
234            let prefix_bytes = prefix_length as usize / 8;
235            let mut prefix = Vec::new();
236
237            for i in 0..std::cmp::min(prefix_bytes, 16) {
238                prefix.push(bytes[i]);
239            }
240
241            let subnet_key = prefix
242                .iter()
243                .map(|b| format!("{:02x}", b))
244                .collect::<Vec<_>>()
245                .join(":");
246
247            *subnet_counts.entry(subnet_key).or_insert(0) += 1;
248        }
249
250        // Convert to sorted results
251        let mut subnet_list: Vec<_> = subnet_counts.into_iter().collect();
252        subnet_list.sort_by(|a, b| b.1.cmp(&a.1));
253        subnet_list.truncate(max_subnets);
254
255        let data_rows: Vec<DataRow> = subnet_list
256            .into_iter()
257            .map(|(subnet, count)| {
258                DataRow::new()
259                    .with_column("subnet", subnet)
260                    .with_column("count", count.to_string())
261                    .with_column("prefix_length", prefix_length.to_string())
262            })
263            .collect();
264
265        let headers = vec![
266            "subnet".to_string(),
267            "count".to_string(),
268            "prefix_length".to_string(),
269        ];
270        let info = DataStreamInfo::new(headers)
271            .with_total_rows(data_rows.len())
272            .with_description(format!(
273                "Top {} subnets with /{} prefix",
274                data_rows.len(),
275                prefix_length
276            ));
277
278        let stream = stream_from_iter(data_rows);
279        Ok(DataStreamResult::new(info, stream))
280    }
281
282    fn analyze_counts(&self, addresses: &[Ipv6Addr]) -> Result<DataStreamResult, String> {
283        // Analyze predicate counts
284        let all_predicates = analyze::analysis::predicates::get_all_predicates();
285
286        let mut data_rows = Vec::new();
287
288        for (predicate_name, predicate_fn) in all_predicates {
289            let count = addresses.iter().filter(|addr| predicate_fn(**addr)).count();
290
291            let row = DataRow::new()
292                .with_column("predicate", predicate_name)
293                .with_column("count", count.to_string())
294                .with_column(
295                    "percentage",
296                    format!("{:.2}%", (count as f64 / addresses.len() as f64) * 100.0),
297                );
298
299            data_rows.push(row);
300        }
301
302        let headers = vec![
303            "predicate".to_string(),
304            "count".to_string(),
305            "percentage".to_string(),
306        ];
307        let info = DataStreamInfo::new(headers)
308            .with_total_rows(data_rows.len())
309            .with_description(format!(
310                "Predicate analysis for {} addresses",
311                addresses.len()
312            ));
313
314        let stream = stream_from_iter(data_rows);
315        Ok(DataStreamResult::new(info, stream))
316    }
317}