import re import os import io from pathlib import Path import time import sys import platform import logging from math import sqrt import jaconv import numpy as np import json from PIL import Image from loguru import logger try: from manga_ocr import MangaOcr as MOCR except ImportError: pass try: import Vision import objc except ImportError: pass try: from google.cloud import vision from google.oauth2 import service_account from google.api_core.exceptions import ServiceUnavailable except ImportError: pass try: from azure.ai.vision.imageanalysis import ImageAnalysisClient from azure.ai.vision.imageanalysis.models import VisualFeatures from azure.core.credentials import AzureKeyCredential from azure.core.exceptions import ServiceRequestError except ImportError: pass try: import easyocr except ImportError: pass try: from rapidocr_onnxruntime import RapidOCR as ROCR import urllib.request except ImportError: pass try: import requests except ImportError: pass try: import winocr except ImportError: pass try: import pyjson5 except ImportError: pass try: import fpng_py optimized_png_encode = True except: optimized_png_encode = False def empty_post_process(text): return text def post_process(text): text = ' '.join([''.join(i.split()) for i in text.splitlines()]) text = text.replace('…', '...') text = re.sub('[・.]{2,}', lambda x: (x.end() - x.start()) * '.', text) text = jaconv.h2z(text, ascii=True, digit=True) return text def pil_image_to_bytes(img, img_format='png', png_compression=6): if img_format == 'png' and optimized_png_encode: raw_data = img.convert('RGBA').tobytes() width, height = img.size image_bytes = fpng_py.fpng_encode_image_to_memory(raw_data, width, height) else: image_bytes = io.BytesIO() img.save(image_bytes, format=img_format, compress_level=png_compression) image_bytes = image_bytes.getvalue() return image_bytes def pil_image_to_numpy_array(img): return np.array(img.convert('RGBA')) class MangaOcr: name = 'mangaocr' readable_name = 'Manga OCR' key = 'm' available = False def __init__(self, config={'pretrained_model_name_or_path':'kha-white/manga-ocr-base','force_cpu': False}): if 'manga_ocr' not in sys.modules: logger.warning('manga-ocr not available, Manga OCR will not work!') else: logger.disable('manga_ocr') from manga_ocr import ocr ocr.post_process = empty_post_process logger.info(f'Loading Manga OCR model') self.model = MOCR(config['pretrained_model_name_or_path'], config['force_cpu']) self.available = True logger.info('Manga OCR ready') def __call__(self, img_or_path): if isinstance(img_or_path, str) or isinstance(img_or_path, Path): img = Image.open(img_or_path) elif isinstance(img_or_path, Image.Image): img = img_or_path else: raise ValueError(f'img_or_path must be a path or PIL.Image, instead got: {img_or_path}') x = (True, self.model(img)) return x class GoogleVision: name = 'gvision' readable_name = 'Google Vision' key = 'g' available = False def __init__(self): if 'google.cloud' not in sys.modules: logger.warning('google-cloud-vision not available, Google Vision will not work!') else: logger.info(f'Parsing Google credentials') google_credentials_file = os.path.join(os.path.expanduser('~'),'.config','google_vision.json') try: google_credentials = service_account.Credentials.from_service_account_file(google_credentials_file) self.client = vision.ImageAnnotatorClient(credentials=google_credentials) self.available = True logger.info('Google Vision ready') except: logger.warning('Error parsing Google credentials, Google Vision will not work!') def __call__(self, img_or_path): if isinstance(img_or_path, str) or isinstance(img_or_path, Path): img = Image.open(img_or_path) elif isinstance(img_or_path, Image.Image): img = img_or_path else: raise ValueError(f'img_or_path must be a path or PIL.Image, instead got: {img_or_path}') image_bytes = self._preprocess(img) image = vision.Image(content=image_bytes) try: response = self.client.text_detection(image=image) except ServiceUnavailable: return (False, 'Connection error!') except: return (False, 'Unknown error!') texts = response.text_annotations res = texts[0].description if len(texts) > 0 else '' x = (True, res) return x def _preprocess(self, img): return pil_image_to_bytes(img) class GoogleLens: name = 'glens' readable_name = 'Google Lens' key = 'l' available = False def __init__(self): if 'pyjson5' not in sys.modules: logger.warning('pyjson5 not available, Google Lens will not work!') elif 'requests' not in sys.modules: logger.warning('requests not available, Google Lens will not work!') else: self.regex = re.compile(r">AF_initDataCallback\(({key: 'ds:1'.*?)\);") self.available = True logger.info('Google Lens ready') def __call__(self, img_or_path): if isinstance(img_or_path, str) or isinstance(img_or_path, Path): img = Image.open(img_or_path) elif isinstance(img_or_path, Image.Image): img = img_or_path else: raise ValueError(f'img_or_path must be a path or PIL.Image, instead got: {img_or_path}') timestamp = int(time.time() * 1000) url = f'https://lens.google.com/v3/upload?stcs={timestamp}' files = {'encoded_image': ('owo' + str(timestamp) + '.png', self._preprocess(img), 'image/png')} try: res = requests.post(url, files=files, timeout=20) except requests.exceptions.Timeout: return (False, 'Request timeout!') except requests.exceptions.ConnectionError: return (False, 'Connection error!') if res.status_code != 200: return (False, 'Unknown error!') match = self.regex.search(res.text) if match == None: return (False, 'Regex error!') lens_object = pyjson5.loads(match.group(1)) if 'errorHasStatus' in lens_object: return (False, 'Unknown Lens error!') res = '' text = lens_object['data'][3][4][0] if len(text) > 0: lines = text[0] for line in lines: res += line + '\n' x = (True, res) return x def _preprocess(self, img): w,h = img.size if w * h > 3000000: aspect_ratio = w/h new_w = int(sqrt(3000000 * aspect_ratio)) new_h = int(new_w / aspect_ratio) img = img.resize((new_w, new_h), Image.LANCZOS) return pil_image_to_bytes(img) class AppleVision: name = 'avision' readable_name = 'Apple Vision' key = 'a' available = False def __init__(self): if sys.platform != 'darwin': logger.warning('Apple Vision is not supported on non-macOS platforms!') elif int(platform.mac_ver()[0].split('.')[0]) < 13: logger.warning('Apple Vision is not supported on macOS older than Ventura/13.0!') else: self.available = True logger.info('Apple Vision ready') def __call__(self, img_or_path): if isinstance(img_or_path, str) or isinstance(img_or_path, Path): img = Image.open(img_or_path) elif isinstance(img_or_path, Image.Image): img = img_or_path else: raise ValueError(f'img_or_path must be a path or PIL.Image, instead got: {img_or_path}') with objc.autorelease_pool(): req = Vision.VNRecognizeTextRequest.alloc().init() req.setRevision_(Vision.VNRecognizeTextRequestRevision3) req.setRecognitionLevel_(Vision.VNRequestTextRecognitionLevelAccurate) req.setUsesLanguageCorrection_(True) req.setRecognitionLanguages_(['ja','en']) handler = Vision.VNImageRequestHandler.alloc().initWithData_options_( self._preprocess(img), None ) success = handler.performRequests_error_([req], None) res = '' if success[0]: for result in req.results(): res += result.text() + '\n' req.dealloc() x = (True, res) else: x = (False, 'Unknown error!') handler.dealloc() return x def _preprocess(self, img): return pil_image_to_bytes(img, 'tiff') class WinRTOCR: name = 'winrtocr' readable_name = 'WinRT OCR' key = 'w' available = False def __init__(self, config={}): if sys.platform == 'win32': if int(platform.release()) < 10: logger.warning('WinRT OCR is not supported on Windows older than 10!') elif 'winocr' not in sys.modules: logger.warning('winocr not available, WinRT OCR will not work!') else: self.available = True logger.info('WinRT OCR ready') else: if 'requests' not in sys.modules: logger.warning('requests not available, WinRT OCR will not work!') else: try: self.url = config['url'] self.available = True logger.info('WinRT OCR ready') except: logger.warning('Error reading URL from config, WinRT OCR will not work!') def __call__(self, img_or_path): if isinstance(img_or_path, str) or isinstance(img_or_path, Path): img = Image.open(img_or_path) elif isinstance(img_or_path, Image.Image): img = img_or_path else: raise ValueError(f'img_or_path must be a path or PIL.Image, instead got: {img_or_path}') if sys.platform == 'win32': res = winocr.recognize_pil_sync(img, lang='ja')['text'] else: params = {'lang': 'ja'} try: res = requests.post(self.url, params=params, data=self._preprocess(img), timeout=3) except requests.exceptions.Timeout: return (False, 'Request timeout!') except requests.exceptions.ConnectionError: return (False, 'Connection error!') if res.status_code != 200: return (False, 'Unknown error!') res = json.loads(res.text)['text'] x = (True, res) return x def _preprocess(self, img): return pil_image_to_bytes(img, png_compression=1) class AzureImageAnalysis: name = 'azure' readable_name = 'Azure Image Analysis' key = 'v' available = False def __init__(self, config={}): if 'azure.ai.vision.imageanalysis' not in sys.modules: logger.warning('azure-ai-vision-imageanalysis not available, Azure Image Analysis will not work!') else: logger.info(f'Parsing Azure credentials') try: self.client = ImageAnalysisClient(config['endpoint'], AzureKeyCredential(config['api_key'])) self.available = True logger.info('Azure Image Analysis ready') except: logger.warning('Error parsing Azure credentials, Azure Image Analysis will not work!') def __call__(self, img_or_path): if isinstance(img_or_path, str) or isinstance(img_or_path, Path): img = Image.open(img_or_path) elif isinstance(img_or_path, Image.Image): img = img_or_path else: raise ValueError(f'img_or_path must be a path or PIL.Image, instead got: {img_or_path}') try: read_result = self.client.analyze(image_data=self._preprocess(img), visual_features=[VisualFeatures.READ]) except ServiceRequestError: return (False, 'Connection error!') except: return (False, 'Unknown error!') res = '' if read_result.read: for block in read_result.read.blocks: for line in block.lines: res += line.text + '\n' else: return (False, 'Unknown error!') x = (True, res) return x def _preprocess(self, img): if any(x < 50 for x in img.size): w,h = img.size resize_factor = max(50/w, 50/h) new_w = int(w * resize_factor) new_h = int(h * resize_factor) img = img.resize((new_w, new_h), Image.LANCZOS) return pil_image_to_bytes(img) class EasyOCR: name = 'easyocr' readable_name = 'EasyOCR' key = 'e' available = False def __init__(self): if 'easyocr' not in sys.modules: logger.warning('easyocr not available, EasyOCR will not work!') else: logger.info('Loading EasyOCR model') self.model = easyocr.Reader(['ja','en']) self.available = True logger.info('EasyOCR ready') def __call__(self, img_or_path): if isinstance(img_or_path, str) or isinstance(img_or_path, Path): img = Image.open(img_or_path) elif isinstance(img_or_path, Image.Image): img = img_or_path else: raise ValueError(f'img_or_path must be a path or PIL.Image, instead got: {img_or_path}') res = '' read_result = self.model.readtext(self._preprocess(img), detail=0) for text in read_result: res += text + '\n' x = (True, res) return x def _preprocess(self, img): return pil_image_to_numpy_array(img) class RapidOCR: name = 'rapidocr' readable_name = 'RapidOCR' key = 'r' available = False def __init__(self): if 'rapidocr_onnxruntime' not in sys.modules: logger.warning('rapidocr_onnxruntime not available, RapidOCR will not work!') else: rapidocr_model_file = os.path.join(os.path.expanduser('~'),'.cache','rapidocr_japan_PP-OCRv4_rec_infer.onnx') if not os.path.isfile(rapidocr_model_file): logger.info('Downloading RapidOCR model') try: cache_folder = os.path.join(os.path.expanduser('~'),'.cache') if not os.path.isdir(cache_folder): os.makedirs(cache_folder) urllib.request.urlretrieve('https://github.com/AuroraWright/owocr/raw/master/rapidocr_japan_PP-OCRv4_rec_infer.onnx', rapidocr_model_file) except: logger.warning('Download failed. RapidOCR will not work!') return logger.info('Loading RapidOCR model') self.model = ROCR(rec_model_path=rapidocr_model_file) logging.getLogger().disabled = True self.available = True logger.info('RapidOCR ready') def __call__(self, img_or_path): if isinstance(img_or_path, str) or isinstance(img_or_path, Path): img = Image.open(img_or_path) elif isinstance(img_or_path, Image.Image): img = img_or_path else: raise ValueError(f'img_or_path must be a path or PIL.Image, instead got: {img_or_path}') res = '' read_results, elapsed = self.model(self._preprocess(img)) if read_results: for read_result in read_results: res += read_result[1] + '\n' x = (True, res) return x def _preprocess(self, img): return pil_image_to_numpy_array(img)