Merge pull request #30 from rtr46/master

add new json (text+coordinates) output format for bing, glens and one…
This commit is contained in:
Aurora
2025-09-19 09:26:25 +02:00
committed by GitHub
3 changed files with 323 additions and 56 deletions

View File

@@ -48,8 +48,10 @@ parser.add_argument('-sw', '--screen_capture_only_active_windows', type=str2bool
help="When reading with screen capture and screen_capture_area is a window name, only target the window while it's active.")
parser.add_argument('-sc', '--screen_capture_combo', type=str, default=argparse.SUPPRESS,
help='When reading with screen capture, combo to wait on for taking a screenshot instead of using the delay. As an example: "<ctrl>+<shift>+s". The list of keys can be found here: https://pynput.readthedocs.io/en/latest/keyboard.html#pynput.keyboard.Key')
parser.add.argument('-l', '--language', type=str, default=argparse.SUPPRESS,
parser.add_argument('-l', '--language', type=str, default=argparse.SUPPRESS,
help='Two letter language code for filtering screencapture OCR results. Ex. "ja" for Japanese, "zh" for Chinese, "ko" for Korean, "ar" for Arabic, "ru" for Russian, "el" for Greek, "he" for Hebrew, "th" for Thai. Any other value will use Latin Extended (for most European languages and English).')
parser.add_argument('-of', '--output_format', type=str, default=argparse.SUPPRESS, choices=['text', 'json'],
help='The output format for OCR results. Can be "text" (default) or "json" (to include coordinates).')
class Config:
has_config = False
@@ -79,7 +81,8 @@ class Config:
'screen_capture_only_active_windows': True,
'screen_capture_combo': '',
'screen_capture_old_macos_api': False,
'language': 'ja'
'language': 'ja',
'output_format': 'text'
}
def __parse(self, value):

View File

