I've a dream (EDT will write to SQL)

Not a video but the items to build your own

hyperplan_schema.json (220 Bytes)

Python script:


"""
Hyperplan XML ⇄ CSV — Generic, colleague-friendly
--------------------------------------------------
Goals:
• Works out-of-the-box by just pressing Run in an IDE (autorun).
• Detects headers in three ways (priority order):
    1) Embedded <columns><col name="..."/></columns> in the .hp
    2) Sidecar schema JSON: <basename>.schema.json or hyperplan_schema.json
       Example:
         {
           "headers": ["Subject","DataProvider","Operator","Country","Tier","NextRun"],
           "include_notes": true,
           "include_image": true,
           "include_links": true,
           "csv_encoding": "utf-8-sig",
           "excel_friendly": true
         }
    3) Fallback default headers: ["Subject","DataProvider","Operator","Country","Tier","NextRun"]
• Export adds optional Notes/Image/Links columns (configurable).
• Also supports CSV → XML rewrite using the same schema.
• Autorun behavior:
    - If only .hp is present → export CSV
    - If CSV exists (cards_export.csv) and .hp exists → ask to (R)ewrite XML or (E)xport again.
"""

import xml.etree.ElementTree as ET
import pandas as pd
from urllib.parse import quote, unquote
from pathlib import Path
from datetime import datetime
import csv, os, sys, json

DEF_HEADERS = ["Subject","DataProvider","Operator","Country","Tier","NextRun"]

def get_latest_hp_file(path="."):
    files = [f for f in os.listdir(path) if f.startswith("hyperplan_") and f.endswith(".hp")]
    if not files:
        raise FileNotFoundError("No 'hyperplan_*.hp' file found in the current directory.")
    files.sort(key=lambda f: os.path.getmtime(os.path.join(path, f)))
    return os.path.join(path, files[-1])

def load_sidecar_schema(hp_path: str):
    """Look for '<basename>.schema.json' then 'hyperplan_schema.json'. Return dict or None."""
    p = Path(hp_path)
    cands = [p.with_suffix(".schema.json"), p.parent / "hyperplan_schema.json"]
    for c in cands:
        if c.exists():
            try:
                with open(c, "r", encoding="utf-8") as f:
                    data = json.load(f)
                if isinstance(data, dict) and "headers" in data and isinstance(data["headers"], list):
                    return data
            except Exception:
                pass
    return None

def read_embedded_columns(root):
    """Parse <columns><col name='...'/></columns>. Return list[str] or None."""
    cols = []
    cols_node = root.find("columns")
    if cols_node is None:
        return None
    for col in cols_node.findall("col"):
        name = col.attrib.get("name") or (col.text or "").strip()
        if name:
            cols.append(unquote(name))
    return cols or None

def detect_headers(root, hp_path: str):
    # 1) Embedded columns
    embedded = read_embedded_columns(root)
    if embedded:
        return embedded, {"include_notes": True, "include_image": True, "include_links": True, "csv_encoding":"utf-8-sig"}
    # 2) Sidecar schema
    side = load_sidecar_schema(hp_path)
    if side:
        return side.get("headers", DEF_HEADERS), {
            "include_notes": bool(side.get("include_notes", True)),
            "include_image": bool(side.get("include_image", True)),
            "include_links": bool(side.get("include_links", True)),
            "csv_encoding": side.get("csv_encoding", "utf-8-sig")
        }
    # 3) Default
    return DEF_HEADERS, {"include_notes": True, "include_image": True, "include_links": True, "csv_encoding":"utf-8-sig"}

def parse_card_value(value: str, expected_fields: int):
    parts = value.split(",")
    # Pad to expected_fields length
    while len(parts) < expected_fields:
        parts.append("")
    # Truncate extras (rare) to keep CSV width stable
    parts = parts[:expected_fields]
    return [unquote(p) for p in parts]

def decode_notes(text: str):
    return unquote(text) if text else ""

def decode_links(links_elem):
    links = []
    if links_elem is not None:
        for link in links_elem.findall("link"):
            links.append(unquote(link.attrib.get("value", "")))
    return "\n".join(links)

