Improve filtering, add line recovery for frame stabilization, add furigana filter

This commit is contained in:
AuroraWright
2025-10-12 00:23:47 +02:00
parent bdfe2d4d46
commit 6ada579b19
3 changed files with 292 additions and 126 deletions

View File

@@ -50,6 +50,10 @@ parser.add_argument('-sw', '--screen_capture_only_active_windows', type=str2bool
help="When reading with screen capture and screen_capture_area is a window name, only target the window while it's active.")
parser.add_argument('-sf', '--screen_capture_frame_stabilization', type=float, default=argparse.SUPPRESS,
help="When reading with screen capture, delay to wait until text is stable before processing it. -1 waits for two OCR results to be the same. 0 to disable.")
parser.add_argument('-sl', '--screen_capture_line_recovery', type=str2bool, nargs='?', const=True, default=argparse.SUPPRESS,
help="When reading with screen capture and frame stabilization is on, try to recover missed lines from unstable frames. Can lead to increased glitches.")
parser.add_argument('-sff', '--screen_capture_furigana_filter', type=str2bool, nargs='?', const=True, default=argparse.SUPPRESS,
help="When reading with screen capture, try to filter furigana lines.")
parser.add_argument('-sc', '--screen_capture_combo', type=str, default=argparse.SUPPRESS,
help='When reading with screen capture, combo to wait on for taking a screenshot. If periodic screenshots are also enabled, any screenshot taken this way bypasses the filtering. Example value: "<ctrl>+<shift>+s". The list of keys can be found here: https://pynput.readthedocs.io/en/latest/keyboard.html#pynput.keyboard.Key')
parser.add_argument('-l', '--language', type=str, default=argparse.SUPPRESS,
@@ -58,6 +62,7 @@ parser.add_argument('-of', '--output_format', type=str, default=argparse.SUPPRES
help='The output format for OCR results. Can be "text" (default) or "json" (to include coordinates).')
parser.add_argument('-v', '--verbosity', type=int, default=argparse.SUPPRESS,
help='Terminal window verbosity. Can be -2 (all recognized text is showed whole, default), -1 (only timestamps are shown), 0 (nothing is shown but errors), or larger than 0 to cut displayed text to that amount of characters.')
parser.add_argument('--uwu', type=str2bool, nargs='?', const=True, default=argparse.SUPPRESS, help=argparse.SUPPRESS)
class Config:
has_config = False
@@ -87,11 +92,14 @@ class Config:
'screen_capture_delay_secs': 0,
'screen_capture_only_active_windows': True,
'screen_capture_frame_stabilization': -1,
'screen_capture_line_recovery': True,
'screen_capture_furigana_filter': True,
'screen_capture_combo': '',
'screen_capture_old_macos_api': False,
'language': 'ja',
'output_format': 'text',
'verbosity': -2
'verbosity': -2,
'uwu': False
}
def __parse(self, value):

View File

