From 7a307f4cb98a7dc83519401409d5f622783bf633 Mon Sep 17 00:00:00 2001 From: AuroraWright Date: Sat, 3 May 2025 18:50:00 +0200 Subject: [PATCH] Move screencapture code into a class, ensure all screenshots get processed in combo mode --- owocr/run.py | 396 +++++++++++++++++++++++++++------------------------ 1 file changed, 213 insertions(+), 183 deletions(-) diff --git a/owocr/run.py b/owocr/run.py index 4b2ac4a..f2c768d 100644 --- a/owocr/run.py +++ b/owocr/run.py @@ -462,6 +462,189 @@ class TextFiltering: return text, orig_text_filtered +class ScreenshotClass: + def __init__(self, screen_capture_on_combo): + screen_capture_area = config.get_general('screen_capture_area') + if type(screen_capture_area) == tuple: + screen_capture_area = ','.join(map(str, screen_capture_area)) + global screencapture_window_active + global screencapture_window_visible + screencapture_window_active = True + screencapture_window_visible = True + self.macos_window_tracker = None + self.windows_window_tracker = None + if screen_capture_area == '': + self.screencapture_mode = 0 + elif screen_capture_area.startswith('screen_'): + parts = screen_capture_area.split('_') + if len(parts) != 2 or not parts[1].isdigit(): + raise ValueError('Invalid screen_capture_area') + screen_capture_monitor = int(parts[1]) + self.screencapture_mode = 1 + elif len(screen_capture_area.split(',')) == 4: + self.screencapture_mode = 3 + else: + self.screencapture_mode = 2 + + if self.screencapture_mode != 2: + self.sct = mss.mss() + + if self.screencapture_mode == 1: + mon = self.sct.monitors + if len(mon) <= screen_capture_monitor: + raise ValueError('Invalid monitor number in screen_capture_area') + coord_left = mon[screen_capture_monitor]['left'] + coord_top = mon[screen_capture_monitor]['top'] + coord_width = mon[screen_capture_monitor]['width'] + coord_height = mon[screen_capture_monitor]['height'] + elif self.screencapture_mode == 3: + coord_left, coord_top, coord_width, coord_height = [int(c.strip()) for c in screen_capture_area.split(',')] + else: + logger.opt(ansi=True).info('Launching screen coordinate picker') + screen_selection = get_screen_selection() + if not screen_selection: + raise ValueError('Picker window was closed or an error occurred') + screen_capture_monitor = screen_selection['monitor'] + x, y, coord_width, coord_height = screen_selection['coordinates'] + if coord_width > 0 and coord_height > 0: + coord_top = screen_capture_monitor['top'] + y + coord_left = screen_capture_monitor['left'] + x + else: + logger.opt(ansi=True).info('Selection is empty, selecting whole screen') + coord_left = screen_capture_monitor['left'] + coord_top = screen_capture_monitor['top'] + coord_width = screen_capture_monitor['width'] + coord_height = screen_capture_monitor['height'] + + self.sct_params = {'top': coord_top, 'left': coord_left, 'width': coord_width, 'height': coord_height} + logger.opt(ansi=True).info(f'Selected coordinates: {coord_left},{coord_top},{coord_width},{coord_height}') + else: + screen_capture_only_active_windows = (not screen_capture_on_combo) and config.get_general('screen_capture_only_active_windows') + area_invalid_error = '"screen_capture_area" must be empty, "screen_N" where N is a screen number starting from 1, a valid set of coordinates, or a valid window name' + if sys.platform == 'darwin': + if int(platform.mac_ver()[0].split('.')[0]) < 14: + self.old_macos_screenshot_api = True + else: + self.old_macos_screenshot_api = False + global screencapturekit_queue + screencapturekit_queue = queue.Queue() + CGMainDisplayID() + window_list = CGWindowListCopyWindowInfo(kCGWindowListExcludeDesktopElements, kCGNullWindowID) + window_titles = [] + window_ids = [] + window_index = None + for i, window in enumerate(window_list): + window_title = window.get(kCGWindowName, '') + if psutil.Process(window['kCGWindowOwnerPID']).name() not in ('Terminal', 'iTerm2'): + window_titles.append(window_title) + window_ids.append(window['kCGWindowNumber']) + + if screen_capture_area in window_titles: + window_index = window_titles.index(screen_capture_area) + else: + for t in window_titles: + if screen_capture_area in t: + window_index = window_titles.index(t) + break + + if not window_index: + raise ValueError(area_invalid_error) + + self.window_id = window_ids[window_index] + window_title = window_titles[window_index] + + if screen_capture_only_active_windows: + screencapture_window_active = False + self.macos_window_tracker = MacOSWindowTracker(self.window_id) + self.macos_window_tracker.start() + logger.opt(ansi=True).info(f'Selected window: {window_title}') + elif sys.platform == 'win32': + self.window_handle, window_title = get_windows_window_handle(screen_capture_area) + + if not self.window_handle: + raise ValueError(area_invalid_error) + + ctypes.windll.shcore.SetProcessDpiAwareness(1) + + if screen_capture_only_active_windows: + screencapture_window_active = False + self.windows_window_tracker = WindowsWindowTracker(self.window_handle, screen_capture_only_active_windows) + self.windows_window_tracker.start() + logger.opt(ansi=True).info(f'Selected window: {window_title}') + else: + raise ValueError('Window capture is only currently supported on Windows and macOS') + + def __del__(self): + if self.macos_window_tracker: + self.macos_window_tracker.stop = True + self.macos_window_tracker.join() + elif self.windows_window_tracker: + self.windows_window_tracker.stop = True + self.windows_window_tracker.join() + + def __call__(self): + if self.screencapture_mode == 2: + if sys.platform == 'darwin': + with objc.autorelease_pool(): + if self.old_macos_screenshot_api: + cg_image = CGWindowListCreateImageFromArray(CGRectNull, [self.window_id], kCGWindowImageBoundsIgnoreFraming) + else: + capture_macos_window_screenshot(self.window_id) + try: + cg_image = screencapturekit_queue.get(timeout=0.5) + except queue.Empty: + cg_image = None + if not cg_image: + return None + width = CGImageGetWidth(cg_image) + height = CGImageGetHeight(cg_image) + raw_data = CGDataProviderCopyData(CGImageGetDataProvider(cg_image)) + bpr = CGImageGetBytesPerRow(cg_image) + img = Image.frombuffer('RGBA', (width, height), raw_data, 'raw', 'BGRA', bpr, 1) + else: + try: + coord_left, coord_top, right, bottom = win32gui.GetWindowRect(self.window_handle) + coord_width = right - coord_left + coord_height = bottom - coord_top + + hwnd_dc = win32gui.GetWindowDC(self.window_handle) + mfc_dc = win32ui.CreateDCFromHandle(hwnd_dc) + save_dc = mfc_dc.CreateCompatibleDC() + + save_bitmap = win32ui.CreateBitmap() + save_bitmap.CreateCompatibleBitmap(mfc_dc, coord_width, coord_height) + save_dc.SelectObject(save_bitmap) + + result = ctypes.windll.user32.PrintWindow(self.window_handle, save_dc.GetSafeHdc(), 2) + + bmpinfo = save_bitmap.GetInfo() + bmpstr = save_bitmap.GetBitmapBits(True) + except pywintypes.error: + return None + img = Image.frombuffer('RGB', (bmpinfo['bmWidth'], bmpinfo['bmHeight']), bmpstr, 'raw', 'BGRX', 0, 1) + try: + win32gui.DeleteObject(save_bitmap.GetHandle()) + except: + pass + try: + save_dc.DeleteDC() + except: + pass + try: + mfc_dc.DeleteDC() + except: + pass + try: + win32gui.ReleaseDC(self.window_handle, hwnd_dc) + except: + pass + else: + sct_img = self.sct.grab(self.sct_params) + img = Image.frombytes('RGB', sct_img.size, sct_img.bgra, 'raw', 'BGRX') + + return img + + class AutopauseTimer: def __init__(self, timeout): self.stop_event = threading.Event() @@ -566,11 +749,6 @@ def user_input_thread_run(): termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) -def on_screenshot_combo(): - if not paused: - screenshot_event.set() - - def signal_handler(sig, frame): global terminated logger.info('Terminated!') @@ -594,6 +772,12 @@ def on_window_minimized(minimized): screencapture_window_visible = not minimized +def on_screenshot_combo(): + if not paused: + img = take_screenshot() + screenshot_queue.put(img) + + def process_and_write_results(img_or_path, last_result, filtering): if auto_pause_handler: auto_pause_handler.stop() @@ -730,126 +914,18 @@ def run(): clipboard_thread.start() read_from_readable = 'clipboard' elif read_from == 'screencapture': - screen_capture_area = config.get_general('screen_capture_area') screen_capture_delay_secs = config.get_general('screen_capture_delay_secs') screen_capture_combo = config.get_general('screen_capture_combo') + last_result = ([], engine_index) if screen_capture_combo != '': screen_capture_on_combo = True - global screenshot_event - screenshot_event = threading.Event() + global screenshot_queue + screenshot_queue = queue.Queue() key_combos[screen_capture_combo] = on_screenshot_combo else: screen_capture_on_combo = False - if type(screen_capture_area) == tuple: - screen_capture_area = ','.join(map(str, screen_capture_area)) - global screencapture_window_active - global screencapture_window_visible - screencapture_mode = None - screencapture_window_active = True - screencapture_window_visible = True - last_result = ([], engine_index) - if screen_capture_area == '': - screencapture_mode = 0 - elif screen_capture_area.startswith('screen_'): - parts = screen_capture_area.split('_') - if len(parts) != 2 or not parts[1].isdigit(): - raise ValueError('Invalid screen_capture_area') - screen_capture_monitor = int(parts[1]) - screencapture_mode = 1 - elif len(screen_capture_area.split(',')) == 4: - screencapture_mode = 3 - else: - screencapture_mode = 2 - - if screencapture_mode != 2: - sct = mss.mss() - - if screencapture_mode == 1: - mon = sct.monitors - if len(mon) <= screen_capture_monitor: - raise ValueError('Invalid monitor number in screen_capture_area') - coord_left = mon[screen_capture_monitor]['left'] - coord_top = mon[screen_capture_monitor]['top'] - coord_width = mon[screen_capture_monitor]['width'] - coord_height = mon[screen_capture_monitor]['height'] - elif screencapture_mode == 3: - coord_left, coord_top, coord_width, coord_height = [int(c.strip()) for c in screen_capture_area.split(',')] - else: - logger.opt(ansi=True).info('Launching screen coordinate picker') - screen_selection = get_screen_selection() - if not screen_selection: - raise ValueError('Picker window was closed or an error occurred') - screen_capture_monitor = screen_selection['monitor'] - x, y, coord_width, coord_height = screen_selection['coordinates'] - if coord_width > 0 and coord_height > 0: - coord_top = screen_capture_monitor['top'] + y - coord_left = screen_capture_monitor['left'] + x - else: - logger.opt(ansi=True).info('Selection is empty, selecting whole screen') - coord_left = screen_capture_monitor['left'] - coord_top = screen_capture_monitor['top'] - coord_width = screen_capture_monitor['width'] - coord_height = screen_capture_monitor['height'] - - sct_params = {'top': coord_top, 'left': coord_left, 'width': coord_width, 'height': coord_height} - logger.opt(ansi=True).info(f'Selected coordinates: {coord_left},{coord_top},{coord_width},{coord_height}') - else: - screen_capture_only_active_windows = config.get_general('screen_capture_only_active_windows') - area_invalid_error = '"screen_capture_area" must be empty, "screen_N" where N is a screen number starting from 1, a valid set of coordinates, or a valid window name' - if sys.platform == 'darwin': - if int(platform.mac_ver()[0].split('.')[0]) < 14: - old_macos_screenshot_api = True - else: - global screencapturekit_queue - screencapturekit_queue = queue.Queue() - CGMainDisplayID() - old_macos_screenshot_api = False - - window_list = CGWindowListCopyWindowInfo(kCGWindowListExcludeDesktopElements, kCGNullWindowID) - window_titles = [] - window_ids = [] - window_index = None - for i, window in enumerate(window_list): - window_title = window.get(kCGWindowName, '') - if psutil.Process(window['kCGWindowOwnerPID']).name() not in ('Terminal', 'iTerm2'): - window_titles.append(window_title) - window_ids.append(window['kCGWindowNumber']) - - if screen_capture_area in window_titles: - window_index = window_titles.index(screen_capture_area) - else: - for t in window_titles: - if screen_capture_area in t: - window_index = window_titles.index(t) - break - - if not window_index: - raise ValueError(area_invalid_error) - - window_id = window_ids[window_index] - window_title = window_titles[window_index] - - if screen_capture_only_active_windows: - screencapture_window_active = False - macos_window_tracker = MacOSWindowTracker(window_id) - macos_window_tracker.start() - logger.opt(ansi=True).info(f'Selected window: {window_title}') - elif sys.platform == 'win32': - window_handle, window_title = get_windows_window_handle(screen_capture_area) - - if not window_handle: - raise ValueError(area_invalid_error) - - ctypes.windll.shcore.SetProcessDpiAwareness(1) - - if screen_capture_only_active_windows: - screencapture_window_active = False - windows_window_tracker = WindowsWindowTracker(window_handle, screen_capture_only_active_windows) - windows_window_tracker.start() - logger.opt(ansi=True).info(f'Selected window: {window_title}') - else: - raise ValueError('Window capture is only currently supported on Windows and macOS') - + global take_screenshot + take_screenshot = ScreenshotClass(screen_capture_on_combo) filtering = TextFiltering() read_from_readable = 'screen capture' else: @@ -915,68 +991,30 @@ def run(): img = item if isinstance(item, Image.Image) else Image.open(io.BytesIO(item)) process_and_write_results(img, None, None) elif read_from == 'screencapture': + img = None if screen_capture_on_combo: - take_screenshot = screenshot_event.wait(0.5) - if take_screenshot: - screenshot_event.clear() - else: - take_screenshot = screencapture_window_active and not paused - sleep_time = 0.5 - - if take_screenshot and screencapture_window_visible: - if screencapture_mode == 2: - if sys.platform == 'darwin': - with objc.autorelease_pool(): - if old_macos_screenshot_api: - cg_image = CGWindowListCreateImageFromArray(CGRectNull, [window_id], kCGWindowImageBoundsIgnoreFraming) - else: - capture_macos_window_screenshot(window_id) - try: - cg_image = screencapturekit_queue.get(timeout=0.5) - except queue.Empty: - cg_image = None - if not cg_image: - on_window_closed(False) - break - width = CGImageGetWidth(cg_image) - height = CGImageGetHeight(cg_image) - raw_data = CGDataProviderCopyData(CGImageGetDataProvider(cg_image)) - bpr = CGImageGetBytesPerRow(cg_image) - img = Image.frombuffer('RGBA', (width, height), raw_data, 'raw', 'BGRA', bpr, 1) - else: - try: - coord_left, coord_top, right, bottom = win32gui.GetWindowRect(window_handle) - coord_width = right - coord_left - coord_height = bottom - coord_top - - hwnd_dc = win32gui.GetWindowDC(window_handle) - mfc_dc = win32ui.CreateDCFromHandle(hwnd_dc) - save_dc = mfc_dc.CreateCompatibleDC() - - save_bitmap = win32ui.CreateBitmap() - save_bitmap.CreateCompatibleBitmap(mfc_dc, coord_width, coord_height) - save_dc.SelectObject(save_bitmap) - - result = ctypes.windll.user32.PrintWindow(window_handle, save_dc.GetSafeHdc(), 2) - - bmpinfo = save_bitmap.GetInfo() - bmpstr = save_bitmap.GetBitmapBits(True) - except pywintypes.error: - on_window_closed(False) - break - img = Image.frombuffer('RGB', (bmpinfo['bmWidth'], bmpinfo['bmHeight']), bmpstr, 'raw', 'BGRX', 0, 1) - - win32gui.DeleteObject(save_bitmap.GetHandle()) - save_dc.DeleteDC() - mfc_dc.DeleteDC() - win32gui.ReleaseDC(window_handle, hwnd_dc) + try: + img = screenshot_queue.get(timeout=0.5) + except queue.Empty: + pass else: - sct_img = sct.grab(sct_params) - img = Image.frombytes('RGB', sct_img.size, sct_img.bgra, 'raw', 'BGRX') + if not img: + on_window_closed(False) + terminated = True + break + else: + sleep_time = 0.5 + if (not paused) and screencapture_window_active and screencapture_window_visible: + img = take_screenshot() + if not img: + on_window_closed(False) + terminated = True + break + sleep_time = screen_capture_delay_secs + if img: res = process_and_write_results(img, last_result, filtering) if res: last_result = (res, engine_index) - sleep_time = screen_capture_delay_secs else: sleep_time = delay_secs for path in read_from.iterdir(): @@ -1006,14 +1044,6 @@ def run(): if sys.platform == 'win32': win32api.PostThreadMessage(clipboard_thread.thread_id, win32con.WM_QUIT, 0, 0) clipboard_thread.join() - elif read_from == 'screencapture' and screencapture_mode == 2: - if sys.platform == 'darwin': - if screen_capture_only_active_windows: - macos_window_tracker.stop = True - macos_window_tracker.join() - else: - windows_window_tracker.stop = True - windows_window_tracker.join() elif read_from == 'unixsocket': unix_socket_server.shutdown() unix_socket_server_thread.join()