#!/usr/libexec/platform-python
# Copyright (c) 2026, Oracle and/or its affiliates.
# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
#
# This code is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 2 only, as
# published by the Free Software Foundation.
#
# This code is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
# version 2 for more details (a copy is included in the LICENSE file that
# accompanied this code).
#
# You should have received a copy of the GNU General Public License version
# 2 along with this work; if not, see <https://www.gnu.org/licenses/>.
#
# Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
# or visit www.oracle.com if you need additional information or have any
# questions.

"""OLED Last Boot Report generator."""

import datetime
import json
import logging
import os
import re
import shutil
import socket
import subprocess  # nosec B404
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

from lastboot_runtime import process_event

REPORT_DIR = Path("/var/oled/lastboot-report")
VMCORE_BOOT_WINDOW_MINUTES = 20
LAST_BIN = shutil.which("last") or "/usr/bin/last"
LOG = logging.getLogger(__name__)


def ensure_dir(path: Path) -> None:
    """
    Create a directory and its parent directories if they do not already exist.

    Args:
        path (Path): The directory path to create.

    Returns:
        None
    """
    path.mkdir(parents=True, exist_ok=True)


def get_lastboot_info() -> Tuple[datetime.datetime, bool]:
    """
    Retrieve the last boot time and determine whether the previous shutdown
    was graceful.

    Parse the output of 'last -Fxn2 reboot shutdown' to find the most recent
    shutdown and reboot entries. If a shutdown occurred before the reboot,
    the reboot is considered graceful. Fall back to the current time and
    False if the last command fails or no records are available.

    Returns:
        Tuple[datetime.datetime, bool]: A tuple containing the boot time and
        a boolean indicating whether the reboot was graceful.
    """
    try:
        out = subprocess.check_output(  # nosec B603
            [LAST_BIN, "-Fxn2", "reboot", "shutdown"],
            universal_newlines=True,
        )
        lines = [
            line.strip()
            for line in out.splitlines()
            if line.strip() and not line.startswith("wtmp begins")
        ]
    except (subprocess.CalledProcessError, FileNotFoundError, OSError):
        # Fallback if last fails or wtmp is corrupt/missing/empty
        return datetime.datetime.now(), False

    if not lines:
        # No last records, use now()
        return datetime.datetime.now(), False

    # Parse the most recent shutdown and reboot entries
    # Example wtmp file entries:
    # reboot   system boot  5.15.0-314.193.5 Wed Jan 21 10:47:55 2026
    #          still running
    # shutdown system down  5.15.0-314.193.5 Wed Jan 21 10:45:30 2026
    #          - Wed Jan 21 10:47:55 2026
    shutdown_dt = None
    reboot_dt = None
    for line in lines:
        if line.startswith("shutdown") and shutdown_dt is None:
            match = re.search(
                (
                    r"\b([A-Z][a-z]{2}\s+[A-Z][a-z]{2}\s+\d{1,2}"
                    r"\s+\d{2}:\d{2}:\d{2}\s+\d{4})\s+-"
                ),
                line,
            )
            if match:
                try:
                    shutdown_dt = datetime.datetime.strptime(
                        match.group(1), "%a %b %d %H:%M:%S %Y"
                    )
                except ValueError:
                    continue
        elif line.startswith("reboot") and reboot_dt is None:
            match = re.search(
                (
                    r"\b([A-Z][a-z]{2}\s+[A-Z][a-z]{2}\s+\d{1,2}"
                    r"\s+\d{2}:\d{2}:\d{2}\s+\d{4})"
                ),
                line,
            )
            if match:
                try:
                    reboot_dt = datetime.datetime.strptime(
                        match.group(1), "%a %b %d %H:%M:%S %Y"
                    )
                except ValueError:
                    continue
        if shutdown_dt and reboot_dt:
            break

    if shutdown_dt and reboot_dt:
        graceful = shutdown_dt <= reboot_dt
        return reboot_dt, graceful
    if reboot_dt:
        return reboot_dt, False

    return datetime.datetime.now(), False


def get_vmcore_search_dirs() -> List[Path]:
    """
    Parse /etc/kdump.conf for 'path' directives to determine vmcore search
    directories. If no valid paths are found, default to /var/crash and
    /var/oled/crash.

    Returns:
        List[Path]: A list of directories to search for vmcore.
    """
    vmcore_dirs = set()
    try:
        with open("/etc/kdump.conf", "r", encoding="utf-8") as config_file:
            for line in config_file:
                line = line.strip()
                if line.startswith("path"):
                    _, vmcore_dir = line.split(None, 1)
                    vmcore_dir = vmcore_dir.strip("\"'")
                    vmcore_dir_path = Path(vmcore_dir)
                    if vmcore_dir_path.is_absolute():
                        vmcore_dirs.add(vmcore_dir_path)
                    else:
                        vmcore_dirs.add(Path("/var/crash") / vmcore_dir_path)
    except (FileNotFoundError, PermissionError, OSError):
        # kdump.conf may not exist or may not be readable
        pass
    # Add default fallbacks
    vmcore_dirs.add(Path("/var/crash"))
    vmcore_dirs.add(Path("/var/oled/crash"))

    return list(vmcore_dirs)