def xml_to_csv(xml_file: str, csv_file: str):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    cards_elem = root.find("cards")
    if cards_elem is None:
        raise ValueError("No <cards> element found in the XML file.")

    headers, opts = detect_headers(root, xml_file)
    rows = []
    for card in cards_elem.findall("card"):
        values = parse_card_value(card.attrib.get("value",""), expected_fields=len(headers))
        row = values[:]
        if opts["include_notes"]:
            row.append(decode_notes(card.attrib.get("notes","")))
        if opts["include_image"]:
            row.append(card.attrib.get("image",""))
        if opts["include_links"]:
            row.append(decode_links(card.find("links")))
        rows.append(row)

    # Build final header list
    out_headers = headers[:]
    if opts["include_notes"]:
        out_headers.append("Notes")
    if opts["include_image"]:
        out_headers.append("Image")
    if opts["include_links"]:
        out_headers.append("Links")

    df = pd.DataFrame(rows, columns=out_headers)
    encoding = opts.get("csv_encoding","utf-8-sig")
    df.to_csv(csv_file, index=False, header=True, quoting=csv.QUOTE_ALL, encoding=encoding)
    print(f"✅ Exported {len(df)} cards to '{csv_file}' with headers: {out_headers}")

def encode_card_value(values):
    return ",".join([quote(str(v)) for v in values])

def encode_notes(text: str):
    return quote(text) if text else ""

def encode_links_block(links_text: str):
    # Return XML block for links or empty string
    block = ""
    for line in str(links_text).splitlines():
        path = line.strip()
        if path:
            name = Path(path).name
            block += f'    <link value="{quote(path)}" link_name="{quote(name)}"/>\n'
    if block:
        return "  <links>\n" + block + "  </links>\n"
    return ""

def csv_to_xml(csv_file: str, xml_input_file: str, xml_output_file: str):
    df = pd.read_csv(csv_file)
    with open(xml_input_file, encoding="utf-8") as f:
        original = f.read()

    # Extract headers config again to know how many leading fields belong to 'value'
    tree = ET.parse(xml_input_file)
    root = tree.getroot()
    headers, opts = detect_headers(root, xml_input_file)

    start = original.find("<cards>")
    end   = original.find("</cards>")
    if start == -1 or end == -1:
        raise ValueError("No <cards> block found.")
    end += len("</cards>")

    cards_section = "<cards>\n"
    for _, row in df.iterrows():
        # Map first len(headers) columns into the 'value' attribute
        values = [row.get(h, "") for h in headers]
        value_attr = encode_card_value([str(x if pd.notna(x) else "") for x in values])

        # Optional attributes
        notes_attr = ""
        image_attr = ' image="-1"'
        links_text = ""

        if opts["include_notes"] and "Notes" in df.columns:
            notes = row.get("Notes","")
            notes_attr = f' notes="{encode_notes("" if pd.isna(notes) else str(notes))}"' if (pd.notna(notes) and str(notes) != "") else ""

        if opts["include_image"] and "Image" in df.columns:
            image = row.get("Image","-1")
            image_attr = f' image="{str("" if pd.isna(image) else image)}"'

        if opts["include_links"] and "Links" in df.columns:
            links_text = "" if pd.isna(row.get("Links","")) else str(row.get("Links",""))

        # Build card
        links_block = encode_links_block(links_text)
        if links_block:
            cards_section += f'  <card value="{value_attr}"{notes_attr}{image_attr}>\n{links_block}  </card>\n'
        else:
            cards_section += f'  <card value="{value_attr}"{notes_attr}{image_attr} />\n'

    cards_section += "</cards>"

    updated = original[:start] + cards_section + original[end:]
    with open(xml_output_file, "w", encoding="utf-8") as f:
        f.write(updated)
    print(f"✅ Rewritten XML with {len(df)} cards → '{xml_output_file}'.")

def get_timestamped_filename(prefix="hyperplan_", ext=".hp"):
    now = datetime.now().strftime("%Y%m%d_%H%M")
    return f"{prefix}{now}{ext}"

def autorun():
    cwd = Path(".")
    hp = get_latest_hp_file(".")
    csv_path = cwd / "cards_export.csv"

    if csv_path.exists():
        # Ask user what to do
        print(f"Detected existing CSV: {csv_path}")
        choice = input("Press [R] to rebuild XML from CSV, [E] to export CSV again, or [X] to cancel: ").strip().lower()
        if choice == "r":
            out_xml = cwd / get_timestamped_filename()
            csv_to_xml(str(csv_path), hp, str(out_xml))
            return
        elif choice == "e":
            xml_to_csv(hp, str(csv_path))
            return
        else:
            print("Cancelled.")
            return
    else:
        xml_to_csv(hp, str(csv_path))

if __name__ == "__main__":
    try:
        autorun()
    except Exception as e:
        print(f"❌ Error: {e}")
        sys.exit(1)


2 Likes