Files
owocr/owocr/ocr.py
2025-11-23 12:34:02 +01:00

2143 lines
75 KiB
Python

import re
import os
import io
from pathlib import Path
import sys
import platform
import logging
from math import sqrt, sin, cos, atan2
import json
import base64
import urllib
from urllib.parse import urlparse, parse_qs
from dataclasses import dataclass, field, asdict
from typing import List, Optional
import jaconv
import numpy as np
from PIL import Image
from loguru import logger
import requests
try:
from manga_ocr import MangaOcr as MOCR
from comic_text_detector.inference import TextDetector
from scipy.signal.windows import gaussian
import torch
import cv2
except ImportError:
pass
try:
import Vision
import objc
from AppKit import NSData, NSImage, NSBundle
from CoreFoundation import CFRunLoopRunInMode, kCFRunLoopDefaultMode, CFRunLoopStop, CFRunLoopGetCurrent
except ImportError:
pass
try:
from google.cloud import vision
from google.oauth2 import service_account
from google.api_core.exceptions import ServiceUnavailable
except ImportError:
pass
try:
from azure.ai.vision.imageanalysis import ImageAnalysisClient
from azure.ai.vision.imageanalysis.models import VisualFeatures
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import ServiceRequestError
except ImportError:
pass
try:
import easyocr
except ImportError:
pass
try:
from rapidocr import RapidOCR as ROCR
from rapidocr import EngineType, LangDet, LangRec, ModelType, OCRVersion
except ImportError:
pass
try:
from meikiocr import MeikiOCR as MKOCR
except ImportError:
pass
try:
import winocr
except ImportError:
pass
try:
import oneocr
except ImportError:
pass
try:
import betterproto
from .lens_betterproto import *
import random
except ImportError:
pass
try:
import fpng_py
optimized_png_encode = True
except:
optimized_png_encode = False
manga_ocr_model = None
@dataclass
class BoundingBox:
"""
Represents the normalized coordinates of a detected element.
All values are floats between 0.0 and 1.0.
"""
center_x: float
center_y: float
width: float
height: float
rotation_z: Optional[float] = None # Optional rotation in radians
@property
def left(self) -> float:
return self.center_x - self.width / 2
@property
def right(self) -> float:
return self.center_x + self.width / 2
@property
def top(self) -> float:
return self.center_y - self.height / 2
@property
def bottom(self) -> float:
return self.center_y + self.height / 2
@dataclass
class Word:
"""Represents a single recognized word and its properties."""
text: str
bounding_box: BoundingBox
separator: Optional[str] = None # The character(s) that follow the word, e.g., a space
@dataclass
class Line:
"""Represents a single line of text, composed of words."""
bounding_box: BoundingBox
words: List[Word] = field(default_factory=list)
text: Optional[str] = None
@dataclass
class Paragraph:
"""Represents a block of text, composed of lines."""
bounding_box: BoundingBox
lines: List[Line] = field(default_factory=list)
writing_direction: Optional[str] = None # Optional: e.g., "LEFT_TO_RIGHT"
@dataclass
class ImageProperties:
"""Stores the original dimensions of the processed image."""
width: int
height: int
@dataclass
class EngineCapabilities:
"""
Represents the features natively supported by the OCR engine.
"""
words: bool
word_bounding_boxes: bool
lines: bool
line_bounding_boxes: bool
paragraphs: bool
paragraph_bounding_boxes: bool
@dataclass
class OcrResult:
"""The root object for a complete OCR analysis of an image."""
image_properties: ImageProperties
engine_capabilities: EngineCapabilities
paragraphs: List[Paragraph] = field(default_factory=list)
def initialize_manga_ocr(pretrained_model_name_or_path, force_cpu):
def empty_post_process(text):
text = re.sub(r'\s+', '', text)
return text
global manga_ocr_model
if not manga_ocr_model:
logger.disable('manga_ocr')
logging.getLogger('transformers').setLevel(logging.ERROR) # silence transformers >=4.46 warnings
from manga_ocr import ocr
ocr.post_process = empty_post_process
logger.info(f'Loading Manga OCR model')
manga_ocr_model = MOCR(pretrained_model_name_or_path, force_cpu)
def input_to_pil_image(img):
is_path = False
if isinstance(img, Image.Image):
pil_image = img
elif isinstance(img, (bytes, bytearray)):
pil_image = Image.open(io.BytesIO(img))
elif isinstance(img, Path):
is_path = True
try:
pil_image = Image.open(img)
pil_image.load()
except (UnidentifiedImageError, OSError) as e:
return None
else:
raise ValueError(f'img must be a path, PIL.Image or bytes object, instead got: {img}')
return pil_image, is_path
def pil_image_to_bytes(img, img_format='png', png_compression=6, jpeg_quality=80, optimize=False):
if img_format == 'png' and optimized_png_encode and not optimize:
raw_data = img.convert('RGBA').tobytes()
image_bytes = fpng_py.fpng_encode_image_to_memory(raw_data, img.width, img.height)
else:
image_bytes = io.BytesIO()
if img_format == 'jpeg':
img = img.convert('RGB')
img.save(image_bytes, format=img_format, compress_level=png_compression, quality=jpeg_quality, optimize=optimize, subsampling=0)
image_bytes = image_bytes.getvalue()
return image_bytes
def pil_image_to_numpy_array(img):
return np.array(img.convert('RGBA'))
def limit_image_size(img, max_size):
img_bytes = pil_image_to_bytes(img)
if len(img_bytes) <= max_size:
return img_bytes, 'png', img.size
scaling_factor = 0.60 if any(x > 2000 for x in img.size) else 0.75
new_w = int(img.width * scaling_factor)
new_h = int(img.height * scaling_factor)
resized_img = img.resize((new_w, new_h), Image.Resampling.LANCZOS)
resized_img_bytes = pil_image_to_bytes(resized_img)
if len(resized_img_bytes) <= max_size:
return resized_img_bytes, 'png', resized_img.size
for _ in range(2):
jpeg_quality = 80
while jpeg_quality >= 60:
img_bytes = pil_image_to_bytes(img, 'jpeg', jpeg_quality=jpeg_quality, optimize=True)
if len(img_bytes) <= max_size:
return img_bytes, 'jpeg', img.size
jpeg_quality -= 5
img = resized_img
return False, '', (None, None)
def quad_to_bounding_box(x1, y1, x2, y2, x3, y3, x4, y4, img_width=None, img_height=None):
center_x = (x1 + x2 + x3 + x4) / 4
center_y = (y1 + y2 + y3 + y4) / 4
# Calculate widths using Euclidean distance
width1 = sqrt((x2 - x1)**2 + (y2 - y1)**2)
width2 = sqrt((x3 - x4)**2 + (y3 - y4)**2)
avg_width = (width1 + width2) / 2
# Calculate heights using Euclidean distance
height1 = sqrt((x4 - x1)**2 + (y4 - y1)**2)
height2 = sqrt((x3 - x2)**2 + (y3 - y2)**2)
avg_height = (height1 + height2) / 2
# Calculate rotation angle from the first edge
dx = x2 - x1
dy = y2 - y1
angle = atan2(dy, dx)
if img_width and img_height:
center_x = center_x / img_width
center_y = center_y / img_height
avg_width = avg_width / img_width
avg_height = avg_height / img_height
return BoundingBox(
center_x=center_x,
center_y=center_y,
width=avg_width,
height=avg_height,
rotation_z=angle
)
def rectangle_to_bounding_box(x1, y1, x2, y2, img_width=None, img_height=None):
width = x2 - x1
height = y2 - y1
center_x = (x1 + x2) / 2
center_y = (y1 + y2) / 2
if img_width and img_height:
width = width / img_width
height = height / img_height
center_x = center_x / img_width
center_y = center_y / img_height
return BoundingBox(
center_x=center_x,
center_y=center_y,
width=width,
height=height
)
def merge_bounding_boxes(ocr_element_list, rotated=False):
def _get_all_corners(ocr_element_list):
corners = []
for element in ocr_element_list:
bbox = element.bounding_box
angle = bbox.rotation_z or 0.0
hw, hh = bbox.width / 2.0, bbox.height / 2.0
cx, cy = bbox.center_x, bbox.center_y
# Local corner offsets
local = np.array([[-hw, -hh], [hw, -hh], [hw, hh], [-hw, hh]])
if abs(angle) < 1e-12:
corners.append(local + [cx, cy])
else:
# Rotation matrix
cos_a, sin_a = np.cos(angle), np.sin(angle)
rot = np.array([[cos_a, -sin_a], [sin_a, cos_a]])
corners.append(local @ rot.T + [cx, cy])
return np.vstack(corners) if corners else np.empty((0, 2))
def _convex_hull(points):
if len(points) <= 3:
return points
pts = np.unique(points, axis=0)
pts = pts[np.lexsort((pts[:, 1], pts[:, 0]))]
if len(pts) <= 1:
return pts
def cross(o, a, b):
return (a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0])
lower, upper = [], []
for p in pts:
while len(lower) >= 2 and cross(lower[-2], lower[-1], p) <= 0:
lower.pop()
lower.append(p)
for p in pts[::-1]:
while len(upper) >= 2 and cross(upper[-2], upper[-1], p) <= 0:
upper.pop()
upper.append(p)
return np.array(lower[:-1] + upper[:-1])
all_corners = _get_all_corners(ocr_element_list)
# Axis-aligned case
if not rotated:
min_pt, max_pt = all_corners.min(axis=0), all_corners.max(axis=0)
center = (min_pt + max_pt) / 2
size = max_pt - min_pt
return BoundingBox(
center_x=center[0],
center_y=center[1],
width=size[0],
height=size[1]
)
hull = _convex_hull(all_corners)
m = len(hull)
# Trivial cases
if m == 1:
return BoundingBox(
center_x=hull[0, 0],
center_y=hull[0, 1],
width=0.0,
height=0.0,
rotation_z=0.0
)
if m == 2:
diff = hull[1] - hull[0]
length = np.linalg.norm(diff)
center = hull.mean(axis=0)
return BoundingBox(
center_x=center[0],
center_y=center[1],
width=length,
height=0.0,
rotation_z=np.arctan2(diff[1], diff[0])
)
# Test each edge orientation
edges = np.roll(hull, -1, axis=0) - hull
edge_lengths = np.linalg.norm(edges, axis=1)
valid = edge_lengths > 1e-12
if not valid.any():
# Fallback to axis-aligned
min_pt, max_pt = all_corners.min(axis=0), all_corners.max(axis=0)
center = (min_pt + max_pt) / 2
size = max_pt - min_pt
return BoundingBox(
center_x=center[0],
center_y=center[1],
width=size[0],
height=size[1]
)
angles = np.arctan2(edges[valid, 1], edges[valid, 0])
best_area, best_idx = np.inf, -1
for idx, angle in enumerate(angles):
# Rotation matrix (rotate by -angle)
cos_a, sin_a = np.cos(angle), np.sin(angle)
rot = np.array([[cos_a, sin_a], [-sin_a, cos_a]])
rotated = hull @ rot.T
min_pt, max_pt = rotated.min(axis=0), rotated.max(axis=0)
area = np.prod(max_pt - min_pt)
if area < best_area:
best_area, best_idx = area, idx
best_bounds = (min_pt, max_pt, angle)
min_pt, max_pt, angle = best_bounds
width, height = max_pt - min_pt
center_rot = (min_pt + max_pt) / 2
# Rotate center back to global coordinates
cos_a, sin_a = np.cos(angle), np.sin(angle)
rot_back = np.array([[cos_a, -sin_a], [sin_a, cos_a]])
center = rot_back @ center_rot
# Normalize angle to [-π, π]
angle = np.mod(angle + np.pi, 2 * np.pi) - np.pi
return BoundingBox(
center_x=center[0],
center_y=center[1],
width=width,
height=height,
rotation_z=angle
)
class MangaOcrSegmented:
name = 'mangaocrs'
readable_name = 'Manga OCR (segmented)'
key = 'n'
config_entry = 'mangaocr'
available = False
local = True
manual_language = False
coordinate_support = True
threading_support = True
capabilities = EngineCapabilities(
words=False,
word_bounding_boxes=False,
lines=True,
line_bounding_boxes=True,
paragraphs=True,
paragraph_bounding_boxes=True
)
def __init__(self, config={}):
if 'manga_ocr' not in sys.modules:
logger.warning('manga-ocr not available, Manga OCR (segmented) will not work!')
elif 'scipy' not in sys.modules:
logger.warning('scipy not available, Manga OCR (segmented) will not work!')
else:
comic_text_detector_path = Path.home() / ".cache" / "manga-ocr"
comic_text_detector_file = comic_text_detector_path / "comictextdetector.pt"
if not comic_text_detector_file.exists():
comic_text_detector_path.mkdir(parents=True, exist_ok=True)
logger.info('Downloading comic text detector model ' + str(comic_text_detector_file))
try:
urllib.request.urlretrieve('https://github.com/zyddnys/manga-image-translator/releases/download/beta-0.3/comictextdetector.pt', str(comic_text_detector_file))
except:
logger.warning('Download failed. Manga OCR (segmented) will not work!')
return
pretrained_model_name_or_path = config.get('pretrained_model_name_or_path', 'kha-white/manga-ocr-base')
force_cpu = config.get('force_cpu', False)
initialize_manga_ocr(pretrained_model_name_or_path, force_cpu)
if not force_cpu and torch.cuda.is_available():
device = 'cuda'
elif not force_cpu and torch.backends.mps.is_available():
device = 'mps'
else:
device = 'cpu'
logger.info(f'Loading comic text detector model, using device {device}')
self.text_detector_model = TextDetector(model_path=comic_text_detector_file, input_size=1024, device=device, act='leaky')
self.available = True
logger.info('Manga OCR (segmented) ready')
def _convert_line_bbox(self, rect, img_width, img_height):
(x1, y1), (x2, y2), (x3, y3), (x4, y4) = [(float(x), float(y)) for x, y in rect]
return quad_to_bounding_box(x1, y1, x2, y2, x3, y3, x4, y4, img_width, img_height)
def _convert_box_bbox(self, rect, img_width, img_height):
x1, y1, x2, y2 = map(float, rect)
return rectangle_to_bounding_box(x1, y1, x2, y2, img_width, img_height)
# from https://github.com/kha-white/mokuro/blob/master/mokuro/manga_page_ocr.py
def _split_into_chunks(self, img, mask_refined, blk, line_idx, textheight, max_ratio, anchor_window):
line_crop = blk.get_transformed_region(img, line_idx, textheight)
h, w, *_ = line_crop.shape
ratio = w / h
if ratio <= max_ratio:
return [line_crop], []
else:
k = gaussian(textheight * 2, textheight / 8)
line_mask = blk.get_transformed_region(mask_refined, line_idx, textheight)
num_chunks = int(np.ceil(ratio / max_ratio))
anchors = np.linspace(0, w, num_chunks + 1)[1:-1]
line_density = line_mask.sum(axis=0)
line_density = np.convolve(line_density, k, 'same')
line_density /= line_density.max()
anchor_window *= textheight
cut_points = []
for anchor in anchors:
anchor = int(anchor)
n0 = np.clip(anchor - anchor_window // 2, 0, w)
n1 = np.clip(anchor + anchor_window // 2, 0, w)
p = line_density[n0:n1].argmin()
p += n0
cut_points.append(p)
return np.split(line_crop, cut_points, axis=1), cut_points
# derived from https://github.com/kha-white/mokuro/blob/master/mokuro/manga_page_ocr.py
def _to_generic_result(self, mask_refined, blk_list, img_np, img_height, img_width):
paragraphs = []
for blk_idx, blk in enumerate(blk_list):
lines = []
for line_idx, line in enumerate(blk.lines_array()):
if blk.vertical:
max_ratio = 16
else:
max_ratio = 8
line_crops, cut_points = self._split_into_chunks(
img_np,
mask_refined,
blk,
line_idx,
textheight=64,
max_ratio=max_ratio,
anchor_window=2,
)
l_text = ''
for line_crop in line_crops:
if blk.vertical:
line_crop = cv2.rotate(line_crop, cv2.ROTATE_90_CLOCKWISE)
l_text += manga_ocr_model(Image.fromarray(line_crop))
l_bbox = self._convert_line_bbox(line.tolist(), img_width, img_height)
word = Word(
text=l_text,
bounding_box=l_bbox
)
words = [word]
line = Line(
text=l_text,
bounding_box=l_bbox,
words=words
)
lines.append(line)
p_bbox = self._convert_box_bbox(list(blk.xyxy), img_width, img_height)
writing_direction = 'TOP_TO_BOTTOM' if blk.vertical else "LEFT_TO_RIGHT"
paragraph = Paragraph(bounding_box=p_bbox, lines=lines, writing_direction=writing_direction)
paragraphs.append(paragraph)
return OcrResult(
image_properties=ImageProperties(width=img_width, height=img_height),
paragraphs=paragraphs,
engine_capabilities=self.capabilities
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
return (False, 'Invalid image provided')
img_np = pil_image_to_numpy_array(img)
img_width, img_height = img.size
_, mask_refined, blk_list = self.text_detector_model(img_np, refine_mode=1, keep_undetected_mask=True)
ocr_result = self._to_generic_result(mask_refined, blk_list, img_np, img_height, img_width)
x = (True, ocr_result)
if is_path:
img.close()
return x
class MangaOcr:
name = 'mangaocr'
readable_name = 'Manga OCR'
key = 'm'
config_entry = 'mangaocr'
available = False
local = True
manual_language = False
coordinate_support = False
threading_support = True
capabilities = EngineCapabilities(
words=False,
word_bounding_boxes=False,
lines=True,
line_bounding_boxes=False,
paragraphs=False,
paragraph_bounding_boxes=False
)
def __init__(self, config={}):
if 'manga_ocr' not in sys.modules:
logger.warning('manga-ocr not available, Manga OCR will not work!')
else:
pretrained_model_name_or_path = config.get('pretrained_model_name_or_path', 'kha-white/manga-ocr-base')
force_cpu = config.get('force_cpu', False)
initialize_manga_ocr(pretrained_model_name_or_path, force_cpu)
self.available = True
logger.info('Manga OCR ready')
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
return (False, 'Invalid image provided')
x = (True, [manga_ocr_model(img)])
if is_path:
img.close()
return x
class GoogleVision:
name = 'gvision'
readable_name = 'Google Vision'
key = 'g'
config_entry = None
available = False
local = False
manual_language = False
coordinate_support = True
threading_support = True
capabilities = {
'words': True,
'word_bounding_boxes': True,
'lines': True,
'line_bounding_boxes': False,
'paragraphs': True,
'paragraph_bounding_boxes': True
}
def __init__(self):
if 'google.cloud' not in sys.modules:
logger.warning('google-cloud-vision not available, Google Vision will not work!')
else:
logger.info(f'Parsing Google credentials')
google_credentials_file = os.path.join(os.path.expanduser('~'),'.config','google_vision.json')
try:
google_credentials = service_account.Credentials.from_service_account_file(google_credentials_file)
self.client = vision.ImageAnnotatorClient(credentials=google_credentials)
self.available = True
logger.info('Google Vision ready')
except:
logger.warning('Error parsing Google credentials, Google Vision will not work!')
def _break_type_to_char(self, break_type):
if break_type == vision.TextAnnotation.DetectedBreak.BreakType.SPACE:
return ' '
elif break_type == vision.TextAnnotation.DetectedBreak.BreakType.SURE_SPACE:
return ' '
elif break_type == vision.TextAnnotation.DetectedBreak.BreakType.EOL_SURE_SPACE:
return '\n'
elif break_type == vision.TextAnnotation.DetectedBreak.BreakType.HYPHEN:
return '-'
elif break_type == vision.TextAnnotation.DetectedBreak.BreakType.LINE_BREAK:
return '\n'
return ''
def _convert_bbox(self, quad, img_width, img_height):
vertices = quad.vertices
return quad_to_bounding_box(
vertices[0].x, vertices[0].y,
vertices[1].x, vertices[1].y,
vertices[2].x, vertices[2].y,
vertices[3].x, vertices[3].y,
img_width, img_height
)
def _create_word_from_google_word(self, google_word, img_width, img_height):
w_bbox = self._convert_bbox(google_word.bounding_box, img_width, img_height)
w_separator = ''
w_text_parts = []
for i, symbol in enumerate(google_word.symbols):
separator = None
if hasattr(symbol, 'property') and hasattr(symbol.property, 'detected_break'):
detected_break = symbol.property.detected_break
detected_separator = self._break_type_to_char(detected_break.type_)
if i == len(google_word.symbols) - 1:
w_separator = detected_separator
else:
separator = detected_separator
symbol_text = symbol.text
w_text_parts.append(symbol_text)
if separator:
w_text_parts.append(separator)
word_text = ''.join(w_text_parts)
return Word(
text=word_text,
bounding_box=w_bbox,
separator=w_separator
)
def _create_lines_from_google_paragraph(self, google_paragraph, p_bbox, img_width, img_height):
lines = []
words = []
for google_word in google_paragraph.words:
word = self._create_word_from_google_word(google_word, img_width, img_height)
words.append(word)
if word.separator == '\n':
line = Line(bounding_box=BoundingBox(0,0,0,0), words=words)
lines.append(line)
words = []
if len(lines) == 1:
lines[0].bounding_box = p_bbox
else:
for line in lines:
l_bbox = merge_bounding_boxes(line.words, True)
line.bounding_box = l_bbox
return lines
def _to_generic_result(self, full_text_annotation, img_width, img_height):
paragraphs = []
if full_text_annotation:
for page in full_text_annotation.pages:
if page.width == img_width and page.height == img_height:
for block in page.blocks:
for google_paragraph in block.paragraphs:
p_bbox = self._convert_bbox(google_paragraph.bounding_box, img_width, img_height)
lines = self._create_lines_from_google_paragraph(google_paragraph, p_bbox, img_width, img_height)
paragraph = Paragraph(bounding_box=p_bbox, lines=lines)
paragraphs.append(paragraph)
return OcrResult(
image_properties=ImageProperties(width=img_width, height=img_height),
paragraphs=paragraphs,
engine_capabilities=self.capabilities
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
return (False, 'Invalid image provided')
image_bytes = self._preprocess(img)
image = vision.Image(content=image_bytes)
try:
response = self.client.document_text_detection(image=image)
except ServiceUnavailable:
return (False, 'Connection error!')
except Exception as e:
return (False, 'Unknown error!')
ocr_result = self._to_generic_result(response.full_text_annotation, img.width, img.height)
x = (True, ocr_result)
if is_path:
img.close()
return x
def _preprocess(self, img):
return pil_image_to_bytes(img)
class GoogleLens:
name = 'glens'
readable_name = 'Google Lens'
key = 'l'
config_entry = None
available = False
local = False
manual_language = False
coordinate_support = True
threading_support = True
capabilities = EngineCapabilities(
words=True,
word_bounding_boxes=True,
lines=True,
line_bounding_boxes=True,
paragraphs=True,
paragraph_bounding_boxes=True
)
def __init__(self):
if 'betterproto' not in sys.modules:
logger.warning('betterproto not available, Google Lens will not work!')
else:
self.available = True
logger.info('Google Lens ready')
def _to_generic_result(self, response, img_width, img_height):
paragraphs = []
if 'objects_response' in response and 'text' in response['objects_response']:
text_data = response['objects_response']['text']
if 'text_layout' in text_data:
for p in text_data['text_layout'].get('paragraphs', []):
lines = []
for l in p.get('lines', []):
words = []
for w in l.get('words', []):
w_bbox = w.get('geometry', {}).get('bounding_box', {})
word = Word(
text=w.get('plain_text', ''),
separator=w.get('text_separator'),
bounding_box=BoundingBox(
center_x=w_bbox.get('center_x'),
center_y=w_bbox.get('center_y'),
width=w_bbox.get('width'),
height=w_bbox.get('height'),
rotation_z=w_bbox.get('rotation_z')
)
)
words.append(word)
l_bbox = l.get('geometry', {}).get('bounding_box', {})
line = Line(
bounding_box=BoundingBox(
center_x=l_bbox.get('center_x'),
center_y=l_bbox.get('center_y'),
width=l_bbox.get('width'),
height=l_bbox.get('height'),
rotation_z=l_bbox.get('rotation_z')
),
words=words
)
lines.append(line)
p_bbox = p.get('geometry', {}).get('bounding_box', {})
paragraph = Paragraph(
bounding_box=BoundingBox(
center_x=p_bbox.get('center_x'),
center_y=p_bbox.get('center_y'),
width=p_bbox.get('width'),
height=p_bbox.get('height'),
rotation_z=p_bbox.get('rotation_z')
),
lines=lines,
writing_direction=p.get('writing_direction')
)
paragraphs.append(paragraph)
return OcrResult(
image_properties=ImageProperties(width=img_width, height=img_height),
paragraphs=paragraphs,
engine_capabilities=self.capabilities
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
return (False, 'Invalid image provided')
request = LensOverlayServerRequest()
request.objects_request.request_context.request_id.uuid = random.randint(0, 2**64 - 1)
request.objects_request.request_context.request_id.sequence_id = 0
request.objects_request.request_context.request_id.image_sequence_id = 0
request.objects_request.request_context.request_id.analytics_id = random.randbytes(16)
request.objects_request.request_context.request_id.routing_info = LensOverlayRoutingInfo()
request.objects_request.request_context.client_context.platform = Platform.WEB
request.objects_request.request_context.client_context.surface = Surface.CHROMIUM
request.objects_request.request_context.client_context.locale_context.language = 'ja'
request.objects_request.request_context.client_context.locale_context.region = 'Asia/Tokyo'
request.objects_request.request_context.client_context.locale_context.time_zone = '' # not set by chromium
request.objects_request.request_context.client_context.app_id = '' # not set by chromium
filter = AppliedFilter()
filter.filter_type = LensOverlayFilterType.AUTO_FILTER
request.objects_request.request_context.client_context.client_filters.filter.append(filter)
img_bytes, img_width, img_height = self._preprocess(img)
request.objects_request.image_data.payload.image_bytes = img_bytes
request.objects_request.image_data.image_metadata.width = img_width
request.objects_request.image_data.image_metadata.height = img_height
payload = request.SerializeToString()
headers = {
'Host': 'lensfrontend-pa.googleapis.com',
'Connection': 'keep-alive',
'Content-Type': 'application/x-protobuf',
'X-Goog-Api-Key': 'AIzaSyDr2UxVnv_U85AbhhY8XSHSIavUW0DC-sY',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-Mode': 'no-cors',
'Sec-Fetch-Dest': 'empty',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Accept-Language': 'ja-JP;q=0.6,ja;q=0.5'
}
try:
res = requests.post('https://lensfrontend-pa.googleapis.com/v1/crupload', data=payload, headers=headers, timeout=20)
except requests.exceptions.Timeout:
return (False, 'Request timeout!')
except requests.exceptions.ConnectionError:
return (False, 'Connection error!')
if res.status_code != 200:
return (False, 'Unknown error!')
response_proto = LensOverlayServerResponse().FromString(res.content)
response_dict = response_proto.to_dict(betterproto.Casing.SNAKE)
ocr_result = self._to_generic_result(response_dict, img.width, img.height)
x = (True, ocr_result)
if is_path:
img.close()
return x
def _preprocess(self, img):
if img.width * img.height > 3000000:
aspect_ratio = img.width / img.height
new_w = int(sqrt(3000000 * aspect_ratio))
new_h = int(new_w / aspect_ratio)
img = img.resize((new_w, new_h), Image.Resampling.LANCZOS)
return pil_image_to_bytes(img), img.width, img.height
class Bing:
name = 'bing'
readable_name = 'Bing'
key = 'b'
config_entry = None
available = False
local = False
manual_language = False
coordinate_support = True
threading_support = True
capabilities = EngineCapabilities(
words=True,
word_bounding_boxes=True,
lines=True,
line_bounding_boxes=True,
paragraphs=True,
paragraph_bounding_boxes=True
)
def __init__(self):
self.requests_session = requests.Session()
self.available = True
logger.info('Bing ready')
def _convert_bbox(self, quad):
return quad_to_bounding_box(
quad['topLeft']['x'], quad['topLeft']['y'],
quad['topRight']['x'], quad['topRight']['y'],
quad['bottomRight']['x'], quad['bottomRight']['y'],
quad['bottomLeft']['x'], quad['bottomLeft']['y']
)
def _to_generic_result(self, response, img_width, img_height, og_img_width, og_img_height):
paragraphs = []
text_tag = None
for tag in response.get('tags', []):
if tag.get('displayName') == '##TextRecognition':
text_tag = tag
break
if text_tag:
text_action = None
for action in text_tag.get('actions', []):
if action.get('_type') == 'ImageKnowledge/TextRecognitionAction':
text_action = action
break
if text_action:
for p in text_action.get('data', {}).get('regions', []):
lines = []
for l in p.get('lines', []):
words = []
for w in l.get('words', []):
word = Word(
text=w.get('text', ''),
bounding_box=self._convert_bbox(w['boundingBox'])
)
words.append(word)
line = Line(
text=l.get('text', ''),
bounding_box=self._convert_bbox(l['boundingBox']),
words=words
)
lines.append(line)
paragraph = Paragraph(
bounding_box=self._convert_bbox(p['boundingBox']),
lines=lines
)
paragraphs.append(paragraph)
return OcrResult(
image_properties=ImageProperties(width=og_img_width, height=og_img_height),
paragraphs=paragraphs,
engine_capabilities=self.capabilities
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
return (False, 'Invalid image provided')
img_bytes, img_size = self._preprocess(img)
if not img_bytes:
return (False, 'Image is too big!')
upload_url = 'https://www.bing.com/images/search?view=detailv2&iss=sbiupload'
upload_headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'ja-JP;q=0.6,ja;q=0.5',
'cache-control': 'max-age=0',
'origin': 'https://www.bing.com',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:136.0) Gecko/20100101 Firefox/136.0',
}
files = {
'imgurl': (None, ''),
'cbir': (None, 'sbi'),
'imageBin': (None, img_bytes)
}
for _ in range(2):
api_host = urlparse(upload_url).netloc
try:
res = self.requests_session.post(upload_url, headers=upload_headers, files=files, timeout=20, allow_redirects=False)
except requests.exceptions.Timeout:
return (False, 'Request timeout!')
except requests.exceptions.ConnectionError:
return (False, 'Connection error!')
if res.status_code != 302:
return (False, 'Unknown error!')
redirect_url = res.headers.get('Location')
if not redirect_url:
return (False, 'Error getting redirect URL!')
if not redirect_url.startswith('https://'):
break
upload_url = redirect_url
parsed_url = urlparse(redirect_url)
query_params = parse_qs(parsed_url.query)
image_insights_token = query_params.get('insightsToken')
if not image_insights_token:
return (False, 'Error getting token!')
image_insights_token = image_insights_token[0]
api_url = f'https://{api_host}/images/api/custom/knowledge'
api_headers = {
'accept': '*/*',
'accept-language': 'ja-JP;q=0.6,ja;q=0.5',
'origin': 'https://www.bing.com',
'referer': f'https://www.bing.com/images/search?view=detailV2&insightstoken={image_insights_token}',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:136.0) Gecko/20100101 Firefox/136.0',
}
api_data_json = {
'imageInfo': {'imageInsightsToken': image_insights_token, 'source': 'Url'},
'knowledgeRequest': {'invokedSkills': ['OCR'], 'index': 1}
}
files = {
'knowledgeRequest': (None, json.dumps(api_data_json), 'application/json')
}
try:
res = self.requests_session.post(api_url, headers=api_headers, files=files, timeout=20)
except requests.exceptions.Timeout:
return (False, 'Request timeout!')
except requests.exceptions.ConnectionError:
return (False, 'Connection error!')
if res.status_code != 200:
return (False, 'Unknown error!')
data = res.json()
img_width, img_height = img_size
ocr_result = self._to_generic_result(data, img_width, img_height, img.width, img.height)
x = (True, ocr_result)
if is_path:
img.close()
return x
def _preprocess(self, img):
max_pixel_size = 4000
max_byte_size = 767772
res = None
if any(x > max_pixel_size for x in img.size):
resize_factor = min(max_pixel_size / img.width, max_pixel_size / img.height)
new_w = int(img.width * resize_factor)
new_h = int(img.height * resize_factor)
img = img.resize((new_w, new_h), Image.Resampling.LANCZOS)
img_bytes, _, img_size = limit_image_size(img, max_byte_size)
if img_bytes:
res = base64.b64encode(img_bytes).decode('utf-8')
return res, img_size
class AppleVision:
name = 'avision'
readable_name = 'Apple Vision'
key = 'a'
config_entry = 'avision'
available = False
local = True
manual_language = True
coordinate_support = True
threading_support = True
capabilities = EngineCapabilities(
words=False,
word_bounding_boxes=False,
lines=True,
line_bounding_boxes=True,
paragraphs=False,
paragraph_bounding_boxes=False
)
def __init__(self, language='ja', config={}):
if sys.platform != 'darwin':
logger.warning('Apple Vision is not supported on non-macOS platforms!')
elif int(platform.mac_ver()[0].split('.')[0]) < 13:
logger.warning('Apple Vision is not supported on macOS older than Ventura/13.0!')
else:
self.recognition_level = Vision.VNRequestTextRecognitionLevelFast if config.get('fast_mode', False) else Vision.VNRequestTextRecognitionLevelAccurate
self.language_correction = config.get('language_correction', True)
self.available = True
self.language = [language, 'en']
logger.info('Apple Vision ready')
def _to_generic_result(self, response, img_width, img_height):
lines = []
for l in response:
bbox_raw = l.boundingBox()
bbox = BoundingBox(
width=bbox_raw.size.width,
height=bbox_raw.size.height,
center_x=bbox_raw.origin.x + (bbox_raw.size.width / 2),
center_y=(1 - bbox_raw.origin.y - bbox_raw.size.height / 2)
)
word = Word(
text=l.text(),
bounding_box=bbox
)
words = [word]
line = Line(
text=l.text(),
bounding_box=bbox,
words=words
)
lines.append(line)
if lines:
p_bbox = merge_bounding_boxes(lines)
paragraph = Paragraph(bounding_box=p_bbox, lines=lines)
paragraphs = [paragraph]
else:
paragraphs = []
return OcrResult(
image_properties=ImageProperties(width=img_width, height=img_height),
paragraphs=paragraphs,
engine_capabilities=self.capabilities
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
return (False, 'Invalid image provided')
with objc.autorelease_pool():
req = Vision.VNRecognizeTextRequest.alloc().init()
req.setRevision_(Vision.VNRecognizeTextRequestRevision3)
req.setRecognitionLevel_(self.recognition_level)
req.setUsesLanguageCorrection_(self.language_correction)
req.setRecognitionLanguages_(self.language)
handler = Vision.VNImageRequestHandler.alloc().initWithData_options_(
self._preprocess(img), None
)
success = handler.performRequests_error_([req], None)
res = []
if success[0]:
ocr_result = self._to_generic_result(req.results(), img.width, img.height)
x = (True, ocr_result)
else:
x = (False, 'Unknown error!')
if is_path:
img.close()
return x
def _preprocess(self, img):
return pil_image_to_bytes(img, 'tiff')
class AppleLiveText:
name = 'alivetext'
readable_name = 'Apple Live Text'
key = 'd'
config_entry = None
available = False
local = True
manual_language = True
coordinate_support = True
threading_support = False
capabilities = EngineCapabilities(
words=True,
word_bounding_boxes=True,
lines=True,
line_bounding_boxes=True,
paragraphs=False,
paragraph_bounding_boxes=False
)
def __init__(self, language='ja'):
if sys.platform != 'darwin':
logger.warning('Apple Live Text is not supported on non-macOS platforms!')
elif int(platform.mac_ver()[0].split('.')[0]) < 13:
logger.warning('Apple Live Text is not supported on macOS older than Ventura/13.0!')
else:
app_info = NSBundle.mainBundle().infoDictionary()
app_info['LSBackgroundOnly'] = '1'
self.VKCImageAnalyzer = objc.lookUpClass('VKCImageAnalyzer')
self.VKCImageAnalyzerRequest = objc.lookUpClass('VKCImageAnalyzerRequest')
objc.registerMetaDataForSelector(
b'VKCImageAnalyzer',
b'processRequest:progressHandler:completionHandler:',
{
'arguments': {
3: {
'callable': {
'retval': {'type': b'v'},
'arguments': {
0: {'type': b'^v'},
1: {'type': b'd'},
}
}
},
4: {
'callable': {
'retval': {'type': b'v'},
'arguments': {
0: {'type': b'^v'},
1: {'type': b'@'},
2: {'type': b'@'},
}
}
}
}
}
)
self.language = [language, 'en']
self.available = True
logger.info('Apple Live Text ready')
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
return (False, 'Invalid image provided')
self.result = None
with objc.autorelease_pool():
analyzer = self.VKCImageAnalyzer.alloc().init()
req = self.VKCImageAnalyzerRequest.alloc().initWithImage_requestType_(self._preprocess(img), 1) #VKAnalysisTypeText
req.setLocales_(self.language)
analyzer.processRequest_progressHandler_completionHandler_(req, lambda progress: None, self._process)
CFRunLoopRunInMode(kCFRunLoopDefaultMode, 10.0, False)
if self.result == None:
return (False, 'Unknown error!')
ocr_result = OcrResult(
image_properties=ImageProperties(width=img.width, height=img.height),
paragraphs=self.result,
engine_capabilities=self.capabilities
)
x = (True, ocr_result)
if is_path:
img.close()
return x
def _process(self, analysis, error):
lines = []
response_lines = analysis.allLines()
if response_lines:
for l in response_lines:
words = []
for i, w in enumerate(l.children()):
w_bbox = w.quad().boundingBox()
word = Word(
text=w.string(),
bounding_box=BoundingBox(
width=w_bbox.size.width,
height=w_bbox.size.height,
center_x=w_bbox.origin.x + (w_bbox.size.width / 2),
center_y=w_bbox.origin.y + (w_bbox.size.height / 2)
)
)
words.append(word)
l_bbox = l.quad().boundingBox()
line = Line(
text=l.string(),
bounding_box=BoundingBox(
width=l_bbox.size.width,
height=l_bbox.size.height,
center_x=l_bbox.origin.x + (l_bbox.size.width / 2),
center_y=l_bbox.origin.y + (l_bbox.size.height / 2)
),
words=words
)
lines.append(line)
if lines:
p_bbox = merge_bounding_boxes(lines)
paragraph = Paragraph(bounding_box=p_bbox, lines=lines)
paragraphs = [paragraph]
else:
paragraphs = []
self.result = paragraphs
CFRunLoopStop(CFRunLoopGetCurrent())
def _preprocess(self, img):
image_bytes = pil_image_to_bytes(img, 'tiff')
ns_data = NSData.dataWithBytes_length_(image_bytes, len(image_bytes))
ns_image = NSImage.alloc().initWithData_(ns_data)
return ns_image
class WinRTOCR:
name = 'winrtocr'
readable_name = 'WinRT OCR'
key = 'w'
config_entry = 'winrtocr'
available = False
local = True
manual_language = True
coordinate_support = True
threading_support = True
capabilities = EngineCapabilities(
words=True,
word_bounding_boxes=True,
lines=True,
line_bounding_boxes=False,
paragraphs=False,
paragraph_bounding_boxes=False
)
def __init__(self, config={}, language='ja'):
if sys.platform == 'win32':
if int(platform.release()) < 10:
logger.warning('WinRT OCR is not supported on Windows older than 10!')
elif 'winocr' not in sys.modules:
logger.warning('winocr not available, WinRT OCR will not work!')
else:
self.language = language
self.available = True
logger.info('WinRT OCR ready')
else:
try:
self.url = config['url']
self.language = language
self.available = True
logger.info('WinRT OCR ready')
except:
logger.warning('Error reading URL from config, WinRT OCR will not work!')
def _normalize_bbox(self, rect, img_width, img_height):
x_norm = rect['x'] / img_width
y_norm = rect['y'] / img_height
width_norm = rect['width'] / img_width
height_norm = rect['height'] / img_height
# Calculate center coordinates
center_x = x_norm + (width_norm / 2)
center_y = y_norm + (height_norm / 2)
return BoundingBox(
center_x=center_x,
center_y=center_y,
width=width_norm,
height=height_norm
)
def _to_generic_result(self, response, img_width, img_height):
lines = []
for l in response.get('lines', []):
words = []
for i, w in enumerate(l.get('words', [])):
word = Word(
text=w.get('text', ''),
bounding_box=self._normalize_bbox(w['bounding_rect'], img_width, img_height)
)
words.append(word)
l_bbox = merge_bounding_boxes(words)
line = Line(
text=l.get('text', ''),
bounding_box=l_bbox,
words=words
)
lines.append(line)
if lines:
p_bbox = merge_bounding_boxes(lines)
paragraph = Paragraph(bounding_box=p_bbox, lines=lines)
paragraphs = [paragraph]
else:
paragraphs = []
return OcrResult(
image_properties=ImageProperties(width=img_width, height=img_height),
paragraphs=paragraphs,
engine_capabilities=self.capabilities
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
return (False, 'Invalid image provided')
if sys.platform == 'win32':
res = winocr.recognize_pil_sync(img, lang=self.language)
else:
params = {'lang': self.language}
try:
res = requests.post(self.url, params=params, data=self._preprocess(img), timeout=3)
except requests.exceptions.Timeout:
return (False, 'Request timeout!')
except requests.exceptions.ConnectionError:
return (False, 'Connection error!')
if res.status_code != 200:
return (False, 'Unknown error!')
res = res.json()
ocr_result = self._to_generic_result(res, img.width, img.height)
x = (True, ocr_result)
if is_path:
img.close()
return x
def _preprocess(self, img):
return pil_image_to_bytes(img, png_compression=1)
class OneOCR:
name = 'oneocr'
readable_name = 'OneOCR'
key = 'z'
config_entry = 'oneocr'
available = False
local = True
manual_language = False
coordinate_support = True
threading_support = True
capabilities = EngineCapabilities(
words=True,
word_bounding_boxes=True,
lines=True,
line_bounding_boxes=True,
paragraphs=False,
paragraph_bounding_boxes=False
)
def __init__(self, config={}):
if sys.platform == 'win32':
if int(platform.release()) < 10:
logger.warning('OneOCR is not supported on Windows older than 10!')
elif 'oneocr' not in sys.modules:
logger.warning('oneocr not available, OneOCR will not work!')
else:
try:
self.model = oneocr.OcrEngine()
except RuntimeError as e:
logger.warning(e + ', OneOCR will not work!')
else:
self.available = True
logger.info('OneOCR ready')
else:
try:
self.url = config['url']
self.available = True
logger.info('OneOCR ready')
except:
logger.warning('Error reading URL from config, OneOCR will not work!')
def _convert_bbox(self, rect, img_width, img_height):
return quad_to_bounding_box(
rect['x1'], rect['y1'],
rect['x2'], rect['y2'],
rect['x3'], rect['y3'],
rect['x4'], rect['y4'],
img_width, img_height
)
def _to_generic_result(self, response, img_width, img_height, og_img_width, og_img_height):
lines = []
for l in response.get('lines', []):
words = []
for i, w in enumerate(l.get('words', [])):
word = Word(
text=w.get('text', ''),
bounding_box=self._convert_bbox(w['bounding_rect'], img_width, img_height)
)
words.append(word)
line = Line(
text=l.get('text', ''),
bounding_box=self._convert_bbox(l['bounding_rect'], img_width, img_height),
words=words
)
lines.append(line)
if lines:
p_bbox = merge_bounding_boxes(lines)
paragraph = Paragraph(bounding_box=p_bbox, lines=lines)
paragraphs = [paragraph]
else:
paragraphs = []
return OcrResult(
image_properties=ImageProperties(width=og_img_width, height=og_img_height),
paragraphs=paragraphs,
engine_capabilities=self.capabilities
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
return (False, 'Invalid image provided')
if sys.platform == 'win32':
img_processed = self._preprocess_windows(img)
img_width, img_height = img_processed.size
try:
raw_res = self.model.recognize_pil(img_processed)
except RuntimeError as e:
return (False, e)
else:
img_processed, img_width, img_height = self._preprocess_notwindows(img)
try:
res = requests.post(self.url, data=img_processed, timeout=3)
except requests.exceptions.Timeout:
return (False, 'Request timeout!')
except requests.exceptions.ConnectionError:
return (False, 'Connection error!')
if res.status_code != 200:
return (False, 'Unknown error!')
raw_res = res.json()
if 'error' in raw_res:
return (False, raw_res['error'])
ocr_result = self._to_generic_result(raw_res, img_width, img_height, img.width, img.height)
x = (True, ocr_result)
if is_path:
img.close()
return x
def _preprocess_windows(self, img):
min_pixel_size = 50
max_pixel_size = 10000
if any(x < min_pixel_size for x in img.size):
resize_factor = max(min_pixel_size / img.width, min_pixel_size / img.height)
new_w = int(img.width * resize_factor)
new_h = int(img.height * resize_factor)
img = img.resize((new_w, new_h), Image.Resampling.LANCZOS)
if any(x > max_pixel_size for x in img.size):
resize_factor = min(max_pixel_size / img.width, max_pixel_size / img.height)
new_w = int(img.width * resize_factor)
new_h = int(img.height * resize_factor)
img = img.resize((new_w, new_h), Image.Resampling.LANCZOS)
return img
def _preprocess_notwindows(self, img):
img = self._preprocess_windows(img)
return pil_image_to_bytes(img, png_compression=1), img.width, img.height
class AzureImageAnalysis:
name = 'azure'
readable_name = 'Azure Image Analysis'
key = 'v'
config_entry = 'azure'
available = False
local = False
manual_language = False
coordinate_support = True
threading_support = True
capabilities = EngineCapabilities(
words=True,
word_bounding_boxes=True,
lines=True,
line_bounding_boxes=True,
paragraphs=False,
paragraph_bounding_boxes=False
)
def __init__(self, config={}):
if 'azure.ai.vision.imageanalysis' not in sys.modules:
logger.warning('azure-ai-vision-imageanalysis not available, Azure Image Analysis will not work!')
else:
logger.info(f'Parsing Azure credentials')
try:
self.client = ImageAnalysisClient(config['endpoint'], AzureKeyCredential(config['api_key']))
self.available = True
logger.info('Azure Image Analysis ready')
except:
logger.warning('Error parsing Azure credentials, Azure Image Analysis will not work!')
def _convert_bbox(self, rect, img_width, img_height):
return quad_to_bounding_box(
rect[0]['x'], rect[0]['y'],
rect[1]['x'], rect[1]['y'],
rect[2]['x'], rect[2]['y'],
rect[3]['x'], rect[3]['y'],
img_width, img_height
)
def _to_generic_result(self, read_result, img_width, img_height):
paragraphs = []
if read_result.read:
for block in read_result.read.blocks:
lines = []
for azure_line in block.lines:
l_bbox = self._convert_bbox(azure_line.bounding_polygon, img_width, img_height)
words = []
for azure_word in azure_line.words:
w_bbox = self._convert_bbox(azure_word.bounding_polygon, img_width, img_height)
word = Word(
text=azure_word.text,
bounding_box=w_bbox
)
words.append(word)
line = Line(
bounding_box=l_bbox,
words=words,
text=azure_line.text
)
lines.append(line)
p_bbox = merge_bounding_boxes(lines)
paragraph = Paragraph(bounding_box=p_bbox, lines=lines)
paragraphs.append(paragraph)
return OcrResult(
image_properties=ImageProperties(width=img_width, height=img_height),
paragraphs=paragraphs,
engine_capabilities=self.capabilities
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
return (False, 'Invalid image provided')
try:
read_result = self.client.analyze(image_data=self._preprocess(img), visual_features=[VisualFeatures.READ])
except ServiceRequestError:
return (False, 'Connection error!')
except:
return (False, 'Unknown error!')
ocr_result = self._to_generic_result(read_result, img.width, img.height)
x = (True, ocr_result)
if is_path:
img.close()
return x
def _preprocess(self, img):
min_pixel_size = 50
max_pixel_size = 10000
if any(x < min_pixel_size for x in img.size):
resize_factor = max(min_pixel_size / img.width, min_pixel_size / img.height)
new_w = int(img.width * resize_factor)
new_h = int(img.height * resize_factor)
img = img.resize((new_w, new_h), Image.Resampling.LANCZOS)
if any(x > max_pixel_size for x in img.size):
resize_factor = min(max_pixel_size / img.width, max_pixel_size / img.height)
new_w = int(img.width * resize_factor)
new_h = int(img.height * resize_factor)
img = img.resize((new_w, new_h), Image.Resampling.LANCZOS)
return pil_image_to_bytes(img)
class EasyOCR:
name = 'easyocr'
readable_name = 'EasyOCR'
key = 'e'
config_entry = 'easyocr'
available = False
local = True
manual_language = True
coordinate_support = True
threading_support = True
capabilities = EngineCapabilities(
words=False,
word_bounding_boxes=False,
lines=True,
line_bounding_boxes=True,
paragraphs=False,
paragraph_bounding_boxes=False
)
def __init__(self, config={}, language='ja'):
if 'easyocr' not in sys.modules:
logger.warning('easyocr not available, EasyOCR will not work!')
else:
logger.info('Loading EasyOCR model')
gpu = config.get('gpu', True)
logging.getLogger('easyocr.easyocr').setLevel(logging.ERROR)
self.model = easyocr.Reader([language,'en'], gpu=gpu)
self.available = True
logger.info('EasyOCR ready')
def _convert_bbox(self, rect, img_width, img_height):
(x1, y1), (x2, y2), (x3, y3), (x4, y4) = [(float(x), float(y)) for x, y in rect]
return quad_to_bounding_box(x1, y1, x2, y2, x3, y3, x4, y4, img_width, img_height)
def _to_generic_result(self, response, img_width, img_height):
lines = []
for detection in response:
quad_coords = detection[0]
text = detection[1]
bbox = self._convert_bbox(quad_coords, img_width, img_height)
word = Word(text=text, bounding_box=bbox)
line = Line(bounding_box=bbox, words=[word], text=text)
lines.append(line)
if lines:
p_bbox = merge_bounding_boxes(lines)
paragraph = Paragraph(bounding_box=p_bbox, lines=lines)
paragraphs = [paragraph]
else:
paragraphs = []
return OcrResult(
image_properties=ImageProperties(width=img_width, height=img_height),
paragraphs=paragraphs,
engine_capabilities=self.capabilities
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
return (False, 'Invalid image provided')
read_results = self.model.readtext(self._preprocess(img))
ocr_result = self._to_generic_result(read_results, img.width, img.height)
x = (True, ocr_result)
if is_path:
img.close()
return x
def _preprocess(self, img):
return pil_image_to_numpy_array(img)
class RapidOCR:
name = 'rapidocr'
readable_name = 'RapidOCR'
key = 'r'
config_entry = 'rapidocr'
available = False
local = True
manual_language = True
coordinate_support = True
threading_support = True
capabilities = EngineCapabilities(
words=False,
word_bounding_boxes=False,
lines=True,
line_bounding_boxes=True,
paragraphs=False,
paragraph_bounding_boxes=False
)
def __init__(self, config={}, language='ja'):
if 'rapidocr' not in sys.modules:
logger.warning('rapidocr not available, RapidOCR will not work!')
else:
logger.info('Loading RapidOCR model')
high_accuracy_detection = config.get('high_accuracy_detection', False)
high_accuracy_recognition = config.get('high_accuracy_recognition', True)
lang_rec = self.language_to_model_language(language)
self.model = ROCR(params={
'Det.engine_type': EngineType.ONNXRUNTIME,
'Det.lang_type': LangDet.CH,
'Det.model_type': ModelType.SERVER if high_accuracy_detection else ModelType.MOBILE,
'Det.ocr_version': OCRVersion.PPOCRV5,
'Rec.engine_type': EngineType.ONNXRUNTIME,
'Rec.lang_type': lang_rec,
'Rec.model_type': ModelType.SERVER if high_accuracy_recognition else ModelType.MOBILE,
'Rec.ocr_version': OCRVersion.PPOCRV5,
'Global.log_level': 'error'
})
self.available = True
logger.info('RapidOCR ready')
def language_to_model_language(self, language):
if language == 'ja':
return LangRec.CH
if language == 'zh':
return LangRec.CH
elif language == 'ko':
return LangRec.KOREAN
elif language == 'ru':
return LangRec.ESLAV
elif language == 'el':
return LangRec.EL
elif language == 'th':
return LangRec.TH
else:
return LangRec.LATIN
def _convert_bbox(self, rect, img_width, img_height):
(x1, y1), (x2, y2), (x3, y3), (x4, y4) = [(float(x), float(y)) for x, y in rect]
return quad_to_bounding_box(x1, y1, x2, y2, x3, y3, x4, y4, img_width, img_height)
def _to_generic_result(self, response, img_width, img_height):
lines = []
for i in range(len(response.boxes)):
box = response.boxes[i]
text = response.txts[i]
bbox = self._convert_bbox(box, img_width, img_height)
word = Word(text=text, bounding_box=bbox)
line = Line(bounding_box=bbox, words=[word], text=text)
lines.append(line)
if lines:
p_bbox = merge_bounding_boxes(lines)
paragraph = Paragraph(bounding_box=p_bbox, lines=lines)
paragraphs = [paragraph]
else:
paragraphs = []
return OcrResult(
image_properties=ImageProperties(width=img_width, height=img_height),
paragraphs=paragraphs,
engine_capabilities=self.capabilities
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
return (False, 'Invalid image provided')
read_results = self.model(self._preprocess(img))
ocr_result = self._to_generic_result(read_results, img.width, img.height)
x = (True, ocr_result)
if is_path:
img.close()
return x
def _preprocess(self, img):
return pil_image_to_numpy_array(img)
class MeikiOCR:
name = 'meikiocr'
readable_name = 'meikiocr'
key = 'k'
config_entry = None
available = False
local = True
manual_language = False
coordinate_support = True
threading_support = True
capabilities = EngineCapabilities(
words=True,
word_bounding_boxes=True,
lines=True,
line_bounding_boxes=False,
paragraphs=False,
paragraph_bounding_boxes=False
)
def __init__(self):
if 'meikiocr' not in sys.modules:
logger.warning('meikiocr not available, meikiocr will not work!')
else:
logger.info('Loading meikiocr model')
self.model = MKOCR()
self.available = True
logger.info('meikiocr ready')
def _to_normalized_bbox(self, pixel_bbox, img_width: int, img_height: int):
x1, y1, x2, y2 = pixel_bbox
return rectangle_to_bounding_box(x1, y1, x2, y2, img_width, img_height)
def _to_generic_result(self, response, img_width, img_height):
paragraphs = []
# each dictionary in the response corresponds to a detected line of text.
# treat each line as a separate Paragraph containing a single Line.
for line_result in response:
line_text = line_result.get('text', '')
char_results = line_result.get('chars', [])
if not line_text or not char_results:
continue
char_in_line = []
for char_info in char_results:
normalized_bbox = self._to_normalized_bbox(
char_info['bbox'], img_width, img_height
)
word = Word(
text=char_info['char'],
bounding_box=normalized_bbox
)
char_in_line.append(word)
if not char_in_line:
continue
line_bbox = merge_bounding_boxes(char_in_line)
line = Line(
bounding_box=line_bbox,
words=char_in_line,
text=line_text
)
# each line becomes a paragraph.
paragraph = Paragraph(
bounding_box=line_bbox,
lines=[line],
writing_direction="LEFT_TO_RIGHT" # meikiocr only supports horizontal text
)
paragraphs.append(paragraph)
return OcrResult(
image_properties=ImageProperties(width=img_width, height=img_height),
paragraphs=paragraphs,
engine_capabilities=self.capabilities
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
return (False, 'Invalid image provided')
image_np = np.array(img.convert('RGB'))[:, :, ::-1]
read_results = self.model.run_ocr(image_np)
ocr_result = self._to_generic_result(read_results, img.width, img.height)
x = (True, ocr_result)
if is_path:
img.close()
return x
class OCRSpace:
name = 'ocrspace'
readable_name = 'OCRSpace'
key = 'o'
config_entry = 'ocrspace'
available = False
local = False
manual_language = True
coordinate_support = True
threading_support = True
capabilities = EngineCapabilities(
words=True,
word_bounding_boxes=True,
lines=True,
line_bounding_boxes=False,
paragraphs=False,
paragraph_bounding_boxes=False
)
def __init__(self, config={}, language='ja'):
try:
self.api_key = config['api_key']
self.max_byte_size = config.get('file_size_limit', 1000000)
self.engine_version = config.get('engine_version', 2)
self.language = self.language_to_model_language(language)
self.available = True
logger.info('OCRSpace ready')
except:
logger.warning('Error reading API key from config, OCRSpace will not work!')
def language_to_model_language(self, language):
if language == 'ja':
return 'jpn'
if language == 'zh':
return 'chs'
elif language == 'ko':
return 'kor'
elif language == 'ar':
return 'ara'
elif language == 'ru':
return 'rus'
elif language == 'el':
return 'gre'
elif language == 'th':
return 'tha'
else:
return 'auto'
def _convert_bbox(self, word_data, img_width, img_height):
left = word_data['Left'] / img_width
top = word_data['Top'] / img_height
width = word_data['Width'] / img_width
height = word_data['Height'] / img_height
center_x = left + width / 2
center_y = top + height / 2
return BoundingBox(
center_x=center_x,
center_y=center_y,
width=width,
height=height
)
def _to_generic_result(self, api_result, img_width, img_height, og_img_width, og_img_height):
parsed_result = api_result['ParsedResults'][0]
text_overlay = parsed_result.get('TextOverlay', {})
lines_data = text_overlay.get('Lines', [])
lines = []
for line_data in lines_data:
words = []
for word_data in line_data.get('Words', []):
w_bbox = self._convert_bbox(word_data, img_width, img_height)
words.append(Word(text=word_data['WordText'], bounding_box=w_bbox))
l_bbox = merge_bounding_boxes(words)
lines.append(Line(bounding_box=l_bbox, words=words))
if lines:
p_bbox = merge_bounding_boxes(lines)
paragraph = Paragraph(bounding_box=p_bbox, lines=lines)
paragraphs = [paragraph]
else:
paragraphs = []
return OcrResult(
image_properties=ImageProperties(width=og_img_width, height=og_img_height),
paragraphs=paragraphs,
engine_capabilities=self.capabilities
)
def __call__(self, img):
img, is_path = input_to_pil_image(img)
if not img:
return (False, 'Invalid image provided')
og_img_width, og_img_height = img.size
img_bytes, img_extension, img_size = self._preprocess(img)
if not img_bytes:
return (False, 'Image is too big!')
data = {
'apikey': self.api_key,
'language': self.language,
'OCREngine': str(self.engine_version),
'isOverlayRequired': 'True'
}
files = {'file': ('image.' + img_extension, img_bytes, 'image/' + img_extension)}
try:
res = requests.post('https://api.ocr.space/parse/image', data=data, files=files, timeout=20)
except requests.exceptions.Timeout:
return (False, 'Request timeout!')
except requests.exceptions.ConnectionError:
return (False, 'Connection error!')
if res.status_code != 200:
return (False, 'Unknown error!')
res = res.json()
if isinstance(res, str):
return (False, 'Unknown error!')
if res['IsErroredOnProcessing']:
return (False, res['ErrorMessage'])
img_width, img_height = img_size
ocr_result = self._to_generic_result(res, img_width, img_height, og_img_width, og_img_height)
x = (True, ocr_result)
if is_path:
img.close()
return x
def _preprocess(self, img):
return limit_image_size(img, self.max_byte_size)