def find_vmcore(
    boot_dt: datetime.datetime,
) -> Tuple[Optional[Path], bool]:
    """
    Search for vmcore files in configured directories. It returns:
    - The path to the vmcore file if found, otherwise None.
    - A boolean indicating if the vmcore was created within the configured
      vmcore boot window before
      the boot time.

    Args:
        boot_dt (datetime.datetime): The boot time to compare against.

    Returns:
        Tuple[Optional[Path], bool]: Vmcore path,
        and whether the vmcore was created at boot.
    """
    candidates: List[Tuple[float, Path]] = []
    for search_dir in get_vmcore_search_dirs():
        if not search_dir.exists():
            continue
        candidates.extend(find_vmcore_candidates(search_dir))

    if not candidates:
        return (None, False)

    # Pick the most recent vmcore
    candidates.sort(key=lambda vmcore_item: vmcore_item[0], reverse=True)
    mod_time, vmcore_path = candidates[0]
    vmcore_dt = datetime.datetime.fromtimestamp(mod_time)

    # Determine if vmcore was created at boot.
    time_delta = boot_dt - vmcore_dt
    vmcore_at_boot = (
        datetime.timedelta(0) <= time_delta
        <= datetime.timedelta(minutes=VMCORE_BOOT_WINDOW_MINUTES)
    )

    return (vmcore_path, vmcore_at_boot)


def find_vmcore_candidates(search_dir: Path) -> List[Tuple[float, Path]]:
    """Return vmcore candidates found under a given directory."""
    candidates: List[Tuple[float, Path]] = []
    try:
        for root, _, files in os.walk(search_dir):
            for filename in files:
                if filename != "vmcore":
                    continue
                vmcore_file = Path(root) / filename
                try:
                    mod_time = vmcore_file.stat().st_mtime
                except OSError:
                    mod_time = 0
                candidates.append((mod_time, vmcore_file))
    except OSError:
        return []
    return candidates


def format_boot_time_json(boot_dt: datetime.datetime) -> str:
    """Return boot time in JSON-friendly local-time format."""
    return boot_dt.strftime("%Y-%m-%dT%H:%M:%S")


def get_local_timezone_name() -> str:
    """Return the current local timezone abbreviation."""
    return datetime.datetime.now().astimezone().tzname() or ""


def build_recommended_actions(
    hostname: str, kernel_crash: bool, vmcore_path: Optional[Path]
) -> List[str]:
    """Build recommended actions for non-graceful reboot cases."""
    actions: List[str] = []
    if kernel_crash and vmcore_path is not None:
        actions.append("Collect vmcore")

    actions.append("Collect sosreport using: sos report --batch --all-logs")
    actions.append(
        "Collect PCP archive for the reboot window from "
        f"/var/oled/pcp/pmlogger/{hostname}/"
    )
    actions.append("Collect console or serial logs if available")
    actions.append("File a JIRA ticket for further investigation")
    return actions


def build_report(
    hostname: str,
    boot_dt: datetime.datetime,
    graceful_shutdown: bool,
    vmcore_path: Optional[Path],
    vmcore_at_boot: bool,
) -> Dict[str, Any]:
    """Build the lastboot report as a dictionary."""
    kernel_crash = bool(
        not graceful_shutdown and vmcore_at_boot
    )
    recommended_actions: List[str] = []
    if not graceful_shutdown:
        recommended_actions = build_recommended_actions(
            hostname, kernel_crash, vmcore_path
        )

    return {
        "host": hostname,
        "boot_time": format_boot_time_json(boot_dt),
        "timezone": get_local_timezone_name(),
        "reboot": {
            "type": "graceful" if graceful_shutdown else "unexpected",
            "kernel_crash": kernel_crash,
        },
        "crash_dump": {
            "vmcore_present": vmcore_at_boot,
            "vmcore_path": (
                str(vmcore_path) if vmcore_path and vmcore_at_boot else None
            ),
        },
        "recommended_actions": recommended_actions,
    }


def save_report_json(report_path: Path, report: Dict[str, Any]) -> None:
    """Save the report as JSON."""
    ensure_dir(report_path.parent)
    try:
        tmp_path = Path(str(report_path) + ".tmp")
        report_json = json.dumps(report, indent=2) + "\n"
        tmp_path.write_text(report_json, encoding="utf-8")
        tmp_path.replace(report_path)
    except OSError as err:
        print(f"Error writing lastboot report: {err}", file=sys.stderr)
        sys.exit(1)


def generate_report() -> Tuple[Path, Dict[str, Any]]:
    """Generate lastboot report and return its path and content."""
    hostname = socket.gethostname()
    boot_dt, graceful_shutdown = get_lastboot_info()
    report_filename = (
        f"boot_report_{boot_dt.strftime('%Y-%m-%d_%H:%M:%S')}.json"
    )
    report_path = REPORT_DIR / report_filename
    if not graceful_shutdown:
        vmcore_path, vmcore_at_boot = find_vmcore(boot_dt)
    else:
        vmcore_path, vmcore_at_boot = None, False

    report = build_report(
        hostname,
        boot_dt,
        graceful_shutdown,
        vmcore_path,
        vmcore_at_boot,
    )
    save_report_json(report_path, report)
    return report_path, report


def main() -> None:
    """Generate and save the lastboot JSON report."""
    logging.basicConfig(
        level=logging.INFO,
        format="%(levelname)s: %(message)s",
    )
    try:
        report_path, report = generate_report()
        if report["reboot"]["type"] != "graceful":
            process_event(report, report_path)
        else:
            LOG.info(
                "Graceful reboot detected; skipping abnormal reboot hooks"
            )
        print(f"Last boot report generated: {report_path}")
    except Exception as err:  # pylint: disable=broad-exception-caught
        print(
            f"Error generating last boot report: {err}",
            file=sys.stderr,
        )
        sys.exit(1)


if __name__ == "__main__":
    main()
