shortenit/shortenit/main.py

123 lines
3.5 KiB
Python

#!/usr/bin/env python3
import argparse
import asyncio
import logging
import pathlib
import sys
import time
import typing
from sqlalchemy import create_engine, exc
from sqlalchemy.orm import Session
from shortenit.config import Config
from shortenit.logger import setup_logging
from shortenit.models.base import Base
from shortenit.models.objects import Link, Pointer
from shortenit.models.shortener import Shortener
from shortenit.web import SiteHandler, Web
PROJECT_ROOT = pathlib.Path(__file__).parent.parent
CONFIGURATION = f"{PROJECT_ROOT}/config/config.yaml"
# Setup logging
logger = logging.getLogger(__name__)
def main() -> typing.NoReturn:
"""
Main method
"""
parser = argument_parse()
args = parser.parse_args()
debug = True if args.verbose > 0 else False
verbosity_level = verbosity(args.verbose)
setup_logging(args.logger, verbosity_level)
config = Config(CONFIGURATION).get_config()
db_config = config.get("Database", None)
server_config = config.get("Server", None)
if db_config:
try:
engine = create_engine(db_config["url"], echo=False, future=True)
Base.metadata.create_all(bind=engine)
with Session(bind=engine, autoflush=True, future=True) as session:
handler = SiteHandler(config, session, shorten_it, lengthen_it)
web = Web(handler, debug=debug)
web.host = server_config.get("host", None)
web.port = server_config.get("port", None)
web.start_up()
except:
sys.exit(1)
sys.exit(0)
def shorten_it(config: dict, session: Session, data: str, ttl: int):
shortener_config = config.get("Shortener", None)
shortener = Shortener(session, shortener_config)
identifier = shortener.generate_uuid()
if identifier:
try:
_link = session.query(Link).filter_by(data=data).one()
except exc.NoResultFound:
logger.debug("Link '%s' was not found in the database.", data)
_link = Link(data=data, pointers=[])
_pointer = Pointer(data=identifier, link_id=_link.id, link=_link, ttl=ttl)
session.add(_pointer)
_link.pointers.append(_pointer)
session.add(_link)
session.commit()
return _pointer.data
return None
def lengthen_it(session: Session, identifier: str):
try:
_pointer = session.query(Pointer).filter_by(data=identifier).one()
except exc.NoResultFound:
logger.debug("Pointer '%s' was not found in the database.", identifier)
return None
return _pointer.link.data
def argument_parse() -> argparse.ArgumentParser:
"""
Method to extract the arguments from the command line.
:returns: The argument parser.
"""
parser = argparse.ArgumentParser(
description="Generates rundeck resources " "file from different API sources."
)
parser.add_argument(
"-v", "--verbose", action="count", default=0, help="Verbosity level to use."
)
parser.add_argument(
"-l", "--logger", type=str, help="The logger YAML configuration file."
)
return parser
def verbosity(verbose: int):
"""
Method to set the verbosity.
:param verbose: The verbosity set by user.
:returns: The verbosity level.
"""
if verbose == 0:
return logging.ERROR
elif verbose == 1:
return logging.WARNING
elif verbose == 2:
return logging.INFO
elif verbose > 2:
return logging.DEBUG
if __name__ == "__main__":
main()