initial commit

This commit is contained in:
ksyasuda 2022-10-19 02:48:12 -07:00
parent c24cba68e9
commit 1869ed4057
10 changed files with 423 additions and 0 deletions

59
.gitignore vendored Normal file
View File

@ -0,0 +1,59 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Vim.
*.swp

59
pydb/.gitignore vendored Normal file
View File

@ -0,0 +1,59 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Vim.
*.swp

19
pydb/LICENSE Normal file
View File

@ -0,0 +1,19 @@
Copyright (c) 2018 The Python Packaging Authority
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

0
pydb/README.md Normal file
View File

7
pydb/config.py Normal file
View File

@ -0,0 +1,7 @@
MYSQL_INFO = {
"host": "192.168.5.77",
"user": "jellyfinuser",
"passwd": "jellyfinuser",
"database": "jellyfinhelper",
"port": 3306,
}

1
pydb/pydb/__init__.py Normal file
View File

@ -0,0 +1 @@
from .pydb import DB_INFO, DB_ROWS, DatabaseManager, db_factory

217
pydb/pydb/pydb.py Normal file
View File

@ -0,0 +1,217 @@
"Python Databse Wrapper class and DB Factory"
import logging
import sys
from typing import Dict, List, NewType, Tuple, Union
# import mariadb
# import cx_Oracle
import mysql.connector
# import snowflake.connector
DB_ROWS = NewType("DB_RET", List[Tuple])
DB_INFO = Union[Dict[str, int], Dict[str, str]]
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
fmt = "[ %(asctime)s ] %(levelname)s|%(name)s|%(message)s"
handler.setFormatter(fmt)
logger.addHandler(handler)
def convert_query_result(query_res):
"""Return List(elements) from query result List(Tuples(element))."""
return [i[0] if len(query_res) == 1 else list(i) for i in query_res]
class DB_WRAPPER:
"""Base Class for DB Connection Wrapper"""
def __init__(self, info: DB_INFO, connector, exception=None, dictionary=False):
"""DB Connection Wrapper Base Class Constructor
Args:
info (DB_INFO [dict]): DB Connection Info
connector (connection function pointer): Function pointer to the connect function
exception (Exception, optional): Exception for the DB error handling. Defaults to base Exception.
"""
try:
self._conn = connector(**info)
if dictionary:
self._dictionary = True
self._cur = self._conn.cursor(dictionary=True)
else:
self._dictionary = False
self._cur = self._conn.cursor()
self._Exception = exception if exception is not None else Exception
except Exception as exception:
logger.critical("Something went wrong connection to DB:", exception)
raise exception
def get_connection(self):
"""Returns the connection object for the DB"""
return self._conn
def get_cursor(self):
"""Returns a reference to the cursor object
Returns:
cursor: the cursor object for the db
"""
return self._cur
def get_exception(self):
"""Returns the exception handler for the class
Returns:
[type]: [description]
"""
return self._Exception
def query(self, stmt):
"""Queries the db with <stmt>. Returns list of tuples if there are results."""
try:
self._cur.execute(stmt)
res = self._cur.fetchall()
return convert_query_result(res) if not self._dictionary else res
except self._Exception as exception:
raise exception
def query_with_commit(self, stmt):
"""Queries the db with <stmt>. Returns list of tuples if there are results."""
try:
self._cur.execute(stmt)
self._cur.commit()
except self._Exception as exception:
raise exception
def execute(self, stmt):
"""Exectes a query, commits, and resturns result."""
try:
res = self._cur.execute(stmt)
self._conn.commit()
return (
convert_query_result(res.fetchall())
if not self._dictionary
else self._cur.fetchall()
)
except self._Exception as e:
raise e
def close(self):
"""Close the db connection and cursor."""
self._cur.close()
self._conn.close()
class MysqlDB(DB_WRAPPER):
"""Mysql Specific Functions"""
def __init__(self, info, dictionary=False):
"""MySQL Connection Wrapper"""
DB_WRAPPER.__init__(
self,
info,
mysql.connector.connect,
mysql.connector.Error,
dictionary=dictionary,
)
self._dictionary = dictionary
self._conn = self.get_connection()
self._cur = self.get_cursor()
def query_with_params(self, stmt, params):
"""Queries db with with <stmt> and <params>."""
try:
self._cur.execute(stmt, params)
return (
convert_query_result(self._cur.fetchall())
if not self._dictionary
else self._cur.fetchall()
)
except self.get_exception() as exception:
raise exception
def query_with_params_and_commit(self, stmt, params):
"""Queries the db with <stmt> and <params> and commits."""
try:
self._cur.execute(stmt, params)
self._conn.commit()
except self.get_exception() as exception:
raise exception
def get_curdate(self):
"""Returns CURDATE() from MySQL."""
return self.query("SELECT CURDATE()")[0]
def get_timestamp(self):
"""Returns CURRENT_TIMESTAMP from MySQL."""
return self.query("SELECT CURRENT_TIMESTAMP()")[0]
def table_exists(self, schema: str, table: str):
stmt = f"""
SELECT COUNT(*) from information_schema.TABLES
WHERE TABLE_SCHEMA = '{schema}' and TABLE_NAME = '{table}'
"""
return self.query(stmt)[0][0] != 0
# class SnowflakeWrapper(DB_WRAPPER):
# """Snowflake Specific Functions"""
# def __init__(self, info: DB_INFO, connector, exception):
# DB_WRAPPER.__init__(self, info, connector, exception)
# class OracleWrapper(DB_WRAPPER):
# """Oracle specific functions."""
# def __init__(self, info: DB_INFO, connector, exception):
# DB_WRAPPER.__init__(self, info, connector, exception)
# def get_incoming_me(self):
# """Returns the ME_INCOMING table from Oracle."""
# return self.query(
# "SELECT post_day_end.pde_commons_pkg.get_incoming_daybreak_me_name FROM dual"
# )[0]
class DatabaseManager:
"""Context Manager for DB Connection"""
def __init__(self, db_info: DB_INFO, db_type: str, dictionary=False):
self._db_type = db_type
self._db_info = db_info
self._dictionary = dictionary
self._db = db_factory(self._db_info, self._db_type, dictionary=self._dictionary)
def __enter__(self):
return self._db
def __exit__(self, exc_type, exc_value, traceback):
self._db.close()
def db_factory(
db_info: Union[Dict[str, int], Dict[str, str]], db_type: str, dictionary=False
):
db_type = db_type.strip().lower()
if db_type == "mysql":
return MysqlDB(db_info, dictionary=dictionary)
# elif db_type == "snowflake":
# return SnowflakeWrapper(
# db_info, snowflake.connector.connect, snowflake.connector.Error
# )
# elif db_type in ("oracle", "prepdb", "bengal", "livdb", "slivdb"):
# return OracleWrapper(
# db_info,
# cx_Oracle.connect,
# cx_Oracle.DatabaseError,
# )
# elif db_type == 'mariadb':
# return _MariaDB(db_info)
else:
logger.error("ERROR %s not valid", db_type)
logger.error("Valid types: [ mysql ]")
sys.exit(1)

