Source code for pyfsr.archetypes.harvest

"""Harvest a draft archetype from a real FortiSOAR solution pack.

A solution pack (whether the git source tree under
``corpus_builder/repos/fortisoar/solution-pack-*`` or the ``.zip`` returned by
:meth:`pyfsr.api.solution_packs.SolutionPackAPI.export_pack`) is a directory of JSON:
``info.json`` (pack metadata), ``modules/<mod>/mmd.json`` (module field/relationship/picklist
schema), ``playbooks/<collection>/*.json`` (playbook step graphs), and ``picklists/*.json``.

The harvester turns that into a draft :class:`~pyfsr.archetypes.record.Archetype` -- a honest
extraction of the module fields, the connector/operation pairs the playbooks use, and a step
skeleton per playbook. It does **not** parameterize (no ``{{param}}`` slots), assign connector
*roles*, or write a ``when_to_use`` -- that curation is step 3. Call ``store.put(draft)`` to
persist a draft for later curation.

Pure stdlib (``zipfile`` / ``json`` / ``pathlib``); the only network I/O is the optional
``export_pack`` call inside :func:`harvest_archetype_from_pack`.

Example::

    from pyfsr.archetypes import harvest_from_dir, ArchetypeStore

    draft = harvest_from_dir("path/to/solution-pack-servicenow-...", name="snow-sir-draft")
    ArchetypeStore().put(draft)
"""

from __future__ import annotations

import json
import tempfile
import zipfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

from .record import (
    Archetype,
    ConnectorUse,
    ModuleField,
    PlaybookSkeleton,
    StepSkeleton,
)

# Playbook-tree JSON files that are not playbooks themselves.
_NON_PLAYBOOK_FILES = {"collection.metadata.json", "globalVariables.json", "tags.json"}


def _now_iso() -> str:
    """UTC now as an ISO-8601 string with a ``Z`` suffix (sortable, unambiguous)."""
    return datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z")


def _uuid_tail(iri: Any) -> str | None:
    """Return the trailing segment of an ``/api/3/.../<uuid>`` IRI, or ``None``."""
    if not iri or not isinstance(iri, str):
        return None
    return iri.rstrip("/").split("/")[-1] or None


def _read_json(path: Path) -> Any:
    with path.open(encoding="utf-8") as fh:
        return json.load(fh)


def _find_info(root: Path) -> Path | None:
    direct = root / "info.json"
    if direct.exists():
        return direct
    hits = sorted(root.rglob("info.json"))
    return hits[0] if hits else None


def _find_mmds(root: Path) -> list[Path]:
    return sorted(root.rglob("mmd.json"))


def _find_playbooks(root: Path) -> list[Path]:
    out: list[Path] = []
    for p in sorted(root.rglob("*.json")):
        if "playbooks" not in p.parts:
            continue
        if p.name in _NON_PLAYBOOK_FILES:
            continue
        out.append(p)
    return out


def _picklist_name(attr: dict[str, Any]) -> str | None:
    """Resolve a picklist field's list name from its ``dataSource.query.filters``.

    A picklist attribute carries ``dataSource.model == "picklists"`` and a filter on
    ``listName__name`` whose ``value`` is the list name (e.g. ``"IncidentStatus"``).
    """
    ds = attr.get("dataSource") or {}
    for f in (ds.get("query") or {}).get("filters") or []:
        if f.get("field") == "listName__name" and f.get("value"):
            return str(f["value"])
    return None


def _field_from_attr(module: str, attr: dict[str, Any]) -> ModuleField:
    type_ = attr.get("type", "")
    ds = attr.get("dataSource") or {}
    model = ds.get("model")
    picklist: str | None = None
    relationship: str | None = None
    if type_ == "picklists":
        picklist = _picklist_name(attr)
    elif model and model != "picklists":
        relationship = model
    return ModuleField(
        module=module,
        name=attr.get("name", ""),
        type=type_,
        form_type=attr.get("formType"),
        required=bool((attr.get("validation") or {}).get("required", False)),
        display_name=(attr.get("descriptions") or {}).get("singular"),
        picklist=picklist,
        relationship=relationship,
    )


def _extract_module_fields(root: Path) -> list[ModuleField]:
    fields: list[ModuleField] = []
    for mmd_path in _find_mmds(root):
        mmd = _read_json(mmd_path)
        if not isinstance(mmd, dict):
            continue
        module = str(mmd.get("module") or mmd.get("type") or mmd_path.parent.name)
        for attr in mmd.get("attributes") or []:
            if isinstance(attr, dict):
                fields.append(_field_from_attr(module, attr))
    return fields


