Add screenshot backend
All checks were successful
Build and Upload Python Package / build (push) Successful in 2m22s

This commit is contained in:
2025-12-11 12:24:39 -08:00
parent ac6feb6af2
commit 5ae51ec08a
4 changed files with 319 additions and 11 deletions

View File

@@ -0,0 +1,28 @@
name: Build and Upload Python Package
on:
push:
branches:
- master
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: "3.x"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install setuptools wheel twine build
- name: Build the package
run: python -m build
- name: Upload to Gitea PyPI Registry
run: |
twine upload --repository-url https://gitea.suda.codes/api/packages/sudacode/pypi -u ${{ secrets.PYPI_USERNAME }} -p ${{ secrets.PYPI_PASSWORD }} dist/*

View File

@@ -17,7 +17,6 @@ import urllib.request
import numpy as np
import pyperclipfix
import mss
import psutil
import asyncio
import websockets
@@ -32,6 +31,10 @@ from desktop_notifier import DesktopNotifierSync, Urgency
from .ocr import *
from .config import config
from .screen_coordinate_picker import get_screen_selection, terminate_selector_if_running
from .screenshot_backend import (
ScreenshotBackendError,
get_screenshot_backend,
)
try:
import win32gui
@@ -1384,6 +1387,7 @@ class ScreenshotThread(threading.Thread):
self.window_visible = True
self.window_closed = False
self.window_size = None
self.screenshot_backend = None
if screen_capture_area == '':
self.screencapture_mode = 0
@@ -1402,10 +1406,13 @@ class ScreenshotThread(threading.Thread):
self.launch_coordinate_picker(True, False)
if self.screencapture_mode != 2:
self.sct = mss.mss()
try:
self.screenshot_backend = get_screenshot_backend()
except ScreenshotBackendError as exc:
exit_with_error(str(exc))
if self.screencapture_mode == 1:
mon = self.sct.monitors
mon = self.screenshot_backend.monitors
if len(mon) <= screen_capture_monitor:
exit_with_error('Invalid monitor number in screen_capture_area')
coord_left = mon[screen_capture_monitor]['left']
@@ -1420,6 +1427,8 @@ class ScreenshotThread(threading.Thread):
if self.screencapture_mode != 0:
self.sct_params = {'top': coord_top, 'left': coord_left, 'width': coord_width, 'height': coord_height}
logger.info(f'Selected coordinates: {coord_left},{coord_top},{coord_width},{coord_height}')
elif not hasattr(self, 'sct_params'):
exit_with_error('No capture region selected; picker was cancelled')
else:
self.screen_capture_only_active_windows = config.get_general('screen_capture_only_active_windows')
self.window_area_coordinates = None
@@ -1686,7 +1695,17 @@ class ScreenshotThread(threading.Thread):
else:
img = img.crop(self.window_area_coordinates)
else:
sct_img = self.sct.grab(self.sct_params)
if not self.screenshot_backend:
logger.error('Screenshot backend is not initialized')
return False
if not hasattr(self, 'sct_params'):
logger.error('No capture region selected; run the coordinate picker again')
return False
try:
sct_img = self.screenshot_backend.grab(self.sct_params)
except ScreenshotBackendError as exc:
logger.error(exc)
return False
img = Image.frombytes('RGB', sct_img.size, sct_img.bgra, 'raw', 'BGRX')
return img
@@ -1734,7 +1753,7 @@ class ScreenshotThread(threading.Thread):
logger.info('Launching screen coordinate picker')
screen_selection = get_screen_selection(None, self.coordinate_selector_combo_enabled)
if not screen_selection:
if on_init:
if init or must_return:
exit_with_error('Picker window was closed or an error occurred')
else:
logger.warning('Picker window was closed or an error occurred, leaving settings unchanged')
@@ -1773,8 +1792,11 @@ class ScreenshotThread(threading.Thread):
logger.info('Selection is empty, selecting whole window')
def run(self):
if self.screencapture_mode != 2:
self.sct = mss.mss()
if self.screencapture_mode != 2 and self.screenshot_backend is None:
try:
self.screenshot_backend = get_screenshot_backend()
except ScreenshotBackendError as exc:
exit_with_error(str(exc))
while not terminated.is_set():
if coordinate_selector_event.is_set():
self.launch_coordinate_picker(False, False)

