Add option to wait for frame stabilization (helps with slow text), allow screenshots on combo at the same time as periodic ones, lots of refactoring

This commit is contained in:
AuroraWright
2025-10-09 09:00:16 +02:00
parent 878f164533
commit be8afa6d45
3 changed files with 326 additions and 286 deletions

View File

@@ -304,6 +304,13 @@ class RequestHandler(socketserver.BaseRequestHandler):
class TextFiltering:
def __init__(self):
self.language = config.get_general('language')
self.frame_stabilization = config.get_general('screen_capture_frame_stabilization')
self.last_frame_data = None
self.stable_frame_data = None
self.last_frame_text = None
self.stable_frame_text = None
self.processed_stable_frame = False
self.cj_regex = re.compile(r'[\u3041-\u3096\u30A1-\u30FA\u4E00-\u9FFF]')
self.regex = self.get_regex()
self.kana_variants = {
'': ['', ''], '': ['', ''],
@@ -330,7 +337,7 @@ class TextFiltering:
def get_regex(self):
if self.language == 'ja':
return re.compile(r'[\u3041-\u3096\u30A1-\u30FA\u4E00-\u9FFF]')
return self.cj_regex
elif self.language == 'zh':
return re.compile(r'[\u4E00-\u9FFF]')
elif self.language == 'ko':
@@ -354,16 +361,270 @@ class TextFiltering:
converted_text = ''.join(self.kana_variants.get(char, [char])[-1] for char in text)
return converted_text
def _get_line_text(self, line):
if line.text is not None:
return line.text
text_parts = []
for w in line.words:
text_parts.append(w.text)
if w.separator is not None:
text_parts.append(w.separator)
else:
text_parts.append(' ')
return ''.join(text_parts)
def _normalize_line_for_comparison(self, line_text):
if not line_text:
return ''
filtered_text = ''.join(self.regex.findall(line_text))
if self.language == 'ja':
filtered_text = self.convert_small_kana_to_big(filtered_text)
return filtered_text
def _compare_text(self, current_text, prev_text, threshold=82):
if current_text in prev_text:
return True
if len(prev_text) > len(current_text):
return fuzz.partial_ratio(current_text, prev_text) >= threshold
return fuzz.ratio(current_text, prev_text) >= threshold
def _find_changed_lines(self, current_result):
if (self.last_frame_data is None or self.stable_frame_data is None or
(self.stable_frame_data and (current_result.image_properties.width != self.stable_frame_data.image_properties.width or
current_result.image_properties.height != self.stable_frame_data.image_properties.height))):
self.stable_frame_data = copy.deepcopy(current_result)
self.last_frame_data = copy.deepcopy(current_result)
changed_lines = []
for p in current_result.paragraphs:
changed_lines.extend(p.lines)
return changed_lines
if not self.frame_stabilization:
changed_lines = self._find_changed_lines_impl(current_result, self.last_frame_data)
self.last_frame_data = copy.deepcopy(current_result)
return changed_lines
frames_match = self._find_changed_lines_impl(current_result, self.last_frame_data) == []
logger.debug(f"Frames match: '{frames_match}'")
if frames_match:
if self.processed_stable_frame:
return []
changed_lines = self._find_changed_lines_impl(current_result, self.stable_frame_data)
self.processed_stable_frame = True
self.stable_frame_data = copy.deepcopy(current_result)
return changed_lines
else:
self.last_frame_data = copy.deepcopy(current_result)
self.processed_stable_frame = False
return []
def _find_changed_lines_impl(self, current_result, previous_result):
changed_lines = []
current_lines = []
previous_lines = []
for p in current_result.paragraphs:
current_lines.extend(p.lines)
if len(current_lines) == 0:
return []
for p in previous_result.paragraphs:
previous_lines.extend(p.lines)
all_previous_text_spliced = []
for prev_line in previous_lines:
prev_text = self._get_line_text(prev_line)
prev_text = self._normalize_line_for_comparison(prev_text)
all_previous_text_spliced.append(prev_text)
all_previous_text = ''.join(all_previous_text_spliced)
logger.debug(f"Previous text: '{all_previous_text_spliced}'")
first = True
for current_line in current_lines:
current_text = self._get_line_text(current_line)
current_text = self._normalize_line_for_comparison(current_text)
if not current_text:
continue
# For the first line, check if it contains the end of previous text
if first and all_previous_text:
overlap = self._find_overlap(all_previous_text, current_text)
if overlap and len(current_text) > len(overlap):
logger.debug(f"Found overlap: '{overlap}'")
changed_lines.append(current_line)
first = False
continue
if len(current_text) < 3:
text_similar = current_text in all_previous_text_spliced
else:
text_similar = self._compare_text(current_text, all_previous_text)
logger.debug(f"Current line: '{current_text}' Similar: '{text_similar}'")
if not text_similar:
changed_lines.append(current_line)
if len(current_text) >= 3:
first = False
return changed_lines
def _find_changed_lines_text(self, current_result, two_pass_processing_active=False):
if not self.frame_stabilization or two_pass_processing_active:
if self.last_frame_text:
changed_lines = self._find_changed_lines_text_impl(current_result, self.last_frame_text, True)
self.last_frame_text = current_result
return changed_lines
else:
self.last_frame_text = current_result
return current_result
if self.last_frame_text is None or self.stable_frame_text is None:
self.stable_frame_text = current_result
self.last_frame_text = current_result
return current_result
frames_match = self._find_changed_lines_text_impl(current_result, self.last_frame_text, False) == []
logger.debug(f"Frames match: '{frames_match}'")
if frames_match:
if self.processed_stable_frame:
return []
changed_lines = self._find_changed_lines_text_impl(current_result, self.stable_frame_text, True)
self.processed_stable_frame = True
self.stable_frame_text = current_result
return changed_lines
else:
self.last_frame_text = current_result
self.processed_stable_frame = False
return []
def _find_changed_lines_text_impl(self, current_result, previous_stable_text, filtering):
if len(current_result) == 0:
return []
changed_lines = []
all_previous_text_spliced = []
for prev_line in previous_stable_text:
prev_text = self._normalize_line_for_comparison(prev_line)
all_previous_text_spliced.append(prev_text)
all_previous_text = ''.join(all_previous_text_spliced)
logger.debug(f"Previous text: '{all_previous_text_spliced}'")
first = True
for current_line in current_result:
current_text = self._normalize_line_for_comparison(current_line)
if not current_text:
continue
# For the first line, check if it contains the end of previous text
if filtering and first and all_previous_text:
overlap = self._find_overlap(all_previous_text, current_text)
if overlap and len(current_text) > len(overlap):
logger.debug(f"Found overlap: '{overlap}'")
current_line = self._cut_at_overlap(current_line, overlap)
logger.debug(f"After cutting: '{current_line}'")
changed_lines.append(current_line)
first = False
continue
if len(current_text) < 3:
text_similar = current_text in all_previous_text_spliced
else:
text_similar = self._compare_text(current_text, all_previous_text)
logger.debug(f"Current line: '{current_text}' Similar: '{text_similar}'")
if not text_similar:
changed_lines.append(current_line)
if len(current_text) >= 3:
first = False
return changed_lines
def _find_overlap(self, previous_text, current_text):
min_overlap_length = 3
max_overlap_length = min(len(previous_text), len(current_text))
for overlap_length in range(max_overlap_length, min_overlap_length - 1, -1):
previous_end = previous_text[-overlap_length:]
current_start = current_text[:overlap_length]
if previous_end == current_start:
return previous_end
return None
def _cut_at_overlap(self, current_line, overlap):
pattern_parts = []
for char in overlap:
if char in self.kana_variants:
variants = self.kana_variants[char]
pattern_parts.append(f'[{"".join(variants)}]')
else:
pattern_parts.append(re.escape(char))
overlap_pattern = r'.*?'.join(pattern_parts)
full_pattern = r'^.*?' + overlap_pattern
logger.debug(f"Cut regex: '{full_pattern}'")
match = re.search(full_pattern, current_line)
if match:
cut_position = match.end()
return current_line[cut_position:]
return current_line
def _create_changed_regions_image(self, pil_image, changed_lines, margin=5):
img_width, img_height = pil_image.size
regions = []
for line in changed_lines:
bbox = line.bounding_box
x1 = (bbox.center_x - bbox.width/2) * img_width - margin
y1 = (bbox.center_y - bbox.height/2) * img_height - margin
x2 = (bbox.center_x + bbox.width/2) * img_width + margin
y2 = (bbox.center_y + bbox.height/2) * img_height + margin
x1 = max(0, int(x1))
y1 = max(0, int(y1))
x2 = min(img_width, int(x2))
y2 = min(img_height, int(y2))
if x2 > x1 and y2 > y1:
regions.append((x1, y1, x2, y2))
if not regions:
return None
overall_x1 = min(x1 for x1, y1, x2, y2 in regions)
overall_y1 = min(y1 for x1, y1, x2, y2 in regions)
overall_x2 = max(x2 for x1, y1, x2, y2 in regions)
overall_y2 = max(y2 for x1, y1, x2, y2 in regions)
result_image = pil_image.crop((overall_x1, overall_y1, overall_x2, overall_y2))
return result_image
class ScreenshotThread(threading.Thread):
def __init__(self, screen_capture_on_combo):
def __init__(self):
super().__init__(daemon=True)
screen_capture_area = config.get_general('screen_capture_area')
self.macos_window_tracker_instance = None
self.windows_window_tracker_instance = None
self.screencapture_window_active = True
self.screencapture_window_visible = True
self.use_periodic_queue = not screen_capture_on_combo
if screen_capture_area == '':
self.screencapture_mode = 0
elif screen_capture_area.startswith('screen_'):
@@ -460,6 +721,7 @@ class ScreenshotThread(threading.Thread):
logger.opt(ansi=True).info(f'Selected window: {window_title}')
else:
raise ValueError('Window capture is only currently supported on Windows and macOS')
self.is_combo_screenshot = False
def get_windows_window_handle(self, window_title):
def callback(hwnd, window_title_part):
@@ -568,10 +830,11 @@ class ScreenshotThread(threading.Thread):
on_window_closed(False)
def write_result(self, result):
if self.use_periodic_queue:
periodic_screenshot_queue.put(result)
else:
if self.is_combo_screenshot:
self.is_combo_screenshot = False
image_queue.put((result, True))
else:
periodic_screenshot_queue.put(result)
def run(self):
if self.screencapture_mode != 2:
@@ -681,260 +944,28 @@ class AutopauseTimer:
class OutputResult:
def __init__(self, init_filtering):
self.filtering = TextFiltering() if init_filtering else None
self.cj_regex = re.compile(r'[\u3041-\u3096\u30A1-\u30FA\u4E00-\u9FFF]')
self.previous_result = None
self.previous_result_text = None
def _coordinate_format_to_string(self, result_data):
full_text_parts = []
for p in result_data.paragraphs:
for l in p.lines:
full_text_parts.append(self._get_line_text(l))
full_text_parts.append('\n')
return ''.join(full_text_parts)
def __init__(self):
self.filtering = TextFiltering()
def _post_process(self, text, strip_spaces):
is_cj_text = self.cj_regex.search(text)
is_cj_text = self.filtering.cj_regex.search(''.join(text))
line_separator = '' if strip_spaces else ' '
if is_cj_text:
text = line_separator.join([''.join(i.split()) for i in text.splitlines()])
text = line_separator.join([''.join(i.split()) for i in text])
else:
text = line_separator.join([re.sub(r'\s+', ' ', i).strip() for i in text.splitlines()])
text = line_separator.join([re.sub(r'\s+', ' ', i).strip() for i in text])
text = text.replace('', '...')
text = re.sub('[・.]{2,}', lambda x: (x.end() - x.start()) * '.', text)
if is_cj_text:
text = jaconv.h2z(text, ascii=True, digit=True)
return text
def _get_line_text(self, line):
if line.text is not None:
return line.text
text_parts = []
for w in line.words:
text_parts.append(w.text)
if w.separator is not None:
text_parts.append(w.separator)
else:
text_parts.append(' ')
return ''.join(text_parts)
def _compare_text(self, current_text, prev_text, threshold=82):
if current_text in prev_text:
return True
if len(prev_text) > len(current_text):
return fuzz.partial_ratio(current_text, prev_text) >= threshold
return fuzz.ratio(current_text, prev_text) >= threshold
def _find_changed_lines(self, current_result, previous_result):
changed_lines = []
# If no previous result, all lines are considered changed
if previous_result is None:
for p in current_result.paragraphs:
changed_lines.extend(p.lines)
return changed_lines
# Check if image sizes are different - if so, treat all lines as changed
if (current_result.image_properties.width != previous_result.image_properties.width or
current_result.image_properties.height != previous_result.image_properties.height):
for p in current_result.paragraphs:
changed_lines.extend(p.lines)
return changed_lines
current_lines = []
previous_lines = []
for p in current_result.paragraphs:
current_lines.extend(p.lines)
for p in previous_result.paragraphs:
previous_lines.extend(p.lines)
all_previous_text_spliced = []
for prev_line in previous_lines:
prev_text = self._get_line_text(prev_line)
prev_text = ''.join(self.filtering.regex.findall(prev_text))
if self.filtering.language == 'ja':
prev_text = self.filtering.convert_small_kana_to_big(prev_text)
all_previous_text_spliced.append(prev_text)
all_previous_text = ''.join(all_previous_text_spliced)
logger.debug(f"Previous text: '{all_previous_text_spliced}'")
first = True
for current_line in current_lines:
current_text = self._get_line_text(current_line)
current_text = ''.join(self.filtering.regex.findall(current_text))
if not current_text:
continue
if self.filtering.language == 'ja':
current_text = self.filtering.convert_small_kana_to_big(current_text)
# For the first line, check if it contains the end of previous text
if first and all_previous_text:
overlap = self._find_overlap(all_previous_text, current_text)
if overlap and len(current_text) > len(overlap):
logger.debug(f"Found overlap: '{overlap}'")
changed_lines.append(current_line)
first = False
continue
if len(current_text) < 3:
text_similar = current_text in all_previous_text_spliced
else:
text_similar = self._compare_text(current_text, all_previous_text)
logger.debug(f"Current line: '{current_text}' Similar: '{text_similar}'")
if not text_similar:
changed_lines.append(current_line)
if len(current_text) >= 3:
first = False
return changed_lines
def _find_overlap(self, previous_text, current_text):
"""Find the overlapping portion between the end of previous_text and start of current_text."""
# Try different overlap lengths, starting from the maximum possible
min_overlap_length = 3 # Minimum overlap to consider meaningful
max_overlap_length = min(len(previous_text), len(current_text))
for overlap_length in range(max_overlap_length, min_overlap_length - 1, -1):
previous_end = previous_text[-overlap_length:]
current_start = current_text[:overlap_length]
if previous_end == current_start:
return previous_end
return None
def _cut_at_overlap(self, current_line, overlap):
pattern_parts = []
for char in overlap:
# Check if character is kana and has small/big variants
if char in self.filtering.kana_variants:
# Use character class that matches both small and big variants
variants = self.filtering.kana_variants[char]
pattern_parts.append(f'[{"".join(variants)}]')
else:
# Escape regex special characters for regular characters
pattern_parts.append(re.escape(char))
# Create pattern: overlap characters with any characters (0 or more) between them
overlap_pattern = r'.*?'.join(pattern_parts)
# Also allow any characters at the beginning
full_pattern = r'^.*?' + overlap_pattern
logger.debug(f"Cut regex: '{full_pattern}'")
# Find the match
match = re.search(full_pattern, current_line)
if match:
# Cut after the matched overlapping portion
cut_position = match.end()
return current_line[cut_position:]
return current_line
def _find_changed_lines_text(self, current_result):
# Split both results into lines
current_lines = current_result.split('\n')
# If no previous result, all lines are considered changed
if self.previous_result_text is None:
self.previous_result_text = current_lines[-10:] # Keep only last 10 lines
return current_result
changed_lines = []
all_previous_text_spliced = []
for prev_line in self.previous_result_text:
prev_text = ''.join(self.filtering.regex.findall(prev_line))
if self.filtering.language == 'ja':
prev_text = self.filtering.convert_small_kana_to_big(prev_text)
all_previous_text_spliced.append(prev_text)
all_previous_text = ''.join(all_previous_text_spliced)
logger.debug(f"Previous text: '{all_previous_text_spliced}'")
first = True
# Check each current line against the combined previous text
for current_line in current_lines:
current_text = ''.join(self.filtering.regex.findall(current_line))
if not current_text:
continue
if self.filtering.language == 'ja':
current_text = self.filtering.convert_small_kana_to_big(current_text)
# For the first line, check if it contains the end of previous text
if first and all_previous_text:
overlap = self._find_overlap(all_previous_text, current_text)
if overlap and len(current_text) > len(overlap):
logger.debug(f"Found overlap: '{overlap}'")
# Cut the current_line to remove the overlapping part
current_line = self._cut_at_overlap(current_line, overlap)
logger.debug(f"After cutting: '{current_line}'")
changed_lines.append(current_line)
first = False
continue
if len(current_text) < 3:
text_similar = current_text in all_previous_text_spliced
else:
text_similar = self._compare_text(current_text, all_previous_text)
logger.debug(f"Current line: '{current_text}' Similar: '{text_similar}'")
if not text_similar:
changed_lines.append(current_line)
if len(current_text) >= 3:
first = False
# Update cache with current lines, keeping only the last 10
self.previous_result_text.extend(current_lines)
self.previous_result_text = self.previous_result_text[-10:]
return '\n'.join(changed_lines)
def _create_changed_regions_image(self, pil_image, changed_lines, margin=5):
img_width, img_height = pil_image.size
# Convert normalized coordinates to pixel coordinates
regions = []
for line in changed_lines:
bbox = line.bounding_box
# Convert center-based bbox to corner-based
x1 = (bbox.center_x - bbox.width/2) * img_width - margin
y1 = (bbox.center_y - bbox.height/2) * img_height - margin
x2 = (bbox.center_x + bbox.width/2) * img_width + margin
y2 = (bbox.center_y + bbox.height/2) * img_height + margin
# Ensure coordinates are within image bounds
x1 = max(0, int(x1))
y1 = max(0, int(y1))
x2 = min(img_width, int(x2))
y2 = min(img_height, int(y2))
if x2 > x1 and y2 > y1: #Only add valid regions
regions.append((x1, y1, x2, y2))
if not regions:
return None
# Calculate the bounding box that contains all regions
overall_x1 = min(x1 for x1, y1, x2, y2 in regions)
overall_y1 = min(y1 for x1, y1, x2, y2 in regions)
overall_x2 = max(x2 for x1, y1, x2, y2 in regions)
overall_y2 = max(y2 for x1, y1, x2, y2 in regions)
# Crop the single rectangle containing all changed regions
result_image = pil_image.crop((overall_x1, overall_y1, overall_x2, overall_y2))
return result_image
def _extract_lines_from_result(self, result_data):
lines = []
for p in result_data.paragraphs:
for l in p.lines:
lines.append(self.filtering._get_line_text(l))
return lines
def __call__(self, img_or_path, filter_text, notify):
if auto_pause_handler and not filter_text:
@@ -943,6 +974,7 @@ class OutputResult:
output_format = config.get_general('output_format')
engine_color = config.get_general('engine_color')
engine_instance = engine_instances[engine_index]
two_pass_processing_active = False
if filter_text and engine_index_2 != -1 and engine_index_2 != engine_index:
engine_instance_2 = engine_instances[engine_index_2]
@@ -953,15 +985,14 @@ class OutputResult:
if not res2:
logger.opt(ansi=True).warning(f'<{engine_color}>{engine_instance_2.readable_name}</{engine_color}> reported an error after {end_time - start_time:0.03f}s: {result_data_2}')
else:
changed_lines = self._find_changed_lines(result_data_2, self.previous_result)
two_pass_processing_active = True
changed_lines = self.filtering._find_changed_lines(result_data_2)
self.previous_result = copy.deepcopy(result_data_2)
if len(changed_lines) > 0:
if changed_lines:
logger.opt(ansi=True).info(f"<{engine_color}>{engine_instance_2.readable_name}</{engine_color}> found {len(changed_lines)} changed line(s) in {end_time - start_time:0.03f}s, re-OCRing with <{engine_color}>{engine_instance.readable_name}</{engine_color}>")
if output_format != 'json':
changed_regions_image = self._create_changed_regions_image(img_or_path, changed_lines)
changed_regions_image = self.filtering._create_changed_regions_image(img_or_path, changed_lines)
if changed_regions_image:
img_or_path = changed_regions_image
@@ -984,7 +1015,7 @@ class OutputResult:
result_data_text = None
if isinstance(result_data, OcrResult):
unprocessed_text = self._coordinate_format_to_string(result_data)
unprocessed_text = self._extract_lines_from_result(result_data)
if output_format == 'json':
result_dict = asdict(result_data)
@@ -995,15 +1026,17 @@ class OutputResult:
else:
result_data_text = result_data
if result_data_text:
if output_format == 'json':
logger.warning(f"Engine '{engine_instance.name}' does not support JSON output. Falling back to text.")
if result_data_text != None:
if filter_text:
text_to_process = self._find_changed_lines_text(result_data_text)
text_to_process = self.filtering._find_changed_lines_text(result_data_text, two_pass_processing_active)
if text_to_process == []:
return
output_string = self._post_process(text_to_process, True)
else:
output_string = self._post_process(result_data_text, False)
log_message = output_string
if output_format == 'json':
logger.warning(f"Engine '{engine_instance.name}' does not support JSON output. Falling back to text.")
if verbosity != 0:
if verbosity < -1:
@@ -1120,6 +1153,7 @@ def on_window_closed(alive):
def on_screenshot_combo():
screenshot_thread.is_combo_screenshot = True
screenshot_event.set()
@@ -1195,13 +1229,13 @@ def run():
directory_watcher_thread = None
unix_socket_server = None
key_combo_listener = None
init_filtering = False
auto_pause_handler = None
engine_index = engine_keys.index(default_engine) if default_engine != '' else 0
engine_index_2 = engine_keys.index(engine_secondary) if engine_secondary != '' else -1
engine_color = config.get_general('engine_color')
combo_pause = config.get_general('combo_pause')
combo_engine_switch = config.get_general('combo_engine_switch')
screen_capture_periodic = False
screen_capture_on_combo = False
notifier = DesktopNotifierSync()
image_queue = queue.Queue()
@@ -1226,13 +1260,13 @@ def run():
if screen_capture_combo != '':
screen_capture_on_combo = True
key_combos[screen_capture_combo] = on_screenshot_combo
else:
if screen_capture_delay_secs != -1:
global periodic_screenshot_queue
periodic_screenshot_queue = queue.Queue()
screen_capture_periodic = True
screenshot_event = threading.Event()
screenshot_thread = ScreenshotThread(screen_capture_on_combo)
screenshot_thread = ScreenshotThread()
screenshot_thread.start()
init_filtering = True
read_from_readable.append('screen capture')
if 'websocket' in (read_from, read_from_secondary):
read_from_readable.append('websocket')
@@ -1261,7 +1295,7 @@ def run():
directory_watcher_thread.start()
read_from_readable.append(f'directory {read_from_path}')
output_result = OutputResult(init_filtering)
output_result = OutputResult()
if len(key_combos) > 0:
key_combo_listener = keyboard.GlobalHotKeys(key_combos)
@@ -1275,9 +1309,8 @@ def run():
write_to_readable = f'file {write_to}'
process_queue = (any(i in ('clipboard', 'websocket', 'unixsocket') for i in (read_from, read_from_secondary)) or read_from_path or screen_capture_on_combo)
process_screenshots = 'screencapture' in (read_from, read_from_secondary) and not screen_capture_on_combo
signal.signal(signal.SIGINT, signal_handler)
if (not process_screenshots) and auto_pause != 0:
if (not screen_capture_periodic) and auto_pause != 0:
auto_pause_handler = AutopauseTimer(auto_pause)
user_input_thread = threading.Thread(target=user_input_thread_run, daemon=True)
user_input_thread.start()
@@ -1299,17 +1332,22 @@ def run():
if process_queue:
try:
img, filter_text = image_queue.get(timeout=0.1)
if screen_capture_periodic:
filter_text = False
notify = True
except queue.Empty:
pass
if (not img) and process_screenshots:
if (not img) and screen_capture_periodic:
if (not paused) and screenshot_thread.screencapture_window_active and screenshot_thread.screencapture_window_visible and (time.time() - last_screenshot_time) > screen_capture_delay_secs:
screenshot_event.set()
img = periodic_screenshot_queue.get()
filter_text = True
notify = False
last_screenshot_time = time.time()
try:
img = periodic_screenshot_queue.get(timeout=0.1)
filter_text = True
notify = False
last_screenshot_time = time.time()
except queue.Empty:
pass
if img == 0:
on_window_closed(False)