23
pydb/pyproject.toml Normal file
View File

@ -0,0 +1,23 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "pydb"
version = "0.0.1"
authors = [
{ name="Kyle Yasuda", email="suda@sudacode.com" },
]
description = "A python database wrapper"
readme = "README.md"
requires-python = ">=3.7"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = ["mysql-connector-python"]
# [project.urls]
# "Homepage" = "https://gitea.suda.codes/sudacode/pydb"
# "Bug Tracker" = "https://gitea.suda.codes/sudacode/pydb/issues"

7
pydb/tests/config.py Normal file
View File

@ -0,0 +1,7 @@
MYSQL_INFO = {
"host": "192.168.5.77",
"user": "jellyfinuser",
"passwd": "jellyfinuser",
"database": "jellyfinhelper",
"port": 3306,
}

31
pydb/tests/test_mysql.py Normal file
View File

@ -0,0 +1,31 @@
#!/usr/bin/env python3
import logging
import pytest
import config
from pydb import DatabaseManager
fmt = "[%(asctime)s] |%(name)s|%(levelname)s|%(message)s|"
formatter = logging.Formatter(fmt)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
def test_mysql():
with DatabaseManager(config.MYSQL_INFO, "mysql", dictionary=True) as db:
res = db.query("SELECT * FROM dir_map WHERE source_dir = 'aho-girl'")
logger.info("Result: %s", res)
# print("Result: ", res)
assert res is not None and len(res) > 0
def test_mysql_2():
db = DatabaseManager(config.MYSQL_INFO, "mysql").__enter__()
res = db.query("SELECT COUNT(*) FROM dir_map")
logger.info("Result: %s", res)
assert res is not None and res[0] > 0