import ctypes
import ctypes.wintypes
import ipaddress
import json
import logging
import logging.handlers
import os
import socket
import subprocess
import sys
import threading
import time
import urllib.parse
from pathlib import Path
import psutil
import requests
if getattr(sys, "frozen", False):
base_dir = Path(sys.executable).parent
else:
base_dir = Path(__file__).parent
# ---------------------------------------------------------------------------
# Logging - console always; file handler toggleable (2-day rotation)
# ---------------------------------------------------------------------------
LOG_FILE = base_dir / "agent.log"
_file_handler: logging.Handler | None = None
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()],
)
log = logging.getLogger("agent")
def set_file_logging(enabled: bool) -> None:
"""Toggle file logging with 2-day rotation."""
global _file_handler
if enabled and _file_handler is None:
_file_handler = logging.handlers.TimedRotatingFileHandler(
LOG_FILE, when="midnight", backupCount=1, encoding="utf-8",
)
_file_handler.setFormatter(
logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
)
logging.getLogger().addHandler(_file_handler)
elif not enabled and _file_handler is not None:
logging.getLogger().removeHandler(_file_handler)
_file_handler.close()
_file_handler = None
# ---------------------------------------------------------------------------
# Win32 API bindings
# ---------------------------------------------------------------------------
user32 = ctypes.windll.user32 # type: ignore[attr-defined]
GetForegroundWindow = user32.GetForegroundWindow
GetForegroundWindow.restype = ctypes.wintypes.HWND
GetWindowTextW = user32.GetWindowTextW
GetWindowTextW.argtypes = [ctypes.wintypes.HWND, ctypes.wintypes.LPWSTR, ctypes.c_int]
GetWindowTextW.restype = ctypes.c_int
GetWindowTextLengthW = user32.GetWindowTextLengthW
GetWindowTextLengthW.argtypes = [ctypes.wintypes.HWND]
GetWindowTextLengthW.restype = ctypes.c_int
GetWindowThreadProcessId = user32.GetWindowThreadProcessId
GetWindowThreadProcessId.argtypes = [ctypes.wintypes.HWND, ctypes.POINTER(ctypes.wintypes.DWORD)]
GetWindowThreadProcessId.restype = ctypes.wintypes.DWORD
def get_foreground_info() -> tuple[str, str] | None:
"""Return (process_name, window_title) of the current foreground window."""
hwnd = GetForegroundWindow()
if not hwnd:
return None
length = GetWindowTextLengthW(hwnd)
title = ""
if length > 0:
buf = ctypes.create_unicode_buffer(length + 1)
GetWindowTextW(hwnd, buf, length + 1)
title = buf.value.strip()
pid = ctypes.wintypes.DWORD()
GetWindowThreadProcessId(hwnd, ctypes.byref(pid))
try:
proc = psutil.Process(pid.value)
proc_name = proc.name()
except (psutil.NoSuchProcess, psutil.AccessDenied):
proc_name = "unknown"
return proc_name, title
def format_report_desc(app_id: str, window_title: str) -> str:
"""Return the reported desc: title first, then process name without .exe."""
title = (window_title or "").strip()
if title:
return title[:256]
app = (app_id or "").strip() or "unknown"
if app.lower().endswith(".exe"):
app = app[:-4]
return app or "unknown"
# ---------------------------------------------------------------------------
# Config - stored next to the exe for easy cleanup
# ---------------------------------------------------------------------------
CONFIG_PATH = base_dir / "config.json"
_DEFAULT_CFG = {
"server_url": "",
"token": "",
"emoji": "",
"heartbeat_seconds": 60,
"sensitive_words_regex": "",
"enable_log": False,
}
MONITOR_POLL_SECONDS = 5
def load_config() -> dict:
"""Load config.json, return config dict (may be empty on error)."""
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
cfg = json.load(f)
except FileNotFoundError:
return dict(_DEFAULT_CFG)
except (PermissionError, json.JSONDecodeError) as e:
log.error("config.json: %s", e)
return dict(_DEFAULT_CFG)
if not isinstance(cfg, dict):
return dict(_DEFAULT_CFG)
normalized = dict(_DEFAULT_CFG)
for key in ("server_url", "token", "emoji", "sensitive_words_regex"):
value = cfg.get(key, _DEFAULT_CFG[key])
normalized[key] = value.strip() if isinstance(value, str) else _DEFAULT_CFG[key]
enable_log = cfg.get("enable_log", _DEFAULT_CFG["enable_log"])
normalized["enable_log"] = enable_log if isinstance(enable_log, bool) else _DEFAULT_CFG["enable_log"]
val = cfg.get("heartbeat_seconds", _DEFAULT_CFG["heartbeat_seconds"])
if not isinstance(val, (int, float)) or val < 10 or val > 600:
val = _DEFAULT_CFG["heartbeat_seconds"]
normalized["heartbeat_seconds"] = int(val)
return normalized
def save_config(cfg: dict) -> bool:
"""Save config to config.json atomically with restricted permissions."""
import tempfile
try:
data = json.dumps(cfg, indent=2, ensure_ascii=False).encode("utf-8")
fd = tempfile.NamedTemporaryFile(
dir=CONFIG_PATH.parent, prefix=".config_", suffix=".tmp",
delete=False,
)
tmp_path = Path(fd.name)
try:
fd.write(data)
fd.flush()
os.fsync(fd.fileno())
fd.close()
os.chmod(tmp_path, 0o600)
tmp_path.replace(CONFIG_PATH)
except BaseException:
fd.close()
tmp_path.unlink(missing_ok=True)
raise
return True
except Exception as e:
log.error("Config save failed: %s", e)
return False
def validate_config(cfg: dict) -> str | None:
"""Validate config. Return error message or None if valid."""
url = cfg.get("server_url", "").strip()
token = cfg.get("token", "").strip()
sensitive_regex = cfg.get("sensitive_words_regex", "").strip()
if not url:
return "上报接口 URL 不能为空"
if not token or token == "YOUR_TOKEN_HERE":
return "Token 不能为空"
if sensitive_regex:
import re
try:
re.compile(sensitive_regex)
except re.error:
return "标题敏感词排除正则表达式无效"
parsed = urllib.parse.urlparse(url)
scheme = parsed.scheme.lower()
hostname = parsed.hostname
if scheme not in ("http", "https"):
return "上报接口 URL 必须使用 http:// 或 https://"
if not hostname:
return "上报接口 URL 无效"
if scheme == "http":
try:
addrinfos = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
except socket.gaierror:
return f"无法解析域名: {hostname}"
for info in addrinfos:
ip = ipaddress.ip_address(info[4][0])
if ip.is_global:
return "HTTP 仅允许内网地址,公网请使用 HTTPS"
return None
# ---------------------------------------------------------------------------
# Windows autostart
# ---------------------------------------------------------------------------
AUTOSTART_NAME = "LiveDashboardAgent"
AUTOSTART_RUN_KEY = r"Software\Microsoft\Windows\CurrentVersion\Run"
def _get_autostart_command() -> str:
"""Return the command line used for login autostart."""
if getattr(sys, "frozen", False):
return subprocess.list2cmdline([str(Path(sys.executable).resolve())])
return subprocess.list2cmdline([sys.executable, str(Path(__file__).resolve())])
def _has_registry_autostart() -> bool:
"""Return whether the current user has a Run-key startup entry."""
try:
import winreg
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, AUTOSTART_RUN_KEY) as key:
value, _ = winreg.QueryValueEx(key, AUTOSTART_NAME)
except FileNotFoundError:
return False
except OSError as e:
log.warning("Autostart registry query failed: %s", e)
return False
return isinstance(value, str) and bool(value.strip())
def _set_registry_autostart(enabled: bool) -> bool:
"""Enable/disable login autostart through the current-user Run key."""
try:
import winreg
with winreg.CreateKey(winreg.HKEY_CURRENT_USER, AUTOSTART_RUN_KEY) as key:
if enabled:
winreg.SetValueEx(
key, AUTOSTART_NAME, 0, winreg.REG_SZ, _get_autostart_command()
)
else:
try:
winreg.DeleteValue(key, AUTOSTART_NAME)
except FileNotFoundError:
pass
return True
except OSError as e:
log.error("Autostart registry update failed: %s", e)
return False
def _has_legacy_startup_task() -> bool:
"""Return whether the legacy scheduled task based autostart exists."""
try:
result = subprocess.run(
["schtasks", "/query", "/tn", AUTOSTART_NAME],
capture_output=True,
text=True,
check=False,
timeout=5,
)
except (OSError, subprocess.SubprocessError) as e:
log.debug("Autostart task query failed: %s", e)
return False
return result.returncode == 0
def _remove_legacy_startup_task() -> bool:
"""Remove the legacy scheduled task if it exists."""
if not _has_legacy_startup_task():
return True
try:
result = subprocess.run(
["schtasks", "/delete", "/tn", AUTOSTART_NAME, "/f"],
capture_output=True,
text=True,
check=False,
timeout=10,
)
except (OSError, subprocess.SubprocessError) as e:
log.warning("Legacy startup task removal failed: %s", e)
return False
if result.returncode == 0:
return True
output = (result.stderr or result.stdout).strip()
if output:
log.warning("Legacy startup task removal failed: %s", output)
return False
def is_autostart_enabled() -> bool:
"""Return whether the agent is configured to launch at Windows logon."""
return _has_registry_autostart() or _has_legacy_startup_task()
def show_message(title: str, message: str, error: bool = False) -> None:
"""Show a best-effort native message box for user-facing actions."""
try:
flags = 0x10 if error else 0x40
ctypes.windll.user32.MessageBoxW(None, message, title, flags) # type: ignore[attr-defined]
except Exception:
log.info("%s: %s", title, message)
# ---------------------------------------------------------------------------
# Settings Dialog
# ---------------------------------------------------------------------------
def show_settings_dialog(current_config: dict | None = None) -> dict | None:
"""Show tkinter settings dialog. Returns new config or None if cancelled."""
try:
import tkinter as tk
from tkinter import ttk, messagebox
except ImportError:
log.error("tkinter 不可用,请手动编辑 %s", CONFIG_PATH)
return None
cfg = current_config or dict(_DEFAULT_CFG)
result: list[dict | None] = [None]
root = tk.Tk()
root.title("Live Dashboard - 设置")
root.resizable(False, False)
frame = ttk.Frame(root, padding=20)
frame.pack(fill="both", expand=True)
ttk.Label(frame, text="上报接口 URL:").grid(row=0, column=0, sticky="w", pady=6)
url_var = tk.StringVar(value=cfg.get("server_url", ""))
ttk.Entry(frame, textvariable=url_var, width=45).grid(row=0, column=1, pady=6, padx=(8, 0))
ttk.Label(frame, text="Token:").grid(row=1, column=0, sticky="w", pady=6)
token_var = tk.StringVar(value=cfg.get("token", ""))
ttk.Entry(frame, textvariable=token_var, width=45, show="*").grid(row=1, column=1, pady=6, padx=(8, 0))
ttk.Label(frame, text="Emoji:").grid(row=2, column=0, sticky="w", pady=6)
emoji_var = tk.StringVar(value=cfg.get("emoji", ""))
ttk.Entry(frame, textvariable=emoji_var, width=45).grid(row=2, column=1, pady=6, padx=(8, 0))
ttk.Label(frame, text="心跳间隔 (秒):").grid(row=3, column=0, sticky="w", pady=6)
heartbeat_var = tk.IntVar(value=cfg.get("heartbeat_seconds", 60))
ttk.Spinbox(frame, textvariable=heartbeat_var, from_=10, to=600, width=10).grid(row=3, column=1, sticky="w", pady=6, padx=(8, 0))
ttk.Label(frame, text="标题敏感词排除 (正则):").grid(row=4, column=0, sticky="w", pady=6)
sensitive_var = tk.StringVar(value=cfg.get("sensitive_words_regex", ""))
ttk.Entry(frame, textvariable=sensitive_var, width=45).grid(row=4, column=1, pady=6, padx=(8, 0))
log_var = tk.BooleanVar(value=cfg.get("enable_log", False))
ttk.Checkbutton(frame, text="开启日志文件 (保留 2 天)", variable=log_var).grid(
row=5, column=0, columnspan=2, sticky="w", pady=6
)
def on_save():
new_cfg = {
"server_url": url_var.get().strip(),
"token": token_var.get().strip(),
"emoji": emoji_var.get().strip(),
"heartbeat_seconds": heartbeat_var.get(),
"sensitive_words_regex": sensitive_var.get().strip(),
"enable_log": log_var.get(),
}
err = validate_config(new_cfg)
if err:
messagebox.showerror("配置错误", err, parent=root)
return
if save_config(new_cfg):
result[0] = new_cfg
root.destroy()
else:
messagebox.showerror("保存失败", "无法写入 config.json", parent=root)
btn_frame = ttk.Frame(frame)
btn_frame.grid(row=6, column=0, columnspan=2, pady=16)
ttk.Button(btn_frame, text="保存", command=on_save).pack(side="left", padx=12)
ttk.Button(btn_frame, text="取消", command=root.destroy).pack(side="left", padx=12)
# Center on screen
root.update_idletasks()
w, h = root.winfo_reqwidth(), root.winfo_reqheight()
x = (root.winfo_screenwidth() - w) // 2
y = (root.winfo_screenheight() - h) // 2
root.geometry(f"+{x}+{y}")
root.lift()
root.focus_force()
root.mainloop()
return result[0]
# ---------------------------------------------------------------------------
# Reporter
# ---------------------------------------------------------------------------
class Reporter:
"""Handles sending reports to the backend with exponential backoff."""
MAX_BACKOFF = 60
PAUSE_AFTER_FAILURES = 5
PAUSE_DURATION = 300
def __init__(self, server_url: str, token: str, emoji: str, ttl: int):
self.endpoint = server_url
self.token = token
self.emoji = emoji
self.ttl = ttl
self.session = requests.Session()
self.session.headers.update({
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
})
self._consecutive_failures = 0
self._current_backoff = 0
self._pause_until = 0.0
def send(self, desc: str) -> bool:
if self.pause_remaining > 0:
return False
payload = {
"key": self.token,
"emoji": self.emoji,
"desc": desc[:256],
"ttl": self.ttl,
}
try:
resp = self.session.post(self.endpoint, json=payload, timeout=10)
if resp.status_code in (200, 201, 409):
self._consecutive_failures = 0
self._current_backoff = 0
self._pause_until = 0.0
return True
log.warning("Server %d: %s", resp.status_code, resp.text[:200])
except requests.RequestException as e:
log.warning("Request failed: %s", e)
self._consecutive_failures += 1
if self._current_backoff == 0:
self._current_backoff = 5
else:
self._current_backoff = min(self._current_backoff * 2, self.MAX_BACKOFF)
if self._consecutive_failures >= self.PAUSE_AFTER_FAILURES:
log.warning("Failed %d times, pausing %ds", self._consecutive_failures, self.PAUSE_DURATION)
self._pause_until = time.monotonic() + self.PAUSE_DURATION
self._consecutive_failures = 0
self._current_backoff = 0
return False
@property
def backoff(self) -> float:
return self._current_backoff
@property
def pause_remaining(self) -> float:
remaining = self._pause_until - time.monotonic()
if remaining <= 0:
self._pause_until = 0.0
return 0.0
return remaining
@property
def retry_delay(self) -> float:
return self.pause_remaining or self.backoff
# ---------------------------------------------------------------------------
# System Tray
# ---------------------------------------------------------------------------
shutdown_event = threading.Event()
def _make_tray_icon(color: str = "green") -> "PIL.Image.Image":
"""Generate a colored circle icon for the system tray."""
from PIL import Image, ImageDraw
size = 64
img = Image.new("RGBA", (size, size), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
colors = {"green": (76, 175, 80), "orange": (255, 152, 0), "gray": (158, 158, 158)}
rgb = colors.get(color, colors["gray"])
draw.ellipse([8, 8, size - 8, size - 8], fill=(*rgb, 255))
return img
class TrayAgent:
"""System tray with Chinese UI, hover tooltip, and integrated settings."""
def __init__(self):
import pystray
self._pystray = pystray
self._lock = threading.Lock()
self._status = "初始化中"
self._current_target = ""
self._icon: pystray.Icon | None = None
self._settings_requested = False
self._icons = {
"green": _make_tray_icon("green"),
"orange": _make_tray_icon("orange"),
"gray": _make_tray_icon("gray"),
}
def _build_menu(self):
p = self._pystray
return p.Menu(
p.MenuItem(lambda _: f"状态: {self._get_status()}", None, enabled=False),
p.MenuItem(lambda _: f"当前: {self._get_current() or '无'}", None, enabled=False),
p.Menu.SEPARATOR,
p.MenuItem("日志文件", self._toggle_log,
checked=lambda _: _file_handler is not None),
p.MenuItem("开机自启", self._toggle_autostart,
checked=lambda _: is_autostart_enabled()),
p.MenuItem("设置", self._open_settings),
p.Menu.SEPARATOR,
p.MenuItem("退出", self._quit),
)
def _get_status(self) -> str:
with self._lock:
return self._status
def _get_current(self) -> str:
with self._lock:
return self._current_target
def update_status(self, status: str, current_target: str | None = None):
with self._lock:
self._status = status
if current_target is not None:
self._current_target = current_target
current_target_value = self._current_target
if self._icon:
color = {"在线": "green"}.get(status, "gray")
self._icon.icon = self._icons[color]
# Hover tooltip shows current app + status.
tip = "Live Dashboard"
if current_target_value:
tip += f"\n当前: {current_target_value}"
tip += f"\n{status}"
self._icon.title = tip[:127]
def _toggle_log(self):
enabled = _file_handler is None
set_file_logging(enabled)
cfg = load_config()
cfg["enable_log"] = enabled
save_config(cfg)
if self._icon:
self._icon.update_menu()
def _toggle_autostart(self):
enabled = is_autostart_enabled()
if enabled:
registry_ok = _set_registry_autostart(False)
legacy_ok = _remove_legacy_startup_task()
if registry_ok and legacy_ok:
log.info("Autostart disabled")
else:
show_message(
"Live Dashboard",
"关闭开机自启时未能清理全部启动项。\n请检查任务计划程序中的 LiveDashboardAgent。",
error=True,
)
else:
if _set_registry_autostart(True):
log.info("Autostart enabled")
else:
show_message(
"Live Dashboard",
"无法开启开机自启,请检查当前账户是否有写入启动项的权限。",
error=True,
)
if self._icon:
self._icon.update_menu()
def _open_settings(self):
self._settings_requested = True
if self._icon:
self._icon.stop()
def _quit(self):
shutdown_event.set()
if self._icon:
self._icon.stop()
logging.shutdown()
os._exit(0)
@property
def settings_requested(self) -> bool:
return self._settings_requested
def run(self):
"""Run the tray icon (blocking; call from main thread)."""
icon_path = base_dir / "icon.ico"
if icon_path.exists():
from PIL import Image
with Image.open(icon_path) as im:
icon_img = im.copy()
else:
icon_img = _make_tray_icon("gray")
self._icon = self._pystray.Icon(
"live-dashboard",
icon_img,
"Live Dashboard",
menu=self._build_menu(),
)
self._icon.run()
# ---------------------------------------------------------------------------
# Monitor loop
# ---------------------------------------------------------------------------
def _monitor_loop(cfg: dict, reporter: Reporter, tray: TrayAgent | None) -> None:
heartbeat_interval = cfg["heartbeat_seconds"]
sensitive_regex_str = cfg.get("sensitive_words_regex", "")
sensitive_regex = None
if sensitive_regex_str:
import re
try:
sensitive_regex = re.compile(sensitive_regex_str)
except re.error:
pass
prev_app: str | None = None
prev_title: str | None = None
last_report_time: float = 0
log.info(
"Monitoring - poll=%ds, heartbeat=%ds",
MONITOR_POLL_SECONDS, heartbeat_interval,
)
while not shutdown_event.is_set():
try:
now = time.time()
info = get_foreground_info()
if info is None:
shutdown_event.wait(MONITOR_POLL_SECONDS)
continue
app_id, title = info
if sensitive_regex and sensitive_regex.search(title):
title = ""
# Keep tray status responsive; current item is updated only after a successful report.
if tray:
tray.update_status("在线")
changed = app_id != prev_app or title != prev_title
heartbeat_due = (now - last_report_time) >= heartbeat_interval
if changed or heartbeat_due:
reported_target = format_report_desc(app_id, title)
success = reporter.send(reported_target)
if success:
prev_app = app_id
prev_title = title
last_report_time = now
if tray:
tray.update_status("在线", reported_target)
if changed:
log.info("Reported: %s", reported_target)
elif reporter.retry_delay > 0:
shutdown_event.wait(reporter.retry_delay)
continue
shutdown_event.wait(MONITOR_POLL_SECONDS)
except Exception as e:
log.error("Error: %s", e, exc_info=True)
shutdown_event.wait(MONITOR_POLL_SECONDS)
log.info("Monitor stopped")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
log.info("Live Dashboard Windows Agent")
while True:
cfg = load_config()
# No valid config 鈫?show settings dialog
if not cfg.get("server_url") or not cfg.get("token") or cfg.get("token") == "YOUR_TOKEN_HERE":
cfg = show_settings_dialog(cfg)
if cfg is None:
return
cfg = load_config()
err = validate_config(cfg)
if err:
log.warning("Invalid config: %s", err)
cfg = show_settings_dialog(cfg)
if cfg is None:
return
cfg = load_config()
continue
# Apply log preference
set_file_logging(cfg.get("enable_log", False))
if cfg.get("enable_log"):
log.info("HTTP: %s", "HTTPS" if cfg["server_url"].startswith("https") else "HTTP (内网)")
reporter = Reporter(
cfg["server_url"],
cfg["token"],
cfg.get("emoji", ""),
cfg["heartbeat_seconds"],
)
tray: TrayAgent | None = None
try:
tray = TrayAgent()
except ImportError:
log.warning("pystray/Pillow not installed, running without tray")
except Exception as e:
log.warning("Tray init failed: %s", e)
if tray:
monitor = threading.Thread(
target=_monitor_loop, args=(cfg, reporter, tray), daemon=True
)
monitor.start()
tray.run() # Blocks until quit or settings
shutdown_event.set()
monitor.join(timeout=5)
if tray.settings_requested:
shutdown_event.clear()
new_cfg = show_settings_dialog(cfg)
if new_cfg is None:
continue # Cancelled, restart with old config
continue # Restart with new config
else:
break # Quit
else:
try:
_monitor_loop(cfg, reporter, None)
except KeyboardInterrupt:
pass
break
log.info("Agent stopped")
if __name__ == "__main__":
main()