From 6ada579b190b52f6f0d7828bc8fa582f21ad0f24 Mon Sep 17 00:00:00 2001 From: AuroraWright Date: Sun, 12 Oct 2025 00:23:47 +0200 Subject: [PATCH] Improve filtering, add line recovery for frame stabilization, add furigana filter --- owocr/config.py | 10 +- owocr/run.py | 405 +++++++++++++++++++++++++++++++++--------------- pyproject.toml | 3 +- 3 files changed, 292 insertions(+), 126 deletions(-) diff --git a/owocr/config.py b/owocr/config.py index 1dd8053..7ab9a60 100644 --- a/owocr/config.py +++ b/owocr/config.py @@ -50,6 +50,10 @@ parser.add_argument('-sw', '--screen_capture_only_active_windows', type=str2bool help="When reading with screen capture and screen_capture_area is a window name, only target the window while it's active.") parser.add_argument('-sf', '--screen_capture_frame_stabilization', type=float, default=argparse.SUPPRESS, help="When reading with screen capture, delay to wait until text is stable before processing it. -1 waits for two OCR results to be the same. 0 to disable.") +parser.add_argument('-sl', '--screen_capture_line_recovery', type=str2bool, nargs='?', const=True, default=argparse.SUPPRESS, + help="When reading with screen capture and frame stabilization is on, try to recover missed lines from unstable frames. Can lead to increased glitches.") +parser.add_argument('-sff', '--screen_capture_furigana_filter', type=str2bool, nargs='?', const=True, default=argparse.SUPPRESS, + help="When reading with screen capture, try to filter furigana lines.") parser.add_argument('-sc', '--screen_capture_combo', type=str, default=argparse.SUPPRESS, help='When reading with screen capture, combo to wait on for taking a screenshot. If periodic screenshots are also enabled, any screenshot taken this way bypasses the filtering. Example value: "++s". The list of keys can be found here: https://pynput.readthedocs.io/en/latest/keyboard.html#pynput.keyboard.Key') parser.add_argument('-l', '--language', type=str, default=argparse.SUPPRESS, @@ -58,6 +62,7 @@ parser.add_argument('-of', '--output_format', type=str, default=argparse.SUPPRES help='The output format for OCR results. Can be "text" (default) or "json" (to include coordinates).') parser.add_argument('-v', '--verbosity', type=int, default=argparse.SUPPRESS, help='Terminal window verbosity. Can be -2 (all recognized text is showed whole, default), -1 (only timestamps are shown), 0 (nothing is shown but errors), or larger than 0 to cut displayed text to that amount of characters.') +parser.add_argument('--uwu', type=str2bool, nargs='?', const=True, default=argparse.SUPPRESS, help=argparse.SUPPRESS) class Config: has_config = False @@ -87,11 +92,14 @@ class Config: 'screen_capture_delay_secs': 0, 'screen_capture_only_active_windows': True, 'screen_capture_frame_stabilization': -1, + 'screen_capture_line_recovery': True, + 'screen_capture_furigana_filter': True, 'screen_capture_combo': '', 'screen_capture_old_macos_api': False, 'language': 'ja', 'output_format': 'text', - 'verbosity': -2 + 'verbosity': -2, + 'uwu': False } def __parse(self, value): diff --git a/owocr/run.py b/owocr/run.py index 76643fe..70b0425 100644 --- a/owocr/run.py +++ b/owocr/run.py @@ -25,7 +25,6 @@ from PIL import Image, UnidentifiedImageError from loguru import logger from pynput import keyboard from desktop_notifier import DesktopNotifierSync, Urgency -from rapidfuzz import fuzz from .ocr import * from .config import config @@ -305,13 +304,19 @@ class TextFiltering: def __init__(self): self.language = config.get_general('language') self.frame_stabilization = config.get_general('screen_capture_frame_stabilization') - self.last_frame_data = None + self.line_recovery = config.get_general('screen_capture_line_recovery') + self.furigana_filter = config.get_general('screen_capture_furigana_filter') + self.recovered_lines_count = 0 + self.last_frame_data = [None, None] + self.last_last_frame_data = [None, None] self.stable_frame_data = None - self.last_frame_text = None + self.last_frame_text = [] + self.last_last_frame_text = [] self.stable_frame_text = None self.processed_stable_frame = False self.frame_stabilization_timestamp = 0 self.cj_regex = re.compile(r'[\u3041-\u3096\u30A1-\u30FA\u4E00-\u9FFF]') + self.kanji_regex = re.compile(r'[\u4E00-\u9FFF]') self.regex = self.get_regex() self.kana_variants = { 'ぁ': ['ぁ', 'あ'], 'あ': ['ぁ', 'あ'], @@ -340,7 +345,7 @@ class TextFiltering: if self.language == 'ja': return self.cj_regex elif self.language == 'zh': - return re.compile(r'[\u4E00-\u9FFF]') + return self.kanji_regex elif self.language == 'ko': return re.compile(r'[\uAC00-\uD7AF]') elif self.language == 'ar': @@ -382,50 +387,73 @@ class TextFiltering: filtered_text = self.convert_small_kana_to_big(filtered_text) return filtered_text - def _compare_text(self, current_text, prev_text, threshold=82): - if current_text in prev_text: - return True - if len(prev_text) > len(current_text): - return fuzz.partial_ratio(current_text, prev_text) >= threshold - return fuzz.ratio(current_text, prev_text) >= threshold + def _find_changed_lines(self, pil_image, current_result): + if (self.last_frame_data != [None, None] and (current_result.image_properties.width != self.last_frame_data[1].image_properties.width or + current_result.image_properties.height != self.last_frame_data[1].image_properties.height)): + self.stable_frame_data = None + self.last_frame_data = [None, None] + self.last_last_frame_data = [None, None] - def _find_changed_lines(self, current_result): - if (self.last_frame_data is None or self.stable_frame_data is None or - (self.stable_frame_data and (current_result.image_properties.width != self.stable_frame_data.image_properties.width or - current_result.image_properties.height != self.stable_frame_data.image_properties.height))): - self.stable_frame_data = copy.deepcopy(current_result) - self.last_frame_data = copy.deepcopy(current_result) + if self.frame_stabilization == 0: + changed_lines = self._find_changed_lines_impl(current_result, self.last_frame_data[1]) + if changed_lines == None: + return 0, None + changed_lines_total = len(changed_lines) + self.last_frame_data = (pil_image, copy.deepcopy(current_result)) + if changed_lines_total and config.get_general('output_format') != 'json': + changed_regions_image = self._create_changed_regions_image(pil_image, changed_lines, None, None) + if not changed_regions_image: + logger.warning('Error occurred while creating the differential image.') + return 0, None + return changed_lines_total, changed_regions_image + else: + return changed_lines_total, None - changed_lines = [] - for p in current_result.paragraphs: - changed_lines.extend(p.lines) - return changed_lines + changed_lines_stabilization = self._find_changed_lines_impl(current_result, self.last_frame_data[1]) + if changed_lines_stabilization == None: + return 0, None - if not self.frame_stabilization: - changed_lines = self._find_changed_lines_impl(current_result, self.last_frame_data) - self.last_frame_data = copy.deepcopy(current_result) - return changed_lines - - frames_match = self._find_changed_lines_impl(current_result, self.last_frame_data) == [] + frames_match = len(changed_lines_stabilization) == 0 logger.debug(f"Frames match: '{frames_match}'") if frames_match: if self.processed_stable_frame: - return [] + return 0, None if time.time() - self.frame_stabilization_timestamp < self.frame_stabilization: - return [] + return 0, None changed_lines = self._find_changed_lines_impl(current_result, self.stable_frame_data) + if self.line_recovery and self.last_last_frame_data: + logger.debug(f'Checking for missed lines') + recovered_lines = self._find_changed_lines_impl(self.last_last_frame_data[1], self.stable_frame_data, current_result) + self.recovered_lines_count = len(recovered_lines) if recovered_lines else 0 + else: + self.recovered_lines_count = 0 + recovered_lines = [] self.processed_stable_frame = True self.stable_frame_data = copy.deepcopy(current_result) - return changed_lines + changed_lines_total = len(changed_lines) + self.recovered_lines_count + if changed_lines_total and config.get_general('output_format') != 'json': + if recovered_lines: + changed_regions_image = self._create_changed_regions_image(pil_image, changed_lines, self.last_last_frame_data[0], recovered_lines) + else: + changed_regions_image = self._create_changed_regions_image(pil_image, changed_lines, None, None) + + if not changed_regions_image: + logger.warning('Error occurred while creating the differential image.') + return 0, None + return changed_lines_total, changed_regions_image + else: + return changed_lines_total, None else: - self.last_frame_data = copy.deepcopy(current_result) + self.last_last_frame_data = self.last_frame_data + self.last_frame_data = (pil_image, copy.deepcopy(current_result)) + self.recovered_lines_count = 0 self.processed_stable_frame = False self.frame_stabilization_timestamp = time.time() - return [] + return 0, None - def _find_changed_lines_impl(self, current_result, previous_result): + def _find_changed_lines_impl(self, current_result, previous_result, next_result=None): changed_lines = [] current_lines = [] previous_lines = [] @@ -433,67 +461,62 @@ class TextFiltering: for p in current_result.paragraphs: current_lines.extend(p.lines) if len(current_lines) == 0: - return [] - - for p in previous_result.paragraphs: - previous_lines.extend(p.lines) + return None all_previous_text_spliced = [] - for prev_line in previous_lines: - prev_text = self._get_line_text(prev_line) - prev_text = self._normalize_line_for_comparison(prev_text) - all_previous_text_spliced.append(prev_text) + + if previous_result: + for p in previous_result.paragraphs: + previous_lines.extend(p.lines) + if next_result != None: + for p in next_result.paragraphs: + previous_lines.extend(p.lines) + + for prev_line in previous_lines: + prev_text = self._get_line_text(prev_line) + prev_text = self._normalize_line_for_comparison(prev_text) + all_previous_text_spliced.append(prev_text) all_previous_text = ''.join(all_previous_text_spliced) logger.debug(f"Previous text: '{all_previous_text_spliced}'") - first = True + processed_valid_line = False for current_line in current_lines: current_text = self._get_line_text(current_line) current_text = self._normalize_line_for_comparison(current_text) if not current_text: continue - # For the first line, check if it contains the end of previous text - if first and all_previous_text: - overlap = self._find_overlap(all_previous_text, current_text) - if overlap and len(current_text) > len(overlap): - logger.debug(f"Found overlap: '{overlap}'") - changed_lines.append(current_line) - first = False - continue + processed_valid_line = True - if len(current_text) < 3: + if next_result == None and len(current_text) < 3: text_similar = current_text in all_previous_text_spliced else: - text_similar = self._compare_text(current_text, all_previous_text) + text_similar = current_text in all_previous_text logger.debug(f"Current line: '{current_text}' Similar: '{text_similar}'") if not text_similar: + if next_result != None: + logger.opt(ansi=True).debug(f"Recovered line: '{current_text}'") changed_lines.append(current_line) - if len(current_text) >= 3: - first = False - return changed_lines + return changed_lines if processed_valid_line else None - def _find_changed_lines_text(self, current_result, two_pass_processing_active=False): - if not self.frame_stabilization or two_pass_processing_active: - if self.last_frame_text: - changed_lines = self._find_changed_lines_text_impl(current_result, self.last_frame_text, True) - self.last_frame_text = current_result - return changed_lines - else: - self.last_frame_text = current_result - return current_result + def _find_changed_lines_text(self, current_result, current_result_ocr, two_pass_processing_active): + frame_stabilization_active = self.frame_stabilization != 0 - if self.last_frame_text is None or self.stable_frame_text is None: - self.stable_frame_text = current_result + if (not frame_stabilization_active) or two_pass_processing_active: + changed_lines = self._find_changed_lines_text_impl(current_result, current_result_ocr, self.last_frame_text, None, True, frame_stabilization_active) self.last_frame_text = current_result - return current_result + return changed_lines - frames_match = self._find_changed_lines_text_impl(current_result, self.last_frame_text, False) == [] + changed_lines_stabilization = self._find_changed_lines_text_impl(current_result, current_result_ocr, self.last_frame_text, None, False, False) + if changed_lines_stabilization == None: + return [] + + frames_match = len(changed_lines_stabilization) == 0 logger.debug(f"Frames match: '{frames_match}'") @@ -502,61 +525,140 @@ class TextFiltering: return [] if time.time() - self.frame_stabilization_timestamp < self.frame_stabilization: return [] - changed_lines = self._find_changed_lines_text_impl(current_result, self.stable_frame_text, True) + if self.line_recovery and self.last_last_frame_text: + logger.debug(f'Checking for missed lines') + recovered_lines = self._find_changed_lines_text_impl(self.last_last_frame_text, None, self.stable_frame_text, current_result, True, False) + self.recovered_lines_count = len(recovered_lines) if recovered_lines else 0 + else: + self.recovered_lines_count = 0 + recovered_lines = [] + recovered_lines.extend(current_result) + changed_lines = self._find_changed_lines_text_impl(recovered_lines, current_result_ocr, self.stable_frame_text, None, True, frame_stabilization_active) self.processed_stable_frame = True self.stable_frame_text = current_result return changed_lines else: + self.last_last_frame_text = self.last_frame_text self.last_frame_text = current_result self.processed_stable_frame = False self.frame_stabilization_timestamp = time.time() return [] - def _find_changed_lines_text_impl(self, current_result, previous_stable_text, filtering): + def _find_changed_lines_text_impl(self, current_result, current_result_ocr, previous_result, next_result, filtering, skip_recovered_lines): if len(current_result) == 0: return [] changed_lines = [] + current_lines = [] + current_lines_ocr = [] all_previous_text_spliced = [] - for prev_line in previous_stable_text: + if self.furigana_filter and self.language == 'ja' and isinstance(current_result_ocr, OcrResult): + for p in current_result_ocr.paragraphs: + current_lines_ocr.extend(p.lines) + + for current_line in current_result: + current_text = self._normalize_line_for_comparison(current_line) + current_lines.append(current_text) + + for prev_line in previous_result: prev_text = self._normalize_line_for_comparison(prev_line) all_previous_text_spliced.append(prev_text) + if next_result != None: + for next_text in next_result: + all_previous_text_spliced.extend(next_text) all_previous_text = ''.join(all_previous_text_spliced) logger.debug(f"Previous text: '{all_previous_text_spliced}'") first = True - for current_line in current_result: - current_text = self._normalize_line_for_comparison(current_line) + processed_valid_line = False + for i, current_text in enumerate(current_lines): if not current_text: continue - # For the first line, check if it contains the end of previous text - if filtering and first and all_previous_text: - overlap = self._find_overlap(all_previous_text, current_text) - if overlap and len(current_text) > len(overlap): - logger.debug(f"Found overlap: '{overlap}'") - current_line = self._cut_at_overlap(current_line, overlap) - logger.debug(f"After cutting: '{current_line}'") - changed_lines.append(current_line) - first = False - continue + processed_valid_line = True + is_furigana = False if len(current_text) < 3: text_similar = current_text in all_previous_text_spliced else: - text_similar = self._compare_text(current_text, all_previous_text) + text_similar = current_text in all_previous_text logger.debug(f"Current line: '{current_text}' Similar: '{text_similar}'") - if not text_similar: - changed_lines.append(current_line) - if len(current_text) >= 3: - first = False + if text_similar: + continue - return changed_lines + if skip_recovered_lines and self.recovered_lines_count > 0: + # Check if any subsequent lines start with current_text + if any(line.startswith(current_text) for line in current_lines[i+1:]): + logger.debug(f"Skipping recovered line: '{current_text}'") + self.recovered_lines_count -= 1 + continue + + if current_lines_ocr: + current_line_bbox = current_lines_ocr[i].bounding_box + # Check if line contains only kana (no kanji) + has_kanji = self.kanji_regex.search(current_text) + + if not has_kanji: + for j in range(len(current_lines_ocr)): + if i == j: + continue + if not current_lines[j]: + continue + + below_line_bbox = current_lines_ocr[j].bounding_box + below_line_text = current_lines[j] + + logger.debug(f"Furigana check against line: '{below_line_text}'") + + # Check if the line is taller + height_threshold = below_line_bbox.height * 0.6 + is_smaller = current_line_bbox.height < height_threshold + logger.debug(f"Furigana check height: '{height_threshold}' '{current_line_bbox.height}'") + if not is_smaller: + continue + + # Check if the line has kanji + below_has_kanji = self.kanji_regex.search(below_line_text) + if not below_has_kanji: + continue + + vertical_threshold = below_line_bbox.height * 0.8 + vertical_distance = below_line_bbox.center_y - current_line_bbox.center_y + horizontal_overlap = self._check_horizontal_overlap(current_line_bbox, below_line_bbox) + + logger.debug(f"Furigana check position: '{vertical_threshold}' '{vertical_distance}' '{horizontal_overlap}'") + + # If vertically close and horizontally aligned, it's likely furigana + if (0 < vertical_distance < vertical_threshold * 2 and horizontal_overlap > 0.3): # At least 30% horizontal overlap + is_furigana = True + logger.debug(f"Skipping furigana line: '{current_text}' above line: '{below_line_text}'") + break + + if is_furigana: + continue + + changed_line = current_result[i] + + if next_result != None: + logger.opt(ansi=True).debug(f"Recovered line: '{changed_line}'") + + if first and len(current_text) > 3: + first = False + # For the first line, check if it contains the end of previous text + if filtering and all_previous_text: + overlap = self._find_overlap(all_previous_text, current_text) + if overlap and len(current_text) > len(overlap): + logger.debug(f"Found overlap: '{overlap}'") + changed_line = self._cut_at_overlap(changed_line, overlap) + logger.debug(f"After cutting: '{changed_line}'") + changed_lines.append(changed_line) + + return changed_lines if processed_valid_line else [] def _find_overlap(self, previous_text, current_text): min_overlap_length = 3 @@ -592,37 +694,97 @@ class TextFiltering: return current_line - def _create_changed_regions_image(self, pil_image, changed_lines, margin=5): - img_width, img_height = pil_image.size + def _check_horizontal_overlap(self, bbox1, bbox2): + """ + Calculate the horizontal overlap ratio between two bounding boxes. + Returns a value between 0.0 (no overlap) and 1.0 (complete overlap). + """ + # Calculate left and right boundaries for both boxes + left1 = bbox1.center_x - bbox1.width / 2 + right1 = bbox1.center_x + bbox1.width / 2 + left2 = bbox2.center_x - bbox2.width / 2 + right2 = bbox2.center_x + bbox2.width / 2 + + # Calculate overlap + overlap_left = max(left1, left2) + overlap_right = min(right1, right2) + + if overlap_right <= overlap_left: + return 0.0 + + overlap_width = overlap_right - overlap_left + smaller_width = min(bbox1.width, bbox2.width) + + return overlap_width / smaller_width if smaller_width > 0 else 0.0 - regions = [] - for line in changed_lines: - bbox = line.bounding_box - x1 = (bbox.center_x - bbox.width/2) * img_width - margin - y1 = (bbox.center_y - bbox.height/2) * img_height - margin - x2 = (bbox.center_x + bbox.width/2) * img_width + margin - y2 = (bbox.center_y + bbox.height/2) * img_height + margin + def _create_changed_regions_image(self, pil_image, changed_lines, pil_image_2, changed_lines_2, margin=5): + def crop_image(image, lines): + img_width, img_height = image.size - x1 = max(0, int(x1)) - y1 = max(0, int(y1)) - x2 = min(img_width, int(x2)) - y2 = min(img_height, int(y2)) + regions = [] + for line in lines: + bbox = line.bounding_box + x1 = (bbox.center_x - bbox.width/2) * img_width - margin + y1 = (bbox.center_y - bbox.height/2) * img_height - margin + x2 = (bbox.center_x + bbox.width/2) * img_width + margin + y2 = (bbox.center_y + bbox.height/2) * img_height + margin - if x2 > x1 and y2 > y1: - regions.append((x1, y1, x2, y2)) + x1 = max(0, int(x1)) + y1 = max(0, int(y1)) + x2 = min(img_width, int(x2)) + y2 = min(img_height, int(y2)) - if not regions: + if x2 > x1 and y2 > y1: + regions.append((x1, y1, x2, y2)) + + if not regions: + return None + + overall_x1 = min(x1 for x1, y1, x2, y2 in regions) + overall_y1 = min(y1 for x1, y1, x2, y2 in regions) + overall_x2 = max(x2 for x1, y1, x2, y2 in regions) + overall_y2 = max(y2 for x1, y1, x2, y2 in regions) + + return image.crop((overall_x1, overall_y1, overall_x2, overall_y2)) + + # Handle the case where changed_lines is empty and previous_result is provided + if (not pil_image) and pil_image_2: + cropped_2 = crop_image(pil_image_2, changed_lines_2) + return cropped_2 + + # Handle the case where both current and previous results are present + elif pil_image and pil_image_2: + # Crop both images + cropped_1 = crop_image(pil_image, changed_lines) + cropped_2 = crop_image(pil_image_2, changed_lines_2) + + if cropped_1 is None and cropped_2 is None: + return None + elif cropped_1 is None: + return cropped_2 + elif cropped_2 is None: + return cropped_1 + + # Stitch vertically with previous_result on top + total_width = max(cropped_1.width, cropped_2.width) + total_height = cropped_1.height + cropped_2.height + + # Create a new image with white background + stitched_image = Image.new('RGB', (total_width, total_height), 'white') + + # Paste previous (top) and current (bottom) images, centered horizontally + prev_x_offset = (total_width - cropped_2.width) // 2 + stitched_image.paste(cropped_2, (prev_x_offset, 0)) + + curr_x_offset = (total_width - cropped_1.width) // 2 + stitched_image.paste(cropped_1, (curr_x_offset, cropped_2.height)) + + return stitched_image + elif pil_image: + return crop_image(pil_image, changed_lines) + else: return None - overall_x1 = min(x1 for x1, y1, x2, y2 in regions) - overall_y1 = min(y1 for x1, y1, x2, y2 in regions) - overall_x2 = max(x2 for x1, y1, x2, y2 in regions) - overall_y2 = max(y2 for x1, y1, x2, y2 in regions) - - result_image = pil_image.crop((overall_x1, overall_y1, overall_x2, overall_y2)) - - return result_image - class ScreenshotThread(threading.Thread): def __init__(self): @@ -993,18 +1155,14 @@ class OutputResult: logger.opt(ansi=True).warning(f'<{engine_color}>{engine_instance_2.readable_name} reported an error after {end_time - start_time:0.03f}s: {result_data_2}') else: two_pass_processing_active = True - changed_lines = self.filtering._find_changed_lines(result_data_2) + changed_lines_count, changed_regions_image = self.filtering._find_changed_lines(img_or_path, result_data_2) - if changed_lines: - logger.opt(ansi=True).info(f"<{engine_color}>{engine_instance_2.readable_name} found {len(changed_lines)} changed line(s) in {end_time - start_time:0.03f}s, re-OCRing with <{engine_color}>{engine_instance.readable_name}") + if changed_lines_count: + logger.opt(ansi=True).info(f"<{engine_color}>{engine_instance_2.readable_name} found {changed_lines_count} changed line(s) in {end_time - start_time:0.03f}s, re-OCRing with <{engine_color}>{engine_instance.readable_name}") if output_format != 'json': - changed_regions_image = self.filtering._create_changed_regions_image(img_or_path, changed_lines) - if changed_regions_image: img_or_path = changed_regions_image - else: - logger.warning('Error occurred while creating the differential image.') else: return @@ -1035,8 +1193,8 @@ class OutputResult: if result_data_text != None: if filter_text: - text_to_process = self.filtering._find_changed_lines_text(result_data_text, two_pass_processing_active) - if text_to_process == []: + text_to_process = self.filtering._find_changed_lines_text(result_data_text, result_data, two_pass_processing_active) + if len(text_to_process) == 0: return output_string = self._post_process(text_to_process, True) else: @@ -1165,7 +1323,8 @@ def on_screenshot_combo(): def run(): - logger.configure(handlers=[{'sink': sys.stderr, 'format': config.get_general('logger_format'), 'level': 'INFO'}]) + logger_level = 'DEBUG' if config.get_general('uwu') else 'INFO' + logger.configure(handlers=[{'sink': sys.stderr, 'format': config.get_general('logger_format'), 'level': logger_level}]) if config.has_config: logger.info('Parsed config file') diff --git a/pyproject.toml b/pyproject.toml index 444d184..b31f69f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "owocr" -version = "1.17" +version = "1.17.1" description = "Japanese OCR" readme = "README.md" requires-python = ">=3.11" @@ -27,7 +27,6 @@ dependencies = [ "mss", "psutil", "requests", - "rapidfuzz", "pywin32;platform_system=='Windows'", "pyobjc;platform_system=='Darwin'" ]