From 0143a6d97c88ca52a90712c21f9f482e7f19ab8a Mon Sep 17 00:00:00 2001 From: AuroraWright Date: Sun, 5 Oct 2025 23:49:28 +0200 Subject: [PATCH] Refactor, fix spacing with some engines like OneOCR --- owocr/ocr.py | 14 ---- owocr/run.py | 216 +++++++++++++++++++++++++++------------------------ 2 files changed, 116 insertions(+), 114 deletions(-) diff --git a/owocr/ocr.py b/owocr/ocr.py index f8f5760..e744c4b 100644 --- a/owocr/ocr.py +++ b/owocr/ocr.py @@ -85,8 +85,6 @@ try: except: optimized_png_encode = False -cj_regex = re.compile(r'[\u3041-\u3096\u30A1-\u30FA\u4E00-\u9FFF]') - @dataclass class BoundingBox: @@ -136,18 +134,6 @@ class OcrResult: def empty_post_process(text): return text -def post_process(text): - is_cj_text = cj_regex.search(text) - if is_cj_text: - text = ' '.join([''.join(i.split()) for i in text.splitlines()]) - else: - text = ' '.join([re.sub(r'\s+', ' ', i).strip() for i in text.splitlines()]) - text = text.replace('…', '...') - text = re.sub('[・.]{2,}', lambda x: (x.end() - x.start()) * '.', text) - if is_cj_text: - text = jaconv.h2z(text, ascii=True, digit=True) - return text - def input_to_pil_image(img): is_path = False if isinstance(img, Image.Image): diff --git a/owocr/run.py b/owocr/run.py index f206b00..d4c2d0c 100644 --- a/owocr/run.py +++ b/owocr/run.py @@ -307,6 +307,7 @@ class TextFiltering: self.language = config.get_general('language') self.segmenter = Segmenter(language=self.language, clean=True) self.regex = self.get_regex() + self.last_result = ([], engine_index) try: from transformers import pipeline, AutoTokenizer @@ -353,7 +354,7 @@ class TextFiltering: return re.compile( r'[a-zA-Z\u00C0-\u00FF\u0100-\u017F\u0180-\u024F\u0250-\u02AF\u1D00-\u1D7F\u1D80-\u1DBF\u1E00-\u1EFF\u2C60-\u2C7F\uA720-\uA7FF\uAB30-\uAB6F]') - def __call__(self, text, last_result): + def __call__(self, text): orig_text = self.segmenter.segment(text) orig_text_filtered = [] for block in orig_text: @@ -364,8 +365,8 @@ class TextFiltering: else: orig_text_filtered.append(None) - if last_result[1] == engine_index: - last_text = last_result[0] + if self.last_result[1] == engine_index: + last_text = self.last_result[0] else: last_text = [] @@ -389,7 +390,9 @@ class TextFiltering: final_blocks.append(block) text = '\n'.join(final_blocks) - return text, orig_text_filtered + + self.last_result = (orig_text_filtered, engine_index) + return text class ScreenshotThread(threading.Thread): @@ -717,6 +720,107 @@ class AutopauseTimer: pause_handler(True) +class OutputResult: + def __init__(self, init_filtering): + self.filtering = TextFiltering() if init_filtering else None + self.cj_regex = re.compile(r'[\u3041-\u3096\u30A1-\u30FA\u4E00-\u9FFF]') + + def _coordinate_format_to_string(self, result_data): + full_text_parts = [] + for p in result_data.paragraphs: + for l in p.lines: + for w in l.words: + full_text_parts.append(w.text) + if w.separator != None: + full_text_parts.append(w.separator) + else: + full_text_parts.append(' ') + full_text_parts.append('\n') + return "".join(full_text_parts) + + def _post_process(self, text): + is_cj_text = self.cj_regex.search(text) + if is_cj_text: + text = ' '.join([''.join(i.split()) for i in text.splitlines()]) + else: + text = ' '.join([re.sub(r'\s+', ' ', i).strip() for i in text.splitlines()]) + text = text.replace('…', '...') + text = re.sub('[・.]{2,}', lambda x: (x.end() - x.start()) * '.', text) + if is_cj_text: + text = jaconv.h2z(text, ascii=True, digit=True) + return text + + def __call__(self, img_or_path, filter_text, notify): + if auto_pause_handler and not filter_text: + auto_pause_handler.stop() + + engine_instance = engine_instances[engine_index] + start_time = time.time() + res, result_data = engine_instance(img_or_path) + end_time = time.time() + + orig_text = [] + engine_color = config.get_general('engine_color') + if not res: + logger.opt(ansi=True).info(f'<{engine_color}>{engine_instance.readable_name} reported an error after {end_time - start_time:0.03f}s: {result_data}') + return orig_text + + output_format = config.get_general('output_format') + verbosity = config.get_general('verbosity') + output_string = '' + log_message = '' + result_data_text = None + + # Check if the engine returned a structured OcrResult object + if isinstance(result_data, OcrResult): + unprocessed_text = self._coordinate_format_to_string(result_data) + + if output_format == 'json': + result_dict = asdict(result_data) + output_string = json.dumps(result_dict, ensure_ascii=False) + log_message = self._post_process(unprocessed_text) + else: + result_data_text = unprocessed_text + else: + result_data_text = result_data + + if result_data_text: + if output_format == 'json': + logger.warning(f"Engine '{engine_instance.name}' does not support JSON output. Falling back to text.") + if filter_text: + text_to_process = self.filtering(result_data_text) + output_string = self._post_process(text_to_process) + else: + output_string = self._post_process(result_data_text) + log_message = output_string + + if verbosity != 0: + if verbosity < -1: + log_message_terminal = ': ' + log_message + elif verbosity == -1: + log_message_terminal = '' + else: + log_message_terminal = ': ' + (log_message if len(log_message) <= verbosity else log_message[:verbosity] + '[...]') + + logger.opt(ansi=True).info(f'Text recognized in {end_time - start_time:0.03f}s using <{engine_color}>{engine_instance.readable_name}{log_message_terminal}') + + if notify and config.get_general('notifications'): + notifier.send(title='owocr', message='Text recognized: ' + log_message, urgency=get_notification_urgency()) + + # Write the final formatted string to the destination + write_to = config.get_general('write_to') + if write_to == 'websocket': + websocket_server_thread.send_text(output_string) + elif write_to == 'clipboard': + pyperclipfix.copy(output_string) + else: + with Path(write_to).open('a', encoding='utf-8') as f: + f.write(output_string + '\n') + + if auto_pause_handler and not paused and not filter_text: + auto_pause_handler.start() + + def get_notification_urgency(): if sys.platform == 'win32': return Urgency.Low @@ -809,90 +913,6 @@ def on_screenshot_combo(): screenshot_event.set() -def process_and_write_results(img_or_path, last_result, filtering, notify): - if auto_pause_handler and not filtering: - auto_pause_handler.stop() - - engine_instance = engine_instances[engine_index] - start_time = time.time() - res, result_data = engine_instance(img_or_path) - end_time = time.time() - - orig_text = [] - engine_color = config.get_general('engine_color') - if not res: - logger.opt(ansi=True).info(f'<{engine_color}>{engine_instance.readable_name} reported an error after {end_time - start_time:0.03f}s: {result_data}') - return orig_text - - output_format = config.get_general('output_format') - verbosity = config.get_general('verbosity') - output_string = '' - log_message = '' - - # Check if the engine returned a structured OcrResult object - if isinstance(result_data, OcrResult): - # Assemble full text for logging/notifications - full_text_parts = [] - for p in result_data.paragraphs: - for l in p.lines: - for w in l.words: - full_text_parts.append(w.text) - if w.separator: - full_text_parts.append(w.separator) - full_text_parts.append('\n') - unprocessed_text = "".join(full_text_parts) - - if output_format == 'json': - result_dict = asdict(result_data) - output_string = json.dumps(result_dict, ensure_ascii=False) - log_message = post_process(unprocessed_text) - else: # 'text' format - if filtering: - text_to_process, orig_text = filtering(unprocessed_text, last_result) - output_string = post_process(text_to_process) - else: - output_string = post_process(unprocessed_text) - log_message = output_string - else: # Handle engines that return a simple string for result_data - if output_format == 'json': - logger.warning(f"Engine '{engine_instance.name}' does not support JSON output. Falling back to text.") - unprocessed_text = result_data - if filtering: - text_to_process, orig_text = filtering(unprocessed_text, last_result) - output_string = post_process(text_to_process) - else: - output_string = post_process(unprocessed_text) - log_message = output_string - - if verbosity != 0: - if verbosity < -1: - log_message_terminal = ': ' + log_message - elif verbosity == -1: - log_message_terminal = '' - else: - log_message_terminal = ': ' + (log_message if len(log_message) <= verbosity else log_message[:verbosity] + '[...]') - - logger.opt(ansi=True).info(f'Text recognized in {end_time - start_time:0.03f}s using <{engine_color}>{engine_instance.readable_name}{log_message_terminal}') - - if notify and config.get_general('notifications'): - notifier.send(title='owocr', message='Text recognized: ' + log_message, urgency=get_notification_urgency()) - - # Write the final formatted string to the destination - write_to = config.get_general('write_to') - if write_to == 'websocket': - websocket_server_thread.send_text(output_string) - elif write_to == 'clipboard': - pyperclipfix.copy(output_string) - else: - with Path(write_to).open('a', encoding='utf-8') as f: - f.write(output_string + '\n') - - if auto_pause_handler and not paused and not filtering: - auto_pause_handler.start() - - return orig_text - - def run(): logger.configure(handlers=[{'sink': sys.stderr, 'format': config.get_general('logger_format')}]) @@ -961,7 +981,7 @@ def run(): directory_watcher_thread = None unix_socket_server = None key_combo_listener = None - filtering = None + init_filtering = False auto_pause_handler = None engine_index = engine_keys.index(default_engine) if default_engine != '' else 0 engine_color = config.get_general('engine_color') @@ -988,7 +1008,6 @@ def run(): screen_capture_delay_secs = config.get_general('screen_capture_delay_secs') screen_capture_combo = config.get_general('screen_capture_combo') last_screenshot_time = 0 - last_result = ([], engine_index) if screen_capture_combo != '': screen_capture_on_combo = True key_combos[screen_capture_combo] = on_screenshot_combo @@ -998,7 +1017,7 @@ def run(): screenshot_event = threading.Event() screenshot_thread = ScreenshotThread(screen_capture_on_combo) screenshot_thread.start() - filtering = TextFiltering() + init_filtering = True read_from_readable.append('screen capture') if 'websocket' in (read_from, read_from_secondary): read_from_readable.append('websocket') @@ -1027,6 +1046,8 @@ def run(): directory_watcher_thread.start() read_from_readable.append(f'directory {read_from_path}') + output_result = OutputResult(init_filtering) + if len(key_combos) > 0: key_combo_listener = keyboard.GlobalHotKeys(key_combos) key_combo_listener.start() @@ -1058,11 +1079,11 @@ def run(): while not terminated: start_time = time.time() img = None - filter_img = False + filter_text = False if process_queue: try: - img, filter_img = image_queue.get(timeout=0.1) + img, filter_text = image_queue.get(timeout=0.1) notify = True except queue.Empty: pass @@ -1071,7 +1092,7 @@ def run(): if (not paused) and screenshot_thread.screencapture_window_active and screenshot_thread.screencapture_window_visible and (time.time() - last_screenshot_time) > screen_capture_delay_secs: screenshot_event.set() img = periodic_screenshot_queue.get() - filter_img = True + filter_text = True notify = False last_screenshot_time = time.time() @@ -1080,12 +1101,7 @@ def run(): terminated = True break elif img: - if filter_img: - res = process_and_write_results(img, last_result, filtering, notify) - if res: - last_result = (res, engine_index) - else: - process_and_write_results(img, None, None, notify) + output_result(img, filter_text, notify) if isinstance(img, Path): if delete_images: Path.unlink(img)