View File

@@ -1,9 +1,11 @@
import multiprocessing
import queue
import mss
import sys
from loguru import logger
from PIL import Image
import sys
from .screenshot_backend import ScreenshotBackendError, get_screenshot_backend
try:
from AppKit import NSApplication, NSApplicationActivationPolicyAccessory
except ImportError:
@@ -19,8 +21,14 @@ except:
class ScreenSelector:
def __init__(self, result_queue, command_queue):
self.sct = mss.mss()
self.monitors = self.sct.monitors[1:]
try:
self.sct = get_screenshot_backend()
except ScreenshotBackendError as exc:
logger.error(exc)
sys.exit(1)
monitors = self.sct.monitors
self.monitors = monitors[1:] if len(monitors) > 1 else monitors
self.root = None
self.result_queue = result_queue
self.command_queue = command_queue

250
owocr/screenshot_backend.py Normal file
View File

@@ -0,0 +1,250 @@
import io
import json
import os
import shutil
import subprocess
from types import SimpleNamespace
import mss
from loguru import logger
from PIL import Image
class ScreenshotBackendError(Exception):
"""Raised when a screenshot backend cannot be initialized or capture fails."""
def is_wayland_session():
session_type = os.environ.get("XDG_SESSION_TYPE", "").lower()
return session_type == "wayland" or bool(os.environ.get("WAYLAND_DISPLAY"))
class _MssScreenshotBackend:
def __init__(self):
try:
self.sct = mss.mss()
except Exception as exc:
raise ScreenshotBackendError(f"Could not initialize mss: {exc}") from exc
@property
def monitors(self):
return self.sct.monitors
def grab(self, region):
try:
return self.sct.grab(region)
except Exception as exc:
raise ScreenshotBackendError(f"Could not grab screenshot: {exc}") from exc
def close(self):
try:
self.sct.close()
except Exception:
pass
class _WaylandGrimBackend:
def __init__(self):
if not shutil.which("grim"):
raise ScreenshotBackendError(
"Wayland session detected but `grim` was not found. "
"Install `grim` to enable screen capture on Wayland."
)
self.monitors = self._load_monitors()
def _load_monitors(self):
loaders = (
self._load_hyprland_monitors,
self._load_sway_monitors,
self._load_fullscreen_geometry,
)
for loader in loaders:
try:
monitors = loader()
except FileNotFoundError:
continue
except ScreenshotBackendError as exc:
logger.debug(str(exc))
continue
if monitors:
break
else:
raise ScreenshotBackendError(
"Could not determine monitor geometry on Wayland. "
"Install `hyprctl` (Hyprland) or `swaymsg` (Sway/wlroots) for monitor info."
)
min_left = min(m["left"] for m in monitors)
min_top = min(m["top"] for m in monitors)
max_right = max(m["left"] + m["width"] for m in monitors)
max_bottom = max(m["top"] + m["height"] for m in monitors)
aggregate = {
"left": min_left,
"top": min_top,
"width": max_right - min_left,
"height": max_bottom - min_top,
"name": "ALL",
}
return [aggregate] + monitors
def _load_hyprland_monitors(self):
if not shutil.which("hyprctl"):
raise FileNotFoundError
try:
proc = subprocess.run(
["hyprctl", "-j", "monitors"],
capture_output=True,
text=True,
check=True,
)
except subprocess.CalledProcessError as exc:
stderr = exc.stderr.strip() if exc.stderr else str(exc)
raise ScreenshotBackendError(f"hyprctl monitors failed: {stderr}") from exc
try:
data = json.loads(proc.stdout)
except json.JSONDecodeError as exc:
raise ScreenshotBackendError(
f"Failed to parse hyprctl output: {exc}"
) from exc
monitors = []
for mon in data:
monitors.append(
{
"left": int(mon.get("x", 0)),
"top": int(mon.get("y", 0)),
"width": int(mon.get("width", 0)),
"height": int(mon.get("height", 0)),
"name": mon.get("name", "unknown"),
}
)
if not monitors:
raise ScreenshotBackendError("hyprctl returned no monitors.")
return monitors
def _load_sway_monitors(self):
if not shutil.which("swaymsg"):
raise FileNotFoundError
try:
proc = subprocess.run(
["swaymsg", "-t", "get_outputs", "-r"],
capture_output=True,
text=True,
check=True,
)
except subprocess.CalledProcessError as exc:
stderr = exc.stderr.strip() if exc.stderr else str(exc)
raise ScreenshotBackendError(
f"swaymsg get_outputs failed: {stderr}"
) from exc
try:
outputs = json.loads(proc.stdout)
except json.JSONDecodeError as exc:
raise ScreenshotBackendError(
f"Failed to parse swaymsg output: {exc}"
) from exc
monitors = []
for out in outputs:
if not out.get("active", True):
continue
rect = out.get("rect") or {}
monitors.append(
{
"left": int(rect.get("x", 0)),
"top": int(rect.get("y", 0)),
"width": int(rect.get("width", 0)),
"height": int(rect.get("height", 0)),
"name": out.get("name", "unknown"),
}
)
if not monitors:
raise ScreenshotBackendError("swaymsg returned no active outputs.")
return monitors
def _load_fullscreen_geometry(self):
try:
proc = subprocess.run(
["grim", "-"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
except subprocess.CalledProcessError as exc:
stderr = exc.stderr.decode().strip() if exc.stderr else str(exc)
raise ScreenshotBackendError(
f"Failed to capture fullscreen with grim: {stderr}"
) from exc
if not proc.stdout:
raise ScreenshotBackendError(
"grim returned no data for fullscreen capture."
)
image = Image.open(io.BytesIO(proc.stdout))
image.load()
width, height = image.size
return [
{
"left": 0,
"top": 0,
"width": width,
"height": height,
"name": "fullscreen",
}
]
def grab(self, region):
geometry = f"{int(region['left'])},{int(region['top'])} {int(region['width'])}x{int(region['height'])}"
try:
proc = subprocess.run(
["grim", "-g", geometry, "-"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
except subprocess.CalledProcessError as exc:
stderr = exc.stderr.decode().strip() if exc.stderr else str(exc)
raise ScreenshotBackendError(
f"Failed to grab screenshot with grim: {stderr}"
) from exc
if not proc.stdout:
raise ScreenshotBackendError("grim returned no data for screenshot.")
image = Image.open(io.BytesIO(proc.stdout)).convert("RGBA")
image.load()
try:
bgra_bytes = image.tobytes("raw", "BGRX")
except Exception:
try:
rgba_bytes = image.convert("RGBA").tobytes()
bgra_bytes = bytearray(len(rgba_bytes))
mv = memoryview(rgba_bytes)
for i in range(0, len(rgba_bytes), 4):
bgra_bytes[i] = mv[i + 2] # B
bgra_bytes[i + 1] = mv[i + 1] # G
bgra_bytes[i + 2] = mv[i] # R
bgra_bytes[i + 3] = mv[i + 3] # A
bgra_bytes = bytes(bgra_bytes)
except Exception:
raise
return SimpleNamespace(
size=image.size,
rgb=image.convert("RGB").tobytes(),
bgra=bgra_bytes,
)
def close(self):
# grim has no persistent resources to dispose
pass
def get_screenshot_backend():
if is_wayland_session():
logger.info("Wayland session detected, using grim for screenshots")
return _WaylandGrimBackend()
return _MssScreenshotBackend()