Push my mod

This commit is contained in:
AuroraWright
2023-09-15 12:03:43 +02:00
parent 1a3ffca7c8
commit 9bfc265192
6 changed files with 504 additions and 209 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

View File

@@ -1,3 +1,6 @@
__version__ = '0.1.11'
from manga_ocr.ocr import MangaOcr
__version__ = '0.1.10'
from manga_ocr.ocr import MangaOcr
from manga_ocr.ocr import GoogleVision
from manga_ocr.ocr import AppleVision
from manga_ocr.ocr import AzureComputerVision

View File

@@ -1,11 +1,11 @@
import fire
from manga_ocr.run import run
def main():
fire.Fire(run)
if __name__ == '__main__':
main()
import fire
from manga_ocr.run import run
def main():
fire.Fire(run)
if __name__ == '__main__':
main()

View File

@@ -1,61 +1,231 @@
import re
from pathlib import Path
import jaconv
import torch
from PIL import Image
from loguru import logger
from transformers import AutoFeatureExtractor, AutoTokenizer, VisionEncoderDecoderModel
class MangaOcr:
def __init__(self, pretrained_model_name_or_path='kha-white/manga-ocr-base', force_cpu=False):
logger.info(f'Loading OCR model from {pretrained_model_name_or_path}')
self.feature_extractor = AutoFeatureExtractor.from_pretrained(pretrained_model_name_or_path)
self.tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path)
self.model = VisionEncoderDecoderModel.from_pretrained(pretrained_model_name_or_path)
if not force_cpu and torch.cuda.is_available():
logger.info('Using CUDA')
self.model.cuda()
elif not force_cpu and torch.backends.mps.is_available():
logger.info('Using MPS')
self.model.to('mps')
else:
logger.info('Using CPU')
example_path = Path(__file__).parent / 'assets/example.jpg'
if not example_path.is_file():
example_path = Path(__file__).parent.parent / 'assets/example.jpg'
self(example_path)
logger.info('OCR ready')
def __call__(self, img_or_path):
if isinstance(img_or_path, str) or isinstance(img_or_path, Path):
img = Image.open(img_or_path)
elif isinstance(img_or_path, Image.Image):
img = img_or_path
else:
raise ValueError(f'img_or_path must be a path or PIL.Image, instead got: {img_or_path}')
img = img.convert('L').convert('RGB')
x = self._preprocess(img)
x = self.model.generate(x[None].to(self.model.device), max_length=300)[0].cpu()
x = self.tokenizer.decode(x, skip_special_tokens=True)
x = post_process(x)
return x
def _preprocess(self, img):
pixel_values = self.feature_extractor(img, return_tensors="pt").pixel_values
return pixel_values.squeeze()
def post_process(text):
text = ''.join(text.split())
text = text.replace('', '...')
text = re.sub('[・.]{2,}', lambda x: (x.end() - x.start()) * '.', text)
text = jaconv.h2z(text, ascii=True, digit=True)
return text
import re
import os
import io
from pathlib import Path
import warnings
import configparser
import time
import sys
import platform
import jaconv
import torch
from PIL import Image
from loguru import logger
from transformers import ViTImageProcessor, AutoTokenizer, VisionEncoderDecoderModel
try:
import Vision
import objc
except ImportError:
pass
try:
from google.cloud import vision
from google.oauth2 import service_account
except ImportError:
pass
try:
from azure.cognitiveservices.vision.computervision import ComputerVisionClient
from azure.cognitiveservices.vision.computervision.models import OperationStatusCodes
from msrest.authentication import CognitiveServicesCredentials
except ImportError:
pass
class MangaOcr:
def __init__(self, pretrained_model_name_or_path='kha-white/manga-ocr-base', force_cpu=False):
logger.info(f'Loading OCR model from {pretrained_model_name_or_path}')
self.processor = ViTImageProcessor.from_pretrained(pretrained_model_name_or_path)
self.tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path)
self.model = VisionEncoderDecoderModel.from_pretrained(pretrained_model_name_or_path)
if not force_cpu and torch.cuda.is_available():
logger.info('Using CUDA')
self.model.cuda()
elif not force_cpu and torch.backends.mps.is_available():
logger.info('Using MPS')
warnings.filterwarnings("ignore", message=".*MPS: no support.*")
self.model.to('mps')
else:
logger.info('Using CPU')
logger.info('Manga OCR ready')
def __call__(self, img_or_path):
if isinstance(img_or_path, str) or isinstance(img_or_path, Path):
img = Image.open(img_or_path)
elif isinstance(img_or_path, Image.Image):
img = img_or_path
else:
raise ValueError(f'img_or_path must be a path or PIL.Image, instead got: {img_or_path}')
img = img.convert('L').convert('RGB')
x = self._preprocess(img)
x = self.model.generate(x[None].to(self.model.device), max_length=300)[0].cpu()
x = self.tokenizer.decode(x, skip_special_tokens=True)
x = post_process(x)
return x
def _preprocess(self, img):
pixel_values = self.processor(img, return_tensors="pt").pixel_values
return pixel_values.squeeze()
class GoogleVision:
def __init__(self):
if 'google.cloud' not in sys.modules:
logger.warning('google-cloud-vision not available, Google Vision will not work!')
self.available = False
else:
logger.info(f'Parsing Google credentials')
google_credentials_file = os.path.join(os.path.expanduser('~'),'.config','google_vision.json')
try:
google_credentials = service_account.Credentials.from_service_account_file(google_credentials_file)
self.client = vision.ImageAnnotatorClient(credentials=google_credentials)
self.available = True
logger.info('Google Vision ready')
except:
logger.warning('Error parsing Google credentials, Google Vision will not work!')
self.available = False
def __call__(self, img_or_path):
if not self.available:
return "Engine not available!"
if isinstance(img_or_path, str) or isinstance(img_or_path, Path):
img = Image.open(img_or_path)
elif isinstance(img_or_path, Image.Image):
img = img_or_path
else:
raise ValueError(f'img_or_path must be a path or PIL.Image, instead got: {img_or_path}')
image_bytes = self._preprocess(img)
image = vision.Image(content=image_bytes)
response = self.client.text_detection(image=image)
texts = response.text_annotations
x = post_process(texts[0].description)
return x
def _preprocess(self, img):
image_bytes = io.BytesIO()
img.save(image_bytes, format=img.format)
return image_bytes.getvalue()
class AppleVision:
def __init__(self):
if sys.platform != "darwin":
logger.warning('Apple Vision is not supported on non-macOS platforms!')
self.available = False
elif int(platform.mac_ver()[0].split('.')[0]) < 13:
logger.warning('Apple Vision is not supported on macOS older than Ventura/13.0!')
self.available = False
else:
if 'objc' not in sys.modules:
logger.warning('pyobjc not available, Apple Vision will not work!')
self.available = False
else:
self.available = True
logger.info('Apple Vision ready')
def __call__(self, img_or_path):
if not self.available:
return "Engine not available!"
if isinstance(img_or_path, str) or isinstance(img_or_path, Path):
img = Image.open(img_or_path)
elif isinstance(img_or_path, Image.Image):
img = img_or_path
else:
raise ValueError(f'img_or_path must be a path or PIL.Image, instead got: {img_or_path}')
with objc.autorelease_pool():
req = Vision.VNRecognizeTextRequest.alloc().init()
req.setRecognitionLevel_(0)
req.setRecognitionLanguages_(['ja','en'])
handler = Vision.VNImageRequestHandler.alloc().initWithData_options_(
self._preprocess(img), None
)
success = handler.performRequests_error_([req], None)
res = ''
if success:
for result in req.results():
res += result.text() + ' '
req.dealloc()
handler.dealloc()
x = post_process(res)
return x
def _preprocess(self, img):
image_bytes = io.BytesIO()
img.save(image_bytes, format=img.format)
return image_bytes.getvalue()
class AzureComputerVision:
def __init__(self):
if 'azure.cognitiveservices.vision.computervision' not in sys.modules:
logger.warning('azure-cognitiveservices-vision-computervision not available, Azure Computer Vision will not work!')
self.available = False
else:
logger.info(f'Parsing Azure credentials')
azure_credentials_file = os.path.join(os.path.expanduser('~'),'.config','azure_computer_vision.ini')
try:
azure_credentials = configparser.ConfigParser()
azure_credentials.read(azure_credentials_file)
self.client = ComputerVisionClient(azure_credentials['config']['endpoint'], CognitiveServicesCredentials(azure_credentials['config']['api_key']))
self.available = True
logger.info('Azure Computer Vision ready')
except:
logger.warning('Error parsing Azure credentials, Azure Computer Vision will not work!')
self.available = False
def __call__(self, img_or_path):
if not self.available:
return "Engine not available!"
if isinstance(img_or_path, str) or isinstance(img_or_path, Path):
img = Image.open(img_or_path)
elif isinstance(img_or_path, Image.Image):
img = img_or_path
else:
raise ValueError(f'img_or_path must be a path or PIL.Image, instead got: {img_or_path}')
image_io = self._preprocess(img)
read_response = self.client.read_in_stream(image_io, raw=True)
read_operation_location = read_response.headers["Operation-Location"]
operation_id = read_operation_location.split("/")[-1]
while True:
read_result = self.client.get_read_result(operation_id)
if read_result.status.lower() not in ['notstarted', 'running']:
break
time.sleep(0.3)
res = ''
if read_result.status == OperationStatusCodes.succeeded:
for text_result in read_result.analyze_result.read_results:
for line in text_result.lines:
res += line.text + ' '
x = post_process(res)
return x
def _preprocess(self, img):
image_io = io.BytesIO()
img.save(image_io, format=img.format)
image_io.seek(0)
return image_io
def post_process(text):
text = ''.join(text.split())
text = text.replace('', '...')
text = re.sub('[・.]{2,}', lambda x: (x.end() - x.start()) * '.', text)
text = jaconv.h2z(text, ascii=True, digit=True)
return text

