214 lines
5.3 KiB
Python
214 lines
5.3 KiB
Python
|
#!/usr/bin/env python
|
||
|
|
||
|
import json
|
||
|
import logging
|
||
|
import os
|
||
|
import sys
|
||
|
from typing import List
|
||
|
|
||
|
import requests
|
||
|
from rofi import Rofi
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
logger.addHandler(logging.StreamHandler())
|
||
|
# logger.setLevel(logging.DEBUG)
|
||
|
logger.setLevel(logging.INFO)
|
||
|
|
||
|
|
||
|
def run_url(url):
|
||
|
config = load_config()
|
||
|
if config is None:
|
||
|
fail(url)
|
||
|
|
||
|
config_changed = False
|
||
|
|
||
|
response = None
|
||
|
if can_make_add_request(config):
|
||
|
response, get_token = add_url(config, url)
|
||
|
else:
|
||
|
logger.debug("Not enough parameters to post a URL")
|
||
|
get_token = True
|
||
|
|
||
|
if get_token:
|
||
|
logger.debug("Need to update token")
|
||
|
|
||
|
retry = False
|
||
|
|
||
|
if can_make_refresh_token_request(config) and update_token(
|
||
|
logger, config, refresh=True
|
||
|
):
|
||
|
logger.info("Token refreshed")
|
||
|
config_changed = True
|
||
|
retry = True
|
||
|
get_token = False
|
||
|
else:
|
||
|
logger.debug("Error while refreshing token")
|
||
|
|
||
|
if get_token and can_make_get_token_request(config) and update_token(config):
|
||
|
logger.info("Got new token")
|
||
|
config_changed = True
|
||
|
retry = True
|
||
|
|
||
|
if retry:
|
||
|
response, _ = add_url(config, url)
|
||
|
|
||
|
if config_changed:
|
||
|
store_config(config)
|
||
|
|
||
|
if not response:
|
||
|
fail(url)
|
||
|
|
||
|
|
||
|
def load_config():
|
||
|
try:
|
||
|
with open(get_config_path()) as f:
|
||
|
return json.load(f)
|
||
|
except Exception:
|
||
|
logger.error("Failed to load configuration file", exc_info=True)
|
||
|
return None
|
||
|
|
||
|
|
||
|
def store_config(config):
|
||
|
try:
|
||
|
with open(get_config_path(), "w", encoding="UTF-8") as f:
|
||
|
json.dump(config, f, indent=4)
|
||
|
except Exception:
|
||
|
logger.error("Failed to store configuration file", exc_info=True)
|
||
|
|
||
|
|
||
|
def fail(url):
|
||
|
logger.warning("Exiting")
|
||
|
|
||
|
if url:
|
||
|
logger.info("Saving url to file")
|
||
|
try:
|
||
|
with open(get_emergency_file_path(), "a", encoding="UTF-8") as f:
|
||
|
f.write(url + os.linesep)
|
||
|
except Exception:
|
||
|
logger.error("Error while saving a URL to file")
|
||
|
|
||
|
exit(1)
|
||
|
|
||
|
|
||
|
def get_config_path():
|
||
|
cfg_dir = (
|
||
|
os.getenv("XDG_CONFIG_HOME") or os.path.expanduser("~/.config")
|
||
|
) + "/wallabag-add"
|
||
|
return os.path.join(cfg_dir, "wallabag-adder.json")
|
||
|
|
||
|
|
||
|
def get_emergency_file_path():
|
||
|
cfg_dir = (
|
||
|
os.getenv("XDG_CONFIG_HOME") or os.path.expanduser("~/.config")
|
||
|
) + "/wallabag-add"
|
||
|
return os.path.join(cfg_dir, "wallabag-adder-failed-urls.txt")
|
||
|
|
||
|
|
||
|
def can_make_add_request(config):
|
||
|
if not config.get("access_token"):
|
||
|
return False
|
||
|
|
||
|
return True
|
||
|
|
||
|
|
||
|
def can_make_refresh_token_request(config):
|
||
|
if check_value_by_key(config, "client_id"):
|
||
|
return False
|
||
|
if check_value_by_key(config, "client_secret"):
|
||
|
return False
|
||
|
if check_value_by_key(config, "refresh_token"):
|
||
|
return False
|
||
|
|
||
|
return True
|
||
|
|
||
|
|
||
|
def can_make_get_token_request(config):
|
||
|
if check_value_by_key(config, "client_id"):
|
||
|
return False
|
||
|
if check_value_by_key(config, "client_secret"):
|
||
|
return False
|
||
|
if check_value_by_key(config, "username"):
|
||
|
return False
|
||
|
if check_value_by_key(config, "password"):
|
||
|
return False
|
||
|
|
||
|
return True
|
||
|
|
||
|
|
||
|
def check_value_by_key(config, key):
|
||
|
if not config.get(key):
|
||
|
logger.info("No {} in config".format(key))
|
||
|
return True
|
||
|
|
||
|
|
||
|
def update_token(config, refresh=False):
|
||
|
data = {
|
||
|
"client_id": config["client_id"],
|
||
|
"client_secret": config["client_secret"],
|
||
|
}
|
||
|
if refresh:
|
||
|
data["grant_type"] = "refresh_token"
|
||
|
data["refresh_token"] = config["refresh_token"]
|
||
|
else:
|
||
|
data["grant_type"] = "password"
|
||
|
data["username"] = (config["username"],)
|
||
|
data["password"] = config["password"]
|
||
|
|
||
|
try:
|
||
|
r = requests.post("{}/oauth/v2/token".format(config["url"]), data=data)
|
||
|
except Exception:
|
||
|
logger.error("Error while getting token", exc_info=True)
|
||
|
return False
|
||
|
|
||
|
if not r.ok:
|
||
|
logger.error("Error while getting token")
|
||
|
logger.error("Response: {}".format(r.text))
|
||
|
return False
|
||
|
|
||
|
r = r.json()
|
||
|
|
||
|
config["access_token"] = r["access_token"]
|
||
|
config["refresh_token"] = r["refresh_token"]
|
||
|
|
||
|
return True
|
||
|
|
||
|
|
||
|
def add_url(config, url):
|
||
|
headers = {"Authorization": "Bearer {}".format(config["access_token"])}
|
||
|
data = {"url": url}
|
||
|
|
||
|
try:
|
||
|
r = requests.post(
|
||
|
"{}/api/entries.json".format(config["url"]), headers=headers, data=data
|
||
|
)
|
||
|
except Exception:
|
||
|
logger.error("Error adding url", exc_info=True)
|
||
|
return None, False
|
||
|
|
||
|
logger.debug("Response: {}, {}".format(r.status_code, r.text))
|
||
|
if not r.ok:
|
||
|
# TODO: check auth error handling
|
||
|
return None, r.status_code == 401
|
||
|
|
||
|
return r.json(), False
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
sysargs = sys.argv
|
||
|
urls = sysargs[1:]
|
||
|
if not urls:
|
||
|
rofi = Rofi(
|
||
|
config_file="~/.config/rofi/aniwrapper-dracula.rasi",
|
||
|
theme_str="window {width: 45%;} listview {lines: 0;}",
|
||
|
)
|
||
|
urls = rofi.text_entry(
|
||
|
"Wallbag: Enter URL to add", "Specify multiple URLs separated by a comma"
|
||
|
)
|
||
|
if not urls:
|
||
|
sys.exit(0)
|
||
|
urls = urls.split(",")
|
||
|
|
||
|
print("URLS:", urls)
|
||
|
for turl in urls:
|
||
|
run_url(turl)
|