65 lines
2 KiB
Python
65 lines
2 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
import logging
|
|
|
|
from cloudant.document import Document
|
|
|
|
|
|
|
|
CHARS = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
|
|
|
|
|
|
class Pointer:
|
|
"""
|
|
Pointer object.
|
|
"""
|
|
def __init__(self, pointers_db: object,
|
|
identifier: str = None) -> None:
|
|
"""
|
|
Initialize the Pointer object.
|
|
|
|
:param pointers_db: The Pointer database object.
|
|
:param identifier: A uniquely generated ID identifying the pointer object.
|
|
"""
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
self.pointers_db = pointers_db
|
|
self.identifier = identifier
|
|
self.data_hash = None
|
|
self.ttl = None
|
|
self.timestamp = time.time()
|
|
|
|
def generate_pointer(self, data_hash: str, ttl: time.time) -> Pointer:
|
|
"""
|
|
Generates a pointer object and saves it into the database.
|
|
|
|
:param data_hash: A uniquely generated ID identifying the data object.
|
|
:param ttl: The "Time to Live" of the pointer.
|
|
:returns: The Pointer object.
|
|
"""
|
|
self.logger.debug("identifier is %s", self.identifier)
|
|
with Document(self.pointers_db, self.identifier) as pointer:
|
|
pointer['value'] = data_hash
|
|
pointer['ttl'] = ttl
|
|
pointer['timestamp'] = self.timestamp
|
|
self.data_hash = data_hash
|
|
self.ttl = ttl
|
|
return self
|
|
|
|
def get_pointer(self, identifier: str) -> Pointer:
|
|
"""
|
|
Retrieve a pointer object from the database.
|
|
|
|
:param identifier: A uniquely generated ID identifying the Pointer object.
|
|
:returns: The Pointer object requested.
|
|
"""
|
|
with Document(self.pointers_db, identifier) as pointer:
|
|
try:
|
|
self.identifier = pointer['_id']
|
|
self.data_hash = pointer['value']
|
|
self.ttl = pointer['ttl']
|
|
self.timestamp = pointer['timestamp']
|
|
return self
|
|
except KeyError:
|
|
pass
|
|
return None
|