update pydb
All checks were successful
Build Changed / build_and_upload_changed_packages (push) Successful in 25s

This commit is contained in:
ksyasuda 2023-08-09 20:05:29 -07:00
parent 155c65aada
commit 3e539bfdea
No known key found for this signature in database
11 changed files with 917 additions and 88 deletions

70
pydb/.gitignore vendored
View File

@ -1,60 +1,10 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
config.py
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Vim.
*.swp
.env
.DS_Store
.idea
*.log
tmp/
env/*
dist/*
__pycache__*
*.egg-info
.vscode/*

View File

@ -1,19 +0,0 @@
Copyright (c) 2018 The Python Packaging Authority
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -4,20 +4,31 @@ build-backend = "hatchling.build"
[project]
name = "pydb"
version = "0.0.3"
authors = [
{ name="Kyle Yasuda", email="suda@sudacode.com" },
dependencies = [
"mysql-connector-python",
"cx_oracle",
"snowflake-connector-python",
"pandas",
"python-logger",
"email-sender-simple",
"sqlparse",
"python-dotenv"
]
description = "A python database wrapper"
version = "0.5.17"
authors = [
{ name="Kyle Yasuda", email="kyasuda@westlakefinancial.com" },
]
description = "Database helper"
readme = "README.md"
requires-python = ">=3.7"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = ["mysql-connector-python"]
# [project.urls]
# "Homepage" = "https://gitea.suda.codes/sudacode/pydb"
# "Bug Tracker" = "https://gitea.suda.codes/sudacode/pydb/issues"
# [tool.setuptools.packages.find]
# where = ["pydb"]
[project.urls]
"Homepage" = "https://gitlab.westlakefinancial.com/data_engineering/python_package_registry/-/tree/main/pydb"
"Bug Tracker" = "https://gitlab.westlakefinancial.com/data_engineering/python_package_registry/-/issues"

36
pydb/requirements.txt Normal file
View File

@ -0,0 +1,36 @@
asn1crypto==1.5.1
certifi==2022.12.7
cffi==1.15.1
charset-normalizer==2.1.1
colored-output==0.0.1
cryptography==40.0.1
cx-Oracle==8.3.0
email-sender-simple==0.2.8
exceptiongroup==1.1.1
filelock==3.11.0
idna==3.4
iniconfig==2.0.0
mysql-connector-python==8.0.32
numpy==1.24.2
oscrypto==1.3.0
packaging==23.0
pandas==2.0.0
pluggy==1.0.0
protobuf==3.20.3
pycparser==2.21
pycryptodomex==3.17
PyJWT==2.6.0
pyOpenSSL==23.1.1
pytest==7.3.1
python-dateutil==2.8.2
python-dotenv==1.0.0
python-logger==0.1.13
pytz==2023.3
requests==2.28.2
six==1.16.0
snowflake-connector-python==3.0.2
sqlparse==0.4.4
tomli==2.0.1
typing_extensions==4.5.0
tzdata==2023.3
urllib3==1.26.15

View File

@ -0,0 +1,3 @@
from pydb.factory import db_factory
from pydb.managers import DatabaseManager
from pydb.utils import pretty_print_sql, query_to_csv, results_to_csv

519
pydb/src/pydb/db.py Executable file
View File

@ -0,0 +1,519 @@
"Python Database Helper"
# import mariadb
import json
from collections.abc import Iterable
from copy import deepcopy
from email.errors import MessageError
from smtplib import SMTPException
from typing import Dict, Tuple, Union
import cx_Oracle
import mysql.connector
import snowflake.connector
from email_sender_simple import send_email as email
from pandas import DataFrame
import pydb.utils as utils
# DB_ROWS = Union[Iterable[], tuple[int, float, str]]
DB_ROWS = Union[Iterable, Tuple[int, float, str]]
DB_INFO = Union[Dict[str, int], Dict[str, str]]
DB_ARGS = Union[int, float, str, Tuple[int, float, str], Iterable]
QUERY_RESULTS = Union[DB_ROWS, None]
class DB_WRAPPER:
"""Base Class for DB Connection Wrapper"""
def __init__(self, info: DB_INFO, connector, exception=None):
"""DB Connection Wrapper Base Class Constructor
----------
Parameters
----------
info : DB_INFO
Dictionary containing connection information for the corresponding
database connector.
Example:
```python
db_info = {
"host": "hostname",
"user": "username",
"password": "password"
}
```
connector : database connector functor
Database connector function
exception : Exception, optional
Exception to pass to databsae connector, by default None
"""
try:
self._conn = connector(**info)
self._cur = self._conn.cursor()
self._Exception = exception if exception is not None else Exception
except Exception as exception:
print("Something went wrong connection to DB:", exception)
raise exception
def get_connection(self):
"""
Returns the connection object for the DB
Returns
-------
connection
Connection object for the DB
"""
return self._conn
def get_cursor(self):
"""
Returns a reference to the cursor object
Returns
-------
cursor
Cursor object for the DB
"""
return self._cur
def get_exception(self):
"""
Returns the exception handler for the class
Returns
-------
Exception
Exception handler for the class
"""
return self._Exception
def execute(
self,
stmt: str,
args=None,
dictionary=False,
dataframe=False,
dataframe_headers=None,
logger=None,
to_csv=None,
) -> QUERY_RESULTS:
"""
Queries the db with <stmt>. Returns the results.
Commits the transaction if successful.
----------
Parameters
----------
stmt : str
Query to execute
args : int | float | str | List[int, float, str], optional
Arguments to pass to query, by default None
dictionary : bool, optional
Convert query result to dictionary, by default False
dataframe : bool, optional
Convert query result to dataframe, by default False
dataframe_headers : list, optional
List of column headers for dataframe (can only be used when
dataframe=True), by default None
logger : Logger, optional
Logger object, by default None
to_csv : str, optional
Path to save query results as csv, by default None
Returns
-------
QUERY_RESULTS
Query result
"""
if dictionary and dataframe:
raise Exception("Cannot specify both dictionary and dataframe")
if dictionary and to_csv:
raise Exception("Cannot specify both dictionary and to_csv")
try:
if args is not None:
# remove any surrounding quotes from the args
if not isinstance(args, (list, tuple)):
args = [args]
utils.lg(logger, f"Args: {args}", "DEBUG")
utils.pretty_print_sql(stmt, logger)
self._cur.execute(stmt, args)
else:
utils.lg(logger, "No args specified", "DEBUG")
self._cur.execute(stmt)
self._conn.commit()
utils.lg(logger, f"Finished executing query: {stmt}")
if to_csv is not None and to_csv != "":
res: DataFrame = utils.convert_query_result(
self._cur, is_dataframe=True, logger=logger
)
return utils.results_to_csv(res, to_csv, logger=logger)
if "SELECT" in stmt:
return utils.convert_query_result(
self._cur, dictionary, dataframe, dataframe_headers, logger
)
return None
except self._Exception as exception:
utils.lg(
logger,
f"Something went wrong: {exception}",
"ERROR",
)
raise exception
def query(
self,
stmt: str,
args=None,
dictionary=False,
dataframe=False,
dataframe_headers=None,
logger=None,
to_csv=None,
):
"""
Queries the db with <stmt>. Returns the results
Does not commit the transaction upon completion
----------
Parameters
----------
stmt : str
Query to execute
args : int | float | str | List[int, float, str], optional
Arguments to pass to query, by default None
dictionary : bool, optional
Convert query result to dictionary, by default False
dataframe : bool, optional
Convert query result to dataframe, by default False
dataframe_headers : list, optional
List of column headers for dataframe (can only be used when
dataframe=True), by default None
logger : Logger, optional
Logger object, by default None
to_csv : str, optional
Path to save query results as csv, by default None
Returns
-------
QUERY_RESULTS (int | float | str | List[int, float, str])
Query result
"""
if dictionary and dataframe:
raise Exception("Cannot specify both dictionary and dataframe")
try:
if args is not None:
# remove any surrounding quotes from the args
if not isinstance(args, (list, tuple)):
args = [args]
utils.lg(logger, f"Args: {args}", "DEBUG")
utils.pretty_print_sql(stmt, logger)
self._cur.execute(stmt, args)
else:
utils.lg(logger, "No args specified", "DEBUG")
self._cur.execute(stmt)
utils.lg(logger, f"Finished executing query: {stmt}")
if to_csv is not None and to_csv != "":
res: DataFrame = utils.convert_query_result(
self._cur, is_dataframe=True, logger=logger
)
return utils.results_to_csv(res, to_csv, logger=logger)
if "SELECT" in stmt:
return utils.convert_query_result(
self._cur, dictionary, dataframe, dataframe_headers, logger
)
return None
except self._Exception as exception:
utils.lg(logger, f"Something went wrong: {exception}", "ERROR")
raise exception
def close(self):
"""Close the db connection and cursor."""
self._cur.close()
self._conn.close()
def execute_procedure(
self,
schema_name: str,
procedure_name: str,
package_name=None,
args=None,
logger=None,
send_email=False,
email_info=None,
error_email_to=None,
):
"""
Executes a procedure with the given arguments or with none if not
provided
----------
Parameters
----------
schema_name: str
schema name
procedure_name: str
procedure name
package_name: str, optional
package name, by default None
args: str | List[str], optional
arguments to pass to the procedure, by default None
logger: Logger, optional
logger object, by default None
send_email: bool, optional
whether or not to send an email, by default False
email_info: dict, optional
dictionary containing email information
email_info = {
smtp_info: {
host: smtp host,
port: smtp port,
},
email_to: email to send to,
email_from: email to send from,
error_email_to: email to send error to,
subject: email subject,
message_body: email message body
attatchments: path to attachment(s)
}
"""
stmt = None
if package_name is None and args is None:
stmt = f"CALL {schema_name}.{procedure_name}()"
elif package_name is None and args is not None:
if isinstance(args, (list, tuple)):
stmt = f"CALL {schema_name}.{procedure_name}({','.join(args)})"
else:
stmt = f"CALL {schema_name}.{procedure_name}({args})"
elif package_name is not None and args is None:
stmt = f"CALL {schema_name}.{package_name}.{procedure_name}()"
elif package_name is not None and args is not None:
if isinstance(args, (list, tuple)):
if len(args) > 1:
stmt = f"CALL {schema_name}.{package_name}.{procedure_name}({','.join(utils.surround_with_quotes(args))})"
else:
stmt = f"CALL {schema_name}.{package_name}.{procedure_name}({utils.surround_with_quotes(args[0])})"
else:
stmt = f"CALL {schema_name}.{package_name}.{procedure_name}({args})"
if stmt is None:
raise Exception("No procedure name or args provided")
try:
utils.lg(logger, f"Executing procedure: {stmt}")
self._cur.execute(stmt)
self._conn.commit()
except self._Exception as e:
utils.lg(logger, f"Something went wrong executing the procedure: {e}")
if email_info is not None:
temp_email_info = deepcopy(email_info)
temp_email_info[
"message_body"
] = f"Something went wrong executing the procedure: {e}"
temp_email_info[
"subject"
] = f"Error executing procedure: {procedure_name}"
if (
"error_email_to" in temp_email_info
and temp_email_info["error_email_to"] is not None
):
logger.info(
f"Sending error email to: {temp_email_info['error_email_to']}"
)
temp_email_info["email_to"] = temp_email_info["error_email_to"]
try:
email(
**temp_email_info,
logger=logger,
)
except (MessageError, SMTPException, TimeoutError) as e:
if logger is not None:
logger.error(f"Error sending email: {e}")
raise e
raise e
utils.lg(logger, f"{stmt} executed successfully")
if send_email and email_info is None:
utils.lg(logger, "No email info provided")
elif send_email:
try:
if logger is not None:
logger.debug(f"Email info: {json.dumps(email_info, indent=4)}")
email(**email_info, logger=logger)
except (MessageError, SMTPException, TimeoutError) as e:
raise e
class MysqlDB(DB_WRAPPER):
"""Mysql Specific Functions"""
def __init__(self, info):
"""MySQL Connection Wrapper"""
try:
DB_WRAPPER.__init__(
self, info, mysql.connector.connect, mysql.connector.Error
)
self._conn = self.get_connection()
self._cur = self.get_cursor()
except mysql.connector.Error as exception:
raise exception
def get_curdate(self):
"""Returns CURDATE() from MySQL."""
return self.query("SELECT CURDATE()")
def get_timestamp(self):
"""Returns CURRENT_TIMESTAMP from MySQL."""
return self.query("SELECT CURRENT_TIMESTAMP()")
def table_exists(self, schema: str, table: str):
stmt = f"""
SELECT COUNT(*) from information_schema.TABLES
WHERE TABLE_SCHEMA = '{schema}' and TABLE_NAME = '{table}'
"""
return self.query(stmt) != 0
def plsql(
self,
in_plsql: str,
in_name: None,
logger=None,
send_email=False,
email_info=None,
error_email_to=None,
):
raise NotImplementedError("MySQL does not support PLSQL")
class SnowflakeWrapper(DB_WRAPPER):
"""Snowflake Specific Functions"""
def __init__(self, info: DB_INFO):
DB_WRAPPER.__init__(
self,
info,
snowflake.connector.connect,
snowflake.connector.errors.ProgrammingError,
)
def plsql(
self,
in_plsql: str,
in_name: None,
logger=None,
send_email=False,
email_info=None,
error_email_to=None,
):
raise NotImplementedError("Snowflake does not support PLSQL")
class OracleWrapper(DB_WRAPPER):
"""Oracle specific functions."""
def __init__(self, info: DB_INFO):
try:
DB_WRAPPER.__init__(self, info, cx_Oracle.connect, cx_Oracle.Error)
self._db_info = info
except cx_Oracle.Error as e:
raise e
def get_incoming_me(self):
"""Returns the ME_INCOMING table from Oracle."""
return self.query(
"SELECT POST_DAY_END.PDE_COMMONS_PKG.GET_INCOMING_DAYBREAK_ME_NAME FROM DUAL"
)
def get_daybreak_me_target_name(self):
"""Returns the ME_TARGET table from Oracle."""
return self.query(
"SELECT POST_DAY_END.PDE_COMMONS_PKG.GET_DAYBREAK_ME_TARGET_NAME FROM DUAL"
)
def plsql(
self,
in_plsql: str,
in_name=None,
logger=None,
send_email=False,
email_info=None,
error_email_to=None,
):
"""
Executes PL/SQL block and optionally sends email.
----------
Parameters
----------
in_plsql : str
PL/SQL block
in_name : str, optional
Name of PL/SQL block
logger : logging.Logger, optional
Logger object
send_email : bool, optional
Whether or not to send email
email_info : dict, optional
Dictionary containing email information
email_info = {
smtp_info: {
host: smtp host,
port: smtp port,
},
email_to: email to send to,
email_from: email to send from,
error_email_to: email to send error to (defaults to email_to),
subject: email subject,
message_body: email message body
attatchments: path to attachment(s)
}
"""
try:
if not in_plsql:
raise ValueError("No PL/SQL")
utils.lg(logger, "Executing PL/SQL block")
utils.lg(logger, f"{in_plsql}", "DEBUG")
self._cur.execute(in_plsql)
self._conn.commit()
except self._Exception as exception:
if email_info is not None:
temp_email_info = deepcopy(email_info)
temp_email_info[
"message_body"
] = f"PL/SQL failed to execute. {exception}"
temp_email_info["subject"] = (
f"{in_name} failed to execute"
if in_name is not None
else "PLSQL failed to execute"
)
if (
"error_email_to" in temp_email_info
and temp_email_info["error_email_to"] is not None
):
temp_email_info["email_to"] = temp_email_info["error_email_to"]
utils.lg(
logger, f"Sending error email to {temp_email_info['email_to']}"
)
try:
email(
**temp_email_info,
logger=logger,
)
except (MessageError, SMTPException, TimeoutError) as email_exception:
utils.lg(
logger,
f"Email failed to send: {email_exception}",
"ERROR",
)
raise email_exception
raise exception
if send_email and email_info is None:
utils.lg(logger, "No email info provided")
elif send_email:
try:
utils.lg(logger, f"Email info: {email_info}", "DEBUG")
if logger is not None:
utils.lg(
logger,
f"Email info: {json.dumps(email_info, indent=4)}",
"DEBUG",
)
email(**email_info, logger=logger)
except (MessageError, SMTPException, TimeoutError) as email_exception:
utils.lg(logger, f"Email failed to send: {email_exception}", "ERROR")
raise email_exception
utils.lg(logger, "plsql executed successfully")

View File

@ -0,0 +1 @@
from pydb.factory.db_factory import db_factory

View File

@ -0,0 +1,31 @@
"""DB Factory"""
from typing import Dict, Union
import pydb
def db_factory(db_info: Union[Dict[str, int], Dict[str, str]], db_type: str):
"""
Returns a database object based on the database type.
Parameters
----------
db_info: dict
Dictionary containing database connection information
db_type: str
Database type (mysql, oracle, snowflake)
Returns
-------
Database object
"""
db_type = db_type.strip().lower()
if db_type == "mysql":
return pydb.db.MysqlDB(db_info)
if db_type == "snowflake":
return pydb.db.SnowflakeWrapper(db_info)
if db_type in ("oracle", "prepdb", "bengal", "livdb", "slivdb"):
return pydb.db.OracleWrapper(db_info)
print("ERROR", db_type, "not valid")
print("Valid types: [ mysql | oracle | snowflake ]")
raise ValueError("Invalid database type")

View File

@ -0,0 +1 @@
from pydb.managers.database_manager import DatabaseManager

View File

@ -0,0 +1,21 @@
"""Database contest manager"""
from pydb.factory.db_factory import db_factory
class DatabaseManager(object):
"""Context Manager for Database Connection
Args:
db_info (DB_INFO): Database connection info
db_type (str): Database type
"""
def __init__(self, db_info, db_type):
self.db = db_factory(db_info, db_type)
def __enter__(self):
return self.db
def __exit__(self, type, value, traceback):
self.db.close()

275
pydb/src/pydb/utils.py Normal file
View File

@ -0,0 +1,275 @@
"""Utility functions for pydb."""
from csv import QUOTE_MINIMAL, QUOTE_NONNUMERIC
from logging import Logger as DefaultLogger
from typing import List, Union
from pandas import DataFrame
from python_logger import Logger
from sqlparse import format
from pydb.db import QUERY_RESULTS
from pydb.managers.database_manager import DatabaseManager
def lg(logger: Union[Logger, DefaultLogger, None], msg: str, level="info"):
level = level.strip().lower()
if logger:
if level == "info":
logger.info(msg)
elif level == "debug":
logger.debug(msg)
elif level == "warning":
logger.warning(msg)
elif level == "error":
logger.error(msg)
elif level == "critical":
logger.critical(msg)
elif level > "info":
print(msg)
def pretty_print_sql(sql: str, logger: Union[Logger, DefaultLogger, None] = None):
"""
Pretty print sql query or PL/SQL block.
----------
Parameters
----------
sql : str
SQL query or PL/SQL block
logger : Union[Logger, DefaultLogger, None], optional
Logger object, by default None
Returns
-------
str
Pretty printed SQL query or PL/SQL block
"""
try:
pretty_query = format(
sql,
reindent=True,
reindent_aligned=False,
keyword_case="upper",
identifier_case="upper",
indent_width=4,
wrap_after=120,
truncate_strings=120,
use_space_around_operators=True,
# output_format="python",
)
lg(logger, pretty_query, "debug")
return pretty_query
except Exception as e:
lg(logger, "Failed to pretty print SQL", "ERROR")
raise e
def results_to_csv(
query_results: QUERY_RESULTS,
out_path: str,
column_headers=None,
logger=None,
quoting=QUOTE_MINIMAL,
quotechar='"',
delimiter=",",
lineterminator="\n",
):
"""
Execute a query and save the result to a csv file.
----------
Parameters
----------
query_results : QUERY_RESULTS
Query results
out_path : str
Output file path
column_headers : list, optional
Explicitly set column headers, by default None
logger : Union[Logger, DefaultLogger, None], optional
Logger object, by default None
quoting : int, optional
CSV quoting option, by default QUOTE_MINIMAL
quotechar : str, optional
CSV quotechar option, by default '\"'
delimiter : str, optional
CSV delimiter option, by default ","
lineterminator : str, optional
CSV lineterminator option, by default "\\n"
"""
if not isinstance(query_results, DataFrame):
lg(logger, "Converting query results to dataframe", "debug")
try:
query_results = DataFrame(query_results)
lg(logger, "Converted query results to dataframe", "debug")
except Exception as e:
raise e
try:
lg(logger, f"Saving query results to {out_path}", "info")
query_results.to_csv(
out_path,
index=False,
header=column_headers,
quoting=quoting,
quotechar=quotechar,
sep=delimiter,
lineterminator=lineterminator,
)
lg(logger, f"Wrote query results to {out_path}", "info")
return out_path
except Exception as e:
raise e
def query_to_csv(
db_info,
db_type,
column_headers,
out_path,
query,
args=None,
logger=None,
quoting=QUOTE_MINIMAL,
quotechar='"',
delimiter=",",
lineterminator="\n",
):
"""
Execute a query and save the result to a csv file.
----------
Parameters
----------
db_info : str
Dictionary of database connection information
Example:
```python
db_info = {
"host": "hostname",
"user": "username",
"password": "password"
}
```
db_type : str
Database type (one of "mysql", "oracle", "snowflake")
column_headers : list
List of column headers
out_path : str
Output file path
query : str
SQL query
args : list, optional
List of arguments for query, by default None
logger : Union[Logger, DefaultLogger, None], optional
Logger object, by default None
quoting : int, optional
CSV quoting option, by default QUOTE_MINIMAL
quotechar : str, optional
CSV quotechar option, by default '\"'
delimiter : str, optional
CSV delimiter option, by default ","
lineterminator : str, optional
CSV lineterminator option, by default "\\n"
"""
with DatabaseManager(db_info, db_type) as db:
try:
df: DataFrame = db.execute(
query,
args=args,
dataframe=True,
dataframe_headers=column_headers,
logger=logger,
)
except Exception as e:
lg(logger, f"Failed to execute query: {query}", "error")
raise e
try:
# if not isinstance(header, (list, tuple)):
# header = [header]
# logger.debug("DF:", df)
# df = df.set_axis(header, axis=1, copy=False)
# logger.debug("DF:", df)
df.to_csv(
out_path,
index=False,
header=column_headers,
quoting=quoting,
quotechar=quotechar,
sep=delimiter,
lineterminator=lineterminator,
)
except Exception as e:
raise e
return out_path
def convert_query_result(
cur, is_dictionary=False, is_dataframe=False, dataframe_headers=None, logger=None
) -> QUERY_RESULTS:
"""
Return the element or List(rows) if multiple results
Convert from List[List[rows]] -> List[rows] or List[result] -> result
Parameters
----------
cur : cursor
Cursor object from database connection
is_dictionary : bool, optional
Convert query result to dictionary, by default False
is_dataframe : bool, optional
Convert query result to dataframe, by default False
dataframe_headers : list, optional
List of column headers for dataframe, by default None
logger : Logger, optional
Logger object, by default None
Returns
-------
QUERY_RESULTS
Query result
"""
if is_dictionary and is_dataframe:
raise Exception("Cannot be both dictionary and dataframe")
try:
res = cur.fetchall()
except Exception as e:
lg(logger, "Failed to fetch results", "ERROR")
raise e
if is_dictionary and cur.description is not None:
return dict_factory(res, cur.description)
if is_dataframe:
if len(res) > 0 and dataframe_headers is not None:
out = DataFrame(res, columns=dataframe_headers)
elif len(res) > 0:
out = DataFrame(res)
else:
out = None
return out
if res is None or (isinstance(res, list) and len(res) == 0):
return None
if len(res) == 1:
if len(res[0]) == 1:
return res[0][0]
return res[0]
return [i[0] if len(i) == 1 else list(i) for i in res]
def surround_with_quotes(s) -> Union[List[str], str]:
"""Surround each element in list with quotes."""
if isinstance(s, (list, tuple)):
return [f"'{i}'" for i in s]
return f"'{s}'"
def dict_factory(rows, description):
"""Converts query result to a list of lists or list of dictionaries."""
if rows is None:
return None
try:
desc = description
if desc is None:
return None
tdict = [
dict(zip([col[0].strip().upper() for col in desc], row)) for row in rows
]
return tdict if len(tdict) > 0 else None
except Exception as e:
raise e