diff --git a/owocr/config.py b/owocr/config.py index c99a4d6..2d97569 100644 --- a/owocr/config.py +++ b/owocr/config.py @@ -48,8 +48,10 @@ parser.add_argument('-sw', '--screen_capture_only_active_windows', type=str2bool help="When reading with screen capture and screen_capture_area is a window name, only target the window while it's active.") parser.add_argument('-sc', '--screen_capture_combo', type=str, default=argparse.SUPPRESS, help='When reading with screen capture, combo to wait on for taking a screenshot instead of using the delay. As an example: "++s". The list of keys can be found here: https://pynput.readthedocs.io/en/latest/keyboard.html#pynput.keyboard.Key') -parser.add.argument('-l', '--language', type=str, default=argparse.SUPPRESS, +parser.add_argument('-l', '--language', type=str, default=argparse.SUPPRESS, help='Two letter language code for filtering screencapture OCR results. Ex. "ja" for Japanese, "zh" for Chinese, "ko" for Korean, "ar" for Arabic, "ru" for Russian, "el" for Greek, "he" for Hebrew, "th" for Thai. Any other value will use Latin Extended (for most European languages and English).') +parser.add_argument('-of', '--output_format', type=str, default=argparse.SUPPRESS, choices=['text', 'json'], + help='The output format for OCR results. Can be "text" (default) or "json" (to include coordinates).') class Config: has_config = False @@ -79,7 +81,8 @@ class Config: 'screen_capture_only_active_windows': True, 'screen_capture_combo': '', 'screen_capture_old_macos_api': False, - 'language': 'ja' + 'language': 'ja', + 'output_format': 'text' } def __parse(self, value): diff --git a/owocr/ocr.py b/owocr/ocr.py index ab0e8fd..71f5c3c 100644 --- a/owocr/ocr.py +++ b/owocr/ocr.py @@ -5,10 +5,12 @@ from pathlib import Path import sys import platform import logging -from math import sqrt +from math import sqrt, sin, cos, atan2 import json import base64 from urllib.parse import urlparse, parse_qs +from dataclasses import dataclass, field, asdict +from typing import List, Optional import jaconv import numpy as np @@ -83,6 +85,50 @@ try: except: optimized_png_encode = False +@dataclass +class BoundingBox: + """ + Represents the normalized coordinates of a detected element. + All values are floats between 0.0 and 1.0. + """ + center_x: float + center_y: float + width: float + height: float + rotation_z: Optional[float] = None # Optional rotation in radians + +@dataclass +class Word: + """Represents a single recognized word and its properties.""" + text: str + bounding_box: BoundingBox + separator: Optional[str] = None # The character(s) that follow the word, e.g., a space + +@dataclass +class Line: + """Represents a single line of text, composed of words.""" + bounding_box: BoundingBox + words: List[Word] = field(default_factory=list) + +@dataclass +class Paragraph: + """Represents a block of text, composed of lines.""" + bounding_box: BoundingBox + lines: List[Line] = field(default_factory=list) + writing_direction: Optional[str] = None # Optional: e.g., "LEFT_TO_RIGHT" + +@dataclass +class ImageProperties: + """Stores the original dimensions of the processed image.""" + width: int + height: int + +@dataclass +class OcrResult: + """The root object for a complete OCR analysis of an image.""" + image_properties: ImageProperties + paragraphs: List[Paragraph] = field(default_factory=list) + def empty_post_process(text): return text @@ -161,6 +207,7 @@ class MangaOcr: readable_name = 'Manga OCR' key = 'm' available = False + coordinate_support = False def __init__(self, config={'pretrained_model_name_or_path':'kha-white/manga-ocr-base','force_cpu': False}): if 'manga_ocr' not in sys.modules: @@ -191,6 +238,7 @@ class GoogleVision: readable_name = 'Google Vision' key = 'g' available = False + coordinate_support = False def __init__(self): if 'google.cloud' not in sys.modules: @@ -235,6 +283,7 @@ class GoogleLens: readable_name = 'Google Lens' key = 'l' available = False + coordinate_support = True def __init__(self): if 'betterproto' not in sys.modules: @@ -243,6 +292,62 @@ class GoogleLens: self.available = True logger.info('Google Lens ready') + def _to_generic_result(self, response, img_width, img_height): + paragraphs = [] + if 'objects_response' in response and 'text' in response['objects_response']: + text_data = response['objects_response']['text'] + if 'text_layout' in text_data: + for p in text_data['text_layout'].get('paragraphs', []): + lines = [] + for l in p.get('lines', []): + words = [] + for w in l.get('words', []): + w_bbox = w.get('geometry', {}).get('bounding_box', {}) + word = Word( + text=w.get('plain_text', ''), + separator=w.get('text_separator'), + bounding_box=BoundingBox( + center_x=w_bbox.get('center_x'), + center_y=w_bbox.get('center_y'), + width=w_bbox.get('width'), + height=w_bbox.get('height'), + rotation_z=w_bbox.get('rotation_z') + ) + ) + words.append(word) + + l_bbox = l.get('geometry', {}).get('bounding_box', {}) + line = Line( + bounding_box=BoundingBox( + center_x=l_bbox.get('center_x'), + center_y=l_bbox.get('center_y'), + width=l_bbox.get('width'), + height=l_bbox.get('height'), + rotation_z=l_bbox.get('rotation_z') + ), + words=words + ) + lines.append(line) + + p_bbox = p.get('geometry', {}).get('bounding_box', {}) + paragraph = Paragraph( + bounding_box=BoundingBox( + center_x=p_bbox.get('center_x'), + center_y=p_bbox.get('center_y'), + width=p_bbox.get('width'), + height=p_bbox.get('height'), + rotation_z=p_bbox.get('rotation_z') + ), + lines=lines, + writing_direction=p.get('writing_direction') + ) + paragraphs.append(paragraph) + + return OcrResult( + image_properties=ImageProperties(width=img_width, height=img_height), + paragraphs=paragraphs + ) + def __call__(self, img): img, is_path = input_to_pil_image(img) if not img: @@ -272,7 +377,7 @@ class GoogleLens: image_data = self._preprocess(img) request.objects_request.image_data.payload.image_bytes = image_data[0] request.objects_request.image_data.image_metadata.width = image_data[1] - request.objects_request.image_data.image_metadata.height = image_data[2] + request.objects_request.image_data.image_metadata.height = image_data[2] payload = request.SerializeToString() @@ -302,17 +407,8 @@ class GoogleLens: response_proto = LensOverlayServerResponse().FromString(res.content) response_dict = response_proto.to_dict(betterproto.Casing.SNAKE) - res = '' - text = response_dict['objects_response']['text'] - if 'text_layout' in text: - paragraphs = text['text_layout']['paragraphs'] - for paragraph in paragraphs: - for line in paragraph['lines']: - for word in line['words']: - res += word['plain_text'] + word['text_separator'] - res += '\n' - - x = (True, res) + ocr_result = self._to_generic_result(response_dict, img.width, img.height) + x = (True, ocr_result) if is_path: img.close() @@ -332,6 +428,7 @@ class GoogleLensWeb: readable_name = 'Google Lens (web)' key = 'k' available = False + coordinate_support = False def __init__(self): if 'pyjson5' not in sys.modules: @@ -427,12 +524,76 @@ class Bing: readable_name = 'Bing' key = 'b' available = False + coordinate_support = True def __init__(self): self.requests_session = requests.Session() self.available = True logger.info('Bing ready') + def _quad_to_center_bbox(self, quad): + center_x = (quad['topLeft']['x'] + quad['topRight']['x'] + quad['bottomRight']['x'] + quad['bottomLeft']['x']) / 4 + center_y = (quad['topLeft']['y'] + quad['topRight']['y'] + quad['bottomRight']['y'] + quad['bottomLeft']['y']) / 4 + + width1 = sqrt((quad['topRight']['x'] - quad['topLeft']['x'])**2 + (quad['topRight']['y'] - quad['topLeft']['y'])**2) + width2 = sqrt((quad['bottomRight']['x'] - quad['bottomLeft']['x'])**2 + (quad['bottomRight']['y'] - quad['bottomLeft']['y'])**2) + avg_width = (width1 + width2) / 2 + + height1 = sqrt((quad['bottomLeft']['x'] - quad['topLeft']['x'])**2 + (quad['bottomLeft']['y'] - quad['topLeft']['y'])**2) + height2 = sqrt((quad['bottomRight']['x'] - quad['topRight']['x'])**2 + (quad['bottomRight']['y'] - quad['topRight']['y'])**2) + avg_height = (height1 + height2) / 2 + + return BoundingBox(center_x=center_x, center_y=center_y, width=avg_width, height=avg_height) + + def _to_generic_result(self, response, img_width, img_height): + paragraphs = [] + text_tag = None + for tag in response.get('tags', []): + if tag.get('displayName') == '##TextRecognition': + text_tag = tag + break + + if text_tag: + text_action = None + for action in text_tag.get('actions', []): + if action.get('_type') == 'ImageKnowledge/TextRecognitionAction': + text_action = action + break + + if text_action: + for p in text_action.get('data', {}).get('regions', []): + lines = [] + for l in p.get('lines', []): + words = [] + for w in l.get('words', []): + word = Word( + text=w.get('text', ''), + bounding_box=self._quad_to_center_bbox(w['boundingBox']), + separator=" " + ) + words.append(word) + + line = Line( + bounding_box=self._quad_to_center_bbox(l['boundingBox']), + words=words + ) + lines.append(line) + + # Bing doesn't provide paragraph-level separators, so we add a newline + if lines and lines[-1].words: + lines[-1].words[-1].separator = '\n' + + paragraph = Paragraph( + bounding_box=self._quad_to_center_bbox(p['boundingBox']), + lines=lines + ) + paragraphs.append(paragraph) + + return OcrResult( + image_properties=ImageProperties(width=img_width, height=img_height), + paragraphs=paragraphs + ) + def __call__(self, img): img, is_path = input_to_pil_image(img) if not img: @@ -510,26 +671,9 @@ class Bing: return (False, 'Unknown error!') data = res.json() - - res = '' - text_tag = None - for tag in data['tags']: - if tag.get('displayName') == '##TextRecognition': - text_tag = tag - break - if text_tag: - text_action = None - for action in text_tag['actions']: - if action.get('_type') == 'ImageKnowledge/TextRecognitionAction': - text_action = action - break - if text_action: - regions = text_action['data'].get('regions', []) - for region in regions: - for line in region.get('lines', []): - res += line['text'] + '\n' - x = (True, res) + ocr_result = self._to_generic_result(data, img.width, img.height) + x = (True, ocr_result) if is_path: img.close() @@ -558,6 +702,7 @@ class AppleVision: readable_name = 'Apple Vision' key = 'a' available = False + coordinate_support = False def __init__(self): if sys.platform != 'darwin': @@ -607,6 +752,7 @@ class AppleLiveText: readable_name = 'Apple Live Text' key = 'd' available = False + coordinate_support = False def __init__(self): if sys.platform != 'darwin': @@ -687,6 +833,7 @@ class WinRTOCR: readable_name = 'WinRT OCR' key = 'w' available = False + coordinate_support = False def __init__(self, config={}): if sys.platform == 'win32': @@ -740,6 +887,7 @@ class OneOCR: readable_name = 'OneOCR' key = 'z' available = False + coordinate_support = True def __init__(self, config={}): if sys.platform == 'win32': @@ -763,6 +911,67 @@ class OneOCR: except: logger.warning('Error reading URL from config, OneOCR will not work!') + def _pixel_quad_to_center_bbox(self, rect, img_width, img_height): + x_coords = [rect['x1'], rect['x2'], rect['x3'], rect['x4']] + y_coords = [rect['y1'], rect['y2'], rect['y3'], rect['y4']] + + center_x_px = sum(x_coords) / 4 + center_y_px = sum(y_coords) / 4 + + width_px = (abs(rect['x2'] - rect['x1']) + abs(rect['x3'] - rect['x4'])) / 2 + height_px = (abs(rect['y4'] - rect['y1']) + abs(rect['y3'] - rect['y2'])) / 2 + + return BoundingBox( + center_x=center_x_px / img_width, + center_y=center_y_px / img_height, + width=width_px / img_width, + height=height_px / img_height + ) + + def _to_generic_result(self, response, img_width, img_height): + lines = [] + for l in response.get('lines', []): + words = [] + for i, w in enumerate(l.get('words', [])): + separator = " " if i < len(l.get('words', [])) - 1 else None + word = Word( + text=w.get('text', ''), + separator=separator, + bounding_box=self._pixel_quad_to_center_bbox(w['bounding_rect'], img_width, img_height) + ) + words.append(word) + + line = Line( + bounding_box=self._pixel_quad_to_center_bbox(l['bounding_rect'], img_width, img_height), + words=words + ) + lines.append(line) + + # Create a single paragraph to hold all lines + if lines: + # Approximate paragraph bbox by combining all line bboxes + all_line_bboxes = [l.bounding_box for l in lines] + min_x = min(b.center_x - b.width / 2 for b in all_line_bboxes) + max_x = max(b.center_x + b.width / 2 for b in all_line_bboxes) + min_y = min(b.center_y - b.height / 2 for b in all_line_bboxes) + max_y = max(b.center_y + b.height / 2 for b in all_line_bboxes) + + p_bbox = BoundingBox( + center_x=(min_x + max_x) / 2, + center_y=(min_y + max_y) / 2, + width=max_x - min_x, + height=max_y - min_y + ) + paragraph = Paragraph(bounding_box=p_bbox, lines=lines) + paragraphs = [paragraph] + else: + paragraphs = [] + + return OcrResult( + image_properties=ImageProperties(width=img_width, height=img_height), + paragraphs=paragraphs + ) + def __call__(self, img): img, is_path = input_to_pil_image(img) if not img: @@ -770,7 +979,7 @@ class OneOCR: if sys.platform == 'win32': try: - res = self.model.recognize_pil(img)['text'] + raw_res = self.model.recognize_pil(img) except RuntimeError as e: return (False, e) else: @@ -784,9 +993,10 @@ class OneOCR: if res.status_code != 200: return (False, 'Unknown error!') - res = res.json()['text'] + raw_res = res.json() - x = (True, res) + ocr_response = self._to_generic_result(raw_res, img.width, img.height) + x = (True, ocr_response) if is_path: img.close() @@ -800,6 +1010,7 @@ class AzureImageAnalysis: readable_name = 'Azure Image Analysis' key = 'v' available = False + coordinate_support = False def __init__(self, config={}): if 'azure.ai.vision.imageanalysis' not in sys.modules: @@ -853,6 +1064,7 @@ class EasyOCR: readable_name = 'EasyOCR' key = 'e' available = False + coordinate_support = False def __init__(self, config={'gpu': True}): if 'easyocr' not in sys.modules: @@ -888,6 +1100,7 @@ class RapidOCR: readable_name = 'RapidOCR' key = 'r' available = False + coordinate_support = False def __init__(self): if 'rapidocr_onnxruntime' not in sys.modules: @@ -936,6 +1149,7 @@ class OCRSpace: readable_name = 'OCRSpace' key = 'o' available = False + coordinate_support = False def __init__(self, config={}): try: diff --git a/owocr/run.py b/owocr/run.py index cf003c1..0b65031 100644 --- a/owocr/run.py +++ b/owocr/run.py @@ -8,6 +8,9 @@ import io import re import logging import inspect +import os +import json +from dataclasses import asdict import numpy as np import pyperclipfix @@ -811,32 +814,70 @@ def process_and_write_results(img_or_path, last_result, filtering, notify): engine_instance = engine_instances[engine_index] start_time = time.time() - res, text = engine_instance(img_or_path) + res, result_data = engine_instance(img_or_path) end_time = time.time() orig_text = [] engine_color = config.get_general('engine_color') - if res: + if not res: + logger.opt(ansi=True).info(f'<{engine_color}>{engine_instance.readable_name} reported an error after {end_time - start_time:0.03f}s: {result_data}') + return orig_text + + output_format = config.get_general('output_format') + output_string = '' + log_message = '' + + # Check if the engine returned a structured OcrResult object + if isinstance(result_data, OcrResult): + # Assemble full text for logging/notifications + full_text_parts = [] + for p in result_data.paragraphs: + for l in p.lines: + for w in l.words: + full_text_parts.append(w.text) + if w.separator: + full_text_parts.append(w.separator) + unprocessed_text = "".join(full_text_parts) + + if output_format == 'json': + result_dict = asdict(result_data) + output_string = json.dumps(result_dict, ensure_ascii=False) + log_message = post_process(unprocessed_text) + else: # 'text' format + if filtering: + text_to_process, orig_text = filtering(unprocessed_text, last_result) + output_string = post_process(text_to_process) + else: + output_string = post_process(unprocessed_text) + log_message = output_string + else: # Handle engines that return a simple string for result_data + if output_format == 'json': + logger.warning(f"Engine '{engine_instance.name}' does not support JSON output. Falling back to text.") + unprocessed_text = result_data if filtering: - text, orig_text = filtering(text, last_result) - text = post_process(text) - logger.opt(ansi=True).info(f'Text recognized in {end_time - start_time:0.03f}s using <{engine_color}>{engine_instance.readable_name}: {text}') - if notify and config.get_general('notifications'): - notifier.send(title='owocr', message='Text recognized: ' + text, urgency=get_notification_urgency()) - - write_to = config.get_general('write_to') - if write_to == 'websocket': - websocket_server_thread.send_text(text) - elif write_to == 'clipboard': - pyperclipfix.copy(text) + text_to_process, orig_text = filtering(unprocessed_text, last_result) + output_string = post_process(text_to_process) else: - with Path(write_to).open('a', encoding='utf-8') as f: - f.write(text + '\n') + output_string = post_process(unprocessed_text) + log_message = output_string - if auto_pause_handler and not paused and not filtering: - auto_pause_handler.start() + logger.opt(ansi=True).info(f'Text recognized in {end_time - start_time:0.03f}s using <{engine_color}>{engine_instance.readable_name}: {log_message}') + + if notify and config.get_general('notifications'): + notifier.send(title='owocr', message='Text recognized: ' + log_message, urgency=get_notification_urgency()) + + # Write the final formatted string to the destination + write_to = config.get_general('write_to') + if write_to == 'websocket': + websocket_server_thread.send_text(output_string) + elif write_to == 'clipboard': + pyperclipfix.copy(output_string) else: - logger.opt(ansi=True).info(f'<{engine_color}>{engine_instance.readable_name} reported an error after {end_time - start_time:0.03f}s: {text}') + with Path(write_to).open('a', encoding='utf-8') as f: + f.write(output_string + '\n') + + if auto_pause_handler and not paused and not filtering: + auto_pause_handler.start() return orig_text @@ -862,7 +903,7 @@ def run(): for config_engine in config.get_general('engines').split(','): config_engines.append(config_engine.strip().lower()) - for _,engine_class in sorted(inspect.getmembers(sys.modules[__name__], lambda x: hasattr(x, '__module__') and x.__module__ and __package__ + '.ocr' in x.__module__ and inspect.isclass(x))): + for _,engine_class in sorted(inspect.getmembers(sys.modules[__name__], lambda x: hasattr(x, '__module__') and x.__module__ and __package__ + '.ocr' in x.__module__ and inspect.isclass(x) and hasattr(x, 'name'))): if len(config_engines) == 0 or engine_class.name in config_engines: if config.get_engine(engine_class.name) == None: engine_instance = engine_class() @@ -897,6 +938,7 @@ def run(): paused = config.get_general('pause_at_startup') auto_pause = config.get_general('auto_pause') language = config.get_general('language') + output_format = config.get_general('output_format') clipboard_thread = None websocket_server_thread = None screenshot_thread = None @@ -987,6 +1029,14 @@ def run(): auto_pause_handler = AutopauseTimer(auto_pause) user_input_thread = threading.Thread(target=user_input_thread_run, daemon=True) user_input_thread.start() + + # if json is selected check if engine is compatible + if output_format == 'json' and not engine_instances[engine_index].coordinate_support: + supported_engines = (engine.name for engine in engine_instances if engine.coordinate_support) + logger.error(f"The selected engine '{engine_instances[engine_index].name}' does not support coordinate output.") + logger.error(f"Please choose one of: {', '.join(supported_engines)}") + sys.exit(1) + logger.opt(ansi=True).info(f"Reading from {' and '.join(read_from_readable)}, writing to {write_to_readable} using <{engine_color}>{engine_instances[engine_index].readable_name}{' (paused)' if paused else ''}") while not terminated: