Switch to betterproto for Lens, add Bing (thanks @bropines)
This commit is contained in:
176
owocr/ocr.py
176
owocr/ocr.py
@@ -6,6 +6,8 @@ import sys
|
||||
import platform
|
||||
import logging
|
||||
from math import sqrt
|
||||
import base64
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
import jaconv
|
||||
import numpy as np
|
||||
@@ -63,11 +65,8 @@ except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from google.protobuf.json_format import MessageToDict
|
||||
from .py_lens.lens_overlay_server_pb2 import LensOverlayServerRequest, LensOverlayServerResponse
|
||||
from .py_lens.lens_overlay_platform_pb2 import PLATFORM_WEB
|
||||
from .py_lens.lens_overlay_surface_pb2 import SURFACE_CHROMIUM
|
||||
from .py_lens.lens_overlay_filters_pb2 import AUTO_FILTER
|
||||
import betterproto
|
||||
from .lens_betterproto import *
|
||||
import random
|
||||
except ImportError:
|
||||
pass
|
||||
@@ -189,10 +188,8 @@ class GoogleLens:
|
||||
available = False
|
||||
|
||||
def __init__(self):
|
||||
if 'google.protobuf' not in sys.modules:
|
||||
logger.warning('protobuf not available, Google Lens will not work!')
|
||||
elif 'requests' not in sys.modules:
|
||||
logger.warning('requests not available, Google Lens will not work!')
|
||||
if 'betterproto' not in sys.modules:
|
||||
logger.warning('betterproto not available, Google Lens will not work!')
|
||||
else:
|
||||
self.available = True
|
||||
logger.info('Google Lens ready')
|
||||
@@ -211,10 +208,10 @@ class GoogleLens:
|
||||
request.objects_request.request_context.request_id.sequence_id = 0
|
||||
request.objects_request.request_context.request_id.image_sequence_id = 0
|
||||
request.objects_request.request_context.request_id.analytics_id = random.randbytes(16)
|
||||
request.objects_request.request_context.request_id.routing_info.Clear()
|
||||
request.objects_request.request_context.request_id.routing_info = LensOverlayRoutingInfo()
|
||||
|
||||
request.objects_request.request_context.client_context.platform = PLATFORM_WEB
|
||||
request.objects_request.request_context.client_context.surface = SURFACE_CHROMIUM
|
||||
request.objects_request.request_context.client_context.platform = Platform.WEB
|
||||
request.objects_request.request_context.client_context.surface = Surface.CHROMIUM
|
||||
|
||||
request.objects_request.request_context.client_context.locale_context.language = 'ja'
|
||||
request.objects_request.request_context.client_context.locale_context.region = 'Asia/Tokyo'
|
||||
@@ -222,8 +219,9 @@ class GoogleLens:
|
||||
|
||||
request.objects_request.request_context.client_context.app_id = '' # not set by chromium
|
||||
|
||||
filter = request.objects_request.request_context.client_context.client_filters.filter.add()
|
||||
filter.filter_type = AUTO_FILTER
|
||||
filter = AppliedFilter()
|
||||
filter.filter_type = LensOverlayFilterType.AUTO_FILTER
|
||||
request.objects_request.request_context.client_context.client_filters.filter.append(filter)
|
||||
|
||||
image_data = self._preprocess(img)
|
||||
request.objects_request.image_data.payload.image_bytes = image_data[0]
|
||||
@@ -255,9 +253,8 @@ class GoogleLens:
|
||||
if res.status_code != 200:
|
||||
return (False, 'Unknown error!')
|
||||
|
||||
response_proto = LensOverlayServerResponse()
|
||||
response_proto.ParseFromString(res.content)
|
||||
response_dict = MessageToDict(response_proto, preserving_proto_field_name=True)
|
||||
response_proto = LensOverlayServerResponse().FromString(res.content)
|
||||
response_dict = response_proto.to_dict(betterproto.Casing.SNAKE)
|
||||
|
||||
res = ''
|
||||
text = response_dict['objects_response']['text']['text_layout']['paragraphs']
|
||||
@@ -288,8 +285,6 @@ class GoogleLensWeb:
|
||||
def __init__(self):
|
||||
if 'pyjson5' not in sys.modules:
|
||||
logger.warning('pyjson5 not available, Google Lens (web) will not work!')
|
||||
elif 'requests' not in sys.modules:
|
||||
logger.warning('requests not available, Google Lens (web) will not work!')
|
||||
else:
|
||||
self.regex = re.compile(r'(\w+)=([^&]+)')
|
||||
self.requests_session = requests.Session()
|
||||
@@ -372,6 +367,119 @@ class GoogleLensWeb:
|
||||
|
||||
return pil_image_to_bytes(img)
|
||||
|
||||
class Bing:
|
||||
name = 'bing'
|
||||
readable_name = 'Bing'
|
||||
key = 'b'
|
||||
available = False
|
||||
|
||||
def __init__(self):
|
||||
self.requests_session = requests.Session()
|
||||
self.available = True
|
||||
logger.info('Bing ready')
|
||||
|
||||
def __call__(self, img_or_path):
|
||||
if isinstance(img_or_path, str) or isinstance(img_or_path, Path):
|
||||
img = Image.open(img_or_path)
|
||||
elif isinstance(img_or_path, Image.Image):
|
||||
img = img_or_path
|
||||
else:
|
||||
raise ValueError(f'img_or_path must be a path or PIL.Image, instead got: {img_or_path}')
|
||||
|
||||
upload_url = 'https://www.bing.com/images/search?view=detailv2&iss=sbiupload'
|
||||
upload_headers = {
|
||||
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
|
||||
'accept-language': 'ja-JP;q=0.6,ja;q=0.5',
|
||||
'cache-control': 'max-age=0',
|
||||
'origin': 'https://www.bing.com',
|
||||
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:136.0) Gecko/20100101 Firefox/136.0',
|
||||
}
|
||||
files = {
|
||||
'imgurl': (None, ''),
|
||||
'cbir': (None, 'sbi'),
|
||||
'imageBin': (None, self._preprocess(img))
|
||||
}
|
||||
|
||||
try:
|
||||
res = self.requests_session.post(upload_url, headers=upload_headers, files=files, timeout=20, allow_redirects=False)
|
||||
except requests.exceptions.Timeout:
|
||||
return (False, 'Request timeout!')
|
||||
except requests.exceptions.ConnectionError:
|
||||
return (False, 'Connection error!')
|
||||
|
||||
if res.status_code != 302:
|
||||
return (False, 'Unknown error!')
|
||||
|
||||
redirect_url = res.headers.get('Location')
|
||||
if not redirect_url:
|
||||
return (False, 'Error getting redirect URL!')
|
||||
|
||||
parsed_url = urlparse(redirect_url)
|
||||
query_params = parse_qs(parsed_url.query)
|
||||
|
||||
image_insights_token = query_params.get('insightsToken')
|
||||
if not image_insights_token:
|
||||
return (False, 'Error getting token!')
|
||||
image_insights_token = image_insights_token[0]
|
||||
|
||||
api_url = 'https://www.bing.com/images/api/custom/knowledge'
|
||||
api_headers = {
|
||||
'accept': '*/*',
|
||||
'accept-language': 'ja-JP;q=0.6,ja;q=0.5',
|
||||
'origin': 'https://www.bing.com',
|
||||
'referer': f'https://www.bing.com/images/search?view=detailV2&insightstoken={image_insights_token}',
|
||||
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:136.0) Gecko/20100101 Firefox/136.0',
|
||||
}
|
||||
api_data_json = {
|
||||
'imageInfo': {'imageInsightsToken': image_insights_token, 'source': 'Url'},
|
||||
'knowledgeRequest': {'invokedSkills': ['OCR'], 'index': 1}
|
||||
}
|
||||
files = {
|
||||
'knowledgeRequest': (None, json.dumps(api_data_json), 'application/json')
|
||||
}
|
||||
|
||||
try:
|
||||
res = self.requests_session.post(api_url, headers=api_headers, files=files, timeout=20)
|
||||
except requests.exceptions.Timeout:
|
||||
return (False, 'Request timeout!')
|
||||
except requests.exceptions.ConnectionError:
|
||||
return (False, 'Connection error!')
|
||||
|
||||
if res.status_code != 200:
|
||||
return (False, 'Unknown error!')
|
||||
|
||||
data = res.json()
|
||||
|
||||
text_tag = None
|
||||
for tag in data['tags']:
|
||||
if tag.get('displayName') == '##TextRecognition':
|
||||
text_tag = tag
|
||||
break
|
||||
if not text_tag:
|
||||
return (False, 'No ##TextRecognition tag in response!')
|
||||
|
||||
text_action = None
|
||||
for action in text_tag['actions']:
|
||||
if action.get('_type') == 'ImageKnowledge/TextRecognitionAction':
|
||||
text_action = action
|
||||
break
|
||||
if not text_action:
|
||||
return (False, 'No TextRecognitionAction action in response!')
|
||||
|
||||
regions = text_action['data'].get('regions', [])
|
||||
|
||||
res = ''
|
||||
for region in regions:
|
||||
for line in region.get('lines', []):
|
||||
res += line['text'] + '\n'
|
||||
|
||||
x = (True, res)
|
||||
return x
|
||||
|
||||
def _preprocess(self, img):
|
||||
img_bytes = pil_image_to_bytes(img)
|
||||
return base64.b64encode(img_bytes).decode('utf-8')
|
||||
|
||||
class AppleVision:
|
||||
name = 'avision'
|
||||
readable_name = 'Apple Vision'
|
||||
@@ -521,15 +629,12 @@ class WinRTOCR:
|
||||
self.available = True
|
||||
logger.info('WinRT OCR ready')
|
||||
else:
|
||||
if 'requests' not in sys.modules:
|
||||
logger.warning('requests not available, WinRT OCR will not work!')
|
||||
else:
|
||||
try:
|
||||
self.url = config['url']
|
||||
self.available = True
|
||||
logger.info('WinRT OCR ready')
|
||||
except:
|
||||
logger.warning('Error reading URL from config, WinRT OCR will not work!')
|
||||
try:
|
||||
self.url = config['url']
|
||||
self.available = True
|
||||
logger.info('WinRT OCR ready')
|
||||
except:
|
||||
logger.warning('Error reading URL from config, WinRT OCR will not work!')
|
||||
|
||||
def __call__(self, img_or_path):
|
||||
if isinstance(img_or_path, str) or isinstance(img_or_path, Path):
|
||||
@@ -704,15 +809,12 @@ class OCRSpace:
|
||||
available = False
|
||||
|
||||
def __init__(self, config={}):
|
||||
if 'requests' not in sys.modules:
|
||||
logger.warning('requests not available, OCRSpace will not work!')
|
||||
else:
|
||||
try:
|
||||
self.api_key = config['api_key']
|
||||
self.available = True
|
||||
logger.info('OCRSpace ready')
|
||||
except:
|
||||
logger.warning('Error reading API key from config, OCRSpace will not work!')
|
||||
try:
|
||||
self.api_key = config['api_key']
|
||||
self.available = True
|
||||
logger.info('OCRSpace ready')
|
||||
except:
|
||||
logger.warning('Error reading API key from config, OCRSpace will not work!')
|
||||
|
||||
def __call__(self, img_or_path):
|
||||
if isinstance(img_or_path, str) or isinstance(img_or_path, Path):
|
||||
|
||||
Reference in New Issue
Block a user