Implement combos to switch/pause from other windows, update version

This commit is contained in:
AuroraWright
2024-02-08 19:42:36 +01:00
parent d415225b42
commit 26a65563f4
4 changed files with 84 additions and 49 deletions

View File

@@ -158,38 +158,51 @@ class TextFiltering:
return text, orig_text
def user_input_thread_run(engine_instances, engine_keys):
def _terminate_handler(user_input):
def pause_handler(is_combo=True):
global paused
global just_unpaused
if paused:
message = 'Unpaused!'
just_unpaused = True
else:
message = 'Paused!'
if is_combo:
notification.title = message
notification.message = ''
notification.send(block=False)
logger.info(message)
paused = not paused
def engine_change_handler(user_input='s', is_combo=True):
global engine_index
old_engine_index = engine_index
if user_input.lower() == 's':
if engine_index == len(engine_keys) - 1:
engine_index = 0
else:
engine_index += 1
elif user_input.lower() != '' and user_input.lower() in engine_keys:
engine_index = engine_keys.index(user_input.lower())
if engine_index != old_engine_index:
new_engine_name = engine_instances[engine_index].readable_name
if is_combo:
notification.title = f'Switched to {new_engine_name}'
notification.message = ''
notification.send(block=False)
engine_color = config.get_general('engine_color')
logger.opt(ansi=True).info(f'Switched to <{engine_color}>{new_engine_name}</{engine_color}>!')
def user_input_thread_run():
def _terminate_handler():
global terminated
logger.info('Terminated!')
terminated = True
def _pause_handler(user_input):
global paused
global just_unpaused
if paused:
logger.info('Unpaused!')
just_unpaused = True
else:
logger.info('Paused!')
paused = not paused
def _engine_change_handler(user_input):
global engine_index
old_engine_index = engine_index
if user_input.lower() == 's':
if engine_index == len(engine_keys) - 1:
engine_index = 0
else:
engine_index += 1
elif user_input.lower() != '' and user_input.lower() in engine_keys:
engine_index = engine_keys.index(user_input.lower())
if engine_index != old_engine_index:
engine_color = config.get_general('engine_color')
logger.opt(ansi=True).info(f'Switched to <{engine_color}>{engine_instances[engine_index].readable_name}</{engine_color}>!')
if sys.platform == 'win32':
import msvcrt
while not terminated:
@@ -197,11 +210,11 @@ def user_input_thread_run(engine_instances, engine_keys):
try:
user_input = user_input_bytes.decode()
if user_input.lower() in 'tq':
_terminate_handler(user_input)
_terminate_handler()
elif user_input.lower() == 'p':
_pause_handler(user_input)
pause_handler(False)
else:
_engine_change_handler(user_input)
engine_change_handler(user_input, False)
except UnicodeDecodeError:
pass
else:
@@ -213,11 +226,11 @@ def user_input_thread_run(engine_instances, engine_keys):
while not terminated:
user_input = sys.stdin.read(1)
if user_input.lower() in 'tq':
_terminate_handler(user_input)
_terminate_handler()
elif user_input.lower() == 'p':
_pause_handler(user_input)
pause_handler(False)
else:
_engine_change_handler(user_input)
engine_change_handler(user_input, False)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
@@ -290,7 +303,8 @@ def are_images_identical(img1, img2):
return (img1.shape == img2.shape) and (img1 == img2).all()
def process_and_write_results(engine_instance, img_or_path, write_to, enable_filtering, last_text, filtering):
def process_and_write_results(img_or_path, write_to, notifications, enable_filtering, last_text, filtering):
engine_instance = engine_instances[engine_index]
t0 = time.time()
res, text = engine_instance(img_or_path)
t1 = time.time()
@@ -302,9 +316,7 @@ def process_and_write_results(engine_instance, img_or_path, write_to, enable_fil
text, orig_text = filtering(text, last_text)
text = post_process(text)
logger.opt(ansi=True).info(f'Text recognized in {t1 - t0:0.03f}s using <{engine_color}>{engine_instance.readable_name}</{engine_color}>: {text}')
if config.get_general('notifications'):
notification = Notify()
notification.application_name = 'owocr'
if notifications:
notification.title = 'Text recognized:'
notification.message = text
notification.send(block=False)
@@ -342,6 +354,8 @@ def run(read_from=None,
ignore_flag=None,
delete_images=None,
notifications=None,
combo_pause=None,
combo_engine_switch=None,
screen_capture_monitor=None,
screen_capture_coords=None,
screen_capture_delay_secs=None,
@@ -362,6 +376,8 @@ def run(read_from=None,
:param ignore_flag: Process flagged clipboard images (images that are copied to the clipboard with the *ocr_ignore* string).
:param delete_images: Delete image files after processing when reading from a directory.
:param notifications: Show an operating system notification with the detected text.
:param combo_pause: Specifies a combo to wait on for pausing the program. As an example: "<ctrl>+<shift>+p". To be used with combo_engine_switch. The list of keys can be found here: https://pynput.readthedocs.io/en/latest/keyboard.html#pynput.keyboard.Key
:param combo_engine_switch: Specifies a combo to wait on for switching the OCR engine. As an example: "<ctrl>+<shift>+a". To be used with combo_pause. The list of keys can be found here: https://pynput.readthedocs.io/en/latest/keyboard.html#pynput.keyboard.Key
:param screen_capture_monitor: Specifies monitor to target when reading with screen capture.
:param screen_capture_coords: Specifies area to target when reading with screen capture. Can be either empty (whole screen), a set of coordinates (x,y,width,height) or a window name (the first matching window title will be used).
:param screen_capture_delay_secs: Specifies the delay (in seconds) between screenshots when reading with screen capture.
@@ -381,6 +397,8 @@ def run(read_from=None,
if config.downloaded_config:
logger.info(f'A default config file has been downloaded to {config.config_path}')
global engine_instances
global engine_keys
engine_instances = []
config_engines = []
engine_keys = []
@@ -413,6 +431,7 @@ def run(read_from=None,
global tmp_paused
global just_unpaused
global first_pressed
global notification
terminated = False
paused = pause_at_startup
just_unpaused = True
@@ -422,8 +441,10 @@ def run(read_from=None,
engine_color = config.get_general('engine_color')
delay_secs = config.get_general('delay_secs')
screen_capture_on_combo = False
notification = Notify()
notification.application_name = 'owocr'
user_input_thread = threading.Thread(target=user_input_thread_run, args=(engine_instances, engine_keys), daemon=True)
user_input_thread = threading.Thread(target=user_input_thread_run, daemon=True)
user_input_thread.start()
if read_from == 'websocket' or write_to == 'websocket':
@@ -528,14 +549,22 @@ def run(read_from=None,
logger.opt(ansi=True).info(f"Reading from directory {read_from} using <{engine_color}>{engine_instances[engine_index].readable_name}</{engine_color}>{' (paused)' if paused else ''}")
key_combos = {}
if screen_capture_on_combo:
tmp_paused_listener = keyboard.GlobalHotKeys({
screen_capture_combo: on_screenshot_combo})
key_combos[screen_capture_combo] = on_screenshot_combo
if any(x != '' for x in [combo_pause, combo_engine_switch]):
if any(x == '' for x in [combo_pause, combo_engine_switch]):
raise ValueError('both combo_pause and combo_engine_switch must be specified')
key_combos[combo_pause] = pause_handler
key_combos[combo_engine_switch] = engine_change_handler
if len(key_combos) > 0:
key_combo_listener = keyboard.GlobalHotKeys(key_combos)
else:
tmp_paused_listener = keyboard.Listener(
key_combo_listener = keyboard.Listener(
on_press=on_key_press,
on_release=on_key_release)
tmp_paused_listener.start()
key_combo_listener.start()
signal.signal(signal.SIGINT, signal_handler)
while not terminated:
@@ -548,7 +577,7 @@ def run(read_from=None,
else:
if not paused and not tmp_paused:
img = Image.open(io.BytesIO(item))
process_and_write_results(engine_instances[engine_index], img, write_to, False, '', None)
process_and_write_results(img, write_to, notifications, False, '', None)
elif read_from == 'clipboard':
process_clipboard = False
if windows_clipboard_polling:
@@ -594,7 +623,7 @@ def run(read_from=None,
process_clipboard = True
if process_clipboard:
process_and_write_results(engine_instances[engine_index], img, write_to, False, '', None)
process_and_write_results(img, write_to, notifications, False, '', None)
just_unpaused = False
@@ -611,7 +640,7 @@ def run(read_from=None,
if take_screenshot and screencapture_window_visible:
sct_img = sct.grab(sct_params)
img = Image.frombytes('RGB', sct_img.size, sct_img.bgra, 'raw', 'BGRX')
res = process_and_write_results(engine_instances[engine_index], img, write_to, True, last_text, filtering)
res = process_and_write_results(img, write_to, notifications, True, last_text, filtering)
if res != '':
last_text = res
delay = screen_capture_delay_secs
@@ -634,7 +663,7 @@ def run(read_from=None,
except (UnidentifiedImageError, OSError) as e:
logger.warning(f'Error while reading file {path}: {e}')
else:
process_and_write_results(engine_instances[engine_index], img, write_to, False, '', None)
process_and_write_results(img, write_to, notifications, False, '', None)
img.close()
if delete_images:
Path.unlink(path)
@@ -649,4 +678,4 @@ def run(read_from=None,
windows_clipboard_thread.join()
elif read_from == 'screencapture' and screencapture_window_mode:
target_window.watchdog.stop()
tmp_paused_listener.stop()
key_combo_listener.stop()