Source code for asab.storage.service
import abc
import secrets
import hashlib
import logging
import asab
import re
try:
import cryptography.hazmat.primitives.ciphers
import cryptography.hazmat.primitives.ciphers.algorithms
import cryptography.hazmat.primitives.ciphers.modes
except ModuleNotFoundError:
cryptography = None
#
L = logging.getLogger(__name__)
#
ENCRYPTED_PREFIX = b"$aes-cbc$"
[docs]class StorageServiceABC(asab.Service):
def __init__(self, app, service_name):
super().__init__(app, service_name)
self.WebhookURIs = asab.Config.get("asab:storage:changestream", "webhook_uri", fallback="") or None
if self.WebhookURIs is not None:
self.WebhookURIs = [uri for uri in re.split(r"\s+", self.WebhookURIs) if len(uri) > 0]
try:
self.ProactorService = app.get_service("asab.ProactorService")
except KeyError as e:
raise Exception("Storage webhooks require ProactorService") from e
self.WebhookAuth = asab.Config.get("asab:storage:changestream", "webhook_auth", fallback="") or None
# Specify a non-empty AES key to enable AES encryption of selected fields
self._AESKey = asab.Config.get("asab:storage", "aes_key", fallback="")
if len(self._AESKey) > 0:
if cryptography is None:
raise ModuleNotFoundError(
"You are using storage encryption without 'cryptography' installed. "
"Please run 'pip install cryptography' "
"or install asab with 'storage_encryption' optional dependency.")
self._AESKey = hashlib.sha256(self._AESKey.encode("utf-8")).digest()
else:
self._AESKey = None
[docs] @abc.abstractmethod
def upsertor(self, collection: str, obj_id=None, version: int = 0):
"""
Create an upsertor object for the specified collection.
If updating an existing object, please specify its `obj_id` and also `version` that you need to read from a storage upfront.
If `obj_id` is None, we assume that you want to insert a new object and generate its new `obj_id`, `version` should be set to 0 (default) in that case.
If you want to insert a new object with a specific `obj_id`, specify `obj_id` and set a version to 0.
- If there will be a colliding object already stored in a storage, `execute()` method will fail on `DuplicateError`.
:param collection: Name of collection to work with
:param obj_id: Primary identification of an object in the storage (e.g. primary key)
:param version: Specify a current version of the object and hence prevent byzantine faults. \
You should always read the version from the storage upfront, prior using an upsertor. \
That creates a soft lock on the record. It means that if the object is updated by other \
component in meanwhile, your upsertor will fail and you should retry the whole operation. \
The new objects should have a `version` set to 0.
"""
pass
[docs] @abc.abstractmethod
async def get(self, collection: str, obj_id, decrypt=None) -> dict:
"""
Get object from collection by its ID.
:param collection: Collection to get from.
:type collection: str
:param obj_id: Object identification.
:param decrypt: Set of fields to decrypt.
:return: The object retrieved from a storage.
:raise KeyError: Raised if `obj_id` is not found in `collection`.
"""
pass
@abc.abstractmethod
async def get_by(self, collection: str, key: str, value, decrypt=None):
"""
Get object from collection by its key and value.
:param collection: Collection to get from
:param key: Key to filter on
:param value: Value to filter on
:param decrypt: Set of fields to decrypt
:return: The object retrieved from a storage
Raises:
KeyError: If object {key: value} not found in `collection`
"""
pass
[docs] @abc.abstractmethod
async def delete(self, collection: str, obj_id):
"""
Delete object from collection.
:param collection: Collection to get from
:type collection: str
:param obj_id: Object identification
:return: ID of the deleted object.
:raise KeyError: Raised when obj_id cannot be found in collection.
"""
pass
[docs] def aes_encrypt(self, raw: bytes, iv: bytes = None) -> bytes:
"""
Take an array of bytes and encrypt it using AES-CBC.
:param raw: The data to be encrypted.
:type raw: bytes
:param iv: AES-CBC initialization vector, 16 bytes long. If left empty, a random 16-byte array will be used.
:type iv: bytes
:return: The encrypted data.
:raise TypeError: The data are not in binary format.
"""
block_size = cryptography.hazmat.primitives.ciphers.algorithms.AES.block_size // 8
if self._AESKey is None:
raise RuntimeError(
"No aes_key specified in asab:storage configuration. "
"If you want to use encryption, specify a non-empty aes_key."
)
if not isinstance(raw, bytes):
if isinstance(raw, str):
raise TypeError("String objects must be encoded before encryption")
else:
raise TypeError("Only 'bytes' objects can be encrypted")
# Pad the text to fit the blocks
pad_length = -len(raw) % block_size
if pad_length != 0:
raw = raw + b"\00" * pad_length
if iv is None:
iv = secrets.token_bytes(block_size)
algorithm = cryptography.hazmat.primitives.ciphers.algorithms.AES(self._AESKey)
mode = cryptography.hazmat.primitives.ciphers.modes.CBC(iv)
cipher = cryptography.hazmat.primitives.ciphers.Cipher(algorithm, mode)
encryptor = cipher.encryptor()
encrypted = ENCRYPTED_PREFIX + iv + (encryptor.update(raw) + encryptor.finalize())
return encrypted
[docs] def aes_decrypt(self, encrypted: bytes) -> bytes:
"""
Decrypt encrypted data using AES-CBC.
:param encrypted: The encrypted data to decrypt.
It must start with b"$aes-cbc$" prefix, followed by one-block-long initialization vector.
:type encrypted: bytes
:return: The decrypted data.
"""
block_size = cryptography.hazmat.primitives.ciphers.algorithms.AES.block_size // 8
if self._AESKey is None:
raise RuntimeError(
"No aes_key specified in asab:storage configuration. "
"If you want to use encryption, specify a non-empty aes_key."
)
if not isinstance(encrypted, bytes):
raise TypeError("Only values of type 'bytes' can be decrypted")
# Strip the prefix
if not encrypted.startswith(ENCRYPTED_PREFIX):
raise ValueError("Encrypted data must start with {!r} prefix".format(ENCRYPTED_PREFIX))
encrypted = encrypted[len(ENCRYPTED_PREFIX):]
# Separate the initialization vector
iv, encrypted = encrypted[:block_size], encrypted[block_size:]
algorithm = cryptography.hazmat.primitives.ciphers.algorithms.AES(self._AESKey)
mode = cryptography.hazmat.primitives.ciphers.modes.CBC(iv)
cipher = cryptography.hazmat.primitives.ciphers.Cipher(algorithm, mode)
decryptor = cipher.decryptor()
raw = decryptor.update(encrypted) + decryptor.finalize()
# Strip padding
raw = raw.rstrip(b"\x00")
return raw
def encryption_enabled(self) -> bool:
"""
Check if AESKey is not empty.
"""
return self._AESKey is not None