Refactor, fix spacing with some engines like OneOCR

This commit is contained in:
AuroraWright
2025-10-05 23:49:28 +02:00
parent b7e0df6c19
commit 0143a6d97c
2 changed files with 116 additions and 114 deletions

View File

@@ -85,8 +85,6 @@ try:
except:
optimized_png_encode = False
cj_regex = re.compile(r'[\u3041-\u3096\u30A1-\u30FA\u4E00-\u9FFF]')
@dataclass
class BoundingBox:
@@ -136,18 +134,6 @@ class OcrResult:
def empty_post_process(text):
return text
def post_process(text):
is_cj_text = cj_regex.search(text)
if is_cj_text:
text = ' '.join([''.join(i.split()) for i in text.splitlines()])
else:
text = ' '.join([re.sub(r'\s+', ' ', i).strip() for i in text.splitlines()])
text = text.replace('', '...')
text = re.sub('[・.]{2,}', lambda x: (x.end() - x.start()) * '.', text)
if is_cj_text:
text = jaconv.h2z(text, ascii=True, digit=True)
return text
def input_to_pil_image(img):
is_path = False
if isinstance(img, Image.Image):

View File

@@ -307,6 +307,7 @@ class TextFiltering:
self.language = config.get_general('language')
self.segmenter = Segmenter(language=self.language, clean=True)
self.regex = self.get_regex()
self.last_result = ([], engine_index)
try:
from transformers import pipeline, AutoTokenizer
@@ -353,7 +354,7 @@ class TextFiltering:
return re.compile(
r'[a-zA-Z\u00C0-\u00FF\u0100-\u017F\u0180-\u024F\u0250-\u02AF\u1D00-\u1D7F\u1D80-\u1DBF\u1E00-\u1EFF\u2C60-\u2C7F\uA720-\uA7FF\uAB30-\uAB6F]')
def __call__(self, text, last_result):
def __call__(self, text):
orig_text = self.segmenter.segment(text)
orig_text_filtered = []
for block in orig_text:
@@ -364,8 +365,8 @@ class TextFiltering:
else:
orig_text_filtered.append(None)
if last_result[1] == engine_index:
last_text = last_result[0]
if self.last_result[1] == engine_index:
last_text = self.last_result[0]
else:
last_text = []
@@ -389,7 +390,9 @@ class TextFiltering:
final_blocks.append(block)
text = '\n'.join(final_blocks)
return text, orig_text_filtered
self.last_result = (orig_text_filtered, engine_index)
return text
class ScreenshotThread(threading.Thread):
@@ -717,6 +720,107 @@ class AutopauseTimer:
pause_handler(True)
class OutputResult:
def __init__(self, init_filtering):
self.filtering = TextFiltering() if init_filtering else None
self.cj_regex = re.compile(r'[\u3041-\u3096\u30A1-\u30FA\u4E00-\u9FFF]')
def _coordinate_format_to_string(self, result_data):
full_text_parts = []
for p in result_data.paragraphs:
for l in p.lines:
for w in l.words:
full_text_parts.append(w.text)
if w.separator != None:
full_text_parts.append(w.separator)
else:
full_text_parts.append(' ')
full_text_parts.append('\n')
return "".join(full_text_parts)
def _post_process(self, text):
is_cj_text = self.cj_regex.search(text)
if is_cj_text:
text = ' '.join([''.join(i.split()) for i in text.splitlines()])
else:
text = ' '.join([re.sub(r'\s+', ' ', i).strip() for i in text.splitlines()])
text = text.replace('', '...')
text = re.sub('[・.]{2,}', lambda x: (x.end() - x.start()) * '.', text)
if is_cj_text:
text = jaconv.h2z(text, ascii=True, digit=True)
return text
def __call__(self, img_or_path, filter_text, notify):
if auto_pause_handler and not filter_text:
auto_pause_handler.stop()
engine_instance = engine_instances[engine_index]
start_time = time.time()
res, result_data = engine_instance(img_or_path)
end_time = time.time()
orig_text = []
engine_color = config.get_general('engine_color')
if not res:
logger.opt(ansi=True).info(f'<{engine_color}>{engine_instance.readable_name}</{engine_color}> reported an error after {end_time - start_time:0.03f}s: {result_data}')
return orig_text
output_format = config.get_general('output_format')
verbosity = config.get_general('verbosity')
output_string = ''
log_message = ''
result_data_text = None
# Check if the engine returned a structured OcrResult object
if isinstance(result_data, OcrResult):
unprocessed_text = self._coordinate_format_to_string(result_data)
if output_format == 'json':
result_dict = asdict(result_data)
output_string = json.dumps(result_dict, ensure_ascii=False)
log_message = self._post_process(unprocessed_text)
else:
result_data_text = unprocessed_text
else:
result_data_text = result_data
if result_data_text:
if output_format == 'json':
logger.warning(f"Engine '{engine_instance.name}' does not support JSON output. Falling back to text.")
if filter_text:
text_to_process = self.filtering(result_data_text)
output_string = self._post_process(text_to_process)
else:
output_string = self._post_process(result_data_text)
log_message = output_string
if verbosity != 0:
if verbosity < -1:
log_message_terminal = ': ' + log_message
elif verbosity == -1:
log_message_terminal = ''
else:
log_message_terminal = ': ' + (log_message if len(log_message) <= verbosity else log_message[:verbosity] + '[...]')
logger.opt(ansi=True).info(f'Text recognized in {end_time - start_time:0.03f}s using <{engine_color}>{engine_instance.readable_name}</{engine_color}>{log_message_terminal}')
if notify and config.get_general('notifications'):
notifier.send(title='owocr', message='Text recognized: ' + log_message, urgency=get_notification_urgency())
# Write the final formatted string to the destination
write_to = config.get_general('write_to')
if write_to == 'websocket':
websocket_server_thread.send_text(output_string)
elif write_to == 'clipboard':
pyperclipfix.copy(output_string)
else:
with Path(write_to).open('a', encoding='utf-8') as f:
f.write(output_string + '\n')
if auto_pause_handler and not paused and not filter_text:
auto_pause_handler.start()
def get_notification_urgency():
if sys.platform == 'win32':
return Urgency.Low
@@ -809,90 +913,6 @@ def on_screenshot_combo():
screenshot_event.set()
def process_and_write_results(img_or_path, last_result, filtering, notify):
if auto_pause_handler and not filtering:
auto_pause_handler.stop()
engine_instance = engine_instances[engine_index]
start_time = time.time()
res, result_data = engine_instance(img_or_path)
end_time = time.time()
orig_text = []
engine_color = config.get_general('engine_color')
if not res:
logger.opt(ansi=True).info(f'<{engine_color}>{engine_instance.readable_name}</{engine_color}> reported an error after {end_time - start_time:0.03f}s: {result_data}')
return orig_text
output_format = config.get_general('output_format')
verbosity = config.get_general('verbosity')
output_string = ''
log_message = ''
# Check if the engine returned a structured OcrResult object
if isinstance(result_data, OcrResult):
# Assemble full text for logging/notifications
full_text_parts = []
for p in result_data.paragraphs:
for l in p.lines:
for w in l.words:
full_text_parts.append(w.text)
if w.separator:
full_text_parts.append(w.separator)
full_text_parts.append('\n')
unprocessed_text = "".join(full_text_parts)
if output_format == 'json':
result_dict = asdict(result_data)
output_string = json.dumps(result_dict, ensure_ascii=False)
log_message = post_process(unprocessed_text)
else: # 'text' format
if filtering:
text_to_process, orig_text = filtering(unprocessed_text, last_result)
output_string = post_process(text_to_process)
else:
output_string = post_process(unprocessed_text)
log_message = output_string
else: # Handle engines that return a simple string for result_data
if output_format == 'json':
logger.warning(f"Engine '{engine_instance.name}' does not support JSON output. Falling back to text.")
unprocessed_text = result_data
if filtering:
text_to_process, orig_text = filtering(unprocessed_text, last_result)
output_string = post_process(text_to_process)
else:
output_string = post_process(unprocessed_text)
log_message = output_string
if verbosity != 0:
if verbosity < -1:
log_message_terminal = ': ' + log_message
elif verbosity == -1:
log_message_terminal = ''
else:
log_message_terminal = ': ' + (log_message if len(log_message) <= verbosity else log_message[:verbosity] + '[...]')
logger.opt(ansi=True).info(f'Text recognized in {end_time - start_time:0.03f}s using <{engine_color}>{engine_instance.readable_name}</{engine_color}>{log_message_terminal}')
if notify and config.get_general('notifications'):
notifier.send(title='owocr', message='Text recognized: ' + log_message, urgency=get_notification_urgency())
# Write the final formatted string to the destination
write_to = config.get_general('write_to')
if write_to == 'websocket':
websocket_server_thread.send_text(output_string)
elif write_to == 'clipboard':
pyperclipfix.copy(output_string)
else:
with Path(write_to).open('a', encoding='utf-8') as f:
f.write(output_string + '\n')
if auto_pause_handler and not paused and not filtering:
auto_pause_handler.start()
return orig_text
def run():
logger.configure(handlers=[{'sink': sys.stderr, 'format': config.get_general('logger_format')}])
@@ -961,7 +981,7 @@ def run():
directory_watcher_thread = None
unix_socket_server = None
key_combo_listener = None
filtering = None
init_filtering = False
auto_pause_handler = None
engine_index = engine_keys.index(default_engine) if default_engine != '' else 0
engine_color = config.get_general('engine_color')
@@ -988,7 +1008,6 @@ def run():
screen_capture_delay_secs = config.get_general('screen_capture_delay_secs')
screen_capture_combo = config.get_general('screen_capture_combo')
last_screenshot_time = 0
last_result = ([], engine_index)
if screen_capture_combo != '':
screen_capture_on_combo = True
key_combos[screen_capture_combo] = on_screenshot_combo
@@ -998,7 +1017,7 @@ def run():
screenshot_event = threading.Event()
screenshot_thread = ScreenshotThread(screen_capture_on_combo)
screenshot_thread.start()
filtering = TextFiltering()
init_filtering = True
read_from_readable.append('screen capture')
if 'websocket' in (read_from, read_from_secondary):
read_from_readable.append('websocket')
@@ -1027,6 +1046,8 @@ def run():
directory_watcher_thread.start()
read_from_readable.append(f'directory {read_from_path}')
output_result = OutputResult(init_filtering)
if len(key_combos) > 0:
key_combo_listener = keyboard.GlobalHotKeys(key_combos)
key_combo_listener.start()
@@ -1058,11 +1079,11 @@ def run():
while not terminated:
start_time = time.time()
img = None
filter_img = False
filter_text = False
if process_queue:
try:
img, filter_img = image_queue.get(timeout=0.1)
img, filter_text = image_queue.get(timeout=0.1)
notify = True
except queue.Empty:
pass
@@ -1071,7 +1092,7 @@ def run():
if (not paused) and screenshot_thread.screencapture_window_active and screenshot_thread.screencapture_window_visible and (time.time() - last_screenshot_time) > screen_capture_delay_secs:
screenshot_event.set()
img = periodic_screenshot_queue.get()
filter_img = True
filter_text = True
notify = False
last_screenshot_time = time.time()
@@ -1080,12 +1101,7 @@ def run():
terminated = True
break
elif img:
if filter_img:
res = process_and_write_results(img, last_result, filtering, notify)
if res:
last_result = (res, engine_index)
else:
process_and_write_results(img, None, None, notify)
output_result(img, filter_text, notify)
if isinstance(img, Path):
if delete_images:
Path.unlink(img)