Automation Patterns

Automation Patterns

Once you are comfortable generating targets and running scans, combine the pieces into automated workflows tailored to your environment.

Schedule recurring scans

import asyncio
import datetime as dt
import pyrmap

async def run_hourly():
    scanner = pyrmap.Scanner(pyrmap.ProbeConfig.icmp(), pps=1500, timeout=6)
    while True:
        timestamp = dt.datetime.utcnow().isoformat()
        stream = await scanner.scan_from_file("targets.txt", unique=True)
        async for row in stream:
            row["timestamp"] = timestamp
            process(row)  # replace with your own handler
        await asyncio.sleep(3600)

asyncio.run(run_hourly())

Attach metadata (such as a timestamp or job label) before forwarding rows to downstream systems.

Shard across workers

import asyncio
import pyrmap

WORKERS = [
    "worker-a.example.net:50051",
    "worker-b.example.net:50051",
    "worker-c.example.net:50051",
]

async def run_shard(remote, path):
    scanner = pyrmap.Scanner(
        pyrmap.ProbeConfig.tcp(port=443),
        remote=remote,
        pps=2000,
    )
    stream = await scanner.scan_from_file(path, unique=True)
    async for row in stream:
        forward(remote, row)  # replace with your own dispatcher

async def main():
    tasks = [
        run_shard(remote, f"targets-shard-{idx}.txt")
        for idx, remote in enumerate(WORKERS, start=1)
    ]
    await asyncio.gather(*tasks)

asyncio.run(main())

Split large target files ahead of time (for example with split -n) and feed each shard to a dedicated worker.

Trigger generation + scan pipelines

import asyncio
import json
import pathlib
import pyrmap

SEEDS = pathlib.Path("data/seeds.txt")
MODEL = pathlib.Path("models/nightly.bin")
TARGETS = pathlib.Path("targets-nightly.txt")

async def nightly_pipeline():
    config = json.dumps({"budget": 2_000_000, "clusters": 32})
    _, summary = pyrmap.train_model(
        algorithm="sixgen",
        seeds_path=str(SEEDS),
        output_path=str(MODEL),
        config_json=config,
    )
    print(summary)

    targets = pyrmap.generate_targets(
        model_path=str(MODEL),
        count=500_000,
        unique=True,
        exclude_path="data/exclusions.txt",
    )
    TARGETS.write_text("\n".join(targets), encoding="utf-8")

    scanner = pyrmap.Scanner(pyrmap.ProbeConfig.icmp(), pps=2500)
    stream = await scanner.scan_from_file(str(TARGETS), unique=True)
    async for row in stream:
        store(row)  # replace with your own persistence layer

asyncio.run(nightly_pipeline())

This pattern mirrors the CLI but keeps everything inside one script so you can hook it into cron, Airflow, GitHub Actions, or any other scheduler.

Summarise each run

  • Maintain counters inside your script (for example, tally replies and alias verdicts) so you can persist lightweight summaries after every job.
  • Record the model path, generator options, and scan parameters alongside each run to make reruns reproducible.
  • Use the same helpers shown above to stitch together nightly jobs, ad-hoc scans, or automated retriage pipelines.