@@ -25,7 +25,6 @@ from PIL import Image, UnidentifiedImageError
from loguru import logger
from pynput import keyboard
from desktop_notifier import DesktopNotifierSync, Urgency
from rapidfuzz import fuzz
from .ocr import *
from .config import config
@@ -305,13 +304,19 @@ class TextFiltering:
def __init__(self):
self.language = config.get_general('language')
self.frame_stabilization = config.get_general('screen_capture_frame_stabilization')
self.last_frame_data = None
self.line_recovery = config.get_general('screen_capture_line_recovery')
self.furigana_filter = config.get_general('screen_capture_furigana_filter')
self.recovered_lines_count = 0
self.last_frame_data = [None, None]
self.last_last_frame_data = [None, None]
self.stable_frame_data = None
self.last_frame_text = None
self.last_frame_text = []
self.last_last_frame_text = []
self.stable_frame_text = None
self.processed_stable_frame = False
self.frame_stabilization_timestamp = 0
self.cj_regex = re.compile(r'[\u3041-\u3096\u30A1-\u30FA\u4E00-\u9FFF]')
self.kanji_regex = re.compile(r'[\u4E00-\u9FFF]')
self.regex = self.get_regex()
self.kana_variants = {
'': ['', ''], '': ['', ''],
@@ -340,7 +345,7 @@ class TextFiltering:
if self.language == 'ja':
return self.cj_regex
elif self.language == 'zh':
return re.compile(r'[\u4E00-\u9FFF]')
return self.kanji_regex
elif self.language == 'ko':
return re.compile(r'[\uAC00-\uD7AF]')
elif self.language == 'ar':
@@ -382,50 +387,73 @@ class TextFiltering:
filtered_text = self.convert_small_kana_to_big(filtered_text)
return filtered_text
def _compare_text(self, current_text, prev_text, threshold=82):
if current_text in prev_text:
return True
if len(prev_text) > len(current_text):
return fuzz.partial_ratio(current_text, prev_text) >= threshold
return fuzz.ratio(current_text, prev_text) >= threshold
def _find_changed_lines(self, pil_image, current_result):
if (self.last_frame_data != [None, None] and (current_result.image_properties.width != self.last_frame_data[1].image_properties.width or
current_result.image_properties.height != self.last_frame_data[1].image_properties.height)):
self.stable_frame_data = None
self.last_frame_data = [None, None]
self.last_last_frame_data = [None, None]
def _find_changed_lines(self, current_result):
if (self.last_frame_data is None or self.stable_frame_data is None or
(self.stable_frame_data and (current_result.image_properties.width != self.stable_frame_data.image_properties.width or
current_result.image_properties.height != self.stable_frame_data.image_properties.height))):
self.stable_frame_data = copy.deepcopy(current_result)
self.last_frame_data = copy.deepcopy(current_result)
if self.frame_stabilization == 0:
changed_lines = self._find_changed_lines_impl(current_result, self.last_frame_data[1])
if changed_lines == None:
return 0, None
changed_lines_total = len(changed_lines)
self.last_frame_data = (pil_image, copy.deepcopy(current_result))
if changed_lines_total and config.get_general('output_format') != 'json':
changed_regions_image = self._create_changed_regions_image(pil_image, changed_lines, None, None)
if not changed_regions_image:
logger.warning('Error occurred while creating the differential image.')
return 0, None
return changed_lines_total, changed_regions_image
else:
return changed_lines_total, None
changed_lines = []
for p in current_result.paragraphs:
changed_lines.extend(p.lines)
return changed_lines
changed_lines_stabilization = self._find_changed_lines_impl(current_result, self.last_frame_data[1])
if changed_lines_stabilization == None:
return 0, None
if not self.frame_stabilization:
changed_lines = self._find_changed_lines_impl(current_result, self.last_frame_data)
self.last_frame_data = copy.deepcopy(current_result)
return changed_lines
frames_match = self._find_changed_lines_impl(current_result, self.last_frame_data) == []
frames_match = len(changed_lines_stabilization) == 0
logger.debug(f"Frames match: '{frames_match}'")
if frames_match:
if self.processed_stable_frame:
return []
return 0, None
if time.time() - self.frame_stabilization_timestamp < self.frame_stabilization:
return []
return 0, None
changed_lines = self._find_changed_lines_impl(current_result, self.stable_frame_data)
if self.line_recovery and self.last_last_frame_data:
logger.debug(f'Checking for missed lines')
recovered_lines = self._find_changed_lines_impl(self.last_last_frame_data[1], self.stable_frame_data, current_result)
self.recovered_lines_count = len(recovered_lines) if recovered_lines else 0
else:
self.recovered_lines_count = 0
recovered_lines = []
self.processed_stable_frame = True
self.stable_frame_data = copy.deepcopy(current_result)
return changed_lines
changed_lines_total = len(changed_lines) + self.recovered_lines_count
if changed_lines_total and config.get_general('output_format') != 'json':
if recovered_lines:
changed_regions_image = self._create_changed_regions_image(pil_image, changed_lines, self.last_last_frame_data[0], recovered_lines)
else:
self.last_frame_data = copy.deepcopy(current_result)
changed_regions_image = self._create_changed_regions_image(pil_image, changed_lines, None, None)
if not changed_regions_image:
logger.warning('Error occurred while creating the differential image.')
return 0, None
return changed_lines_total, changed_regions_image
else:
return changed_lines_total, None
else:
self.last_last_frame_data = self.last_frame_data
self.last_frame_data = (pil_image, copy.deepcopy(current_result))
self.recovered_lines_count = 0
self.processed_stable_frame = False
self.frame_stabilization_timestamp = time.time()
return []
return 0, None
def _find_changed_lines_impl(self, current_result, previous_result):
def _find_changed_lines_impl(self, current_result, previous_result, next_result=None):
changed_lines = []
current_lines = []
previous_lines = []
@@ -433,12 +461,17 @@ class TextFiltering:
for p in current_result.paragraphs:
current_lines.extend(p.lines)
if len(current_lines) == 0:
return []
for p in previous_result.paragraphs:
previous_lines.extend(p.lines)
return None
all_previous_text_spliced = []
if previous_result:
for p in previous_result.paragraphs:
previous_lines.extend(p.lines)
if next_result != None:
for p in next_result.paragraphs:
previous_lines.extend(p.lines)
for prev_line in previous_lines:
prev_text = self._get_line_text(prev_line)
prev_text = self._normalize_line_for_comparison(prev_text)
@@ -448,52 +481,42 @@ class TextFiltering:
logger.debug(f"Previous text: '{all_previous_text_spliced}'")
first = True
processed_valid_line = False
for current_line in current_lines:
current_text = self._get_line_text(current_line)
current_text = self._normalize_line_for_comparison(current_text)
if not current_text:
continue
# For the first line, check if it contains the end of previous text
if first and all_previous_text:
overlap = self._find_overlap(all_previous_text, current_text)
if overlap and len(current_text) > len(overlap):
logger.debug(f"Found overlap: '{overlap}'")
changed_lines.append(current_line)
first = False
continue
processed_valid_line = True
if len(current_text) < 3:
if next_result == None and len(current_text) < 3:
text_similar = current_text in all_previous_text_spliced
else:
text_similar = self._compare_text(current_text, all_previous_text)
text_similar = current_text in all_previous_text
logger.debug(f"Current line: '{current_text}' Similar: '{text_similar}'")
if not text_similar:
if next_result != None:
logger.opt(ansi=True).debug(f"<red>Recovered line: '{current_text}'</red>")
changed_lines.append(current_line)
if len(current_text) >= 3:
first = False
return changed_lines
return changed_lines if processed_valid_line else None
def _find_changed_lines_text(self, current_result, two_pass_processing_active=False):
if not self.frame_stabilization or two_pass_processing_active:
if self.last_frame_text:
changed_lines = self._find_changed_lines_text_impl(current_result, self.last_frame_text, True)
def _find_changed_lines_text(self, current_result, current_result_ocr, two_pass_processing_active):
frame_stabilization_active = self.frame_stabilization != 0
if (not frame_stabilization_active) or two_pass_processing_active:
changed_lines = self._find_changed_lines_text_impl(current_result, current_result_ocr, self.last_frame_text, None, True, frame_stabilization_active)
self.last_frame_text = current_result
return changed_lines
else:
self.last_frame_text = current_result
return current_result
if self.last_frame_text is None or self.stable_frame_text is None:
self.stable_frame_text = current_result
self.last_frame_text = current_result
return current_result
changed_lines_stabilization = self._find_changed_lines_text_impl(current_result, current_result_ocr, self.last_frame_text, None, False, False)
if changed_lines_stabilization == None:
return []
frames_match = self._find_changed_lines_text_impl(current_result, self.last_frame_text, False) == []
frames_match = len(changed_lines_stabilization) == 0
logger.debug(f"Frames match: '{frames_match}'")
@@ -502,61 +525,140 @@ class TextFiltering:
return []
if time.time() - self.frame_stabilization_timestamp < self.frame_stabilization:
return []
changed_lines = self._find_changed_lines_text_impl(current_result, self.stable_frame_text, True)
if self.line_recovery and self.last_last_frame_text:
logger.debug(f'Checking for missed lines')
recovered_lines = self._find_changed_lines_text_impl(self.last_last_frame_text, None, self.stable_frame_text, current_result, True, False)
self.recovered_lines_count = len(recovered_lines) if recovered_lines else 0
else:
self.recovered_lines_count = 0
recovered_lines = []
recovered_lines.extend(current_result)
changed_lines = self._find_changed_lines_text_impl(recovered_lines, current_result_ocr, self.stable_frame_text, None, True, frame_stabilization_active)
self.processed_stable_frame = True
self.stable_frame_text = current_result
return changed_lines
else:
self.last_last_frame_text = self.last_frame_text
self.last_frame_text = current_result
self.processed_stable_frame = False
self.frame_stabilization_timestamp = time.time()
return []
def _find_changed_lines_text_impl(self, current_result, previous_stable_text, filtering):
def _find_changed_lines_text_impl(self, current_result, current_result_ocr, previous_result, next_result, filtering, skip_recovered_lines):
if len(current_result) == 0:
return []
changed_lines = []
current_lines = []
current_lines_ocr = []
all_previous_text_spliced = []
for prev_line in previous_stable_text:
if self.furigana_filter and self.language == 'ja' and isinstance(current_result_ocr, OcrResult):
for p in current_result_ocr.paragraphs:
current_lines_ocr.extend(p.lines)
for current_line in current_result:
current_text = self._normalize_line_for_comparison(current_line)
current_lines.append(current_text)
for prev_line in previous_result:
prev_text = self._normalize_line_for_comparison(prev_line)
all_previous_text_spliced.append(prev_text)
if next_result != None:
for next_text in next_result:
all_previous_text_spliced.extend(next_text)
all_previous_text = ''.join(all_previous_text_spliced)
logger.debug(f"Previous text: '{all_previous_text_spliced}'")
first = True
for current_line in current_result:
current_text = self._normalize_line_for_comparison(current_line)
processed_valid_line = False
for i, current_text in enumerate(current_lines):
if not current_text:
continue
# For the first line, check if it contains the end of previous text
if filtering and first and all_previous_text:
overlap = self._find_overlap(all_previous_text, current_text)
if overlap and len(current_text) > len(overlap):
logger.debug(f"Found overlap: '{overlap}'")
current_line = self._cut_at_overlap(current_line, overlap)
logger.debug(f"After cutting: '{current_line}'")
changed_lines.append(current_line)
first = False
continue
processed_valid_line = True
is_furigana = False
if len(current_text) < 3:
text_similar = current_text in all_previous_text_spliced
else:
text_similar = self._compare_text(current_text, all_previous_text)
text_similar = current_text in all_previous_text
logger.debug(f"Current line: '{current_text}' Similar: '{text_similar}'")
if not text_similar:
changed_lines.append(current_line)
if len(current_text) >= 3:
first = False
if text_similar:
continue
return changed_lines
if skip_recovered_lines and self.recovered_lines_count > 0:
# Check if any subsequent lines start with current_text
if any(line.startswith(current_text) for line in current_lines[i+1:]):
logger.debug(f"Skipping recovered line: '{current_text}'")
self.recovered_lines_count -= 1
continue
if current_lines_ocr:
current_line_bbox = current_lines_ocr[i].bounding_box
# Check if line contains only kana (no kanji)
has_kanji = self.kanji_regex.search(current_text)
if not has_kanji:
for j in range(len(current_lines_ocr)):
if i == j:
continue
if not current_lines[j]:
continue
below_line_bbox = current_lines_ocr[j].bounding_box
below_line_text = current_lines[j]
logger.debug(f"Furigana check against line: '{below_line_text}'")
# Check if the line is taller
height_threshold = below_line_bbox.height * 0.6
is_smaller = current_line_bbox.height < height_threshold
logger.debug(f"Furigana check height: '{height_threshold}' '{current_line_bbox.height}'")
if not is_smaller:
continue
# Check if the line has kanji
below_has_kanji = self.kanji_regex.search(below_line_text)
if not below_has_kanji:
continue
vertical_threshold = below_line_bbox.height * 0.8
vertical_distance = below_line_bbox.center_y - current_line_bbox.center_y
horizontal_overlap = self._check_horizontal_overlap(current_line_bbox, below_line_bbox)
logger.debug(f"Furigana check position: '{vertical_threshold}' '{vertical_distance}' '{horizontal_overlap}'")
# If vertically close and horizontally aligned, it's likely furigana
if (0 < vertical_distance < vertical_threshold * 2 and horizontal_overlap > 0.3): # At least 30% horizontal overlap
is_furigana = True
logger.debug(f"Skipping furigana line: '{current_text}' above line: '{below_line_text}'")
break
if is_furigana:
continue
changed_line = current_result[i]
if next_result != None:
logger.opt(ansi=True).debug(f"<red>Recovered line: '{changed_line}'</red>")
if first and len(current_text) > 3:
first = False
# For the first line, check if it contains the end of previous text
if filtering and all_previous_text:
overlap = self._find_overlap(all_previous_text, current_text)
if overlap and len(current_text) > len(overlap):
logger.debug(f"Found overlap: '{overlap}'")
changed_line = self._cut_at_overlap(changed_line, overlap)
logger.debug(f"After cutting: '{changed_line}'")
changed_lines.append(changed_line)
return changed_lines if processed_valid_line else []
def _find_overlap(self, previous_text, current_text):
min_overlap_length = 3
@@ -592,11 +694,35 @@ class TextFiltering:
return current_line
def _create_changed_regions_image(self, pil_image, changed_lines, margin=5):
img_width, img_height = pil_image.size
def _check_horizontal_overlap(self, bbox1, bbox2):
"""
Calculate the horizontal overlap ratio between two bounding boxes.
Returns a value between 0.0 (no overlap) and 1.0 (complete overlap).
"""
# Calculate left and right boundaries for both boxes
left1 = bbox1.center_x - bbox1.width / 2
right1 = bbox1.center_x + bbox1.width / 2
left2 = bbox2.center_x - bbox2.width / 2
right2 = bbox2.center_x + bbox2.width / 2
# Calculate overlap
overlap_left = max(left1, left2)
overlap_right = min(right1, right2)
if overlap_right <= overlap_left:
return 0.0
overlap_width = overlap_right - overlap_left
smaller_width = min(bbox1.width, bbox2.width)
return overlap_width / smaller_width if smaller_width > 0 else 0.0
def _create_changed_regions_image(self, pil_image, changed_lines, pil_image_2, changed_lines_2, margin=5):
def crop_image(image, lines):
img_width, img_height = image.size
regions = []
for line in changed_lines:
for line in lines:
bbox = line.bounding_box
x1 = (bbox.center_x - bbox.width/2) * img_width - margin
y1 = (bbox.center_y - bbox.height/2) * img_height - margin
@@ -619,9 +745,45 @@ class TextFiltering:
overall_x2 = max(x2 for x1, y1, x2, y2 in regions)
overall_y2 = max(y2 for x1, y1, x2, y2 in regions)
result_image = pil_image.crop((overall_x1, overall_y1, overall_x2, overall_y2))
return image.crop((overall_x1, overall_y1, overall_x2, overall_y2))
return result_image
# Handle the case where changed_lines is empty and previous_result is provided
if (not pil_image) and pil_image_2:
cropped_2 = crop_image(pil_image_2, changed_lines_2)
return cropped_2
# Handle the case where both current and previous results are present
elif pil_image and pil_image_2:
# Crop both images
cropped_1 = crop_image(pil_image, changed_lines)
cropped_2 = crop_image(pil_image_2, changed_lines_2)
if cropped_1 is None and cropped_2 is None:
return None
elif cropped_1 is None:
return cropped_2
elif cropped_2 is None:
return cropped_1
# Stitch vertically with previous_result on top
total_width = max(cropped_1.width, cropped_2.width)
total_height = cropped_1.height + cropped_2.height
# Create a new image with white background
stitched_image = Image.new('RGB', (total_width, total_height), 'white')
# Paste previous (top) and current (bottom) images, centered horizontally
prev_x_offset = (total_width - cropped_2.width) // 2
stitched_image.paste(cropped_2, (prev_x_offset, 0))
curr_x_offset = (total_width - cropped_1.width) // 2
stitched_image.paste(cropped_1, (curr_x_offset, cropped_2.height))
return stitched_image
elif pil_image:
return crop_image(pil_image, changed_lines)
else:
return None
class ScreenshotThread(threading.Thread):
@@ -993,18 +1155,14 @@ class OutputResult:
logger.opt(ansi=True).warning(f'<{engine_color}>{engine_instance_2.readable_name}</{engine_color}> reported an error after {end_time - start_time:0.03f}s: {result_data_2}')
else:
two_pass_processing_active = True
changed_lines = self.filtering._find_changed_lines(result_data_2)
changed_lines_count, changed_regions_image = self.filtering._find_changed_lines(img_or_path, result_data_2)
if changed_lines:
logger.opt(ansi=True).info(f"<{engine_color}>{engine_instance_2.readable_name}</{engine_color}> found {len(changed_lines)} changed line(s) in {end_time - start_time:0.03f}s, re-OCRing with <{engine_color}>{engine_instance.readable_name}</{engine_color}>")
if changed_lines_count:
logger.opt(ansi=True).info(f"<{engine_color}>{engine_instance_2.readable_name}</{engine_color}> found {changed_lines_count} changed line(s) in {end_time - start_time:0.03f}s, re-OCRing with <{engine_color}>{engine_instance.readable_name}</{engine_color}>")
if output_format != 'json':
changed_regions_image = self.filtering._create_changed_regions_image(img_or_path, changed_lines)
if changed_regions_image:
img_or_path = changed_regions_image
else:
logger.warning('Error occurred while creating the differential image.')
else:
return
@@ -1035,8 +1193,8 @@ class OutputResult:
if result_data_text != None:
if filter_text:
text_to_process = self.filtering._find_changed_lines_text(result_data_text, two_pass_processing_active)
if text_to_process == []:
text_to_process = self.filtering._find_changed_lines_text(result_data_text, result_data, two_pass_processing_active)
if len(text_to_process) == 0:
return
output_string = self._post_process(text_to_process, True)
else:
@@ -1165,7 +1323,8 @@ def on_screenshot_combo():
def run():
logger.configure(handlers=[{'sink': sys.stderr, 'format': config.get_general('logger_format'), 'level': 'INFO'}])
logger_level = 'DEBUG' if config.get_general('uwu') else 'INFO'
logger.configure(handlers=[{'sink': sys.stderr, 'format': config.get_general('logger_format'), 'level': logger_level}])
if config.has_config:
logger.info('Parsed config file')

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "owocr"
version = "1.17"
version = "1.17.1"
description = "Japanese OCR"
readme = "README.md"
requires-python = ">=3.11"
@@ -27,7 +27,6 @@ dependencies = [
"mss",
"psutil",
"requests",
"rapidfuzz",
"pywin32;platform_system=='Windows'",
"pyobjc;platform_system=='Darwin'"
]