Add Windows native clipboard polling and a note in the readme

This commit is contained in:
AuroraWright
2024-01-21 23:43:33 +01:00
parent 70687e6b01
commit 6461ec80f7
2 changed files with 118 additions and 67 deletions

View File

@@ -30,6 +30,7 @@ However:
- holding ctrl or cmd at any time will pause image processing temporarily - holding ctrl or cmd at any time will pause image processing temporarily
- for systems where text can be copied to the clipboard at the same time as images, if `*ocr_ignore*` is copied with an image, the image will be ignored - for systems where text can be copied to the clipboard at the same time as images, if `*ocr_ignore*` is copied with an image, the image will be ignored
- optionally, notifications can be enabled in the config file to show the text with a native OS notification - optionally, notifications can be enabled in the config file to show the text with a native OS notification
- optionally, idle resource usage on macOS and Windows when reading from the clipboard can be eliminated by making owocr use native OS polling. This requires installing pyobjc on macOS (`pip install pyobjc`) and pywin32 on Windows (`pip install pywin32`)
- a config file (to be created in `user directory/.config/owocr_config.ini`, on Windows `user directory` is the `C:\Users\yourusername` folder) can be used to limit providers (to reduce clutter/memory usage) as well as specifying provider settings such as api keys etc. A sample config file is provided [here](https://raw.githubusercontent.com/AuroraWright/owocr/master/owocr_config.ini) - a config file (to be created in `user directory/.config/owocr_config.ini`, on Windows `user directory` is the `C:\Users\yourusername` folder) can be used to limit providers (to reduce clutter/memory usage) as well as specifying provider settings such as api keys etc. A sample config file is provided [here](https://raw.githubusercontent.com/AuroraWright/owocr/master/owocr_config.ini)
# Acknowledgments # Acknowledgments

View File

@@ -22,6 +22,43 @@ from notifypy import Notify
import inspect import inspect
from owocr import * from owocr import *
try:
import win32gui
import win32api
import win32con
import win32clipboard
import ctypes
except ImportError:
pass
class WindowsClipboardThread(threading.Thread):
def __init__(self):
super().__init__()
self.daemon = True
def process_message(self, hwnd: int, msg: int, wparam: int, lparam: int):
WM_CLIPBOARDUPDATE = 0x031D
if msg == WM_CLIPBOARDUPDATE and not (paused or tmp_paused):
if win32clipboard.IsClipboardFormatAvailable(win32con.CF_BITMAP):
clipboard_event.set()
return 0
def create_window(self):
className = "ClipboardHook"
wc = win32gui.WNDCLASS()
wc.lpfnWndProc = self.process_message
wc.lpszClassName = className
wc.hInstance = win32api.GetModuleHandle(None)
class_atom = win32gui.RegisterClass(wc)
return win32gui.CreateWindow(class_atom, className, 0, 0, 0, 0, 0, 0, 0, wc.hInstance, None)
def run(self):
hwnd = self.create_window()
self.thread_id = win32api.GetCurrentThreadId()
ctypes.windll.user32.AddClipboardFormatListener(hwnd)
win32gui.PumpMessages()
class WebsocketServerThread(threading.Thread): class WebsocketServerThread(threading.Thread):
def __init__(self, port, read): def __init__(self, port, read):
@@ -73,46 +110,6 @@ class WebsocketServerThread(threading.Thread):
self.loop.close() self.loop.close()
def are_images_identical(img1, img2):
if None in (img1, img2):
return img1 == img2
img1 = np.array(img1)
img2 = np.array(img2)
return (img1.shape == img2.shape) and (img1 == img2).all()
def process_and_write_results(engine_instance, engine_color, img_or_path, write_to, notifications):
t0 = time.time()
text = engine_instance(img_or_path)
t1 = time.time()
logger.opt(ansi=True).info(f"Text recognized in {t1 - t0:0.03f}s using <{engine_color}>{engine_instance.readable_name}</{engine_color}>: {text}")
if notifications == True:
notification = Notify()
notification.application_name = 'owocr'
notification.title = 'Text recognized:'
notification.message = text
notification.send(block=False)
if write_to == 'websocket':
websocket_server_thread.send_text(text)
elif write_to == 'clipboard':
pyperclip.copy(text)
else:
write_to = Path(write_to)
if write_to.suffix != '.txt':
raise ValueError('write_to must be either "clipboard" or a path to a text file')
with write_to.open('a', encoding="utf-8") as f:
f.write(text + '\n')
def get_path_key(path):
return path, path.lstat().st_mtime
def getchar_thread(): def getchar_thread():
global user_input global user_input
if sys.platform == "win32": if sys.platform == "win32":
@@ -157,6 +154,46 @@ def on_key_release(key):
first_pressed = None first_pressed = None
def are_images_identical(img1, img2):
if None in (img1, img2):
return img1 == img2
img1 = np.array(img1)
img2 = np.array(img2)
return (img1.shape == img2.shape) and (img1 == img2).all()
def process_and_write_results(engine_instance, engine_color, img_or_path, write_to, notifications):
t0 = time.time()
text = engine_instance(img_or_path)
t1 = time.time()
logger.opt(ansi=True).info(f"Text recognized in {t1 - t0:0.03f}s using <{engine_color}>{engine_instance.readable_name}</{engine_color}>: {text}")
if notifications == True:
notification = Notify()
notification.application_name = 'owocr'
notification.title = 'Text recognized:'
notification.message = text
notification.send(block=False)
if write_to == 'websocket':
websocket_server_thread.send_text(text)
elif write_to == 'clipboard':
pyperclip.copy(text)
else:
write_to = Path(write_to)
if write_to.suffix != '.txt':
raise ValueError('write_to must be either "clipboard" or a path to a text file')
with write_to.open('a', encoding="utf-8") as f:
f.write(text + '\n')
def get_path_key(path):
return path, path.lstat().st_mtime
def run(read_from='clipboard', def run(read_from='clipboard',
write_to='clipboard', write_to='clipboard',
engine='', engine='',
@@ -293,17 +330,23 @@ def run(read_from='clipboard',
logger.opt(ansi=True).info(f"Reading from websocket using <{engine_color}>{engine_instances[engine_index].readable_name}</{engine_color}>{' (paused)' if paused else ''}") logger.opt(ansi=True).info(f"Reading from websocket using <{engine_color}>{engine_instances[engine_index].readable_name}</{engine_color}>{' (paused)' if paused else ''}")
elif read_from == 'clipboard': elif read_from == 'clipboard':
from PIL import ImageGrab from PIL import ImageGrab
mac_clipboard_polling = False
windows_clipboard_polling = False
img = None img = None
logger.opt(ansi=True).info(f"Reading from clipboard using <{engine_color}>{engine_instances[engine_index].readable_name}</{engine_color}>{' (paused)' if paused else ''}") logger.opt(ansi=True).info(f"Reading from clipboard using <{engine_color}>{engine_instances[engine_index].readable_name}</{engine_color}>{' (paused)' if paused else ''}")
if sys.platform == "darwin" and 'objc' in sys.modules: if sys.platform == 'darwin' and 'objc' in sys.modules:
from AppKit import NSPasteboard, NSPasteboardTypePNG, NSPasteboardTypeTIFF from AppKit import NSPasteboard, NSPasteboardTypePNG, NSPasteboardTypeTIFF
pasteboard = NSPasteboard.generalPasteboard() pasteboard = NSPasteboard.generalPasteboard()
count = pasteboard.changeCount() count = pasteboard.changeCount()
mac_clipboard_polling = True mac_clipboard_polling = True
else: elif sys.platform == 'win32' and 'win32gui' in sys.modules:
mac_clipboard_polling = False global clipboard_event
clipboard_event = threading.Event()
windows_clipboard_thread = WindowsClipboardThread()
windows_clipboard_thread.start()
windows_clipboard_polling = True
else: else:
allowed_extensions = (".png", ".jpg", ".jpeg", ".bmp", ".gif", ".webp") allowed_extensions = (".png", ".jpg", ".jpeg", ".bmp", ".gif", ".webp")
read_from = Path(read_from) read_from = Path(read_from)
@@ -323,6 +366,9 @@ def run(read_from='clipboard',
if read_from == 'websocket' or write_to == 'websocket': if read_from == 'websocket' or write_to == 'websocket':
websocket_server_thread.stop_server() websocket_server_thread.stop_server()
websocket_server_thread.join() websocket_server_thread.join()
if read_from == 'clipboard' and windows_clipboard_polling:
win32api.PostThreadMessage(windows_clipboard_thread.thread_id, win32con.WM_QUIT, 0, 0)
windows_clipboard_thread.join()
user_input_thread.join() user_input_thread.join()
tmp_paused_listener.stop() tmp_paused_listener.stop()
logger.info('Terminated!') logger.info('Terminated!')
@@ -362,13 +408,16 @@ def run(read_from='clipboard',
img = Image.open(io.BytesIO(item)) img = Image.open(io.BytesIO(item))
process_and_write_results(engine_instances[engine_index], engine_color, img, write_to, notifications) process_and_write_results(engine_instances[engine_index], engine_color, img, write_to, notifications)
elif read_from == 'clipboard': elif read_from == 'clipboard':
if not paused and not tmp_paused: if windows_clipboard_polling:
if mac_clipboard_polling: changed = clipboard_event.wait(delay_secs)
if changed:
clipboard_event.clear()
elif mac_clipboard_polling and not (paused or tmp_paused):
old_count = count old_count = count
count = pasteboard.changeCount() count = pasteboard.changeCount()
changed = not just_unpaused and count != old_count and any(x in pasteboard.types() for x in [NSPasteboardTypePNG, NSPasteboardTypeTIFF]) changed = not just_unpaused and count != old_count and any(x in pasteboard.types() for x in [NSPasteboardTypePNG, NSPasteboardTypeTIFF])
else: else:
changed = True changed = not (paused or tmp_paused)
if changed: if changed:
old_img = img old_img = img
@@ -388,10 +437,11 @@ def run(read_from='clipboard',
if not just_unpaused and (ignore_flag or pyperclip.paste() != '*ocr_ignore*') and isinstance(img, Image.Image) and not are_images_identical(img, old_img): if not just_unpaused and (ignore_flag or pyperclip.paste() != '*ocr_ignore*') and isinstance(img, Image.Image) and not are_images_identical(img, old_img):
process_and_write_results(engine_instances[engine_index], engine_color, img, write_to, notifications) process_and_write_results(engine_instances[engine_index], engine_color, img, write_to, notifications)
if not windows_clipboard_polling:
time.sleep(delay_secs)
if just_unpaused: if just_unpaused:
just_unpaused = False just_unpaused = False
time.sleep(delay_secs)
else: else:
for path in read_from.iterdir(): for path in read_from.iterdir():
if str(path).lower().endswith(allowed_extensions): if str(path).lower().endswith(allowed_extensions):