chore(#5): Initial SQLAlchemy building blocks #6

Merged
elia merged 3 commits from sqlalchemy into master 2024-12-18 23:01:58 +00:00
12 changed files with 143 additions and 393 deletions
Showing only changes of commit 6800c90936 - Show all commits

1
.gitignore vendored
View file

@ -9,3 +9,4 @@ build/
# IDE
.vscode/
*.db

View file

@ -6,15 +6,7 @@ Shortenit is a tool to shorten urls.
## Running
To run `shortenit`, first we need to have a running database. Shortenit uses [CouchDB](https://couchdb.apache.org/) as a database. CouchDB can run as a docker container.
```text
$ docker run -p 5984:5984 -e COUCHDB_USER=root -e COUCHDB_PASSWORD=root -d couchdb
```
At this point, visit the local instance link [http://localhost:5984](http://localhost:5984/) and create the user credentials configured in [config/config.yaml](config/config.yaml).
Once the database is up and running and the credentials have been created, shortenit can be ran.
To run `shortenit`, edit the configuration file found in [config/config.yaml](config/config.yaml) then run the following commands.
```text
$ pip install -e .

View file

@ -1,14 +1,15 @@
Server:
host: 127.0.0.1
port: 8000
CouchDB:
username: root
password: root
url: http://localhost:5984
Database:
username: foo
password: bar
#url: "sqlite+pysqlite:///:memory:"
url: "sqlite+pysqlite:///shortenit.db"
Shortener:
# *CAUTION*: Enabling this check if the ID already exists before returning it.
# Even though this guarantees that the ID doesn't exist, this might inflict
# some performance hit.
check_duplicate_id: False
id_length: 32
id_upper_case: False
check_duplicate_id: True
id_upper_case: False

View file

@ -1,83 +0,0 @@
import logging
import time
import typing
from hashlib import sha256
from cloudant.document import Document
class Data:
"""
Data object.
"""
def __init__(
self, data_db: object, identifier: str = None, data: str = None
) -> typing.NoReturn:
"""
Initialize the Data object.
:param data_db: The Data database object.
:param identifier: A uniquely generated ID identifying the data object.
:param data: The data to save.
"""
self.logger = logging.getLogger(self.__class__.__name__)
self.data_db = data_db
self.identifier = identifier
self.data = data
self.timestamp = time.time()
self.pointers = []
self.data_found = None
self.populate()
def generate_identifier(self) -> typing.NoReturn:
"""
Method to generate and save a new unique ID as the Data object identifier.
"""
hash_object = sha256(self.data.encode("utf-8"))
self.identifier = hash_object.hexdigest()
def populate(self, pointer: str = None) -> typing.NoReturn:
"""
Method to populate the Data object fields with proper data and save it in the database.
:param pointer: The unique ID of the pointer object to save with the data.
"""
if self.identifier:
self.logger.debug("The identifier is set, retrieving data...")
self.get_data()
elif self.data:
self.logger.debug("The data is set, generating an identifier...")
self.generate_identifier()
self.logger.debug(
"Attempting to get the data with " "the identifier generated..."
)
self.get_data()
if not self.data_found:
self.logger.debug("The data generated is not found, " "creating...")
self.set_data(pointer)
def get_data(self) -> typing.NoReturn:
"""
Method to retrieve the Data ojbect from the database.
"""
with Document(self.data_db, self.identifier) as data:
try:
self.data = data["value"]
self.timestamp = data["timestamp"]
self.pointers = data["pointers"]
self.data_found = True
except KeyError:
self.data_found = False
def set_data(self, pointer: str) -> typing.NoReturn:
"""
Method to save Data object to the database.
"""
with Document(self.data_db, self.identifier) as data:
data["value"] = self.data
data["timestamp"] = self.timestamp
try:
data["pointers"].append(pointer)
except KeyError:
data["pointers"] = [pointer]

View file

@ -1,75 +0,0 @@
import logging
import typing
import requests
from cloudant.client import CouchDB
from shortenit.exceptions import DBConnectionFailed
class DB:
"""
Database object class
"""
def __init__(self, config: dict) -> typing.NoReturn:
"""
Initialize the Database object.
:param config: The Database configuration.
"""
self.logger = logging.getLogger(self.__class__.__name__)
self.username = config["username"]
self.password = config["password"]
self.url = config["url"]
self.client = None
self.session = None
def initialize_shortenit(self) -> typing.NoReturn:
"""
Method to initialize the database for shortenit.
This will check if all the needed tables already exist in the database.
Otherwise, it will create the database tables.
"""
try:
self.data_db = self.client["data"]
except KeyError:
self.logger.warn("The 'data' database was not found, creating...")
self.data_db = self.client.create_database("data")
if self.data_db.exists():
self.logger.info("The 'data' database was successfully created.")
try:
self.pointers_db = self.client["pointers"]
except KeyError:
self.logger.warn("The 'pointers' database was not found, creating...")
self.pointers_db = self.client.create_database("pointers")
if self.pointers_db.exists():
self.logger.info("The 'pointers' database was successfully created.")
def __enter__(self) -> CouchDB:
"""
Method used when entering the database context.
:returns: The CouchDB object.
"""
try:
self.client = CouchDB(
self.username, self.password, url=self.url, connect=True
)
except requests.exceptions.ConnectionError as e:
self.logger.fatal("Failed to connect to database, is it on?")
self.logger.fatal("%s", e)
raise DBConnectionFailed
except requests.exceptions.HTTPError as e:
self.logger.fatal("Failed to authenticate to database.")
self.logger.fatal("%s", e)
raise DBConnectionFailed
self.session = self.client.session()
return self
def __exit__(self, *args) -> typing.NoReturn:
"""
Method used when exiting the database context.
"""
self.client.disconnect()

View file

@ -1,9 +0,0 @@
import sys
class DBConnectionFailed(Exception):
"""
DBConnectionFailed class exception.
"""
pass

View file

@ -7,13 +7,14 @@ import sys
import time
import typing
from sqlalchemy import create_engine, exc
from sqlalchemy.orm import Session
from shortenit.config import Config
from shortenit.data import Data
from shortenit.db import DB
from shortenit.exceptions import DBConnectionFailed
from shortenit.logger import setup_logging
from shortenit.pointer import Pointer
from shortenit.shortener import Shortener
from shortenit.models.base import Base
from shortenit.models.objects import Link, Pointer
from shortenit.models.shortener import Shortener
from shortenit.web import SiteHandler, Web
PROJECT_ROOT = pathlib.Path(__file__).parent.parent
@ -33,42 +34,53 @@ def main() -> typing.NoReturn:
verbosity_level = verbosity(args.verbose)
setup_logging(args.logger, verbosity_level)
config = Config(CONFIGURATION).get_config()
db_config = config.get("CouchDB", None)
db_config = config.get("Database", None)
server_config = config.get("Server", None)
if db_config:
try:
with DB(db_config) as db:
db.initialize_shortenit()
engine = create_engine(db_config["url"], echo=False, future=True)
Base.metadata.create_all(bind=engine)
handler = SiteHandler(config, db, shorten_url, lenghten_url)
with Session(bind=engine, autoflush=True, future=True) as session:
handler = SiteHandler(config, session, shorten_it, lengthen_it)
web = Web(handler, debug=debug)
web.host = server_config.get("host", None)
web.port = server_config.get("port", None)
web.start_up()
except DBConnectionFailed as e:
except:
sys.exit(1)
sys.exit(0)
def shorten_url(configuration: dict, database: DB, data: str, ttl):
shortener = Shortener(database.pointers_db, configuration.get("Shortener", None))
identifier = shortener.get_id()
def shorten_it(config: dict, session: Session, data: str, ttl: int):
shortener_config = config.get("Shortener", None)
shortener = Shortener(session, shortener_config)
identifier = shortener.generate_uuid()
if identifier:
_data = Data(database.data_db, data=data)
_data.populate()
pointer = Pointer(database.pointers_db, identifier)
pointer.generate_pointer(_data.identifier, ttl)
_data.set_data(pointer.identifier)
return pointer.identifier
try:
_link = session.query(Link).filter_by(data=data).one()
except exc.NoResultFound:
logger.debug("Link '%s' was not found in the database.", data)
_link = Link(data=data, pointers=[])
_pointer = Pointer(data=identifier, link_id=_link.id, link=_link, ttl=ttl)
_link.pointers.append(_pointer)
session.add(_link)
session.add(_pointer)
session.commit()
return _pointer.data
return None
def lenghten_url(database: DB, identifier: str):
pointer = Pointer(database.pointers_db)
pointer.get_pointer(identifier)
data = Data(database.data_db, identifier=pointer.data_hash)
data.populate()
return data.data
def lengthen_it(session: Session, identifier: str):
try:
_pointer = session.query(Pointer).filter_by(data=identifier).one()
except exc.NoResultFound:
logger.debug("Pointer '%s' was not found in the database.", identifier)
return None
return _pointer.link.data
def argument_parse() -> argparse.ArgumentParser:

View file

@ -1,27 +0,0 @@
from datetime import datetime
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
from shortenit.models.objects import Link, Pointer
engine = create_engine("sqlite+pysqlite:///:memory:", echo=True)
Link.metadata.create_all(engine)
Pointer.metadata.create_all(engine)
link = Link("00001", "https://duckduckgo.com", [])
pointer = Pointer("00001", "duckduckgo!", "30d", link.id, link)
link.pointers.append(pointer)
with Session(engine) as session:
session.add(link)
session.add(pointer)
session.commit()
with Session(engine) as session:
result = session.execute(text("SELECT * FROM pointers"))
for row in result:
print(row)
result = session.execute(text("SELECT * FROM links"))
for row in result:
print(row)

View file

@ -6,24 +6,21 @@ from sqlalchemy import DateTime, ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from shortenit.models.base import Base
from shortenit.models.shortener import Shortener
import shortenit.models.base as base
class Link(Base):
class Link(base.Base):
__tablename__ = "links"
id: Mapped[int] = mapped_column(primary_key=True, index=True, init=False)
identifier: Mapped[str] = mapped_column(index=True)
data: Mapped[AnyHttpUrl] = mapped_column(String)
data: Mapped[AnyHttpUrl] = mapped_column(String, index=True)
pointers: Mapped[List["Pointer"]] = relationship(back_populates="link")
timestamp: Mapped[datetime] = mapped_column(default=func.now())
class Pointer(Base):
class Pointer(base.Base):
__tablename__ = "pointers"
id: Mapped[int] = mapped_column(primary_key=True, index=True, init=False)
identifier: Mapped[str] = mapped_column(index=True)
data: Mapped[str]
data: Mapped[str] = mapped_column(index=True)
ttl: Mapped[str]
link_id: Mapped[int] = mapped_column(ForeignKey("links.id"))
link: Mapped["Link"] = relationship(back_populates="pointers")

View file

@ -0,0 +1,92 @@
import logging
import typing
import uuid
from sqlalchemy.orm import exc
import shortenit.models.objects as objects
class Shortener:
"""
Shortener Object
"""
def __init__(self, session, configuration: dict) -> typing.NoReturn:
"""
Initialize the Shortener object.
:param configuration: The shortenit configuration
"""
self.logger = logging.getLogger(self.__class__.__name__)
self.session = session
self.configuration = configuration
self.length = 32
self.check_duplicate = False
self.upper_case = False
self.init()
def init(self) -> typing.NoReturn:
"""
Initialize the shortener from the configuration.
"""
length = self.configuration.get("id_length", 32)
if length > 32 or length <= 8:
self.length = 32
self.logger.warn(
"ID length provided is not between '8' and '32', reverting to default of '32'"
)
else:
self.length = length
self.check_duplicate = self.configuration.get("check_duplicate_id", False)
self.upper_case = self.configuration.get("id_upper_case", False)
def generate_short_uuid(self) -> str:
"""
Method to generate UUID.
:returns: A UUID.
"""
_uuid = uuid.uuid1().hex
if self.upper_case:
return _uuid.upper()[-self.length :]
return _uuid.lower()[-self.length :]
def check_uuid(self, short_uuid: str) -> bool:
"""
Method to check the short UUID against the database.
:returns: True if UUID exists in the database.
"""
try:
_ = self.session.query(objects.Pointer).filter_by(data=short_uuid).one()
except exc.NoResultFound:
self.logger.debug(
"Generated short uuid '%s' was not found in the database.", short_uuid
)
return False
self.logger.warn("Generated short uuid '%s' was found in the database.")
return True
def generate_uuid(self) -> str:
"""
Method to generate a UUID.
This method will generate a UUID and check if it already exists.
:returns: A UUID.
"""
short_uuid = self.generate_short_uuid()
if self.check_duplicate:
counter = 0
while self.check_uuid(short_uuid=short_uuid):
if counter > 10:
self.logger.err(
"Cannot generate a new unique ID,"
"try to configure a longer ID length."
)
return None
short_uuid = self.generate_short_uuid()
counter += 1
self.logger.debug("Returning ID: '%s'", short_uuid)
return short_uuid

View file

@ -1,62 +0,0 @@
from __future__ import annotations
import logging
import time
import typing
from cloudant.document import Document
class Pointer:
"""
Pointer object.
"""
def __init__(self, pointers_db: object, identifier: str = None) -> typing.NoReturn:
"""
Initialize the Pointer object.
:param pointers_db: The Pointer database object.
:param identifier: A uniquely generated ID identifying the pointer object.
"""
self.logger = logging.getLogger(self.__class__.__name__)
self.pointers_db = pointers_db
self.identifier = identifier
self.data_hash = None
self.ttl = None
self.timestamp = time.time()
def generate_pointer(self, data_hash: str, ttl: time.time) -> Pointer:
"""
Generates a pointer object and saves it into the database.
:param data_hash: A uniquely generated ID identifying the data object.
:param ttl: The "Time to Live" of the pointer.
:returns: The Pointer object.
"""
self.logger.debug("identifier is %s", self.identifier)
with Document(self.pointers_db, self.identifier) as pointer:
pointer["value"] = data_hash
pointer["ttl"] = ttl
pointer["timestamp"] = self.timestamp
self.data_hash = data_hash
self.ttl = ttl
return self
def get_pointer(self, identifier: str) -> Pointer:
"""
Retrieve a pointer object from the database.
:param identifier: A uniquely generated ID identifying the Pointer object.
:returns: The Pointer object requested.
"""
with Document(self.pointers_db, identifier) as pointer:
try:
self.identifier = pointer["_id"]
self.data_hash = pointer["value"]
self.ttl = pointer["ttl"]
self.timestamp = pointer["timestamp"]
return self
except KeyError:
pass
return None

View file

@ -1,89 +0,0 @@
import logging
import typing
import uuid
from cloudant.document import Document
class Shortener:
"""
Shortener object
"""
def __init__(self, pointer_db, configuration: dict) -> typing.NoReturn:
"""
Initialize the Shortener object.
:param pointer_db: The Pointer Database object.
:param configuration: The shortenit configuration.
"""
self.logger = logging.getLogger(self.__class__.__name__)
self.pointer_db = pointer_db
self.uuid = None
self.length = 32
self.check_duplicate = False
self.upper_case = False
self.configuration = configuration
self.init()
def init(self) -> typing.NoReturn:
"""
Initialize the shortener from the configuration.
"""
length = self.configuration.get("id_length", 32)
if length >= 32 or length <= 0:
self.length = 32
else:
self.length = length
self.check_duplicate = self.configuration.get("check_duplicate_id", False)
self.upper_case = self.configuration.get("id_upper_case", False)
def generate_short_uuid(self) -> str:
"""
Generate a short UUID in Hex format.
:returns: A short UUID in Hex format.
"""
short_uuid = uuid.uuid1().hex
if self.upper_case:
return short_uuid.upper()[0 : self.length]
return short_uuid.lower()[0 : self.length]
def check_uuid(self, short_uuid) -> bool:
"""
Check a short UUID against the database.
:returns: Whether the UUID exists in the database or not.
"""
with Document(self.pointer_db, "pointer") as pointer:
self.logger.debug("Pointer: %s", pointer)
try:
self.uuid = pointer[short_uuid]
except KeyError:
self.logger.info(
"Generated short uuid '%s'" "was not found in database", short_uuid
)
return False
return True
def get_id(self) -> str:
"""
Method to get a UUID.
This method will generate a UUID and checks if it already exists in the database.
:returns: A UUID.
"""
short_uuid = self.generate_short_uuid()
if self.check_duplicate:
counter = 0
while self.check_uuid(short_uuid):
if counter > 10:
self.logger.err(
"Cannot generate new unique ID,"
"try to configure a longer ID length."
)
return None
short_uuid = self.generate_short_uuid()
counter += 1
self.logger.debug("Returning ID: '%s'", short_uuid)
return short_uuid