Automation Patterns
Once you are comfortable generating targets and running scans, combine the pieces into automated workflows tailored to your environment.
Schedule recurring scans
import asyncio
import datetime as dt
import pyrmap
async def run_hourly():
scanner = pyrmap.Scanner(pyrmap.ProbeConfig.icmp(), pps=1500, timeout=6)
while True:
timestamp = dt.datetime.utcnow().isoformat()
stream = await scanner.scan_from_file("targets.txt", unique=True)
async for row in stream:
row["timestamp"] = timestamp
process(row) # replace with your own handler
await asyncio.sleep(3600)
asyncio.run(run_hourly())Attach metadata (such as a timestamp or job label) before forwarding rows to downstream systems.
Shard across workers
import asyncio
import pyrmap
WORKERS = [
"worker-a.example.net:50051",
"worker-b.example.net:50051",
"worker-c.example.net:50051",
]
async def run_shard(remote, path):
scanner = pyrmap.Scanner(
pyrmap.ProbeConfig.tcp(port=443),
remote=remote,
pps=2000,
)
stream = await scanner.scan_from_file(path, unique=True)
async for row in stream:
forward(remote, row) # replace with your own dispatcher
async def main():
tasks = [
run_shard(remote, f"targets-shard-{idx}.txt")
for idx, remote in enumerate(WORKERS, start=1)
]
await asyncio.gather(*tasks)
asyncio.run(main())Split large target files ahead of time (for example with split -n) and feed each shard to a dedicated worker.
Trigger generation + scan pipelines
import asyncio
import json
import pathlib
import pyrmap
SEEDS = pathlib.Path("data/seeds.txt")
MODEL = pathlib.Path("models/nightly.bin")
TARGETS = pathlib.Path("targets-nightly.txt")
async def nightly_pipeline():
config = json.dumps({"budget": 2_000_000, "clusters": 32})
_, summary = pyrmap.train_model(
algorithm="sixgen",
seeds_path=str(SEEDS),
output_path=str(MODEL),
config_json=config,
)
print(summary)
targets = pyrmap.generate_targets(
model_path=str(MODEL),
count=500_000,
unique=True,
exclude_path="data/exclusions.txt",
)
TARGETS.write_text("\n".join(targets), encoding="utf-8")
scanner = pyrmap.Scanner(pyrmap.ProbeConfig.icmp(), pps=2500)
stream = await scanner.scan_from_file(str(TARGETS), unique=True)
async for row in stream:
store(row) # replace with your own persistence layer
asyncio.run(nightly_pipeline())This pattern mirrors the CLI but keeps everything inside one script so you can hook it into cron, Airflow, GitHub Actions, or any other scheduler.
Summarise each run
- Maintain counters inside your script (for example, tally replies and alias verdicts) so you can persist lightweight summaries after every job.
- Record the model path, generator options, and scan parameters alongside each run to make reruns reproducible.
- Use the same helpers shown above to stitch together nightly jobs, ad-hoc scans, or automated retriage pipelines.