View File

@@ -1,134 +1,252 @@
import sys
import time
from pathlib import Path
import fire
import numpy as np
import pyperclip
from PIL import Image
from PIL import UnidentifiedImageError
from loguru import logger
from manga_ocr import MangaOcr
def are_images_identical(img1, img2):
if None in (img1, img2):
return img1 == img2
img1 = np.array(img1)
img2 = np.array(img2)
return (img1.shape == img2.shape) and (img1 == img2).all()
def process_and_write_results(mocr, img_or_path, write_to):
t0 = time.time()
text = mocr(img_or_path)
t1 = time.time()
logger.info(f'Text recognized in {t1 - t0:0.03f} s: {text}')
if write_to == 'clipboard':
pyperclip.copy(text)
else:
write_to = Path(write_to)
if write_to.suffix != '.txt':
raise ValueError('write_to must be either "clipboard" or a path to a text file')
with write_to.open('a', encoding="utf-8") as f:
f.write(text + '\n')
def get_path_key(path):
return path, path.lstat().st_mtime
def run(read_from='clipboard',
write_to='clipboard',
pretrained_model_name_or_path='kha-white/manga-ocr-base',
force_cpu=False,
delay_secs=0.1,
verbose=False
):
"""
Run OCR in the background, waiting for new images to appear either in system clipboard, or a directory.
Recognized texts can be either saved to system clipboard, or appended to a text file.
:param read_from: Specifies where to read input images from. Can be either "clipboard", or a path to a directory.
:param write_to: Specifies where to save recognized texts to. Can be either "clipboard", or a path to a text file.
:param pretrained_model_name_or_path: Path to a trained model, either local or from Transformers' model hub.
:param force_cpu: If True, OCR will use CPU even if GPU is available.
:param verbose: If True, unhides all warnings.
:param delay_secs: How often to check for new images, in seconds.
"""
mocr = MangaOcr(pretrained_model_name_or_path, force_cpu)
if sys.platform not in ('darwin', 'win32') and write_to == 'clipboard':
# Check if the system is using Wayland
import os
if os.environ.get('WAYLAND_DISPLAY'):
# Check if the wl-clipboard package is installed
if os.system("which wl-copy > /dev/null") == 0:
pyperclip.set_clipboard("wl-clipboard")
else:
msg = 'Your session uses wayland and does not have wl-clipboard installed. ' \
'Install wl-clipboard for write in clipboard to work.'
raise NotImplementedError(msg)
if read_from == 'clipboard':
from PIL import ImageGrab
logger.info('Reading from clipboard')
img = None
while True:
old_img = img
try:
img = ImageGrab.grabclipboard()
except OSError as error:
if not verbose and "cannot identify image file" in str(error):
# Pillow error when clipboard hasn't changed since last grab (Linux)
pass
elif not verbose and "target image/png not available" in str(error):
# Pillow error when clipboard contains text (Linux, X11)
pass
else:
logger.warning('Error while reading from clipboard ({})'.format(error))
else:
if isinstance(img, Image.Image) and not are_images_identical(img, old_img):
process_and_write_results(mocr, img, write_to)
time.sleep(delay_secs)
else:
read_from = Path(read_from)
if not read_from.is_dir():
raise ValueError('read_from must be either "clipboard" or a path to a directory')
logger.info(f'Reading from directory {read_from}')
old_paths = set()
for path in read_from.iterdir():
old_paths.add(get_path_key(path))
while True:
for path in read_from.iterdir():
path_key = get_path_key(path)
if path_key not in old_paths:
old_paths.add(path_key)
try:
img = Image.open(path)
img.load()
except (UnidentifiedImageError, OSError) as e:
logger.warning(f'Error while reading file {path}: {e}')
else:
process_and_write_results(mocr, img, write_to)
time.sleep(delay_secs)
if __name__ == '__main__':
fire.Fire(run)
import sys
import time
import threading
import os
from pathlib import Path
import fire
import numpy as np
import pyperclip
from PIL import Image
from PIL import UnidentifiedImageError
from loguru import logger
from pynput import keyboard
from manga_ocr import MangaOcr
from manga_ocr import GoogleVision
from manga_ocr import AppleVision
from manga_ocr import AzureComputerVision
engines = ['avision', 'gvision', 'azure', 'mangaocr']
def get_engine_name(engine):
engine_names = ['Apple Vision', 'Google Vision', 'Azure Computer Vision', 'Manga OCR']
return engine_names[engines.index(engine)]
def are_images_identical(img1, img2):
if None in (img1, img2):
return img1 == img2
img1 = np.array(img1)
img2 = np.array(img2)
return (img1.shape == img2.shape) and (img1 == img2).all()
def process_and_write_results(mocr, avision, gvision, azure, img_or_path, write_to, engine):
t0 = time.time()
if engine == 'gvision':
text = gvision(img_or_path)
elif engine == 'avision':
text = avision(img_or_path)
elif engine == 'azure':
text = azure(img_or_path)
else:
text = mocr(img_or_path)
t1 = time.time()
logger.opt(ansi=True).info(f"Text recognized in {t1 - t0:0.03f}s using <cyan>{get_engine_name(engine)}</cyan>: {text}")
if write_to == 'clipboard':
pyperclip.copy(text)
else:
write_to = Path(write_to)
if write_to.suffix != '.txt':
raise ValueError('write_to must be either "clipboard" or a path to a text file')
with write_to.open('a', encoding="utf-8") as f:
f.write(text + '\n')
def get_path_key(path):
return path, path.lstat().st_mtime
def run(read_from='clipboard',
write_to='clipboard',
pretrained_model_name_or_path='kha-white/manga-ocr-base',
force_cpu=False,
delay_secs=0.5,
engine='mangaocr',
verbose=False
):
"""
Run OCR in the background, waiting for new images to appear either in system clipboard, or a directory.
Recognized texts can be either saved to system clipboard, or appended to a text file.
:param read_from: Specifies where to read input images from. Can be either "clipboard", or a path to a directory.
:param write_to: Specifies where to save recognized texts to. Can be either "clipboard", or a path to a text file.
:param pretrained_model_name_or_path: Path to a trained model, either local or from Transformers' model hub.
:param force_cpu: If True, OCR will use CPU even if GPU is available.
:param delay_secs: How often to check for new images, in seconds.
:param engine: OCR engine to use. Available: "mangaocr", "gvision", "avision", "azure".
:param verbose: If True, unhides all warnings.
"""
fmt = "<green>{time:HH:mm:ss.SSS}</green> | <level>{message}</level>"
config = {
"handlers": [
{"sink": sys.stderr, "format": fmt},
],
}
logger.configure(**config)
mocr = MangaOcr(pretrained_model_name_or_path, force_cpu)
gvision = GoogleVision()
azure = AzureComputerVision()
avision = AppleVision()
if engine not in engines:
msg = 'Unknown OCR engine!'
raise NotImplementedError(msg)
if sys.platform not in ('darwin', 'win32') and write_to == 'clipboard':
# Check if the system is using Wayland
import os
if os.environ.get('WAYLAND_DISPLAY'):
# Check if the wl-clipboard package is installed
if os.system("which wl-copy > /dev/null") == 0:
pyperclip.set_clipboard("wl-clipboard")
else:
msg = 'Your session uses wayland and does not have wl-clipboard installed. ' \
'Install wl-clipboard for write in clipboard to work.'
raise NotImplementedError(msg)
if read_from == 'clipboard':
from PIL import ImageGrab
logger.info('Reading from clipboard')
paused = False
global just_unpaused
just_unpaused = True
img = None
def on_key_press(key):
global tmp_paused
if key == keyboard.Key.cmd_r or key == keyboard.Key.ctrl_r:
tmp_paused = True
def on_key_release(key):
global tmp_paused
global just_unpaused
if key == keyboard.Key.cmd_r or key == keyboard.Key.ctrl_r:
tmp_paused = False
just_unpaused = True
global tmp_paused
tmp_paused = False
tmp_paused_listener = keyboard.Listener(
on_press=on_key_press,
on_release=on_key_release)
tmp_paused_listener.start()
else:
read_from = Path(read_from)
if not read_from.is_dir():
raise ValueError('read_from must be either "clipboard" or a path to a directory')
logger.info(f'Reading from directory {read_from}')
old_paths = set()
for path in read_from.iterdir():
old_paths.add(get_path_key(path))
def getchar_thread():
global user_input
import os
if os.name == 'nt': # how it works on windows
import msvcrt
while True:
user_input = msvcrt.getch()
if user_input.lower() in 'tq':
break
else:
import tty, termios, sys
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setcbreak(sys.stdin.fileno())
while True:
user_input = sys.stdin.read(1)
if user_input.lower() in 'tq':
break
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
global user_input
user_input = ''
user_input_thread = threading.Thread(target=getchar_thread, daemon=True)
user_input_thread.start()
while True:
if user_input != '':
if user_input.lower() in 'tq':
if read_from == 'clipboard':
tmp_paused_listener.stop()
user_input_thread.join()
logger.info('Terminated!')
break
if read_from == 'clipboard' and user_input.lower() == 'p':
if paused:
logger.info('Unpaused!')
just_unpaused = True
else:
logger.info('Paused!')
paused = not paused
elif user_input.lower() == 's':
if engine == engines[-1]:
engine = engines[0]
else:
engine = engines[engines.index(engine) + 1]
logger.opt(ansi=True).info(f"Switched to <cyan>{get_engine_name(engine)}</cyan>!")
elif user_input.lower() in 'agvm':
new_engine = engines['agvm'.find(user_input.lower())]
if engine != new_engine:
engine = new_engine
logger.opt(ansi=True).info(f"Switched to <cyan>{get_engine_name(engine)}</cyan>!")
user_input = ''
if read_from == 'clipboard':
if not paused and not tmp_paused:
old_img = img
try:
img = ImageGrab.grabclipboard()
except OSError as error:
if not verbose and "cannot identify image file" in str(error):
# Pillow error when clipboard hasn't changed since last grab (Linux)
pass
elif not verbose and "target image/png not available" in str(error):
# Pillow error when clipboard contains text (Linux, X11)
pass
else:
logger.warning('Error while reading from clipboard ({})'.format(error))
else:
if not just_unpaused and isinstance(img, Image.Image) and not are_images_identical(img, old_img):
process_and_write_results(mocr, avision, gvision, azure, img, write_to, engine)
if just_unpaused:
just_unpaused = False
else:
for path in read_from.iterdir():
path_key = get_path_key(path)
if path_key not in old_paths:
old_paths.add(path_key)
try:
img = Image.open(path)
img.load()
except (UnidentifiedImageError, OSError) as e:
logger.warning(f'Error while reading file {path}: {e}')
else:
process_and_write_results(mocr, avision, gvision, azure, img, write_to, engine)
time.sleep(delay_secs)
if __name__ == '__main__':
fire.Fire(run)

View File

@@ -8,3 +8,7 @@ pyperclip
torch>=1.0
transformers>=4.25.0
unidic_lite
google-cloud-vision
azure-cognitiveservices-vision-computervision
pyobjc
pynput