def _skeleton_from_playbook(pb: dict[str, Any]) -> tuple[PlaybookSkeleton, list[ConnectorUse]]:
    steps_out: list[StepSkeleton] = []
    uses: list[ConnectorUse] = []
    for step in pb.get("steps") or []:
        if not isinstance(step, dict):
            continue
        args = step.get("arguments") or {}
        connector = args.get("connector")
        operation = args.get("operation")
        steps_out.append(
            StepSkeleton(
                name=step.get("name", ""),
                step_type=_uuid_tail(step.get("stepType")),
                connector=connector,
                operation=operation,
            )
        )
        if connector and operation:
            uses.append(
                ConnectorUse(
                    connector=str(connector),
                    operation=str(operation),
                    step_name=step.get("name", ""),
                )
            )
    return (
        PlaybookSkeleton(
            name=pb.get("name", ""),
            description=pb.get("description"),
            steps=steps_out,
        ),
        uses,
    )


def _dedupe_manifest(uses: list[ConnectorUse]) -> list[ConnectorUse]:
    """Dedup by ``(connector, operation)``, keeping the first step seen for each pair."""
    seen: set[tuple[str, str]] = set()
    out: list[ConnectorUse] = []
    for u in uses:
        key = (u.connector, u.operation)
        if key in seen:
            continue
        seen.add(key)
        out.append(u)
    return out


[docs] def harvest_from_dir(pack_dir: str | Path, *, name: str | None = None) -> Archetype: """Harvest a draft archetype from an unpacked solution-pack directory. Works on both the git source tree and an unpacked export ``.zip`` (discovery uses ``rglob``, so a top-level prefix directory in the export is tolerated). Parses ``info.json``, ``modules/*/mmd.json``, and ``playbooks/**/*.json`` into a draft :class:`~pyfsr.archetypes.record.Archetype`. Args: pack_dir: path to the unpacked pack. name: the archetype name (key). Defaults to the pack's ``info.json`` ``name``, falling back to the directory name. Returns: A draft :class:`~pyfsr.archetypes.record.Archetype` (``when_to_use`` empty, ``parameters`` empty). """ root = Path(pack_dir) info_path = _find_info(root) info = _read_json(info_path) if info_path else {} if not isinstance(info, dict): info = {} module_schema = _extract_module_fields(root) skeletons: list[PlaybookSkeleton] = [] manifest: list[ConnectorUse] = [] for pb_path in _find_playbooks(root): pb = _read_json(pb_path) if not isinstance(pb, dict): continue skeleton, uses = _skeleton_from_playbook(pb) skeletons.append(skeleton) manifest.extend(uses) pack_name = info.get("name") or root.name return Archetype( name=name or pack_name, when_to_use="", description=str(info.get("description", "")), module_schema=module_schema, connector_manifest=_dedupe_manifest(manifest), playbook_skeletons=skeletons, parameters=[], source={ "pack_name": pack_name, "pack_version": info.get("version"), "pack_label": info.get("label"), "harvested_at": _now_iso(), }, )
[docs] def harvest_from_zip(zip_path: str | Path, *, name: str | None = None) -> Archetype: """Harvest a draft archetype from a solution-pack export ``.zip``. Extracts the archive to a temporary directory and delegates to :func:`harvest_from_dir`. Use this with the path returned by :meth:`pyfsr.api.solution_packs.SolutionPackAPI.export_pack`. """ with tempfile.TemporaryDirectory() as tmp: with zipfile.ZipFile(zip_path) as zf: zf.extractall(tmp) return harvest_from_dir(tmp, name=name)
[docs] def harvest_archetype_from_pack(client: Any, pack_identifier: str, archetype_name: str) -> Archetype: """Harvest a draft archetype from a live appliance's solution pack. Wraps :meth:`pyfsr.api.solution_packs.SolutionPackAPI.export_pack` (which finds the installed pack, triggers the export, and downloads the ``.zip``) and parses it with :func:`harvest_from_zip`. Returns the draft named ``archetype_name`` -- curate it, then ``ArchetypeStore().put(draft)`` to persist. """ zip_path = client.solution_packs.export_pack(pack_identifier) return harvest_from_zip(zip_path, name=archetype_name)