mirror of
https://github.com/ksyasuda/dotfiles.git
synced 2026-02-28 00:22:41 -08:00
add plugins
This commit is contained in:
@@ -0,0 +1,91 @@
|
||||
from __future__ import annotations
|
||||
|
||||
__version__ = '1.2.2'
|
||||
|
||||
import abc
|
||||
import json
|
||||
|
||||
from yt_dlp.extractor.youtube.pot.provider import (
|
||||
ExternalRequestFeature,
|
||||
PoTokenContext,
|
||||
PoTokenProvider,
|
||||
PoTokenProviderRejectedRequest,
|
||||
)
|
||||
from yt_dlp.extractor.youtube.pot.utils import WEBPO_CLIENTS
|
||||
from yt_dlp.utils import js_to_json
|
||||
from yt_dlp.utils.traversal import traverse_obj
|
||||
|
||||
|
||||
class BgUtilPTPBase(PoTokenProvider, abc.ABC):
|
||||
PROVIDER_VERSION = __version__
|
||||
BUG_REPORT_LOCATION = 'https://github.com/Brainicism/bgutil-ytdlp-pot-provider/issues'
|
||||
_SUPPORTED_EXTERNAL_REQUEST_FEATURES = (
|
||||
ExternalRequestFeature.PROXY_SCHEME_HTTP,
|
||||
ExternalRequestFeature.PROXY_SCHEME_HTTPS,
|
||||
ExternalRequestFeature.PROXY_SCHEME_SOCKS4,
|
||||
ExternalRequestFeature.PROXY_SCHEME_SOCKS4A,
|
||||
ExternalRequestFeature.PROXY_SCHEME_SOCKS5,
|
||||
ExternalRequestFeature.PROXY_SCHEME_SOCKS5H,
|
||||
ExternalRequestFeature.SOURCE_ADDRESS,
|
||||
ExternalRequestFeature.DISABLE_TLS_VERIFICATION,
|
||||
)
|
||||
_SUPPORTED_CLIENTS = WEBPO_CLIENTS
|
||||
_SUPPORTED_CONTEXTS = (
|
||||
PoTokenContext.GVS,
|
||||
PoTokenContext.PLAYER,
|
||||
PoTokenContext.SUBS,
|
||||
)
|
||||
_GETPOT_TIMEOUT = 20.0
|
||||
_GET_SERVER_VSN_TIMEOUT = 5.0
|
||||
_MIN_NODE_VSN = (18, 0, 0)
|
||||
|
||||
def _info_and_raise(self, msg, raise_from=None):
|
||||
self.logger.info(msg)
|
||||
raise PoTokenProviderRejectedRequest(msg) from raise_from
|
||||
|
||||
def _warn_and_raise(self, msg, once=True, raise_from=None):
|
||||
self.logger.warning(msg, once=once)
|
||||
raise PoTokenProviderRejectedRequest(msg) from raise_from
|
||||
|
||||
def _check_version(self, got_version, *, default='unknown', name):
|
||||
def _major(version):
|
||||
return version.split('.', 1)[0]
|
||||
|
||||
if got_version != self.PROVIDER_VERSION:
|
||||
self.logger.warning(
|
||||
f'The provider plugin and the {name} are on different versions, '
|
||||
f'this may cause compatibility issues. '
|
||||
f'Please ensure they are on the same version. '
|
||||
f'Otherwise, help will NOT be provided for any issues that arise. '
|
||||
f'(plugin: {self.PROVIDER_VERSION}, {name}: {got_version or default})',
|
||||
once=True,
|
||||
)
|
||||
if not got_version or _major(got_version) != _major(self.PROVIDER_VERSION):
|
||||
self._warn_and_raise(
|
||||
f'Plugin and {name} major versions are mismatched. '
|
||||
f'Update both the plugin and the {name} to the same version to proceed.'
|
||||
)
|
||||
|
||||
def _get_attestation(self, webpage: str | None):
|
||||
if not webpage:
|
||||
return None
|
||||
raw_challenge_data = self.ie._search_regex(
|
||||
r"""(?sx)window\.ytAtR\s*=\s*(?P<raw_cd>(?P<q>['"])
|
||||
(?:
|
||||
\\.|
|
||||
(?!(?P=q)).
|
||||
)*
|
||||
(?P=q))\s*;""",
|
||||
webpage,
|
||||
'raw challenge data',
|
||||
default=None,
|
||||
group='raw_cd',
|
||||
)
|
||||
att_txt = traverse_obj(raw_challenge_data, ({js_to_json}, {json.loads}, {json.loads}, 'bgChallenge'))
|
||||
if not att_txt:
|
||||
self.logger.warning('Failed to extract initial attestation from the webpage')
|
||||
return None
|
||||
return att_txt
|
||||
|
||||
|
||||
__all__ = ['__version__']
|
||||
@@ -0,0 +1,168 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import functools
|
||||
import json
|
||||
import time
|
||||
|
||||
from yt_dlp.extractor.youtube.pot.provider import (
|
||||
PoTokenProviderError,
|
||||
PoTokenProviderRejectedRequest,
|
||||
PoTokenRequest,
|
||||
PoTokenResponse,
|
||||
register_preference,
|
||||
register_provider,
|
||||
)
|
||||
from yt_dlp.extractor.youtube.pot.utils import get_webpo_content_binding
|
||||
from yt_dlp.networking.common import Request
|
||||
from yt_dlp.networking.exceptions import HTTPError, TransportError
|
||||
|
||||
from yt_dlp_plugins.extractor.getpot_bgutil import BgUtilPTPBase
|
||||
|
||||
|
||||
@register_provider
|
||||
class BgUtilHTTPPTP(BgUtilPTPBase):
|
||||
PROVIDER_NAME = 'bgutil:http'
|
||||
DEFAULT_BASE_URL = 'http://127.0.0.1:4416'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._last_server_check = 0
|
||||
self._server_available = True
|
||||
|
||||
@functools.cached_property
|
||||
def _base_url(self):
|
||||
base_url = self._configuration_arg('base_url', default=[None])[0]
|
||||
|
||||
if base_url:
|
||||
return base_url
|
||||
|
||||
# check deprecated arg
|
||||
deprecated_base_url = self.ie._configuration_arg(
|
||||
ie_key='youtube', key='getpot_bgutil_baseurl', default=[None])[0]
|
||||
if deprecated_base_url:
|
||||
self._warn_and_raise(
|
||||
"'youtube:getpot_bgutil_baseurl' extractor arg is deprecated, use 'youtubepot-bgutilhttp:base_url' instead")
|
||||
|
||||
# default if no arg was passed
|
||||
self.logger.debug(
|
||||
f'No base_url provided, defaulting to {self.DEFAULT_BASE_URL}')
|
||||
return self.DEFAULT_BASE_URL
|
||||
|
||||
def _check_server_availability(self, ctx: PoTokenRequest):
|
||||
if self._last_server_check + 60 > time.time():
|
||||
return self._server_available
|
||||
|
||||
self._server_available = False
|
||||
try:
|
||||
self.logger.trace(
|
||||
f'Checking server availability at {self._base_url}/ping')
|
||||
response = json.load(self._request_webpage(Request(
|
||||
f'{self._base_url}/ping', extensions={'timeout': self._GET_SERVER_VSN_TIMEOUT}, proxies={'all': None}),
|
||||
note=False))
|
||||
except TransportError as e:
|
||||
# the server may be down
|
||||
script_path_provided = self.ie._configuration_arg(
|
||||
ie_key='youtubepot-bgutilscript', key='script_path', default=[None])[0] is not None
|
||||
|
||||
warning_base = f'Error reaching GET {self._base_url}/ping (caused by {e.__class__.__name__}). '
|
||||
if script_path_provided: # server down is expected, log info
|
||||
self._info_and_raise(
|
||||
warning_base + 'This is expected if you are using the script method.')
|
||||
else:
|
||||
self._warn_and_raise(
|
||||
warning_base + f'Please make sure that the server is reachable at {self._base_url}.')
|
||||
|
||||
return
|
||||
except HTTPError as e:
|
||||
# may be an old server, don't raise
|
||||
self.logger.warning(
|
||||
f'HTTP Error reaching GET /ping (caused by {e!r})', once=True)
|
||||
return
|
||||
except json.JSONDecodeError as e:
|
||||
# invalid server
|
||||
self._warn_and_raise(
|
||||
f'Error parsing ping response JSON (caused by {e!r})')
|
||||
return
|
||||
except Exception as e:
|
||||
self._warn_and_raise(
|
||||
f'Unknown error reaching GET /ping (caused by {e!r})', raise_from=e)
|
||||
return
|
||||
else:
|
||||
self._check_version(response.get('version', ''), name='HTTP server')
|
||||
self._server_available = True
|
||||
return True
|
||||
finally:
|
||||
self._last_server_check = time.time()
|
||||
|
||||
def is_available(self):
|
||||
return self._server_available or self._last_server_check + 60 < int(time.time())
|
||||
|
||||
def _real_request_pot(
|
||||
self,
|
||||
request: PoTokenRequest,
|
||||
) -> PoTokenResponse:
|
||||
if not self._check_server_availability(request):
|
||||
raise PoTokenProviderRejectedRequest(
|
||||
f'{self.PROVIDER_NAME} server is not available')
|
||||
|
||||
# used for CI check
|
||||
self.logger.trace('Generating POT via HTTP server')
|
||||
|
||||
disable_innertube = bool(self._configuration_arg('disable_innertube', default=[None])[0])
|
||||
challenge = self._get_attestation(None if disable_innertube else request.video_webpage)
|
||||
# The challenge is falsy when the webpage and the challenge are unavailable
|
||||
# In this case, we need to disable /att/get since it's broken for web_music
|
||||
if not challenge and request.internal_client_name == 'web_music':
|
||||
if not disable_innertube: # if not already set, warn the user
|
||||
self.logger.warning(
|
||||
'BotGuard challenges could not be obtained from the webpage, '
|
||||
'overriding disable_innertube=True because InnerTube challenges '
|
||||
'are currently broken for the web_music client. '
|
||||
'Pass disable_innertube=1 to suppress this warning.')
|
||||
disable_innertube = True
|
||||
|
||||
try:
|
||||
response = self._request_webpage(
|
||||
request=Request(
|
||||
f'{self._base_url}/get_pot', data=json.dumps({
|
||||
'bypass_cache': request.bypass_cache,
|
||||
'challenge': challenge,
|
||||
'content_binding': get_webpo_content_binding(request)[0],
|
||||
'disable_innertube': disable_innertube,
|
||||
'disable_tls_verification': not request.request_verify_tls,
|
||||
'proxy': request.request_proxy,
|
||||
'innertube_context': request.innertube_context,
|
||||
'source_address': request.request_source_address,
|
||||
}).encode(), headers={'Content-Type': 'application/json'},
|
||||
extensions={'timeout': self._GETPOT_TIMEOUT}, proxies={'all': None}),
|
||||
note=f'Generating a {request.context.value} PO Token for '
|
||||
f'{request.internal_client_name} client via bgutil HTTP server',
|
||||
)
|
||||
except Exception as e:
|
||||
raise PoTokenProviderError(
|
||||
f'Error reaching POST /get_pot (caused by {e!r})') from e
|
||||
|
||||
try:
|
||||
response_json = json.load(response)
|
||||
except Exception as e:
|
||||
raise PoTokenProviderError(
|
||||
f'Error parsing response JSON (caused by {e!r}). response = {response.read().decode()}') from e
|
||||
|
||||
if error_msg := response_json.get('error'):
|
||||
raise PoTokenProviderError(error_msg)
|
||||
if 'poToken' not in response_json:
|
||||
raise PoTokenProviderError(
|
||||
f'Server did not respond with a poToken. Received response: {response}')
|
||||
|
||||
po_token = response_json['poToken']
|
||||
self.logger.trace(f'Generated POT: {po_token}')
|
||||
return PoTokenResponse(po_token=po_token)
|
||||
|
||||
|
||||
@register_preference(BgUtilHTTPPTP)
|
||||
def bgutil_HTTP_getpot_preference(provider, request):
|
||||
return 130
|
||||
|
||||
|
||||
__all__ = [BgUtilHTTPPTP.__name__,
|
||||
bgutil_HTTP_getpot_preference.__name__]
|
||||
@@ -0,0 +1,188 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import functools
|
||||
import json
|
||||
import os.path
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
from yt_dlp.extractor.youtube.pot.provider import (
|
||||
PoTokenProviderError,
|
||||
PoTokenRequest,
|
||||
PoTokenResponse,
|
||||
register_preference,
|
||||
register_provider,
|
||||
)
|
||||
from yt_dlp.extractor.youtube.pot.utils import get_webpo_content_binding
|
||||
from yt_dlp.utils import Popen
|
||||
|
||||
from yt_dlp_plugins.extractor.getpot_bgutil import BgUtilPTPBase
|
||||
|
||||
|
||||
@register_provider
|
||||
class BgUtilScriptPTP(BgUtilPTPBase):
|
||||
PROVIDER_NAME = 'bgutil:script'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self._check_script = functools.cache(self._check_script_impl)
|
||||
|
||||
@functools.cached_property
|
||||
def _script_path(self):
|
||||
script_path = self._configuration_arg(
|
||||
'script_path', casesense=True, default=[None])[0]
|
||||
|
||||
if script_path:
|
||||
return os.path.expandvars(script_path)
|
||||
|
||||
# check deprecated arg
|
||||
deprecated_script_path = self.ie._configuration_arg(
|
||||
ie_key='youtube', key='getpot_bgutil_script', default=[None])[0]
|
||||
|
||||
if deprecated_script_path:
|
||||
self._warn_and_raise(
|
||||
"'youtube:getpot_bgutil_script' extractor arg is deprecated, use 'youtubepot-bgutilscript:script_path' instead")
|
||||
|
||||
# default if no arg was passed
|
||||
home = os.path.expanduser('~')
|
||||
default_path = os.path.join(
|
||||
home, 'bgutil-ytdlp-pot-provider', 'server', 'build', 'generate_once.js')
|
||||
self.logger.debug(
|
||||
f'No script path passed, defaulting to {default_path}')
|
||||
return default_path
|
||||
|
||||
def is_available(self):
|
||||
return self._check_script(self._script_path)
|
||||
|
||||
@functools.cached_property
|
||||
def _node_path(self):
|
||||
node_path = shutil.which('node')
|
||||
if node_path is None:
|
||||
self.logger.trace('node is not in PATH')
|
||||
vsn = self._check_node_version(node_path)
|
||||
if vsn:
|
||||
self.logger.trace(f'Node version: {vsn}')
|
||||
return node_path
|
||||
|
||||
def _check_script_impl(self, script_path):
|
||||
if not os.path.isfile(script_path):
|
||||
self.logger.debug(
|
||||
f"Script path doesn't exist: {script_path}")
|
||||
return False
|
||||
if os.path.basename(script_path) != 'generate_once.js':
|
||||
self.logger.warning(
|
||||
'Incorrect script passed to extractor args. Path to generate_once.js required', once=True)
|
||||
return False
|
||||
node_path = self._node_path
|
||||
if not node_path:
|
||||
return False
|
||||
stdout, stderr, returncode = Popen.run(
|
||||
[self._node_path, script_path, '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
|
||||
timeout=self._GET_SERVER_VSN_TIMEOUT)
|
||||
if returncode:
|
||||
self.logger.warning(
|
||||
f'Failed to check script version. '
|
||||
f'Script returned {returncode} exit status. '
|
||||
f'Script stdout: {stdout}; Script stderr: {stderr}',
|
||||
once=True)
|
||||
return False
|
||||
else:
|
||||
self._check_version(stdout.strip(), name='script')
|
||||
return True
|
||||
|
||||
def _check_node_version(self, node_path):
|
||||
try:
|
||||
stdout, stderr, returncode = Popen.run(
|
||||
[node_path, '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
|
||||
timeout=self._GET_SERVER_VSN_TIMEOUT)
|
||||
stdout = stdout.strip()
|
||||
mobj = re.match(r'v(\d+)\.(\d+)\.(\d+)', stdout)
|
||||
if returncode or not mobj:
|
||||
raise ValueError
|
||||
node_vsn = tuple(map(int, mobj.groups()))
|
||||
if node_vsn >= self._MIN_NODE_VSN:
|
||||
return node_vsn
|
||||
raise RuntimeError
|
||||
except RuntimeError:
|
||||
min_vsn_str = 'v' + '.'.join(str(v) for v in self._MIN_NODE_VSN)
|
||||
self.logger.warning(
|
||||
f'Node version too low. '
|
||||
f'(got {stdout}, but at least {min_vsn_str} is required)')
|
||||
except (subprocess.TimeoutExpired, ValueError):
|
||||
self.logger.warning(
|
||||
f'Failed to check node version. '
|
||||
f'Node returned {returncode} exit status. '
|
||||
f'Node stdout: {stdout}; Node stderr: {stderr}')
|
||||
|
||||
def _real_request_pot(
|
||||
self,
|
||||
request: PoTokenRequest,
|
||||
) -> PoTokenResponse:
|
||||
# used for CI check
|
||||
self.logger.trace(
|
||||
f'Generating POT via script: {self._script_path}')
|
||||
|
||||
command_args = [self._node_path, self._script_path]
|
||||
if proxy := request.request_proxy:
|
||||
command_args.extend(['-p', proxy])
|
||||
command_args.extend(['-c', get_webpo_content_binding(request)[0]])
|
||||
if request.bypass_cache:
|
||||
command_args.append('--bypass-cache')
|
||||
if request.request_source_address:
|
||||
command_args.extend(
|
||||
['--source-address', request.request_source_address])
|
||||
if request.request_verify_tls is False:
|
||||
command_args.append('--disable-tls-verification')
|
||||
|
||||
self.logger.info(
|
||||
f'Generating a {request.context.value} PO Token for '
|
||||
f'{request.internal_client_name} client via bgutil script',
|
||||
)
|
||||
self.logger.debug(
|
||||
f'Executing command to get POT via script: {" ".join(command_args)}')
|
||||
|
||||
try:
|
||||
stdout, stderr, returncode = Popen.run(
|
||||
command_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
|
||||
timeout=self._GETPOT_TIMEOUT)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
raise PoTokenProviderError(
|
||||
f'_get_pot_via_script failed: Timeout expired when trying to run script (caused by {e!r})')
|
||||
except Exception as e:
|
||||
raise PoTokenProviderError(
|
||||
f'_get_pot_via_script failed: Unable to run script (caused by {e!r})') from e
|
||||
|
||||
msg = ''
|
||||
if stdout_extra := stdout.strip().splitlines()[:-1]:
|
||||
msg = f'stdout:\n{stdout_extra}\n'
|
||||
if stderr_stripped := stderr.strip(): # Empty strings are falsy
|
||||
msg += f'stderr:\n{stderr_stripped}\n'
|
||||
msg = msg.strip()
|
||||
if msg:
|
||||
self.logger.trace(msg)
|
||||
if returncode:
|
||||
raise PoTokenProviderError(
|
||||
f'_get_pot_via_script failed with returncode {returncode}')
|
||||
|
||||
try:
|
||||
json_resp = stdout.splitlines()[-1]
|
||||
self.logger.trace(f'JSON response:\n{json_resp}')
|
||||
# The JSON response is always the last line
|
||||
script_data_resp = json.loads(json_resp)
|
||||
except json.JSONDecodeError as e:
|
||||
raise PoTokenProviderError(
|
||||
f'Error parsing JSON response from _get_pot_via_script (caused by {e!r})') from e
|
||||
if 'poToken' not in script_data_resp:
|
||||
raise PoTokenProviderError(
|
||||
'The script did not respond with a po_token')
|
||||
return PoTokenResponse(po_token=script_data_resp['poToken'])
|
||||
|
||||
|
||||
@register_preference(BgUtilScriptPTP)
|
||||
def bgutil_script_getpot_preference(provider, request):
|
||||
return 1
|
||||
|
||||
|
||||
__all__ = [BgUtilScriptPTP.__name__,
|
||||
bgutil_script_getpot_preference.__name__]
|
||||
Reference in New Issue
Block a user