@@ -5,10 +5,12 @@ from pathlib import Path
import sys
import platform
import logging
from math import sqrt
from math import sqrt, sin, cos, atan2
import json
import base64
from urllib.parse import urlparse, parse_qs
from dataclasses import dataclass, field, asdict
from typing import List, Optional
import jaconv
import numpy as np
@@ -83,6 +85,50 @@ try:
except:
optimized_png_encode = False
@dataclass
class BoundingBox:
"""
Represents the normalized coordinates of a detected element.
All values are floats between 0.0 and 1.0.
"""
center_x: float
center_y: float
width: float
height: float
rotation_z: Optional[float] = None # Optional rotation in radians
@dataclass
class Word:
"""Represents a single recognized word and its properties."""
text: str
bounding_box: BoundingBox
separator: Optional[str] = None # The character(s) that follow the word, e.g., a space
@dataclass
class Line:
"""Represents a single line of text, composed of words."""
bounding_box: BoundingBox
words: List[Word] = field(default_factory=list)
@dataclass
class Paragraph:
"""Represents a block of text, composed of lines."""
bounding_box: BoundingBox
lines: List[Line] = field(default_factory=list)
writing_direction: Optional[str] = None # Optional: e.g., "LEFT_TO_RIGHT"
@dataclass
class ImageProperties:
"""Stores the original dimensions of the processed image."""
width: int
height: int
@dataclass
class OcrResult:
"""The root object for a complete OCR analysis of an image."""
image_properties: ImageProperties
paragraphs: List[Paragraph] = field(default_factory=list)
def empty_post_process(text):
return text
@@ -161,6 +207,7 @@ class MangaOcr:
readable_name = 'Manga OCR'
key = 'm'
available = False
coordinate_support = False
def __init__(self, config={'pretrained_model_name_or_path':'kha-white/manga-ocr-base','force_cpu': False}):
if 'manga_ocr' not in sys.modules:
@@ -191,6 +238,7 @@ class GoogleVision:
readable_name = 'Google Vision'
key = 'g'
available = False
coordinate_support = False
def __init__(self):
if 'google.cloud' not in sys.modules:
@@ -235,6 +283,7 @@ class GoogleLens:
readable_name = 'Google Lens'
key = 'l'
available = False
coordinate_support = True
def __init__(self):
if 'betterproto' not in sys.modules:
@@ -243,6 +292,62 @@ class GoogleLens:
self.available = True
logger.info('Google Lens ready')
def _to_generic_result(self, response, img_width, img_height):
paragraphs = []
if 'objects_response' in response and 'text' in response['objects_response']:
text_data = response['objects_response']['text']
if 'text_layout' in text_data:
for p in text_data['text_layout'].get('paragraphs', []):
lines = []
for l in p.get('lines', []):
words = []
for w in l.get('words', []):
w_bbox = w.get('geometry', {}).get('bounding_box', {})
word = Word(
text=w.get('plain_text', ''),
separator=w.get('text_separator'),
bounding_box=BoundingBox(
center_x=w_bbox.get('center_x'),
center_y=w_bbox.get('center_y'),
width=w_bbox.get('width'),
height=w_bbox.get('height'),
rotation_z=w_bbox.get('rotation_z')
)
)
words.append(word)
l_bbox = l.get('geometry', {}).get('bounding_box', {})
line = Line(
bounding_box=BoundingBox(
center_x=l_bbox.get('center_x'),
center_y=l_bbox.get('center_y'),
width=l_bbox.get('width'),
height=l_bbox.get('height'),
rotation_z=l_bbox.get('rotation_z')
),
words=words
)
lines.append(line)
p_bbox = p.get('geometry', {}).get('bounding_box', {})
paragraph = Paragraph(
bounding_box=BoundingBox(
center_x=p_bbox.get('center_x'),
center_y=p_bbox.get('center_y'),
width=p_bbox.get('width'),
height=p_bbox.get('height'),
rotation_z=p_bbox.get('rotation_z')
),
lines=lines,
writing_direction=p.get('writing_direction')
)
paragraphs.append(paragraph)
return OcrResult(
image_properties=ImageProperties(width=img_width, height=img_height),
paragraphs=paragraphs
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
@@ -272,7 +377,7 @@ class GoogleLens:
image_data = self._preprocess(img)
request.objects_request.image_data.payload.image_bytes = image_data[0]
request.objects_request.image_data.image_metadata.width = image_data[1]
request.objects_request.image_data.image_metadata.height = image_data[2]
request.objects_request.image_data.image_metadata.height = image_data[2]
payload = request.SerializeToString()
@@ -302,17 +407,8 @@ class GoogleLens:
response_proto = LensOverlayServerResponse().FromString(res.content)
response_dict = response_proto.to_dict(betterproto.Casing.SNAKE)
res = ''
text = response_dict['objects_response']['text']
if 'text_layout' in text:
paragraphs = text['text_layout']['paragraphs']
for paragraph in paragraphs:
for line in paragraph['lines']:
for word in line['words']:
res += word['plain_text'] + word['text_separator']
res += '\n'
x = (True, res)
ocr_result = self._to_generic_result(response_dict, img.width, img.height)
x = (True, ocr_result)
if is_path:
img.close()
@@ -332,6 +428,7 @@ class GoogleLensWeb:
readable_name = 'Google Lens (web)'
key = 'k'
available = False
coordinate_support = False
def __init__(self):
if 'pyjson5' not in sys.modules:
@@ -427,12 +524,76 @@ class Bing:
readable_name = 'Bing'
key = 'b'
available = False
coordinate_support = True
def __init__(self):
self.requests_session = requests.Session()
self.available = True
logger.info('Bing ready')
def _quad_to_center_bbox(self, quad):
center_x = (quad['topLeft']['x'] + quad['topRight']['x'] + quad['bottomRight']['x'] + quad['bottomLeft']['x']) / 4
center_y = (quad['topLeft']['y'] + quad['topRight']['y'] + quad['bottomRight']['y'] + quad['bottomLeft']['y']) / 4
width1 = sqrt((quad['topRight']['x'] - quad['topLeft']['x'])**2 + (quad['topRight']['y'] - quad['topLeft']['y'])**2)
width2 = sqrt((quad['bottomRight']['x'] - quad['bottomLeft']['x'])**2 + (quad['bottomRight']['y'] - quad['bottomLeft']['y'])**2)
avg_width = (width1 + width2) / 2
height1 = sqrt((quad['bottomLeft']['x'] - quad['topLeft']['x'])**2 + (quad['bottomLeft']['y'] - quad['topLeft']['y'])**2)
height2 = sqrt((quad['bottomRight']['x'] - quad['topRight']['x'])**2 + (quad['bottomRight']['y'] - quad['topRight']['y'])**2)
avg_height = (height1 + height2) / 2
return BoundingBox(center_x=center_x, center_y=center_y, width=avg_width, height=avg_height)
def _to_generic_result(self, response, img_width, img_height):
paragraphs = []
text_tag = None
for tag in response.get('tags', []):
if tag.get('displayName') == '##TextRecognition':
text_tag = tag
break
if text_tag:
text_action = None
for action in text_tag.get('actions', []):
if action.get('_type') == 'ImageKnowledge/TextRecognitionAction':
text_action = action
break
if text_action:
for p in text_action.get('data', {}).get('regions', []):
lines = []
for l in p.get('lines', []):
words = []
for w in l.get('words', []):
word = Word(
text=w.get('text', ''),
bounding_box=self._quad_to_center_bbox(w['boundingBox']),
separator=" "
)
words.append(word)
line = Line(
bounding_box=self._quad_to_center_bbox(l['boundingBox']),
words=words
)
lines.append(line)
# Bing doesn't provide paragraph-level separators, so we add a newline
if lines and lines[-1].words:
lines[-1].words[-1].separator = '\n'
paragraph = Paragraph(
bounding_box=self._quad_to_center_bbox(p['boundingBox']),
lines=lines
)
paragraphs.append(paragraph)
return OcrResult(
image_properties=ImageProperties(width=img_width, height=img_height),
paragraphs=paragraphs
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
@@ -510,26 +671,9 @@ class Bing:
return (False, 'Unknown error!')
data = res.json()
res = ''
text_tag = None
for tag in data['tags']:
if tag.get('displayName') == '##TextRecognition':
text_tag = tag
break
if text_tag:
text_action = None
for action in text_tag['actions']:
if action.get('_type') == 'ImageKnowledge/TextRecognitionAction':
text_action = action
break
if text_action:
regions = text_action['data'].get('regions', [])
for region in regions:
for line in region.get('lines', []):
res += line['text'] + '\n'
x = (True, res)
ocr_result = self._to_generic_result(data, img.width, img.height)
x = (True, ocr_result)
if is_path:
img.close()
@@ -558,6 +702,7 @@ class AppleVision:
readable_name = 'Apple Vision'
key = 'a'
available = False
coordinate_support = False
def __init__(self):
if sys.platform != 'darwin':
@@ -607,6 +752,7 @@ class AppleLiveText:
readable_name = 'Apple Live Text'
key = 'd'
available = False
coordinate_support = False
def __init__(self):
if sys.platform != 'darwin':
@@ -687,6 +833,7 @@ class WinRTOCR:
readable_name = 'WinRT OCR'
key = 'w'
available = False
coordinate_support = False
def __init__(self, config={}):
if sys.platform == 'win32':
@@ -740,6 +887,7 @@ class OneOCR:
readable_name = 'OneOCR'
key = 'z'
available = False
coordinate_support = True
def __init__(self, config={}):
if sys.platform == 'win32':
@@ -763,6 +911,67 @@ class OneOCR:
except:
logger.warning('Error reading URL from config, OneOCR will not work!')
def _pixel_quad_to_center_bbox(self, rect, img_width, img_height):
x_coords = [rect['x1'], rect['x2'], rect['x3'], rect['x4']]
y_coords = [rect['y1'], rect['y2'], rect['y3'], rect['y4']]
center_x_px = sum(x_coords) / 4
center_y_px = sum(y_coords) / 4
width_px = (abs(rect['x2'] - rect['x1']) + abs(rect['x3'] - rect['x4'])) / 2
height_px = (abs(rect['y4'] - rect['y1']) + abs(rect['y3'] - rect['y2'])) / 2
return BoundingBox(
center_x=center_x_px / img_width,
center_y=center_y_px / img_height,
width=width_px / img_width,
height=height_px / img_height
)
def _to_generic_result(self, response, img_width, img_height):
lines = []
for l in response.get('lines', []):
words = []
for i, w in enumerate(l.get('words', [])):
separator = " " if i < len(l.get('words', [])) - 1 else None
word = Word(
text=w.get('text', ''),
separator=separator,
bounding_box=self._pixel_quad_to_center_bbox(w['bounding_rect'], img_width, img_height)
)
words.append(word)
line = Line(
bounding_box=self._pixel_quad_to_center_bbox(l['bounding_rect'], img_width, img_height),
words=words
)
lines.append(line)
# Create a single paragraph to hold all lines
if lines:
# Approximate paragraph bbox by combining all line bboxes
all_line_bboxes = [l.bounding_box for l in lines]
min_x = min(b.center_x - b.width / 2 for b in all_line_bboxes)
max_x = max(b.center_x + b.width / 2 for b in all_line_bboxes)
min_y = min(b.center_y - b.height / 2 for b in all_line_bboxes)
max_y = max(b.center_y + b.height / 2 for b in all_line_bboxes)
p_bbox = BoundingBox(
center_x=(min_x + max_x) / 2,
center_y=(min_y + max_y) / 2,
width=max_x - min_x,
height=max_y - min_y
)
paragraph = Paragraph(bounding_box=p_bbox, lines=lines)
paragraphs = [paragraph]
else:
paragraphs = []
return OcrResult(
image_properties=ImageProperties(width=img_width, height=img_height),
paragraphs=paragraphs
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
@@ -770,7 +979,7 @@ class OneOCR:
if sys.platform == 'win32':
try:
res = self.model.recognize_pil(img)['text']
raw_res = self.model.recognize_pil(img)
except RuntimeError as e:
return (False, e)
else:
@@ -784,9 +993,10 @@ class OneOCR:
if res.status_code != 200:
return (False, 'Unknown error!')
res = res.json()['text']
raw_res = res.json()
x = (True, res)
ocr_response = self._to_generic_result(raw_res, img.width, img.height)
x = (True, ocr_response)
if is_path:
img.close()
@@ -800,6 +1010,7 @@ class AzureImageAnalysis:
readable_name = 'Azure Image Analysis'
key = 'v'
available = False
coordinate_support = False
def __init__(self, config={}):
if 'azure.ai.vision.imageanalysis' not in sys.modules:
@@ -853,6 +1064,7 @@ class EasyOCR:
readable_name = 'EasyOCR'
key = 'e'
available = False
coordinate_support = False
def __init__(self, config={'gpu': True}):
if 'easyocr' not in sys.modules:
@@ -888,6 +1100,7 @@ class RapidOCR:
readable_name = 'RapidOCR'
key = 'r'
available = False
coordinate_support = False
def __init__(self):
if 'rapidocr_onnxruntime' not in sys.modules:
@@ -936,6 +1149,7 @@ class OCRSpace:
readable_name = 'OCRSpace'
key = 'o'
available = False
coordinate_support = False
def __init__(self, config={}):
try:

View File

@@ -8,6 +8,9 @@ import io
import re
import logging
import inspect
import os
import json
from dataclasses import asdict
import numpy as np
import pyperclipfix
@@ -811,32 +814,70 @@ def process_and_write_results(img_or_path, last_result, filtering, notify):
engine_instance = engine_instances[engine_index]
start_time = time.time()
res, text = engine_instance(img_or_path)
res, result_data = engine_instance(img_or_path)
end_time = time.time()
orig_text = []
engine_color = config.get_general('engine_color')
if res:
if not res:
logger.opt(ansi=True).info(f'<{engine_color}>{engine_instance.readable_name}</{engine_color}> reported an error after {end_time - start_time:0.03f}s: {result_data}')
return orig_text
output_format = config.get_general('output_format')
output_string = ''
log_message = ''
# Check if the engine returned a structured OcrResult object
if isinstance(result_data, OcrResult):
# Assemble full text for logging/notifications
full_text_parts = []
for p in result_data.paragraphs:
for l in p.lines:
for w in l.words:
full_text_parts.append(w.text)
if w.separator:
full_text_parts.append(w.separator)
unprocessed_text = "".join(full_text_parts)
if output_format == 'json':
result_dict = asdict(result_data)
output_string = json.dumps(result_dict, ensure_ascii=False)
log_message = post_process(unprocessed_text)
else: # 'text' format
if filtering:
text_to_process, orig_text = filtering(unprocessed_text, last_result)
output_string = post_process(text_to_process)
else:
output_string = post_process(unprocessed_text)
log_message = output_string
else: # Handle engines that return a simple string for result_data
if output_format == 'json':
logger.warning(f"Engine '{engine_instance.name}' does not support JSON output. Falling back to text.")
unprocessed_text = result_data
if filtering:
text, orig_text = filtering(text, last_result)
text = post_process(text)
logger.opt(ansi=True).info(f'Text recognized in {end_time - start_time:0.03f}s using <{engine_color}>{engine_instance.readable_name}</{engine_color}>: {text}')
if notify and config.get_general('notifications'):
notifier.send(title='owocr', message='Text recognized: ' + text, urgency=get_notification_urgency())
write_to = config.get_general('write_to')
if write_to == 'websocket':
websocket_server_thread.send_text(text)
elif write_to == 'clipboard':
pyperclipfix.copy(text)
text_to_process, orig_text = filtering(unprocessed_text, last_result)
output_string = post_process(text_to_process)
else:
with Path(write_to).open('a', encoding='utf-8') as f:
f.write(text + '\n')
output_string = post_process(unprocessed_text)
log_message = output_string
if auto_pause_handler and not paused and not filtering:
auto_pause_handler.start()
logger.opt(ansi=True).info(f'Text recognized in {end_time - start_time:0.03f}s using <{engine_color}>{engine_instance.readable_name}</{engine_color}>: {log_message}')
if notify and config.get_general('notifications'):
notifier.send(title='owocr', message='Text recognized: ' + log_message, urgency=get_notification_urgency())
# Write the final formatted string to the destination
write_to = config.get_general('write_to')
if write_to == 'websocket':
websocket_server_thread.send_text(output_string)
elif write_to == 'clipboard':
pyperclipfix.copy(output_string)
else:
logger.opt(ansi=True).info(f'<{engine_color}>{engine_instance.readable_name}</{engine_color}> reported an error after {end_time - start_time:0.03f}s: {text}')
with Path(write_to).open('a', encoding='utf-8') as f:
f.write(output_string + '\n')
if auto_pause_handler and not paused and not filtering:
auto_pause_handler.start()
return orig_text
@@ -862,7 +903,7 @@ def run():
for config_engine in config.get_general('engines').split(','):
config_engines.append(config_engine.strip().lower())
for _,engine_class in sorted(inspect.getmembers(sys.modules[__name__], lambda x: hasattr(x, '__module__') and x.__module__ and __package__ + '.ocr' in x.__module__ and inspect.isclass(x))):
for _,engine_class in sorted(inspect.getmembers(sys.modules[__name__], lambda x: hasattr(x, '__module__') and x.__module__ and __package__ + '.ocr' in x.__module__ and inspect.isclass(x) and hasattr(x, 'name'))):
if len(config_engines) == 0 or engine_class.name in config_engines:
if config.get_engine(engine_class.name) == None:
engine_instance = engine_class()
@@ -897,6 +938,7 @@ def run():
paused = config.get_general('pause_at_startup')
auto_pause = config.get_general('auto_pause')
language = config.get_general('language')
output_format = config.get_general('output_format')
clipboard_thread = None
websocket_server_thread = None
screenshot_thread = None
@@ -987,6 +1029,14 @@ def run():
auto_pause_handler = AutopauseTimer(auto_pause)
user_input_thread = threading.Thread(target=user_input_thread_run, daemon=True)
user_input_thread.start()
# if json is selected check if engine is compatible
if output_format == 'json' and not engine_instances[engine_index].coordinate_support:
supported_engines = (engine.name for engine in engine_instances if engine.coordinate_support)
logger.error(f"The selected engine '{engine_instances[engine_index].name}' does not support coordinate output.")
logger.error(f"Please choose one of: {', '.join(supported_engines)}")
sys.exit(1)
logger.opt(ansi=True).info(f"Reading from {' and '.join(read_from_readable)}, writing to {write_to_readable} using <{engine_color}>{engine_instances[engine_index].readable_name}</{engine_color}>{' (paused)' if paused else ''}")
while not terminated: